Repository: kafka Updated Branches: refs/heads/trunk 4e3092d27 -> 73703a15c
KAFKA-5241: GlobalKTable should checkpoint offsets after restoring state Ensure checkpointable offsets for GlobalKTables are always written on close. Author: Tommy Becker <[email protected]> Reviewers: Damian Guy, Eno Thereska, Guozhang Wang Closes #3054 from twbecker/KAFKA-5241 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73703a15 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73703a15 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73703a15 Branch: refs/heads/trunk Commit: 73703a15c5006ddca166a458dcde72e17c91de4a Parents: 4e3092d Author: Tommy Becker <[email protected]> Authored: Tue May 16 13:59:19 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue May 16 13:59:19 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/GlobalStateManagerImpl.java | 4 ++-- .../internals/GlobalStateManagerImplTest.java | 14 +++++++++++++- 2 files changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/73703a15/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 275dca5..6bd699f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -229,8 +229,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @Override public void checkpoint(final Map<TopicPartition, Long> offsets) { - if (!offsets.isEmpty()) { - checkpointableOffsets.putAll(offsets); + checkpointableOffsets.putAll(offsets); + if (!checkpointableOffsets.isEmpty()) { try { checkpoint.write(checkpointableOffsets); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/73703a15/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 54062fc..98ef8f6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -426,6 +426,18 @@ public class GlobalStateManagerImplTest { assertThat(stateRestoreCallback.restored, equalTo(Collections.singletonList(KeyValue.pair(restoredKv.key, restoredKv.value)))); } + @Test + public void shouldCheckpointRestoredOffsetsToFile() throws IOException { + stateManager.initialize(context); + final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + initializeConsumer(10, 1, t1); + stateManager.register(store1, false, stateRestoreCallback); + stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + + final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed(); + assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 11L))); + assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap)); + } private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), @@ -487,4 +499,4 @@ public class GlobalStateManagerImplTest { restored.add(KeyValue.pair(key, value)); } } -} \ No newline at end of file +}
