Repository: kafka Updated Branches: refs/heads/trunk 9bed8fbcf -> 7c7becd4c
KAFKA-4486: Don't commit offsets on exception Author: Eno Thereska <[email protected]> Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang Closes #2225 from enothereska/KAFKA-4486-exception-commit Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c7becd4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c7becd4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c7becd4 Branch: refs/heads/trunk Commit: 7c7becd4cb88b372d199b05661f8efb1e1d022e6 Parents: 9bed8fb Author: Eno Thereska <[email protected]> Authored: Fri Dec 9 17:17:55 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Fri Dec 9 17:17:55 2016 -0800 ---------------------------------------------------------------------- .../kafka/streams/processor/internals/StreamThread.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7c7becd4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 96e9963..a2cac71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -198,7 +198,7 @@ public class StreamThread extends Thread { final StateDirectory stateDirectory; private StreamPartitionAssignor partitionAssignor = null; - + private boolean cleanRun = false; private long timerStartedMs; private long lastCleanMs; private long lastCommitMs; @@ -343,6 +343,7 @@ public class StreamThread extends Thread { try { runLoop(); + cleanRun = true; } catch (KafkaException e) { // just re-throw the exception as it should be logged already throw e; @@ -368,6 +369,7 @@ public class StreamThread extends Thread { return Collections.unmodifiableMap(activeTasks); } + private void shutdown() { log.info("{} Shutting down", logPrefix); shutdownTasksAndState(false); @@ -414,8 +416,11 @@ public class StreamThread extends Thread { log.debug("{} shutdownTasksAndState: shutting down all active tasks [{}] and standby tasks [{}]", logPrefix, activeTasks.keySet(), standbyTasks.keySet()); - // Commit first as there may be cached records that have not been flushed yet. - commitOffsets(rethrowExceptions); + // only commit under clean exit + if (cleanRun) { + // Commit first as there may be cached records that have not been flushed yet. + commitOffsets(rethrowExceptions); + } // Close all processors in topology order closeAllTasks(); // flush state
