[
https://issues.apache.org/jira/browse/STORM-2494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16128296#comment-16128296
]
Stig Rohde Døssing commented on STORM-2494:
-------------------------------------------
[~hmclouro] This is caused by using the subscribe API. I don't think we should
try to make the code work around this. Upgrading to 1.2.0 for
https://issues.apache.org/jira/browse/STORM-2640 will solve this.
> KafkaSpout does not handle CommitFailedException
> ------------------------------------------------
>
> Key: STORM-2494
> URL: https://issues.apache.org/jira/browse/STORM-2494
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.1.0
> Reporter: Yuri Barseghyan
> Assignee: Hugo Louro
>
> In situations when tuple processing takes longer than session timeout, we get
> CommitFailedException and instead of recovering from it Storm worker dies.
> {code}
> 2017-04-26 11:07:04.902 o.a.s.util [ERROR] Async loop died!
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the partitions
> to another member. This means that the time between subsequent calls to
> poll() was longer than the configured session.timeout.ms, which typically
> implies that the poll loop is spending too much time message processing. You
> can address this either by increasing the session timeout or by reducing the
> maximum size of batches returned in poll() with max.poll.records.
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> ~[stormjar.jar:3.0.2]
> \tat org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384)
> ~[stormjar.jar:3.0.2]
> \tat org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644)
> ~[storm-core-1.1.0.jar:1.1.0]
> \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
> [storm-core-1.1.0.jar:1.1.0]
> \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
> 2017-04-26 11:07:04.909 o.a.s.d.executor [ERROR]
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the partitions
> to another member. This means that the time between subsequent calls to
> poll() was longer than the configured session.timeout.ms, which typically
> implies that the poll loop is spending too much time message processing. You
> can address this either by increasing the session timeout or by reducing the
> maximum size of batches returned in poll() with max.poll.records.
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> ~[stormjar.jar:3.0.2]
> \tat org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384)
> ~[stormjar.jar:3.0.2]
> \tat org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219)
> ~[stormjar.jar:3.0.2]
> \tat
> org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644)
> ~[storm-core-1.1.0.jar:1.1.0]
> \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
> [storm-core-1.1.0.jar:1.1.0]
> \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
> 2017-04-26 11:07:04.953 o.a.s.util [ERROR] Halting process: (\"Worker died\")
> java.lang.RuntimeException: (\"Worker died\")
> \tat org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
> [storm-core-1.1.0.jar:1.1.0]
> \tat clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
> \tat org.apache.storm.daemon.worker$fn__5646$fn__5647.invoke(worker.clj:763)
> [storm-core-1.1.0.jar:1.1.0]
> \tat
> org.apache.storm.daemon.executor$mk_executor_data$fn__4863$fn__4864.invoke(executor.clj:274)
> [storm-core-1.1.0.jar:1.1.0]
> \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:494)
> [storm-core-1.1.0.jar:1.1.0]
> \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
> 2017-04-26 11:07:44.507 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG]
> Instantiated KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0,
> timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS},
> maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10,
> timeUnit=SECONDS}}
> 2017-04-26 11:07:44.516 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG]
> Instantiated KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0,
> timeUnit=SECONDS}, ratio=TimeInterval{length=0, timeUnit=MILLISECONDS},
> maxRetries=2147483647, maxRetryDelay=TimeInterval{length=0,
> timeUnit=MILLISECONDS}}
> 2017-04-26 11:07:45.048 o.a.s.k.s.KafkaSpout [INFO] Kafka Spout opened with
> the following configuration:
> KafkaSpoutConfig{kafkaProps={enable.auto.commit=false,
> request.timeout.ms=30000, group.id=Group1,
> bootstrap.servers=192.168.1.143:9092, session.timeout.ms=20000},
> key=org.apache.kafka.common.serialization.StringDeserializer@1b5080fd,
> value=org.apache.kafka.common.serialization.StringDeserializer@2720873b,
> pollTimeoutMs=200, offsetCommitPeriodMs=5000, maxUncommittedOffsets=1000,
> firstPollOffsetStrategy=UNCOMMITTED_EARLIEST,
> subscription=org.apache.storm.kafka.spout.NamedSubscription@7f068c1f,
> translator=org.apache.storm.kafka.spout.SimpleRecordTranslator@1f1ca6a2,
> retryService=KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0,
> timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS},
> maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10,
> timeUnit=SECONDS}}}
> 2017-04-26 11:07:45.111 o.a.s.k.s.KafkaSpout [INFO] Kafka Spout opened with
> the following configuration:
> KafkaSpoutConfig{kafkaProps={enable.auto.commit=false,
> request.timeout.ms=30000, group.id=Group2,
> bootstrap.servers=192.168.1.143:9092, session.timeout.ms=20000},
> key=org.apache.kafka.common.serialization.StringDeserializer@45ffa954,
> value=org.apache.kafka.common.serialization.StringDeserializer@4b384f9b,
> pollTimeoutMs=200, offsetCommitPeriodMs=5000, maxUncommittedOffsets=1000,
> firstPollOffsetStrategy=UNCOMMITTED_EARLIEST,
> subscription=org.apache.storm.kafka.spout.NamedSubscription@4f07c224,
> translator=org.apache.storm.kafka.spout.SimpleRecordTranslator@a0545a0,
> retryService=KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0,
> timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS},
> maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10,
> timeUnit=SECONDS}}}
> 2017-04-26 11:07:45.297 o.a.s.k.s.NamedSubscription [INFO] Kafka consumer
> subscribed topics [topic-1]
> 2017-04-26 11:07:45.302 o.a.s.k.s.NamedSubscription [INFO] Kafka consumer
> subscribed topics [topic-2]
> 2017-04-26 11:07:45.456 o.a.s.k.s.KafkaSpout [INFO] Partitions revoked.
> [consumer-group=Group1,
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32cbdbb0,
> topic-partitions=[]]
> 2017-04-26 11:07:45.463 o.a.s.k.s.KafkaSpout [INFO] Partitions revoked.
> [consumer-group=Group1,
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@275d5222,
> topic-partitions=[]]
> 2017-04-26 11:07:45.545 o.a.s.k.s.KafkaSpout [INFO] Partitions reassignment.
> [consumer-group=Group1,
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@275d5222,
> topic-partitions=[topic-1]]
> 2017-04-26 11:07:45.546 o.a.s.k.s.KafkaSpout [INFO] Partitions reassignment.
> [consumer-group=Group1,
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32cbdbb0,
> topic-partitions=[topic-2]]
> 2017-04-26 11:07:45.551 o.a.s.k.s.i.OffsetManager [DEBUG] Instantiated
> OffsetManager{topic-partition=topic-1, fetchOffset=11803,
> committedOffset=11802, ackedMsgs=[]}
> 2017-04-26 11:07:45.551 o.a.s.k.s.i.OffsetManager [DEBUG] Instantiated
> OffsetManager{topic-partition=topic-2, fetchOffset=11801,
> committedOffset=11800, ackedMsgs=[]}
> 2017-04-26 11:07:45.552 o.a.s.k.s.KafkaSpout [INFO] Initialization complete
> 2017-04-26 11:07:45.552 o.a.s.k.s.KafkaSpout [INFO] Initialization complete
> {code}
> I think expected behaviour would be that KafkaSpout would recover from
> exception (client will reconnect and get partitions reassigned) without
> worker getting killed.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)