Hi ShuQi, My apologies for the late reply.
There are 2 categories of exceptions here (both occurring presumably due to your Kafka broker failure) *Producer side:* - This is a network exception from the *Sender* instance inside the *KafkaProducer* used by Samza. - The default number of retries in Samza is MAX_INT. You can configure retries by over-riding: systems.system-name.producer.retries More generally, any Kafka property can be over-ridden as follows: systems.system-name. producer.* Any Kafka producer configuration <http://kafka.apache.org/documentation.html#newproducerconfigs> can be included here. For example, to change the request timeout, you can set systems.system-name.producer.timeout.ms. (There is no need to configure client.id as it is automatically configured by Samza.) *Consumer-side:* - The exception is a timeout triggered from the DefaultFetchSimpleConsumer. It happens in a separate thread where we poll for messages, and hence, should not affect the *SamzaContainer* main thread. - The default behavior is to attempt a re-connect, and then re-create the Consumer instance. The number of reconnect attempts is unbounded (and not configurable). Best, Jagadish On Tue, Apr 18, 2017 at 10:51 PM, 舒琦 <sh...@eefung.com> wrote: > Hi Guys, > > One of brokers in Kafka cluster is going down, the samza got the > following exception: > > 2017-04-19 10:42:36.751 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at > 172.19.105.20:9096 for client samza_consumer-canal_status_content_distinct-1] > DefaultFetchSimpleConsumer [INFO] Reconnect due to error: > java.net.SocketTimeoutException > at sun.nio.ch.SocketAdaptor$SocketInputStream.read( > SocketAdaptor.java:211) > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > at java.nio.channels.Channels$ReadableByteChannelImpl.read( > Channels.java:385) > at org.apache.kafka.common.network.NetworkReceive. > readFromReadableChannel(NetworkReceive.java:81) > at kafka.network.BlockingChannel.readCompletely( > BlockingChannel.scala:129) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala: > 120) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer. > scala:86) > at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ > $sendRequest(SimpleConsumer.scala:83) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > apply$mcV$sp$1.apply(SimpleConsumer.scala:132) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > apply$mcV$sp$1.apply(SimpleConsumer.scala:132) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( > SimpleConsumer.scala:131) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( > SimpleConsumer.scala:131) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( > SimpleConsumer.scala:131) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) > at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch( > DefaultFetchSimpleConsumer.scala:48) > at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer. > defaultFetch(DefaultFetchSimpleConsumer.scala:41) > at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$ > system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179) > at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ > run$1.apply(BrokerProxy.scala:147) > at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ > run$1.apply(BrokerProxy.scala:134) > at org.apache.samza.util.ExponentialSleepStrategy.run( > ExponentialSleepStrategy.scala:82) > at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run( > BrokerProxy.scala:133) > at java.lang.Thread.run(Thread.java:745) > 2017-04-19 10:42:44.507 [kafka-producer-network-thread | > samza_producer-canal_status_content_distinct-1] Sender [WARN] Got error > produce response with correlation id 64783117 on topic-partition > tweets_distinctContent-5, retrying (2147483646 attempts left). Error: > NETWORK_EXCEPTION > > Does “2147483646 attempts left” mean that samza will try to > reconnect to broken broker 2147483646 times? > And the log shows that samza keeps connecting to the broken broker > and the samza cluster can’t read any new messages even if Kafka cluster is > fault tolerance. > > How can I override this property: “2147483646 attempts left”? > > Thanks. > > ———————— > ShuQi > > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University