Hi, Willem If you use asynchronous sending, we also need to consider the case of alpha crash
Lei Zhang > 在 2019年8月17日,下午4:29,Willem Jiang <willem.ji...@gmail.com> 写道: > > I think we think too much on the Omega side. > From my experience, the kafka client can get much better performance > by using the async invocation with batch processing. > If want to guarantee the delivery of the kafka, we'd better build a > kafka cluster on the server side instead of notifying Omega the Alpha > cannot process any message. > > For the publish method, I think we just need to log the event which > cannot be send to kafka for further investigation. > > Willem Jiang > > Twitter: willemjiang > Weibo: 姜宁willem > > On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zhang_...@boco.com.cn> wrote: >> >> Add another point: >> >> We still use the asynchronous send method. Use parameter >> alpha.feature.akka.channel.kafka.producer.batchSize trigger >> producer.flush(), the balance of performance and reliability depends on the >> parameters set by the user. >> >> The most important thing is that Omega knows that Alpha has made a mistake. >> >> List<BaseEvent> eventCache = new LinkedList(); >> >> @Override >> public void publish(Object data) { >> if(logger.isDebugEnabled()){ >> logger.debug("send message [{}] to [{}]", data, topic); >> } >> >> try { >> if(data instanceof BaseEvent) { >> BaseEvent event = (BaseEvent) data; >> eventCache.add(event); >> if(eventCache.size()== batchSize){ >> try { >> List<Future> kafkaFutures = new LinkedList<>(); >> for(BaseEvent event : eventCache){ >> kafkaFutures.add(kafkaTemplate.send(topic, >> event.getGlobalTxId(), Callback...); >> } >> producer.flush(); >> for (Future future : kafkaFutures) { >> future.get(); >> } >> eventCache.clear(); >> } catch (Exception ex) { >> logger.warn("Sending events to Kafka failed", ex); >> throw new Exception("Commit failed as send to Kafka >> failed",ex); >> } >> } >> }else{ >> throw new UnsupportedOperationException("data must be BaseEvent >> type"); >> } >> } catch (InterruptedException | ExecutionException | >> UnsupportedOperationException e) { >> logger.error("publish Exception = [{}]", e.getMessage(), e); >> throw new RuntimeException(e); >> } >> } >> >> >> >>> 在 2019年8月17日,下午3:29,Zhang Lei <zhang_...@boco.com.cn> 写道: >>> >>> Hi, Team >>> >>> Our previous discussion on PR[1] about using synchronous or asynchronous >>> methods to send Kafka messages, I think need a trade-off in reliability and >>> performance. >>> >>> Maybe we give the option to the user by allowing the user to customize some >>> parameters, I have the following suggestions about the Kafka producer >>> parameters: >>> >>> Key: Messages ordered and can't be lost, but they are allowed to repeat >>> for FSM >>> >>> 1. Default parameter >>> >>> max.in.flight.requests.per.connection is 1 (User modification is prohibited >>> for ordered) >>> acks is -1 >>> retries is greater than 0 >>> >>> 2. Allow users to define most parameters of the Kafka producer, E.g. >>> >>> acks >>> retries >>> buffer.memory >>> compresstion.type >>> min.insync.replicas > 1 (use with acks) >>> replication.factor > min.insync.replicas >>> timeout.ms >>> request.timeout.ms >>> metadata.fetch.timeout.ms >>> max.block.ms >>> max.request.size >>> >>> 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get() >>> >>> KafkaProducer.send(record).get() can cause performance problems, but we can >>> fix it by deploying multiple alphas >>> >>> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough >>> buffer.memory. But we still have to deal with the callback failure scenario. >>> In asynchronous mode, if the message is sent, but the acknowledgment has >>> not been received, the buffer pool is full, and the configuration file is >>> set to not limit the timeout for the blocking timeout, which means that the >>> production end is blocked all the time. Ensure that data is not lost. >>> >>> Maybe we can use the parameters to allow users to choose to use synchronous >>> or asynchronous sending mode, and use asynchronous mode to get better >>> performance when there is a reliable network and Kafka cluster. >>> >>> [1] https://github.com/apache/servicecomb-pack/pull/540 >>> <https://github.com/apache/servicecomb-pack/pull/540> >>> >>> Lei Zhang >>> >> >