Add another point: We still use the asynchronous send method. Use parameter alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), the balance of performance and reliability depends on the parameters set by the user.
The most important thing is that Omega knows that Alpha has made a mistake. List<BaseEvent> eventCache = new LinkedList(); @Override public void publish(Object data) { if(logger.isDebugEnabled()){ logger.debug("send message [{}] to [{}]", data, topic); } try { if(data instanceof BaseEvent) { BaseEvent event = (BaseEvent) data; eventCache.add(event); if(eventCache.size()== batchSize){ try { List<Future> kafkaFutures = new LinkedList<>(); for(BaseEvent event : eventCache){ kafkaFutures.add(kafkaTemplate.send(topic, event.getGlobalTxId(), Callback...); } producer.flush(); for (Future future : kafkaFutures) { future.get(); } eventCache.clear(); } catch (Exception ex) { logger.warn("Sending events to Kafka failed", ex); throw new Exception("Commit failed as send to Kafka failed",ex); } } }else{ throw new UnsupportedOperationException("data must be BaseEvent type"); } } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) { logger.error("publish Exception = [{}]", e.getMessage(), e); throw new RuntimeException(e); } } > 在 2019年8月17日,下午3:29,Zhang Lei <zhang_...@boco.com.cn> 写道: > > Hi, Team > > Our previous discussion on PR[1] about using synchronous or asynchronous > methods to send Kafka messages, I think need a trade-off in reliability and > performance. > > Maybe we give the option to the user by allowing the user to customize some > parameters, I have the following suggestions about the Kafka producer > parameters: > > Key: Messages ordered and can't be lost, but they are allowed to repeat for > FSM > > 1. Default parameter > > max.in.flight.requests.per.connection is 1 (User modification is prohibited > for ordered) > acks is -1 > retries is greater than 0 > > 2. Allow users to define most parameters of the Kafka producer, E.g. > > acks > retries > buffer.memory > compresstion.type > min.insync.replicas > 1 (use with acks) > replication.factor > min.insync.replicas > timeout.ms > request.timeout.ms > metadata.fetch.timeout.ms > max.block.ms > max.request.size > > 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get() > > KafkaProducer.send(record).get() can cause performance problems, but we can > fix it by deploying multiple alphas > > KafkaProducer.send(record, callback) set max.block.ms=0 & large enough > buffer.memory. But we still have to deal with the callback failure scenario. > In asynchronous mode, if the message is sent, but the acknowledgment has not > been received, the buffer pool is full, and the configuration file is set to > not limit the timeout for the blocking timeout, which means that the > production end is blocked all the time. Ensure that data is not lost. > > Maybe we can use the parameters to allow users to choose to use synchronous > or asynchronous sending mode, and use asynchronous mode to get better > performance when there is a reliable network and Kafka cluster. > > [1] https://github.com/apache/servicecomb-pack/pull/540 > <https://github.com/apache/servicecomb-pack/pull/540> > > Lei Zhang >