[ 
https://issues.apache.org/jira/browse/STORM-3176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang updated STORM-3176:
-----------------------------
    Description: 
KafkaSpout use the commitAsync api of Consumer, if the interval time between 
the call of consumer.poll() more than _max.poll.interval.ms_ or the heartbeat 
of consumer timeout, that will occur CommitFailedException,  and then the 
worker will die, the log like this: 
{code:java}
// 2018-07-31 19:19:03.341 o.a.s.util [ERROR] Async loop died!
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer th
an the configured max.poll.interval.ms, which typically implies that the poll 
loop is spending too much time message processing. You can address this either 
by increasing the session timeout or by reducing the maximum size of batches 
returned in
poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1126)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:XXX)
 ~[stormjar.jar:?]
at 
org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:430)
 ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:264) 
~[stormjar.jar:?]
at 
org.apache.storm.daemon.executor$fn__10936$fn__10951$fn__10982.invoke(executor.clj:647)
 ~[XXX.jar:?]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [XXX.jar:?]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
2018-07-31 19:19:03.342 o.a.s.d.executor [ERROR]
{code}
I find it will catch the Exception in auto-commit mode of consumer, the source 
code is:
{code:java}
// private void maybeAutoCommitOffsetsSync(long timeoutMs) {
    if (autoCommitEnabled) {
        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptions.allConsumed();
        try {
            log.debug("Sending synchronous auto-commit of offsets {} for group 
{}", allConsumedOffsets, groupId);
            if (!commitOffsetsSync(allConsumedOffsets, timeoutMs))
                log.debug("Auto-commit of offsets {} for group {} timed out 
before completion",
                        allConsumedOffsets, groupId);
        } catch (WakeupException | InterruptException e) {
            log.debug("Auto-commit of offsets {} for group {} was interrupted 
before completion",
                    allConsumedOffsets, groupId);
            // rethrow wakeups since they are triggered by the user
            throw e;
        } catch (Exception e) {
            // consistent with async auto-commit failures, we do not propagate 
the exception
            log.warn("Auto-commit of offsets {} failed for group {}: {}", 
allConsumedOffsets, groupId,
                    e.getMessage());
        }
    }
}
{code}
I think KafkaSpout should do like this, catch the Exception avoid to worker 
die. And when the msg ack failed, Spout should judge the offset of the msgID is 
larger than the last commit offset(Spout can guarantee that these msgs which 
offset less than the last commit offset are all ack), if not, the msg should 
not retry.

 

  was:
KafkaSpout use the commitAsync api of Consumer, if the interval time between 
the call of consumer.poll() more than _max.poll.interval.ms_ or the heartbeat 
of consumer timeout, that will occur CommitFailedException,  and then the 
worker will die, the log like this: 
{code:java}
// 2018-07-31 19:19:03.341 o.a.s.util [ERROR] Async loop died!
org.apache.mtkafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer th
an the configured max.poll.interval.ms, which typically implies that the poll 
loop is spending too much time message processing. You can address this either 
by increasing the session timeout or by reducing the maximum size of batches 
returned in
poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1126)
 ~[stormjar.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:XXX)
 ~[stormjar.jar:?]
at 
org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:430)
 ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:264) 
~[stormjar.jar:?]
at 
org.apache.storm.daemon.executor$fn__10936$fn__10951$fn__10982.invoke(executor.clj:647)
 ~[storm-core-1.1.2-mt001.jar:?]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) 
[storm-core-1.1.2-mt001.jar:?]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
2018-07-31 19:19:03.342 o.a.s.d.executor [ERROR]
{code}
I find it will catch the Exception in auto-commit mode of consumer, the source 
code is:
{code:java}
// private void maybeAutoCommitOffsetsSync(long timeoutMs) {
    if (autoCommitEnabled) {
        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptions.allConsumed();
        try {
            log.debug("Sending synchronous auto-commit of offsets {} for group 
{}", allConsumedOffsets, groupId);
            if (!commitOffsetsSync(allConsumedOffsets, timeoutMs))
                log.debug("Auto-commit of offsets {} for group {} timed out 
before completion",
                        allConsumedOffsets, groupId);
        } catch (WakeupException | InterruptException e) {
            log.debug("Auto-commit of offsets {} for group {} was interrupted 
before completion",
                    allConsumedOffsets, groupId);
            // rethrow wakeups since they are triggered by the user
            throw e;
        } catch (Exception e) {
            // consistent with async auto-commit failures, we do not propagate 
the exception
            log.warn("Auto-commit of offsets {} failed for group {}: {}", 
allConsumedOffsets, groupId,
                    e.getMessage());
        }
    }
}
{code}
I think KafkaSpout should do like this, catch the Exception avoid to worker 
die. And when the msg ack failed, Spout should judge the offset of the msgID is 
larger than the last commit offset(Spout can guarantee that these msgs which 
offset less than the last commit offset are all ack), if not, the msg should 
not retry.

 


> KafkaSpout commit offset occurs CommitFailedException which leads to worker 
> dead
> --------------------------------------------------------------------------------
>
>                 Key: STORM-3176
>                 URL: https://issues.apache.org/jira/browse/STORM-3176
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.2
>            Reporter: Matt Wang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.2.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> KafkaSpout use the commitAsync api of Consumer, if the interval time between 
> the call of consumer.poll() more than _max.poll.interval.ms_ or the heartbeat 
> of consumer timeout, that will occur CommitFailedException,  and then the 
> worker will die, the log like this: 
> {code:java}
> // 2018-07-31 19:19:03.341 o.a.s.util [ERROR] Async loop died!
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer th
> an the configured max.poll.interval.ms, which typically implies that the poll 
> loop is spending too much time message processing. You can address this 
> either by increasing the session timeout or by reducing the maximum size of 
> batches returned in
> poll() with max.poll.records.
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1126)
>  ~[stormjar.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:XXX)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:430)
>  ~[stormjar.jar:?]
> at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:264) 
> ~[stormjar.jar:?]
> at 
> org.apache.storm.daemon.executor$fn__10936$fn__10951$fn__10982.invoke(executor.clj:647)
>  ~[XXX.jar:?]
> at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [XXX.jar:?]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
> 2018-07-31 19:19:03.342 o.a.s.d.executor [ERROR]
> {code}
> I find it will catch the Exception in auto-commit mode of consumer, the 
> source code is:
> {code:java}
> // private void maybeAutoCommitOffsetsSync(long timeoutMs) {
>     if (autoCommitEnabled) {
>         Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
> subscriptions.allConsumed();
>         try {
>             log.debug("Sending synchronous auto-commit of offsets {} for 
> group {}", allConsumedOffsets, groupId);
>             if (!commitOffsetsSync(allConsumedOffsets, timeoutMs))
>                 log.debug("Auto-commit of offsets {} for group {} timed out 
> before completion",
>                         allConsumedOffsets, groupId);
>         } catch (WakeupException | InterruptException e) {
>             log.debug("Auto-commit of offsets {} for group {} was interrupted 
> before completion",
>                     allConsumedOffsets, groupId);
>             // rethrow wakeups since they are triggered by the user
>             throw e;
>         } catch (Exception e) {
>             // consistent with async auto-commit failures, we do not 
> propagate the exception
>             log.warn("Auto-commit of offsets {} failed for group {}: {}", 
> allConsumedOffsets, groupId,
>                     e.getMessage());
>         }
>     }
> }
> {code}
> I think KafkaSpout should do like this, catch the Exception avoid to worker 
> die. And when the msg ack failed, Spout should judge the offset of the msgID 
> is larger than the last commit offset(Spout can guarantee that these msgs 
> which offset less than the last commit offset are all ack), if not, the msg 
> should not retry.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to