Yeah, that's is why I said we need a kafka cluster on the back end. It's better we have alpha cluster to receive the events. I agree we could provide different configuration for the user to choice between the performance and stability of system.
Willem Jiang Twitter: willemjiang Weibo: 姜宁willem On Sat, Aug 17, 2019 at 4:37 PM Zhang Lei <cool...@qq.com> wrote: > > Hi, Willem > > If you use asynchronous sending, we also need to consider the case of alpha > crash > > Lei Zhang > > > 在 2019年8月17日,下午4:29,Willem Jiang <willem.ji...@gmail.com> 写道: > > > > I think we think too much on the Omega side. > > From my experience, the kafka client can get much better performance > > by using the async invocation with batch processing. > > If want to guarantee the delivery of the kafka, we'd better build a > > kafka cluster on the server side instead of notifying Omega the Alpha > > cannot process any message. > > > > For the publish method, I think we just need to log the event which > > cannot be send to kafka for further investigation. > > > > Willem Jiang > > > > Twitter: willemjiang > > Weibo: 姜宁willem > > > > On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zhang_...@boco.com.cn> wrote: > >> > >> Add another point: > >> > >> We still use the asynchronous send method. Use parameter > >> alpha.feature.akka.channel.kafka.producer.batchSize trigger > >> producer.flush(), the balance of performance and reliability depends on > >> the parameters set by the user. > >> > >> The most important thing is that Omega knows that Alpha has made a mistake. > >> > >> List<BaseEvent> eventCache = new LinkedList(); > >> > >> @Override > >> public void publish(Object data) { > >> if(logger.isDebugEnabled()){ > >> logger.debug("send message [{}] to [{}]", data, topic); > >> } > >> > >> try { > >> if(data instanceof BaseEvent) { > >> BaseEvent event = (BaseEvent) data; > >> eventCache.add(event); > >> if(eventCache.size()== batchSize){ > >> try { > >> List<Future> kafkaFutures = new LinkedList<>(); > >> for(BaseEvent event : eventCache){ > >> kafkaFutures.add(kafkaTemplate.send(topic, > >> event.getGlobalTxId(), Callback...); > >> } > >> producer.flush(); > >> for (Future future : kafkaFutures) { > >> future.get(); > >> } > >> eventCache.clear(); > >> } catch (Exception ex) { > >> logger.warn("Sending events to Kafka failed", ex); > >> throw new Exception("Commit failed as send to Kafka > >> failed",ex); > >> } > >> } > >> }else{ > >> throw new UnsupportedOperationException("data must be BaseEvent > >> type"); > >> } > >> } catch (InterruptedException | ExecutionException | > >> UnsupportedOperationException e) { > >> logger.error("publish Exception = [{}]", e.getMessage(), e); > >> throw new RuntimeException(e); > >> } > >> } > >> > >> > >> > >>> 在 2019年8月17日,下午3:29,Zhang Lei <zhang_...@boco.com.cn> 写道: > >>> > >>> Hi, Team > >>> > >>> Our previous discussion on PR[1] about using synchronous or asynchronous > >>> methods to send Kafka messages, I think need a trade-off in reliability > >>> and performance. > >>> > >>> Maybe we give the option to the user by allowing the user to customize > >>> some parameters, I have the following suggestions about the Kafka > >>> producer parameters: > >>> > >>> Key: Messages ordered and can't be lost, but they are allowed to repeat > >>> for FSM > >>> > >>> 1. Default parameter > >>> > >>> max.in.flight.requests.per.connection is 1 (User modification is > >>> prohibited for ordered) > >>> acks is -1 > >>> retries is greater than 0 > >>> > >>> 2. Allow users to define most parameters of the Kafka producer, E.g. > >>> > >>> acks > >>> retries > >>> buffer.memory > >>> compresstion.type > >>> min.insync.replicas > 1 (use with acks) > >>> replication.factor > min.insync.replicas > >>> timeout.ms > >>> request.timeout.ms > >>> metadata.fetch.timeout.ms > >>> max.block.ms > >>> max.request.size > >>> > >>> 3. KafkaProducer.send(record, callback) or > >>> KafkaProducer.send(record).get() > >>> > >>> KafkaProducer.send(record).get() can cause performance problems, but we > >>> can fix it by deploying multiple alphas > >>> > >>> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough > >>> buffer.memory. But we still have to deal with the callback failure > >>> scenario. > >>> In asynchronous mode, if the message is sent, but the acknowledgment has > >>> not been received, the buffer pool is full, and the configuration file is > >>> set to not limit the timeout for the blocking timeout, which means that > >>> the production end is blocked all the time. Ensure that data is not lost. > >>> > >>> Maybe we can use the parameters to allow users to choose to use > >>> synchronous or asynchronous sending mode, and use asynchronous mode to > >>> get better performance when there is a reliable network and Kafka cluster. > >>> > >>> [1] https://github.com/apache/servicecomb-pack/pull/540 > >>> <https://github.com/apache/servicecomb-pack/pull/540> > >>> > >>> Lei Zhang > >>> > >> > > > > >