Hi Jagadish, Thanks for your patient explanation.
I understand now about the exceptions. But there is still a question. There are 4 brokers in our Kafka cluster, when one of the brokers goes down, Samza can not fetch and send messages any more, is this normal? ———————— Qi Shu > 在 2017年4月25日,01:24,Jagadish Venkatraman <jagadish1...@gmail.com> 写道: > > Hi ShuQi, > > My apologies for the late reply. > > There are 2 categories of exceptions here (both occurring presumably due to > your Kafka broker failure) > > *Producer side:* > > - This is a network exception from the *Sender* instance inside the > *KafkaProducer* used by Samza. > - The default number of retries in Samza is MAX_INT. You can configure > retries by over-riding: systems.system-name.producer.retries > > More generally, any Kafka property can be over-ridden as follows: > systems.system-name. > producer.* Any Kafka producer configuration > <http://kafka.apache.org/documentation.html#newproducerconfigs> can be > included here. For example, to change the request timeout, you can set > systems.system-name.producer.timeout.ms. (There is no need to configure > client.id as it is automatically configured by Samza.) > *Consumer-side:* > > - The exception is a timeout triggered from the DefaultFetchSimpleConsumer. > It happens in a separate thread where we poll for messages, and hence, > should not affect the *SamzaContainer* main thread. > - The default behavior is to attempt a re-connect, and then re-create the > Consumer instance. The number of reconnect attempts is unbounded (and not > configurable). > > > Best, > Jagadish > > > On Tue, Apr 18, 2017 at 10:51 PM, 舒琦 <sh...@eefung.com> wrote: > >> Hi Guys, >> >> One of brokers in Kafka cluster is going down, the samza got the >> following exception: >> >> 2017-04-19 10:42:36.751 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at >> 172.19.105.20:9096 for client samza_consumer-canal_status_content_distinct-1] >> DefaultFetchSimpleConsumer [INFO] Reconnect due to error: >> java.net.SocketTimeoutException >> at sun.nio.ch.SocketAdaptor$SocketInputStream.read( >> SocketAdaptor.java:211) >> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) >> at java.nio.channels.Channels$ReadableByteChannelImpl.read( >> Channels.java:385) >> at org.apache.kafka.common.network.NetworkReceive. >> readFromReadableChannel(NetworkReceive.java:81) >> at kafka.network.BlockingChannel.readCompletely( >> BlockingChannel.scala:129) >> at kafka.network.BlockingChannel.receive(BlockingChannel.scala: >> 120) >> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer. >> scala:86) >> at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ >> $sendRequest(SimpleConsumer.scala:83) >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ >> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ >> apply$mcV$sp$1.apply(SimpleConsumer.scala:132) >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ >> apply$mcV$sp$1.apply(SimpleConsumer.scala:132) >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( >> SimpleConsumer.scala:131) >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( >> SimpleConsumer.scala:131) >> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( >> SimpleConsumer.scala:131) >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) >> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch( >> DefaultFetchSimpleConsumer.scala:48) >> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer. >> defaultFetch(DefaultFetchSimpleConsumer.scala:41) >> at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$ >> system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179) >> at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ >> run$1.apply(BrokerProxy.scala:147) >> at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ >> run$1.apply(BrokerProxy.scala:134) >> at org.apache.samza.util.ExponentialSleepStrategy.run( >> ExponentialSleepStrategy.scala:82) >> at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run( >> BrokerProxy.scala:133) >> at java.lang.Thread.run(Thread.java:745) >> 2017-04-19 10:42:44.507 [kafka-producer-network-thread | >> samza_producer-canal_status_content_distinct-1] Sender [WARN] Got error >> produce response with correlation id 64783117 on topic-partition >> tweets_distinctContent-5, retrying (2147483646 attempts left). Error: >> NETWORK_EXCEPTION >> >> Does “2147483646 attempts left” mean that samza will try to >> reconnect to broken broker 2147483646 times? >> And the log shows that samza keeps connecting to the broken broker >> and the samza cluster can’t read any new messages even if Kafka cluster is >> fault tolerance. >> >> How can I override this property: “2147483646 attempts left”? >> >> Thanks. >> >> ———————— >> ShuQi >> >> > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University