I think we think too much on the Omega side. >From my experience, the kafka client can get much better performance by using the async invocation with batch processing. If want to guarantee the delivery of the kafka, we'd better build a kafka cluster on the server side instead of notifying Omega the Alpha cannot process any message.
For the publish method, I think we just need to log the event which cannot be send to kafka for further investigation. Willem Jiang Twitter: willemjiang Weibo: 姜宁willem On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zhang_...@boco.com.cn> wrote: > > Add another point: > > We still use the asynchronous send method. Use parameter > alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(), > the balance of performance and reliability depends on the parameters set by > the user. > > The most important thing is that Omega knows that Alpha has made a mistake. > > List<BaseEvent> eventCache = new LinkedList(); > > @Override > public void publish(Object data) { > if(logger.isDebugEnabled()){ > logger.debug("send message [{}] to [{}]", data, topic); > } > > try { > if(data instanceof BaseEvent) { > BaseEvent event = (BaseEvent) data; > eventCache.add(event); > if(eventCache.size()== batchSize){ > try { > List<Future> kafkaFutures = new LinkedList<>(); > for(BaseEvent event : eventCache){ > kafkaFutures.add(kafkaTemplate.send(topic, > event.getGlobalTxId(), Callback...); > } > producer.flush(); > for (Future future : kafkaFutures) { > future.get(); > } > eventCache.clear(); > } catch (Exception ex) { > logger.warn("Sending events to Kafka failed", ex); > throw new Exception("Commit failed as send to Kafka > failed",ex); > } > } > }else{ > throw new UnsupportedOperationException("data must be BaseEvent > type"); > } > } catch (InterruptedException | ExecutionException | > UnsupportedOperationException e) { > logger.error("publish Exception = [{}]", e.getMessage(), e); > throw new RuntimeException(e); > } > } > > > > > 在 2019年8月17日,下午3:29,Zhang Lei <zhang_...@boco.com.cn> 写道: > > > > Hi, Team > > > > Our previous discussion on PR[1] about using synchronous or asynchronous > > methods to send Kafka messages, I think need a trade-off in reliability and > > performance. > > > > Maybe we give the option to the user by allowing the user to customize some > > parameters, I have the following suggestions about the Kafka producer > > parameters: > > > > Key: Messages ordered and can't be lost, but they are allowed to repeat > > for FSM > > > > 1. Default parameter > > > > max.in.flight.requests.per.connection is 1 (User modification is prohibited > > for ordered) > > acks is -1 > > retries is greater than 0 > > > > 2. Allow users to define most parameters of the Kafka producer, E.g. > > > > acks > > retries > > buffer.memory > > compresstion.type > > min.insync.replicas > 1 (use with acks) > > replication.factor > min.insync.replicas > > timeout.ms > > request.timeout.ms > > metadata.fetch.timeout.ms > > max.block.ms > > max.request.size > > > > 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get() > > > > KafkaProducer.send(record).get() can cause performance problems, but we can > > fix it by deploying multiple alphas > > > > KafkaProducer.send(record, callback) set max.block.ms=0 & large enough > > buffer.memory. But we still have to deal with the callback failure scenario. > > In asynchronous mode, if the message is sent, but the acknowledgment has > > not been received, the buffer pool is full, and the configuration file is > > set to not limit the timeout for the blocking timeout, which means that the > > production end is blocked all the time. Ensure that data is not lost. > > > > Maybe we can use the parameters to allow users to choose to use synchronous > > or asynchronous sending mode, and use asynchronous mode to get better > > performance when there is a reliable network and Kafka cluster. > > > > [1] https://github.com/apache/servicecomb-pack/pull/540 > > <https://github.com/apache/servicecomb-pack/pull/540> > > > > Lei Zhang > > >