Repository: kafka Updated Branches: refs/heads/0.10.2 8c0005740 -> 444110eac
MINOR: don't throw CommitFailedException during suspendTasksAndState Cherrypicked from trunk https://github.com/apache/kafka/pull/2535 Author: Eno Thereska <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #2804 from enothereska/CommitFailedException-0.10.2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/444110ea Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/444110ea Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/444110ea Branch: refs/heads/0.10.2 Commit: 444110eacb88a5f6cffbdb3a773f5d50eca2aea8 Parents: 8c00057 Author: Eno Thereska <[email protected]> Authored: Tue Apr 4 15:12:25 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Apr 4 15:12:25 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/processor/internals/StreamThread.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/444110ea/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 70c1392..d053431 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -478,7 +478,9 @@ public class StreamThread extends Thread { firstException.compareAndSet(null, flushAllState()); // only commit after all state has been flushed and there hasn't been an exception if (firstException.get() == null) { - firstException.set(commitOffsets()); + // TODO: currently commit failures will not be thrown to users + // while suspending tasks; this need to be re-visit after KIP-98 + commitOffsets(); } // remove the changelog partitions from restore consumer firstException.compareAndSet(null, unAssignChangeLogPartitions());
