Hi, Willem

If you use asynchronous sending, we also need to consider the case of alpha 
crash

Lei Zhang

> 在 2019年8月17日,下午4:29,Willem Jiang <willem.ji...@gmail.com> 写道:
> 
> I think we think too much on the Omega side.
> From my experience, the kafka client can get much better performance
> by using the async invocation with batch processing.
> If want to guarantee the delivery of the kafka, we'd better build a
> kafka cluster on the server side instead of notifying Omega the Alpha
> cannot process any message.
> 
> For the publish method, I think we just need to log the event which
> cannot be send to kafka for further investigation.
> 
> Willem Jiang
> 
> Twitter: willemjiang
> Weibo: 姜宁willem
> 
> On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zhang_...@boco.com.cn> wrote:
>> 
>> Add another point:
>> 
>> We still use the asynchronous send method. Use parameter 
>> alpha.feature.akka.channel.kafka.producer.batchSize trigger 
>> producer.flush(), the balance of performance and reliability depends on the 
>> parameters set by the user.
>> 
>> The most important thing is that Omega knows that Alpha has made a mistake.
>> 
>> List<BaseEvent> eventCache = new LinkedList();
>> 
>> @Override
>> public void publish(Object data) {
>>    if(logger.isDebugEnabled()){
>>        logger.debug("send message [{}] to [{}]", data, topic);
>>    }
>> 
>>    try {
>>        if(data instanceof BaseEvent) {
>>            BaseEvent event = (BaseEvent) data;
>>            eventCache.add(event);
>>            if(eventCache.size()== batchSize){
>>              try {
>>                      List<Future> kafkaFutures = new LinkedList<>();
>>                      for(BaseEvent event : eventCache){
>>                        kafkaFutures.add(kafkaTemplate.send(topic, 
>> event.getGlobalTxId(), Callback...);
>>                      }
>>                      producer.flush();
>>                      for (Future future : kafkaFutures) {
>>                        future.get();
>>                      }
>>                      eventCache.clear();
>>                           } catch (Exception ex) {
>>                  logger.warn("Sending events to Kafka failed", ex);
>>                  throw new Exception("Commit failed as send to Kafka 
>> failed",ex);
>>                   }
>>            }
>>        }else{
>>            throw new UnsupportedOperationException("data must be BaseEvent 
>> type");
>>        }
>>    } catch (InterruptedException | ExecutionException | 
>> UnsupportedOperationException e) {
>>        logger.error("publish Exception = [{}]", e.getMessage(), e);
>>        throw new RuntimeException(e);
>>    }
>> }
>> 
>> 
>> 
>>> 在 2019年8月17日,下午3:29,Zhang Lei <zhang_...@boco.com.cn> 写道:
>>> 
>>> Hi, Team
>>> 
>>> Our previous discussion on PR[1] about using synchronous or asynchronous 
>>> methods to send Kafka messages, I think need a trade-off in reliability and 
>>> performance.
>>> 
>>> Maybe we give the option to the user by allowing the user to customize some 
>>> parameters, I have the following suggestions about the Kafka producer 
>>> parameters:
>>> 
>>> Key: Messages ordered  and can't be lost, but they are allowed to repeat 
>>> for FSM
>>> 
>>> 1. Default parameter
>>> 
>>> max.in.flight.requests.per.connection is 1 (User modification is prohibited 
>>> for ordered)
>>> acks is -1
>>> retries is greater than 0
>>> 
>>> 2. Allow users to define most parameters of the Kafka producer, E.g.
>>> 
>>> acks
>>> retries
>>> buffer.memory
>>> compresstion.type
>>> min.insync.replicas > 1 (use with acks)
>>> replication.factor > min.insync.replicas
>>> timeout.ms
>>> request.timeout.ms
>>> metadata.fetch.timeout.ms
>>> max.block.ms
>>> max.request.size
>>> 
>>> 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
>>> 
>>> KafkaProducer.send(record).get() can cause performance problems, but we can 
>>> fix it by deploying multiple alphas
>>> 
>>> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough 
>>> buffer.memory. But we still have to deal with the callback failure scenario.
>>> In asynchronous mode, if the message is sent, but the acknowledgment has 
>>> not been received, the buffer pool is full, and the configuration file is 
>>> set to not limit the timeout for the blocking timeout, which means that the 
>>> production end is blocked all the time. Ensure that data is not lost.
>>> 
>>> Maybe we can use the parameters to allow users to choose to use synchronous 
>>> or asynchronous sending mode, and use asynchronous mode to get better 
>>> performance when there is a reliable network and Kafka cluster.
>>> 
>>> [1] https://github.com/apache/servicecomb-pack/pull/540 
>>> <https://github.com/apache/servicecomb-pack/pull/540>
>>> 
>>> Lei Zhang
>>> 
>> 
> 



Reply via email to