Hi ShuQi, >> There are 4 brokers in our Kafka cluster, when one of the brokers goes down, Samza can not fetch and send messages any more, is this normal?
I'm not entirely sure this is a Samza problem intrinsically. It is possible that your topic partition was entirely offline. The Kafka producer will refresh the metadata before each send attempt based on the *retry.backoff.ms <http://retry.backoff.ms>* setting (default *10ms*). In general, the following Kafka server-side properties govern the availability versus consistency trade-off when a broker is dead: - Replication factor - Minimum in-sync replicas (which Kafka refers to as ISR) - Unclean Leader election - Acknowledgements On Mon, Apr 24, 2017 at 6:42 PM, 舒琦 <sh...@eefung.com> wrote: > Hi Jagadish, > > Thanks for your patient explanation. > > I understand now about the exceptions. > > But there is still a question. There are 4 brokers in our Kafka cluster, > when one of the brokers goes down, Samza can not fetch and send messages > any more, is this normal? > > ———————— > Qi Shu > > > > 在 2017年4月25日,01:24,Jagadish Venkatraman <jagadish1...@gmail.com> 写道: > > > > Hi ShuQi, > > > > My apologies for the late reply. > > > > There are 2 categories of exceptions here (both occurring presumably due > to > > your Kafka broker failure) > > > > *Producer side:* > > > > - This is a network exception from the *Sender* instance inside the > > *KafkaProducer* used by Samza. > > - The default number of retries in Samza is MAX_INT. You can configure > > retries by over-riding: systems.system-name.producer.retries > > > > More generally, any Kafka property can be over-ridden as follows: > > systems.system-name. > > producer.* Any Kafka producer configuration > > <http://kafka.apache.org/documentation.html#newproducerconfigs> can be > > included here. For example, to change the request timeout, you can set > > systems.system-name.producer.timeout.ms. (There is no need to configure > > client.id as it is automatically configured by Samza.) > > *Consumer-side:* > > > > - The exception is a timeout triggered from the > DefaultFetchSimpleConsumer. > > It happens in a separate thread where we poll for messages, and hence, > > should not affect the *SamzaContainer* main thread. > > - The default behavior is to attempt a re-connect, and then re-create the > > Consumer instance. The number of reconnect attempts is unbounded (and not > > configurable). > > > > > > Best, > > Jagadish > > > > > > On Tue, Apr 18, 2017 at 10:51 PM, 舒琦 <sh...@eefung.com> wrote: > > > >> Hi Guys, > >> > >> One of brokers in Kafka cluster is going down, the samza got the > >> following exception: > >> > >> 2017-04-19 10:42:36.751 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed > at > >> 172.19.105.20:9096 for client samza_consumer-canal_status_ > content_distinct-1] > >> DefaultFetchSimpleConsumer [INFO] Reconnect due to error: > >> java.net.SocketTimeoutException > >> at sun.nio.ch.SocketAdaptor$SocketInputStream.read( > >> SocketAdaptor.java:211) > >> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java: > 103) > >> at java.nio.channels.Channels$ReadableByteChannelImpl.read( > >> Channels.java:385) > >> at org.apache.kafka.common.network.NetworkReceive. > >> readFromReadableChannel(NetworkReceive.java:81) > >> at kafka.network.BlockingChannel.readCompletely( > >> BlockingChannel.scala:129) > >> at kafka.network.BlockingChannel.receive(BlockingChannel.scala: > >> 120) > >> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer. > >> scala:86) > >> at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ > >> $sendRequest(SimpleConsumer.scala:83) > >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > >> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) > >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > >> apply$mcV$sp$1.apply(SimpleConsumer.scala:132) > >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > >> apply$mcV$sp$1.apply(SimpleConsumer.scala:132) > >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( > >> SimpleConsumer.scala:131) > >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( > >> SimpleConsumer.scala:131) > >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( > >> SimpleConsumer.scala:131) > >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > >> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) > >> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer. > fetch( > >> DefaultFetchSimpleConsumer.scala:48) > >> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer. > >> defaultFetch(DefaultFetchSimpleConsumer.scala:41) > >> at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$ > >> system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179) > >> at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ > >> run$1.apply(BrokerProxy.scala:147) > >> at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ > >> run$1.apply(BrokerProxy.scala:134) > >> at org.apache.samza.util.ExponentialSleepStrategy.run( > >> ExponentialSleepStrategy.scala:82) > >> at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run( > >> BrokerProxy.scala:133) > >> at java.lang.Thread.run(Thread.java:745) > >> 2017-04-19 10:42:44.507 [kafka-producer-network-thread | > >> samza_producer-canal_status_content_distinct-1] Sender [WARN] Got error > >> produce response with correlation id 64783117 on topic-partition > >> tweets_distinctContent-5, retrying (2147483646 attempts left). Error: > >> NETWORK_EXCEPTION > >> > >> Does “2147483646 attempts left” mean that samza will try to > >> reconnect to broken broker 2147483646 times? > >> And the log shows that samza keeps connecting to the broken > broker > >> and the samza cluster can’t read any new messages even if Kafka cluster > is > >> fault tolerance. > >> > >> How can I override this property: “2147483646 attempts left”? > >> > >> Thanks. > >> > >> ———————— > >> ShuQi > >> > >> > > > > > > -- > > Jagadish V, > > Graduate Student, > > Department of Computer Science, > > Stanford University > > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University