Repository: kafka Updated Branches: refs/heads/0.10.2 5719e8c9c -> e06cd3e55
KAFKA-5241: GlobalKTable should checkpoint offsets after restoring state Ensure checkpointable offsets for GlobalKTables are always written on close. Author: Tommy Becker <[email protected]> Reviewers: Damian Guy, Eno Thereska, Guozhang Wang Closes #3054 from twbecker/KAFKA-5241 (cherry picked from commit 73703a15c5006ddca166a458dcde72e17c91de4a) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e06cd3e5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e06cd3e5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e06cd3e5 Branch: refs/heads/0.10.2 Commit: e06cd3e55f25a0bb414e0770493906ea8019420a Parents: 5719e8c Author: Tommy Becker <[email protected]> Authored: Tue May 16 13:59:19 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue May 16 14:01:08 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/GlobalStateManagerImpl.java | 4 ++-- .../internals/GlobalStateManagerImplTest.java | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e06cd3e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 3819bb5..dd40ad7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -227,8 +227,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @Override public void checkpoint(final Map<TopicPartition, Long> offsets) { - if (!offsets.isEmpty()) { - checkpointableOffsets.putAll(offsets); + checkpointableOffsets.putAll(offsets); + if (!checkpointableOffsets.isEmpty()) { try { checkpoint.write(checkpointableOffsets); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/e06cd3e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 8c9cf19..6f06ad9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -408,6 +408,19 @@ public class GlobalStateManagerImplTest { assertThat(updatedCheckpoint.get(t1), equalTo(101L)); } + @Test + public void shouldCheckpointRestoredOffsetsToFile() throws IOException { + stateManager.initialize(context); + final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + initializeConsumer(10, 1, t1); + stateManager.register(store1, false, stateRestoreCallback); + stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + + final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed(); + assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 11L))); + assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap)); + } + private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME)); @@ -444,4 +457,4 @@ public class GlobalStateManagerImplTest { restored.add(KeyValue.pair(key, value)); } } -} \ No newline at end of file +}
