KAFKA-5702; extract refactor StreamThread Extracted `TaskManager` to handle all task related activities. Make `StandbyTaskCreator`, `TaskCreator`, and `RebalanceListener` static classes so they must define their dependencies and can be testing independently of `StreamThread` Added interfaces between `StreamPartitionAssignor` & `StreamThread` to reduce coupling.
Author: Damian Guy <damian....@gmail.com> Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com>, Eno Thereska <eno.there...@gmail.com> Closes #3624 from dguy/stream-thread-refactor Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e69ce80 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e69ce80 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e69ce80 Branch: refs/heads/trunk Commit: 3e69ce80157eb6a5e6dd05e3be522b5208a41bc5 Parents: b2b5295 Author: Damian Guy <damian....@gmail.com> Authored: Fri Aug 11 12:14:01 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Fri Aug 11 12:14:01 2017 +0100 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 51 +- .../processor/internals/AbstractTask.java | 16 +- .../processor/internals/ChangelogReader.java | 5 + .../processor/internals/StandbyTask.java | 32 +- .../internals/StoreChangelogReader.java | 6 + .../internals/StreamPartitionAssignor.java | 50 +- .../streams/processor/internals/StreamTask.java | 20 +- .../processor/internals/StreamThread.java | 1038 ++++++--------- .../internals/StreamsMetadataState.java | 2 +- .../kafka/streams/processor/internals/Task.java | 67 + .../processor/internals/TaskManager.java | 524 ++++++++ .../processor/internals/ThreadDataProvider.java | 36 + .../internals/ThreadMetadataProvider.java | 36 + .../apache/kafka/streams/state/HostInfo.java | 3 +- .../StreamThreadStateStoreProvider.java | 4 +- .../integration/RegexSourceIntegrationTest.java | 280 ++-- .../processor/internals/AbstractTaskTest.java | 43 + .../processor/internals/StandbyTaskTest.java | 46 + .../internals/StreamPartitionAssignorTest.java | 397 ++---- .../processor/internals/StreamTaskTest.java | 92 +- .../processor/internals/StreamThreadTest.java | 1222 +++--------------- .../processor/internals/TaskManagerTest.java | 240 ++++ .../StreamThreadStateStoreProviderTest.java | 51 +- .../apache/kafka/test/MockChangelogReader.java | 5 + 24 files changed, 2038 insertions(+), 2228 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index ec09730..46f6cd8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.JmxReporter; @@ -145,6 +146,29 @@ public class KafkaStreams { private final StreamsMetadataState streamsMetadataState; private final StreamsConfig config; private final StateDirectory stateDirectory; + private StateRestoreListener globalStateRestoreListener; + private final StateRestoreListener delegatingStateRestoreListener = new StateRestoreListener() { + @Override + public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { + if (globalStateRestoreListener != null) { + globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); + } + } + + @Override + public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) { + if (globalStateRestoreListener != null) { + globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); + } + } + + @Override + public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) { + if (globalStateRestoreListener != null) { + globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); + } + } + }; // container states /** @@ -525,18 +549,19 @@ public class KafkaStreams { globalThreadState = globalStreamThread.state(); } + for (int i = 0; i < threads.length; i++) { - threads[i] = new StreamThread(internalTopologyBuilder, - config, - clientSupplier, - applicationId, - clientId, - processId, - metrics, - time, - streamsMetadataState, - cacheSizeBytes, - stateDirectory); + threads[i] = StreamThread.create(internalTopologyBuilder, + config, + clientSupplier, + processId, + clientId, + metrics, + time, + streamsMetadataState, + cacheSizeBytes, + stateDirectory, + delegatingStateRestoreListener); threadState.put(threads[i].getId(), threads[i].state()); storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); } @@ -823,9 +848,7 @@ public class KafkaStreams { public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) { synchronized (stateLock) { if (state == State.CREATED) { - for (StreamThread thread : threads) { - thread.setGlobalStateRestoreListener(globalStateRestoreListener); - } + this.globalStateRestoreListener = globalStateRestoreListener; } else { throw new IllegalStateException("Can only set the GlobalRestoreListener in the CREATED state"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 8427e11..2688a8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -37,7 +37,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -public abstract class AbstractTask { +public abstract class AbstractTask implements Task { private static final Logger log = LoggerFactory.getLogger(AbstractTask.class); final TaskId id; @@ -87,32 +87,32 @@ public abstract class AbstractTask { } } - public abstract void resume(); - - public abstract void commit(); - public abstract void suspend(); - public abstract void close(final boolean clean); - - public final TaskId id() { + @Override + public TaskId id() { return id; } + @Override public final String applicationId() { return applicationId; } + @Override public final Set<TopicPartition> partitions() { return partitions; } + @Override public final ProcessorTopology topology() { return topology; } + @Override public final ProcessorContext context() { return processorContext; } + @Override public StateStore getStore(final String name) { return stateMgr.getStore(name); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java index 2e006a0..f06f760 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java @@ -49,4 +49,9 @@ public interface ChangelogReader { * @return the restored offsets for all persistent stores. */ Map<TopicPartition, Long> restoredOffsets(); + + /** + * Clear out any internal state so this can be re-used + */ + void clear(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 754700f..98a907b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -133,6 +133,26 @@ public class StandbyTask extends AbstractTask { } } + @Override + public void closeSuspended(final boolean clean, final RuntimeException e) { + throw new UnsupportedOperationException("closeSuspended not supported by StandbyTask"); + } + + @Override + public boolean maybePunctuateStreamTime() { + throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask"); + } + + @Override + public boolean maybePunctuateSystemTime() { + throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask"); + } + + @Override + public boolean commitNeeded() { + return false; + } + /** * Updates a state store using records from one change log partition * @@ -144,8 +164,18 @@ public class StandbyTask extends AbstractTask { return stateMgr.updateStandbyStates(partition, records); } - Map<TopicPartition, Long> checkpointedOffsets() { + @Override + public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) { + throw new UnsupportedOperationException("addRecords not supported by StandbyTask"); + } + + public Map<TopicPartition, Long> checkpointedOffsets() { return checkpointedOffsets; } + @Override + public boolean process() { + throw new UnsupportedOperationException("process not supported by StandbyTask"); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 842721d..1887e73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -186,6 +186,12 @@ public class StoreChangelogReader implements ChangelogReader { return restoredOffsets; } + @Override + public void clear() { + partitionInfo.clear(); + stateRestorers.clear(); + } + private void restorePartition(final Map<TopicPartition, Long> endOffsets, final ConsumerRecords<byte[], byte[]> allRecords, final Iterator<TopicPartition> partitionIterator) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index ebdd64d..d479a72 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -53,7 +53,7 @@ import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT; -public class StreamPartitionAssignor implements PartitionAssignor, Configurable { +public class StreamPartitionAssignor implements PartitionAssignor, Configurable, ThreadMetadataProvider { private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class); private Time time = Time.SYSTEM; @@ -168,7 +168,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } }; - private StreamThread streamThread; + private ThreadDataProvider threadDataProvider; private String userEndPoint; private int numStandbyReplicas; @@ -207,16 +207,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable throw ex; } - if (!(o instanceof StreamThread)) { - KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), StreamThread.class.getName())); + if (!(o instanceof ThreadDataProvider)) { + KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), ThreadDataProvider.class.getName())); log.error(ex.getMessage(), ex); throw ex; } - streamThread = (StreamThread) o; - streamThread.setPartitionAssignor(this); + threadDataProvider = (ThreadDataProvider) o; + threadDataProvider.setThreadMetadataProvider(this); - logPrefix = String.format("stream-thread [%s]", streamThread.getName()); + logPrefix = String.format("stream-thread [%s]", threadDataProvider.name()); String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG); if (userEndPoint != null && !userEndPoint.isEmpty()) { @@ -237,13 +237,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } internalTopicManager = new InternalTopicManager( - StreamsKafkaClient.create(this.streamThread.config), + StreamsKafkaClient.create(this.threadDataProvider.config()), configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1, configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ? (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time); - this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(streamThread.getName()); + this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(threadDataProvider.name()); } @Override @@ -258,13 +258,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable // 2. Task ids of previously running tasks // 3. Task ids of valid local states on the client's state directory. - final Set<TaskId> previousActiveTasks = streamThread.prevActiveTasks(); - Set<TaskId> standbyTasks = streamThread.cachedTasks(); + final Set<TaskId> previousActiveTasks = threadDataProvider.prevActiveTasks(); + Set<TaskId> standbyTasks = threadDataProvider.cachedTasks(); standbyTasks.removeAll(previousActiveTasks); - SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, previousActiveTasks, standbyTasks, this.userEndPoint); + SubscriptionInfo data = new SubscriptionInfo(threadDataProvider.processId(), previousActiveTasks, standbyTasks, this.userEndPoint); - if (streamThread.builder.sourceTopicPattern() != null && - !streamThread.builder.subscriptionUpdates().getUpdates().equals(topics)) { + if (threadDataProvider.builder().sourceTopicPattern() != null && + !threadDataProvider.builder().subscriptionUpdates().getUpdates().equals(topics)) { updateSubscribedTopics(topics); } @@ -276,7 +276,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable log.debug("{} found {} topics possibly matching regex", logPrefix, topics); // update the topic groups with the returned subscription set for regex pattern subscriptions subscriptionUpdates.updateTopics(topics); - streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName()); + threadDataProvider.builder().updateSubscriptions(subscriptionUpdates, threadDataProvider.name()); } /* @@ -329,7 +329,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable // parse the topology to determine the repartition source topics, // making sure they are created with the number of partitions as // the maximum of the depending sub-topologies source topics' number of partitions - Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups(); + Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = threadDataProvider.builder().topicGroups(); Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>(); for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { @@ -401,7 +401,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable // ensure the co-partitioning topics within the group have the same number of partitions, // and enforce the number of partitions for those repartition topics to be the same if they // are co-partitioned as well. - ensureCopartitioning(streamThread.builder.copartitionGroups(), repartitionTopicMetadata, metadata); + ensureCopartitioning(threadDataProvider.builder().copartitionGroups(), repartitionTopicMetadata, metadata); // make sure the repartition source topics exist with the right number of partitions, // create these topics if necessary @@ -421,7 +421,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics); } - Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups( + Map<TaskId, Set<TopicPartition>> partitionsForTask = threadDataProvider.partitionGrouper().partitionGroups( sourceTopicsByGroup, metadataWithInternalTopics); // check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks @@ -627,13 +627,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } private void checkForNewTopicAssignments(Assignment assignment) { - if (streamThread.builder.sourceTopicPattern() != null) { + if (threadDataProvider.builder().sourceTopicPattern() != null) { final Set<String> assignedTopics = new HashSet<>(); for (final TopicPartition topicPartition : assignment.partitions()) { assignedTopics.add(topicPartition.topic()); } - if (!streamThread.builder.subscriptionUpdates().getUpdates().containsAll(assignedTopics)) { - assignedTopics.addAll(streamThread.builder.subscriptionUpdates().getUpdates()); + if (!threadDataProvider.builder().subscriptionUpdates().getUpdates().containsAll(assignedTopics)) { + assignedTopics.addAll(threadDataProvider.builder().subscriptionUpdates().getUpdates()); updateSubscribedTopics(assignedTopics); } } @@ -702,28 +702,28 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } } - Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() { + public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() { if (partitionsByHostState == null) { return Collections.emptyMap(); } return Collections.unmodifiableMap(partitionsByHostState); } - Cluster clusterMetadata() { + public Cluster clusterMetadata() { if (metadataWithInternalTopics == null) { return Cluster.empty(); } return metadataWithInternalTopics; } - Map<TaskId, Set<TopicPartition>> activeTasks() { + public Map<TaskId, Set<TopicPartition>> activeTasks() { if (activeTasks == null) { return Collections.emptyMap(); } return Collections.unmodifiableMap(activeTasks); } - Map<TaskId, Set<TopicPartition>> standbyTasks() { + public Map<TaskId, Set<TopicPartition>> standbyTasks() { if (standbyTasks == null) { return Collections.emptyMap(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d4bd668..697bda8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import static java.lang.String.format; @@ -400,7 +401,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } // helper to avoid calling suspend() twice if a suspended task is not reassigned and closed - void closeSuspended(boolean clean, RuntimeException firstException) { + public void closeSuspended(boolean clean, RuntimeException firstException) { try { closeStateManager(clean); } catch (final RuntimeException e) { @@ -437,6 +438,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } } + @Override + public Map<TopicPartition, Long> checkpointedOffsets() { + throw new UnsupportedOperationException("checkpointedOffsets is not supported by StreamTasks"); + } + /** * <pre> * - {@link #suspend(boolean) suspend(clean)} @@ -529,7 +535,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * current partition group timestamp has reached the defined stamp * Note, this is only called in the presence of new records */ - boolean maybePunctuateStreamTime() { + public boolean maybePunctuateStreamTime() { final long timestamp = partitionGroup.timestamp(); // if the timestamp is not known yet, meaning there is not enough data accumulated @@ -546,11 +552,17 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * current system timestamp has reached the defined stamp * Note, this is called irrespective of the presence of new records */ - boolean maybePunctuateSystemTime() { + public boolean maybePunctuateSystemTime() { final long timestamp = time.milliseconds(); return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.SYSTEM_TIME, this); } + + @Override + public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, final List<ConsumerRecord<byte[], byte[]>> remaining) { + throw new UnsupportedOperationException("update is not implemented"); + } + /** * Request committing the current task's state */ @@ -561,7 +573,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator /** * Whether or not a request has been made to commit the current state */ - boolean commitNeeded() { + public boolean commitNeeded() { return commitRequested; }