Repository: kafka Updated Branches: refs/heads/trunk 87b894d68 -> 617a91a23
HOTFIX: fix StreamTask.close() guozhangwang Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #586 from ymatsuda/fix_streamtask_close Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/617a91a2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/617a91a2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/617a91a2 Branch: refs/heads/trunk Commit: 617a91a236a44431aeca9345ae954f7067da48f3 Parents: 87b894d Author: Yasuhiro Matsuda <[email protected]> Authored: Wed Nov 25 12:04:24 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 25 12:04:24 2015 -0800 ---------------------------------------------------------------------- .../kafka/streams/processor/internals/AbstractTask.java | 7 ++++++- .../apache/kafka/streams/processor/internals/StreamTask.java | 5 +++++ 2 files changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/617a91a2/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 64bb10d..14037ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.TaskId; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.Map; import java.util.Set; public abstract class AbstractTask { @@ -84,10 +85,14 @@ public abstract class AbstractTask { public void close() { try { - stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); + stateMgr.close(recordCollectorOffsets()); } catch (IOException e) { throw new KafkaException("Error while closing the state manager in processor context", e); } } + protected Map<TopicPartition, Long> recordCollectorOffsets() { + return Collections.emptyMap(); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/617a91a2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 5d170f8..16f0667 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -304,6 +304,11 @@ public class StreamTask extends AbstractTask implements Punctuator { super.close(); } + @Override + protected Map<TopicPartition, Long> recordCollectorOffsets() { + return recordCollector.offsets(); + } + private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) { return new RecordQueue(partition, source); }
