Add another point:

We still use the asynchronous send method. Use parameter 
alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), 
the balance of performance and reliability depends on the parameters set by the 
user. 

The most important thing is that Omega knows that Alpha has made a mistake.

List<BaseEvent> eventCache = new LinkedList();

@Override
public void publish(Object data) {
    if(logger.isDebugEnabled()){
        logger.debug("send message [{}] to [{}]", data, topic);
    }

    try {
        if(data instanceof BaseEvent) {
            BaseEvent event = (BaseEvent) data;
            eventCache.add(event);
            if(eventCache.size()== batchSize){
              try {
                      List<Future> kafkaFutures = new LinkedList<>();
                      for(BaseEvent event : eventCache){
                        kafkaFutures.add(kafkaTemplate.send(topic, 
event.getGlobalTxId(), Callback...);
                      }
                      producer.flush();
                      for (Future future : kafkaFutures) {
                        future.get();
                      }
                      eventCache.clear();
                           } catch (Exception ex) {
                  logger.warn("Sending events to Kafka failed", ex);
                  throw new Exception("Commit failed as send to Kafka 
failed",ex);
                   }
            }
        }else{
            throw new UnsupportedOperationException("data must be BaseEvent 
type");
        }
    } catch (InterruptedException | ExecutionException | 
UnsupportedOperationException e) {
        logger.error("publish Exception = [{}]", e.getMessage(), e);
        throw new RuntimeException(e);
    }
}



> 在 2019年8月17日,下午3:29,Zhang Lei <zhang_...@boco.com.cn> 写道:
> 
> Hi, Team
> 
> Our previous discussion on PR[1] about using synchronous or asynchronous 
> methods to send Kafka messages, I think need a trade-off in reliability and 
> performance.
> 
> Maybe we give the option to the user by allowing the user to customize some 
> parameters, I have the following suggestions about the Kafka producer 
> parameters:
> 
> Key: Messages ordered  and can't be lost, but they are allowed to repeat for 
> FSM
> 
> 1. Default parameter 
> 
> max.in.flight.requests.per.connection is 1 (User modification is prohibited 
> for ordered)
> acks is -1
> retries is greater than 0
> 
> 2. Allow users to define most parameters of the Kafka producer, E.g.
> 
> acks
> retries
> buffer.memory
> compresstion.type
> min.insync.replicas > 1 (use with acks)
> replication.factor > min.insync.replicas
> timeout.ms
> request.timeout.ms
> metadata.fetch.timeout.ms
> max.block.ms
> max.request.size
> 
> 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
> 
> KafkaProducer.send(record).get() can cause performance problems, but we can 
> fix it by deploying multiple alphas
> 
> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough 
> buffer.memory. But we still have to deal with the callback failure scenario.
> In asynchronous mode, if the message is sent, but the acknowledgment has not 
> been received, the buffer pool is full, and the configuration file is set to 
> not limit the timeout for the blocking timeout, which means that the 
> production end is blocked all the time. Ensure that data is not lost.
> 
> Maybe we can use the parameters to allow users to choose to use synchronous 
> or asynchronous sending mode, and use asynchronous mode to get better 
> performance when there is a reliable network and Kafka cluster.
> 
> [1] https://github.com/apache/servicecomb-pack/pull/540 
> <https://github.com/apache/servicecomb-pack/pull/540>
> 
> Lei Zhang
> 

Reply via email to