Repository: kafka Updated Branches: refs/heads/trunk 9b47f9a7f -> 7802a90ed
KAFKA-3207: Fix StateChangeLogger to use the right topic name Author: Guozhang Wang <[email protected]> Reviewers: Yasuhiro Matsuda Closes #865 from guozhangwang/K3207 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7802a90e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7802a90e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7802a90e Branch: refs/heads/trunk Commit: 7802a90ed98ea5b9a2b2dcf2e04db1a50e34a2f8 Parents: 9b47f9a Author: Guozhang Wang <[email protected]> Authored: Thu Feb 4 14:51:10 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Feb 4 14:51:10 2016 -0800 ---------------------------------------------------------------------- .../kafka/streams/processor/ProcessorContext.java | 15 ++++++++++++++- .../streams/processor/internals/AbstractTask.java | 6 ++++++ .../processor/internals/ProcessorContextImpl.java | 12 +++++++++--- .../processor/internals/StandbyContextImpl.java | 15 ++++++++++++--- .../streams/processor/internals/StandbyTask.java | 2 +- .../state/internals/InMemoryKeyValueLoggedStore.java | 10 +++++----- .../state/internals/RawStoreChangeLogger.java | 8 ++++---- .../streams/state/internals/StoreChangeLogger.java | 13 +++++++------ .../kafka/streams/state/KeyValueStoreTestDriver.java | 2 +- .../org/apache/kafka/test/MockProcessorContext.java | 7 ++++++- 10 files changed, 65 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 9740fa3..79376ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -29,11 +29,18 @@ import java.io.File; public interface ProcessorContext { /** + * Returns the job id + * + * @return the job id + */ + String jobId(); + + /** * Returns the task id * * @return the task id */ - TaskId id(); + TaskId taskId(); /** * Returns the key serializer @@ -84,6 +91,12 @@ public interface ProcessorContext { */ void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback); + /** + * Get the state store given the store name. + * + * @param name The store name + * @return The state store instance + */ StateStore getStateStore(String name); /** http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 46dd738..162a926 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -36,6 +36,7 @@ import java.util.Set; public abstract class AbstractTask { protected final TaskId id; + protected final String jobId; protected final ProcessorTopology topology; protected final Consumer consumer; protected final ProcessorStateManager stateMgr; @@ -51,6 +52,7 @@ public abstract class AbstractTask { StreamsConfig config, boolean isStandby) { this.id = id; + this.jobId = jobId; this.partitions = new HashSet<>(partitions); this.topology = topology; this.consumer = consumer; @@ -77,6 +79,10 @@ public abstract class AbstractTask { return id; } + public final String jobId() { + return jobId; + } + public final Set<TopicPartition> partitions() { return this.partitions; } http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 4b72394..c4acc01 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -74,12 +74,18 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S this.initialized = true; } - public TaskId id() { + public ProcessorStateManager getStateMgr() { + return stateMgr; + } + + @Override + public TaskId taskId() { return id; } - public ProcessorStateManager getStateMgr() { - return stateMgr; + @Override + public String jobId() { + return task.jobId(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 133d597..82633b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -35,6 +35,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class); private final TaskId id; + private final String jobId; private final StreamsMetrics metrics; private final ProcessorStateManager stateMgr; @@ -46,10 +47,12 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup private boolean initialized; public StandbyContextImpl(TaskId id, + String jobId, StreamsConfig config, ProcessorStateManager stateMgr, StreamsMetrics metrics) { this.id = id; + this.jobId = jobId; this.metrics = metrics; this.stateMgr = stateMgr; @@ -65,12 +68,18 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup this.initialized = true; } - public TaskId id() { + public ProcessorStateManager getStateMgr() { + return stateMgr; + } + + @Override + public TaskId taskId() { return id; } - public ProcessorStateManager getStateMgr() { - return stateMgr; + @Override + public String jobId() { + return jobId; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 861b830..7b6ab8c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -64,7 +64,7 @@ public class StandbyTask extends AbstractTask { super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true); // initialize the topology with its own context - this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); + this.processorContext = new StandbyContextImpl(id, jobId, config, stateMgr, metrics); initializeStateStores(); http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java index 5be6483..94349bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java @@ -29,25 +29,25 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { private final KeyValueStore<K, V> inner; private final Serdes<K, V> serdes; - private final String topic; + private final String storeName; private StoreChangeLogger<K, V> changeLogger; private StoreChangeLogger.ValueGetter<K, V> getter; - public InMemoryKeyValueLoggedStore(final String topic, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) { - this.topic = topic; + public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) { + this.storeName = storeName; this.inner = inner; this.serdes = serdes; } @Override public String name() { - return this.topic; + return this.storeName; } @Override public void init(ProcessorContext context) { - this.changeLogger = new StoreChangeLogger<>(topic, context, serdes); + this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes); inner.init(context); http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java index cff9d6b..4d99b59 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java @@ -39,12 +39,12 @@ public class RawStoreChangeLogger extends StoreChangeLogger<byte[], byte[]> { } } - public RawStoreChangeLogger(String topic, ProcessorContext context) { - this(topic, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE); + public RawStoreChangeLogger(String storeName, ProcessorContext context) { + this(storeName, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE); } - public RawStoreChangeLogger(String topic, ProcessorContext context, int maxDirty, int maxRemoved) { - super(topic, context, context.id().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved); + public RawStoreChangeLogger(String storeName, ProcessorContext context, int maxDirty, int maxRemoved) { + super(storeName, context, context.taskId().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved); init(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java index b330334..3bbd522 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.Serdes; @@ -46,17 +47,17 @@ public class StoreChangeLogger<K, V> { protected Set<K> dirty; protected Set<K> removed; - public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) { - this(topic, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE); + public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization) { + this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE); } - public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) { - this(topic, context, context.id().partition, serialization, maxDirty, maxRemoved); + public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) { + this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved); init(); } - protected StoreChangeLogger(String topic, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) { - this.topic = topic; + protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) { + this.topic = ProcessorStateManager.storeChangelogTopic(context.jobId(), storeName); this.context = context; this.partition = partition; this.serialization = serialization; http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index daa7201..d8b034f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -284,7 +284,7 @@ public class KeyValueStoreTestDriver<K, V> { this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(), serdes.valueDeserializer(), recordCollector) { @Override - public TaskId id() { + public TaskId taskId() { return new TaskId(0, 1); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7802a90e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index 31b8335..d597fd2 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -101,11 +101,16 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S } @Override - public TaskId id() { + public TaskId taskId() { return new TaskId(0, 0); } @Override + public String jobId() { + return "mockJob"; + } + + @Override public Serializer<?> keySerializer() { return keySerializer; }
