Repository: kafka Updated Branches: refs/heads/trunk 5b5869383 -> d3ff902d6
MINOR: Fix restoring for source KTable Author: Guozhang Wang <[email protected]> Reviewers: Yasuhiro Matsuda Closes #860 from guozhangwang/KRestoreChangelog Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d3ff902d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d3ff902d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d3ff902d Branch: refs/heads/trunk Commit: d3ff902d6023eff257653a5dfb31f4e482204c44 Parents: 5b58693 Author: Guozhang Wang <[email protected]> Authored: Wed Feb 3 20:42:43 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Feb 3 20:42:43 2016 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/streams/processor/TopologyBuilder.java | 2 ++ .../streams/processor/internals/ProcessorStateManager.java | 6 +++--- .../streams/processor/internals/StreamPartitionAssignor.java | 6 +++--- .../kafka/streams/processor/internals/StandbyTaskTest.java | 4 ++-- .../org/apache/kafka/test/ProcessorTopologyTestDriver.java | 2 +- 5 files changed, 11 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d3ff902d/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 7af377f..785d3e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -495,6 +495,8 @@ public class TopologyBuilder { // if the node is connected to a state, add to the state topics for (StateStoreFactory stateFactory : stateFactories.values()) { + // we store the changelog topic here without the job id prefix + // since it is within a single job and is only used for if (stateFactory.isInternal && stateFactory.users.contains(node)) { stateChangelogTopics.add(stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d3ff902d/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index b90af48..c3bd82a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -168,18 +168,18 @@ public class ProcessorStateManager { if (store.persistent()) restoreCallbacks.put(topic, stateRestoreCallback); } else { - restoreActiveState(store, stateRestoreCallback); + restoreActiveState(topic, stateRestoreCallback); } } - private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback) { + private void restoreActiveState(String topicName, StateRestoreCallback stateRestoreCallback) { // ---- try to restore the state from change-log ---- // // subscribe to the store's partition if (!restoreConsumer.subscription().isEmpty()) { throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand"); } - TopicPartition storePartition = new TopicPartition(storeChangelogTopic(this.jobId, store.name()), getPartition(store.name())); + TopicPartition storePartition = new TopicPartition(topicName, getPartition(topicName)); restoreConsumer.assign(Collections.singletonList(storePartition)); try { http://git-wip-us.apache.org/repos/asf/kafka/blob/d3ff902d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index d499534..74770a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -300,11 +300,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable stateChangelogTopicToTaskIds = new HashMap<>(); internalSourceTopicToTaskIds = new HashMap<>(); for (TaskId task : partitionsForTask.keySet()) { - for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) { - Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName); + for (String topicName : topicGroups.get(task.topicGroupId).stateChangelogTopics) { + Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topicName); if (tasks == null) { tasks = new HashSet<>(); - stateChangelogTopicToTaskIds.put(stateName, tasks); + stateChangelogTopicToTaskIds.put(topicName, tasks); } tasks.add(task); http://git-wip-us.apache.org/repos/asf/kafka/blob/d3ff902d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index ffcf9ae..fd6f49f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -59,8 +59,8 @@ public class StandbyTaskTest { private final String jobId = "test-job"; private final String storeName1 = "store1"; private final String storeName2 = "store2"; - private final String storeChangelogTopicName1 = jobId + "-" + storeName1 + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX; - private final String storeChangelogTopicName2 = jobId + "-" + storeName2 + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX; + private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1); + private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2); private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1, 1); private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2, 1); http://git-wip-us.apache.org/repos/asf/kafka/blob/d3ff902d/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index af6d51b..5edff28 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -333,7 +333,7 @@ public class ProcessorTopologyTestDriver { }; // For each store name ... for (String storeName : storeNames) { - String topicName = jobId + "-" + storeName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX; + String topicName = ProcessorStateManager.storeChangelogTopic(jobId, storeName); // Set up the restore-state topic ... // consumer.subscribe(new TopicPartition(topicName, 1)); // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
