Hi Jagadish, Thanks for your help, I’ll check our Kafka first.
———————— 舒琦 地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F 网址:http://www.eefung.com 微博:http://weibo.com/eefung 邮编:410013 电话:400-677-0986 传真:0731-88519609 > 在 2017年4月25日,11:36,Jagadish Venkatraman <jagadish1...@gmail.com> 写道: > > Hi ShuQi, > >>> There are 4 brokers in our Kafka cluster, when one of the brokers goes > down, Samza can not fetch and send messages any more, is this normal? > > I'm not entirely sure this is a Samza problem intrinsically. It is possible > that your topic partition was entirely offline. The Kafka producer will > refresh the metadata before each send attempt based on the *retry.backoff.ms > <http://retry.backoff.ms>* setting (default *10ms*). > > In general, the following Kafka server-side properties govern the > availability versus consistency trade-off when a broker is dead: > > - Replication factor > - Minimum in-sync replicas (which Kafka refers to as ISR) > - Unclean Leader election > - Acknowledgements > > > > On Mon, Apr 24, 2017 at 6:42 PM, 舒琦 <sh...@eefung.com> wrote: > >> Hi Jagadish, >> >> Thanks for your patient explanation. >> >> I understand now about the exceptions. >> >> But there is still a question. There are 4 brokers in our Kafka cluster, >> when one of the brokers goes down, Samza can not fetch and send messages >> any more, is this normal? >> >> ———————— >> Qi Shu >> >> >>> 在 2017年4月25日,01:24,Jagadish Venkatraman <jagadish1...@gmail.com> 写道: >>> >>> Hi ShuQi, >>> >>> My apologies for the late reply. >>> >>> There are 2 categories of exceptions here (both occurring presumably due >> to >>> your Kafka broker failure) >>> >>> *Producer side:* >>> >>> - This is a network exception from the *Sender* instance inside the >>> *KafkaProducer* used by Samza. >>> - The default number of retries in Samza is MAX_INT. You can configure >>> retries by over-riding: systems.system-name.producer.retries >>> >>> More generally, any Kafka property can be over-ridden as follows: >>> systems.system-name. >>> producer.* Any Kafka producer configuration >>> <http://kafka.apache.org/documentation.html#newproducerconfigs> can be >>> included here. For example, to change the request timeout, you can set >>> systems.system-name.producer.timeout.ms. (There is no need to configure >>> client.id as it is automatically configured by Samza.) >>> *Consumer-side:* >>> >>> - The exception is a timeout triggered from the >> DefaultFetchSimpleConsumer. >>> It happens in a separate thread where we poll for messages, and hence, >>> should not affect the *SamzaContainer* main thread. >>> - The default behavior is to attempt a re-connect, and then re-create the >>> Consumer instance. The number of reconnect attempts is unbounded (and not >>> configurable). >>> >>> >>> Best, >>> Jagadish >>> >>> >>> On Tue, Apr 18, 2017 at 10:51 PM, 舒琦 <sh...@eefung.com> wrote: >>> >>>> Hi Guys, >>>> >>>> One of brokers in Kafka cluster is going down, the samza got the >>>> following exception: >>>> >>>> 2017-04-19 10:42:36.751 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed >> at >>>> 172.19.105.20:9096 for client samza_consumer-canal_status_ >> content_distinct-1] >>>> DefaultFetchSimpleConsumer [INFO] Reconnect due to error: >>>> java.net.SocketTimeoutException >>>> at sun.nio.ch.SocketAdaptor$SocketInputStream.read( >>>> SocketAdaptor.java:211) >>>> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java: >> 103) >>>> at java.nio.channels.Channels$ReadableByteChannelImpl.read( >>>> Channels.java:385) >>>> at org.apache.kafka.common.network.NetworkReceive. >>>> readFromReadableChannel(NetworkReceive.java:81) >>>> at kafka.network.BlockingChannel.readCompletely( >>>> BlockingChannel.scala:129) >>>> at kafka.network.BlockingChannel.receive(BlockingChannel.scala: >>>> 120) >>>> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer. >>>> scala:86) >>>> at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ >>>> $sendRequest(SimpleConsumer.scala:83) >>>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ >>>> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) >>>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ >>>> apply$mcV$sp$1.apply(SimpleConsumer.scala:132) >>>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ >>>> apply$mcV$sp$1.apply(SimpleConsumer.scala:132) >>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( >>>> SimpleConsumer.scala:131) >>>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( >>>> SimpleConsumer.scala:131) >>>> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( >>>> SimpleConsumer.scala:131) >>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>>> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) >>>> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer. >> fetch( >>>> DefaultFetchSimpleConsumer.scala:48) >>>> at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer. >>>> defaultFetch(DefaultFetchSimpleConsumer.scala:41) >>>> at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$ >>>> system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179) >>>> at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ >>>> run$1.apply(BrokerProxy.scala:147) >>>> at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$ >>>> run$1.apply(BrokerProxy.scala:134) >>>> at org.apache.samza.util.ExponentialSleepStrategy.run( >>>> ExponentialSleepStrategy.scala:82) >>>> at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run( >>>> BrokerProxy.scala:133) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 2017-04-19 10:42:44.507 [kafka-producer-network-thread | >>>> samza_producer-canal_status_content_distinct-1] Sender [WARN] Got error >>>> produce response with correlation id 64783117 on topic-partition >>>> tweets_distinctContent-5, retrying (2147483646 attempts left). Error: >>>> NETWORK_EXCEPTION >>>> >>>> Does “2147483646 attempts left” mean that samza will try to >>>> reconnect to broken broker 2147483646 times? >>>> And the log shows that samza keeps connecting to the broken >> broker >>>> and the samza cluster can’t read any new messages even if Kafka cluster >> is >>>> fault tolerance. >>>> >>>> How can I override this property: “2147483646 attempts left”? >>>> >>>> Thanks. >>>> >>>> ———————— >>>> ShuQi >>>> >>>> >>> >>> >>> -- >>> Jagadish V, >>> Graduate Student, >>> Department of Computer Science, >>> Stanford University >> >> > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University