Yeah, that's is why I said we need a kafka cluster on the back end.
It's better we have alpha cluster to receive the events.
I agree we could provide different configuration for the user to
choice between the performance and stability of system.

Willem Jiang

Twitter: willemjiang
Weibo: 姜宁willem

On Sat, Aug 17, 2019 at 4:37 PM Zhang Lei <cool...@qq.com> wrote:
>
> Hi, Willem
>
> If you use asynchronous sending, we also need to consider the case of alpha 
> crash
>
> Lei Zhang
>
> > 在 2019年8月17日,下午4:29,Willem Jiang <willem.ji...@gmail.com> 写道:
> >
> > I think we think too much on the Omega side.
> > From my experience, the kafka client can get much better performance
> > by using the async invocation with batch processing.
> > If want to guarantee the delivery of the kafka, we'd better build a
> > kafka cluster on the server side instead of notifying Omega the Alpha
> > cannot process any message.
> >
> > For the publish method, I think we just need to log the event which
> > cannot be send to kafka for further investigation.
> >
> > Willem Jiang
> >
> > Twitter: willemjiang
> > Weibo: 姜宁willem
> >
> > On Sat, Aug 17, 2019 at 4:11 PM Zhang Lei <zhang_...@boco.com.cn> wrote:
> >>
> >> Add another point:
> >>
> >> We still use the asynchronous send method. Use parameter 
> >> alpha.feature.akka.channel.kafka.producer.batchSize trigger 
> >> producer.flush(), the balance of performance and reliability depends on 
> >> the parameters set by the user.
> >>
> >> The most important thing is that Omega knows that Alpha has made a mistake.
> >>
> >> List<BaseEvent> eventCache = new LinkedList();
> >>
> >> @Override
> >> public void publish(Object data) {
> >>    if(logger.isDebugEnabled()){
> >>        logger.debug("send message [{}] to [{}]", data, topic);
> >>    }
> >>
> >>    try {
> >>        if(data instanceof BaseEvent) {
> >>            BaseEvent event = (BaseEvent) data;
> >>            eventCache.add(event);
> >>            if(eventCache.size()== batchSize){
> >>              try {
> >>                      List<Future> kafkaFutures = new LinkedList<>();
> >>                      for(BaseEvent event : eventCache){
> >>                        kafkaFutures.add(kafkaTemplate.send(topic, 
> >> event.getGlobalTxId(), Callback...);
> >>                      }
> >>                      producer.flush();
> >>                      for (Future future : kafkaFutures) {
> >>                        future.get();
> >>                      }
> >>                      eventCache.clear();
> >>                           } catch (Exception ex) {
> >>                  logger.warn("Sending events to Kafka failed", ex);
> >>                  throw new Exception("Commit failed as send to Kafka 
> >> failed",ex);
> >>                   }
> >>            }
> >>        }else{
> >>            throw new UnsupportedOperationException("data must be BaseEvent 
> >> type");
> >>        }
> >>    } catch (InterruptedException | ExecutionException | 
> >> UnsupportedOperationException e) {
> >>        logger.error("publish Exception = [{}]", e.getMessage(), e);
> >>        throw new RuntimeException(e);
> >>    }
> >> }
> >>
> >>
> >>
> >>> 在 2019年8月17日,下午3:29,Zhang Lei <zhang_...@boco.com.cn> 写道:
> >>>
> >>> Hi, Team
> >>>
> >>> Our previous discussion on PR[1] about using synchronous or asynchronous 
> >>> methods to send Kafka messages, I think need a trade-off in reliability 
> >>> and performance.
> >>>
> >>> Maybe we give the option to the user by allowing the user to customize 
> >>> some parameters, I have the following suggestions about the Kafka 
> >>> producer parameters:
> >>>
> >>> Key: Messages ordered  and can't be lost, but they are allowed to repeat 
> >>> for FSM
> >>>
> >>> 1. Default parameter
> >>>
> >>> max.in.flight.requests.per.connection is 1 (User modification is 
> >>> prohibited for ordered)
> >>> acks is -1
> >>> retries is greater than 0
> >>>
> >>> 2. Allow users to define most parameters of the Kafka producer, E.g.
> >>>
> >>> acks
> >>> retries
> >>> buffer.memory
> >>> compresstion.type
> >>> min.insync.replicas > 1 (use with acks)
> >>> replication.factor > min.insync.replicas
> >>> timeout.ms
> >>> request.timeout.ms
> >>> metadata.fetch.timeout.ms
> >>> max.block.ms
> >>> max.request.size
> >>>
> >>> 3. KafkaProducer.send(record, callback) or 
> >>> KafkaProducer.send(record).get()
> >>>
> >>> KafkaProducer.send(record).get() can cause performance problems, but we 
> >>> can fix it by deploying multiple alphas
> >>>
> >>> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough 
> >>> buffer.memory. But we still have to deal with the callback failure 
> >>> scenario.
> >>> In asynchronous mode, if the message is sent, but the acknowledgment has 
> >>> not been received, the buffer pool is full, and the configuration file is 
> >>> set to not limit the timeout for the blocking timeout, which means that 
> >>> the production end is blocked all the time. Ensure that data is not lost.
> >>>
> >>> Maybe we can use the parameters to allow users to choose to use 
> >>> synchronous or asynchronous sending mode, and use asynchronous mode to 
> >>> get better performance when there is a reliable network and Kafka cluster.
> >>>
> >>> [1] https://github.com/apache/servicecomb-pack/pull/540 
> >>> <https://github.com/apache/servicecomb-pack/pull/540>
> >>>
> >>> Lei Zhang
> >>>
> >>
> >
>
>
>

Reply via email to