KAFKA-5702; extract refactor StreamThread

Extracted `TaskManager` to handle all task related activities.
Make `StandbyTaskCreator`, `TaskCreator`, and `RebalanceListener` static 
classes so they must define their dependencies and can be testing independently 
of `StreamThread`
Added interfaces between `StreamPartitionAssignor` & `StreamThread` to reduce 
coupling.

Author: Damian Guy <damian....@gmail.com>

Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com>, 
Eno Thereska <eno.there...@gmail.com>

Closes #3624 from dguy/stream-thread-refactor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e69ce80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e69ce80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e69ce80

Branch: refs/heads/trunk
Commit: 3e69ce80157eb6a5e6dd05e3be522b5208a41bc5
Parents: b2b5295
Author: Damian Guy <damian....@gmail.com>
Authored: Fri Aug 11 12:14:01 2017 +0100
Committer: Damian Guy <damian....@gmail.com>
Committed: Fri Aug 11 12:14:01 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |   51 +-
 .../processor/internals/AbstractTask.java       |   16 +-
 .../processor/internals/ChangelogReader.java    |    5 +
 .../processor/internals/StandbyTask.java        |   32 +-
 .../internals/StoreChangelogReader.java         |    6 +
 .../internals/StreamPartitionAssignor.java      |   50 +-
 .../streams/processor/internals/StreamTask.java |   20 +-
 .../processor/internals/StreamThread.java       | 1038 ++++++---------
 .../internals/StreamsMetadataState.java         |    2 +-
 .../kafka/streams/processor/internals/Task.java |   67 +
 .../processor/internals/TaskManager.java        |  524 ++++++++
 .../processor/internals/ThreadDataProvider.java |   36 +
 .../internals/ThreadMetadataProvider.java       |   36 +
 .../apache/kafka/streams/state/HostInfo.java    |    3 +-
 .../StreamThreadStateStoreProvider.java         |    4 +-
 .../integration/RegexSourceIntegrationTest.java |  280 ++--
 .../processor/internals/AbstractTaskTest.java   |   43 +
 .../processor/internals/StandbyTaskTest.java    |   46 +
 .../internals/StreamPartitionAssignorTest.java  |  397 ++----
 .../processor/internals/StreamTaskTest.java     |   92 +-
 .../processor/internals/StreamThreadTest.java   | 1222 +++---------------
 .../processor/internals/TaskManagerTest.java    |  240 ++++
 .../StreamThreadStateStoreProviderTest.java     |   51 +-
 .../apache/kafka/test/MockChangelogReader.java  |    5 +
 24 files changed, 2038 insertions(+), 2228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ec09730..46f6cd8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -145,6 +146,29 @@ public class KafkaStreams {
     private final StreamsMetadataState streamsMetadataState;
     private final StreamsConfig config;
     private final StateDirectory stateDirectory;
+    private StateRestoreListener globalStateRestoreListener;
+    private final StateRestoreListener delegatingStateRestoreListener = new 
StateRestoreListener() {
+        @Override
+        public void onRestoreStart(final TopicPartition topicPartition, final 
String storeName, final long startingOffset, final long endingOffset) {
+            if (globalStateRestoreListener != null) {
+                globalStateRestoreListener.onRestoreStart(topicPartition, 
storeName, startingOffset, endingOffset);
+            }
+        }
+
+        @Override
+        public void onBatchRestored(final TopicPartition topicPartition, final 
String storeName, final long batchEndOffset, final long numRestored) {
+            if (globalStateRestoreListener != null) {
+                globalStateRestoreListener.onBatchRestored(topicPartition, 
storeName, batchEndOffset, numRestored);
+            }
+        }
+
+        @Override
+        public void onRestoreEnd(final TopicPartition topicPartition, final 
String storeName, final long totalRestored) {
+            if (globalStateRestoreListener != null) {
+                globalStateRestoreListener.onRestoreEnd(topicPartition, 
storeName, totalRestored);
+            }
+        }
+    };
 
     // container states
     /**
@@ -525,18 +549,19 @@ public class KafkaStreams {
             globalThreadState = globalStreamThread.state();
         }
 
+
         for (int i = 0; i < threads.length; i++) {
-            threads[i] = new StreamThread(internalTopologyBuilder,
-                                          config,
-                                          clientSupplier,
-                                          applicationId,
-                                          clientId,
-                                          processId,
-                                          metrics,
-                                          time,
-                                          streamsMetadataState,
-                                          cacheSizeBytes,
-                                          stateDirectory);
+            threads[i] = StreamThread.create(internalTopologyBuilder,
+                                             config,
+                                             clientSupplier,
+                                             processId,
+                                             clientId,
+                                             metrics,
+                                             time,
+                                             streamsMetadataState,
+                                             cacheSizeBytes,
+                                             stateDirectory,
+                                             delegatingStateRestoreListener);
             threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
@@ -823,9 +848,7 @@ public class KafkaStreams {
     public void setGlobalStateRestoreListener(final StateRestoreListener 
globalStateRestoreListener) {
         synchronized (stateLock) {
             if (state == State.CREATED) {
-                for (StreamThread thread : threads) {
-                    
thread.setGlobalStateRestoreListener(globalStateRestoreListener);
-                }
+                this.globalStateRestoreListener = globalStateRestoreListener;
             } else {
                 throw new IllegalStateException("Can only set the 
GlobalRestoreListener in the CREATED state");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 8427e11..2688a8f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -37,7 +37,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-public abstract class AbstractTask {
+public abstract class AbstractTask implements Task {
     private static final Logger log = 
LoggerFactory.getLogger(AbstractTask.class);
 
     final TaskId id;
@@ -87,32 +87,32 @@ public abstract class AbstractTask {
         }
     }
 
-    public abstract void resume();
-
-    public abstract void commit();
-    public abstract void suspend();
-    public abstract void close(final boolean clean);
-
-    public final TaskId id() {
+    @Override
+    public TaskId id() {
         return id;
     }
 
+    @Override
     public final String applicationId() {
         return applicationId;
     }
 
+    @Override
     public final Set<TopicPartition> partitions() {
         return partitions;
     }
 
+    @Override
     public final ProcessorTopology topology() {
         return topology;
     }
 
+    @Override
     public final ProcessorContext context() {
         return processorContext;
     }
 
+    @Override
     public StateStore getStore(final String name) {
         return stateMgr.getStore(name);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 2e006a0..f06f760 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -49,4 +49,9 @@ public interface ChangelogReader {
      * @return the restored offsets for all persistent stores.
      */
     Map<TopicPartition, Long> restoredOffsets();
+
+    /**
+     * Clear out any internal state so this can be re-used
+     */
+    void clear();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 754700f..98a907b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -133,6 +133,26 @@ public class StandbyTask extends AbstractTask {
         }
     }
 
+    @Override
+    public void closeSuspended(final boolean clean, final RuntimeException e) {
+        throw new UnsupportedOperationException("closeSuspended not supported 
by StandbyTask");
+    }
+
+    @Override
+    public boolean maybePunctuateStreamTime() {
+        throw new UnsupportedOperationException("maybePunctuateStreamTime not 
supported by StandbyTask");
+    }
+
+    @Override
+    public boolean maybePunctuateSystemTime() {
+        throw new UnsupportedOperationException("maybePunctuateSystemTime not 
supported by StandbyTask");
+    }
+
+    @Override
+    public boolean commitNeeded() {
+        return false;
+    }
+
     /**
      * Updates a state store using records from one change log partition
      *
@@ -144,8 +164,18 @@ public class StandbyTask extends AbstractTask {
         return stateMgr.updateStandbyStates(partition, records);
     }
 
-    Map<TopicPartition, Long> checkpointedOffsets() {
+    @Override
+    public int addRecords(final TopicPartition partition, final 
Iterable<ConsumerRecord<byte[], byte[]>> records) {
+        throw new UnsupportedOperationException("addRecords not supported by 
StandbyTask");
+    }
+
+    public Map<TopicPartition, Long> checkpointedOffsets() {
         return checkpointedOffsets;
     }
 
+    @Override
+    public boolean process() {
+        throw new UnsupportedOperationException("process not supported by 
StandbyTask");
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 842721d..1887e73 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -186,6 +186,12 @@ public class StoreChangelogReader implements 
ChangelogReader {
         return restoredOffsets;
     }
 
+    @Override
+    public void clear() {
+        partitionInfo.clear();
+        stateRestorers.clear();
+    }
+
     private void restorePartition(final Map<TopicPartition, Long> endOffsets,
                                   final ConsumerRecords<byte[], byte[]> 
allRecords,
                                   final Iterator<TopicPartition> 
partitionIterator) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index ebdd64d..d479a72 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -53,7 +53,7 @@ import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 import static 
org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
 
-public class StreamPartitionAssignor implements PartitionAssignor, 
Configurable {
+public class StreamPartitionAssignor implements PartitionAssignor, 
Configurable, ThreadMetadataProvider {
 
     private static final Logger log = 
LoggerFactory.getLogger(StreamPartitionAssignor.class);
     private Time time = Time.SYSTEM;
@@ -168,7 +168,7 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         }
     };
 
-    private StreamThread streamThread;
+    private ThreadDataProvider threadDataProvider;
 
     private String userEndPoint;
     private int numStandbyReplicas;
@@ -207,16 +207,16 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
             throw ex;
         }
 
-        if (!(o instanceof StreamThread)) {
-            KafkaException ex = new KafkaException(String.format("%s is not an 
instance of %s", o.getClass().getName(), StreamThread.class.getName()));
+        if (!(o instanceof ThreadDataProvider)) {
+            KafkaException ex = new KafkaException(String.format("%s is not an 
instance of %s", o.getClass().getName(), ThreadDataProvider.class.getName()));
             log.error(ex.getMessage(), ex);
             throw ex;
         }
 
-        streamThread = (StreamThread) o;
-        streamThread.setPartitionAssignor(this);
+        threadDataProvider = (ThreadDataProvider) o;
+        threadDataProvider.setThreadMetadataProvider(this);
 
-        logPrefix = String.format("stream-thread [%s]", 
streamThread.getName());
+        logPrefix = String.format("stream-thread [%s]", 
threadDataProvider.name());
 
         String userEndPoint = (String) 
configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
         if (userEndPoint != null && !userEndPoint.isEmpty()) {
@@ -237,13 +237,13 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         }
 
         internalTopicManager = new InternalTopicManager(
-                StreamsKafkaClient.create(this.streamThread.config),
+                StreamsKafkaClient.create(this.threadDataProvider.config()),
                 configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? 
(Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
                 
configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
 ?
                         (Long) 
configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
                         : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, 
time);
 
-        this.copartitionedTopicsValidator = new 
CopartitionedTopicsValidator(streamThread.getName());
+        this.copartitionedTopicsValidator = new 
CopartitionedTopicsValidator(threadDataProvider.name());
     }
 
     @Override
@@ -258,13 +258,13 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         // 2. Task ids of previously running tasks
         // 3. Task ids of valid local states on the client's state directory.
 
-        final Set<TaskId> previousActiveTasks = streamThread.prevActiveTasks();
-        Set<TaskId> standbyTasks = streamThread.cachedTasks();
+        final Set<TaskId> previousActiveTasks = 
threadDataProvider.prevActiveTasks();
+        Set<TaskId> standbyTasks = threadDataProvider.cachedTasks();
         standbyTasks.removeAll(previousActiveTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, 
previousActiveTasks, standbyTasks, this.userEndPoint);
+        SubscriptionInfo data = new 
SubscriptionInfo(threadDataProvider.processId(), previousActiveTasks, 
standbyTasks, this.userEndPoint);
 
-        if (streamThread.builder.sourceTopicPattern() != null &&
-            
!streamThread.builder.subscriptionUpdates().getUpdates().equals(topics)) {
+        if (threadDataProvider.builder().sourceTopicPattern() != null &&
+            
!threadDataProvider.builder().subscriptionUpdates().getUpdates().equals(topics))
 {
             updateSubscribedTopics(topics);
         }
 
@@ -276,7 +276,7 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         log.debug("{} found {} topics possibly matching regex", logPrefix, 
topics);
         // update the topic groups with the returned subscription set for 
regex pattern subscriptions
         subscriptionUpdates.updateTopics(topics);
-        streamThread.builder.updateSubscriptions(subscriptionUpdates, 
streamThread.getName());
+        threadDataProvider.builder().updateSubscriptions(subscriptionUpdates, 
threadDataProvider.name());
     }
 
     /*
@@ -329,7 +329,7 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         // parse the topology to determine the repartition source topics,
         // making sure they are created with the number of partitions as
         // the maximum of the depending sub-topologies source topics' number 
of partitions
-        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = 
streamThread.builder.topicGroups();
+        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = 
threadDataProvider.builder().topicGroups();
 
         Map<String, InternalTopicMetadata> repartitionTopicMetadata = new 
HashMap<>();
         for (InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
@@ -401,7 +401,7 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         // ensure the co-partitioning topics within the group have the same 
number of partitions,
         // and enforce the number of partitions for those repartition topics 
to be the same if they
         // are co-partitioned as well.
-        ensureCopartitioning(streamThread.builder.copartitionGroups(), 
repartitionTopicMetadata, metadata);
+        ensureCopartitioning(threadDataProvider.builder().copartitionGroups(), 
repartitionTopicMetadata, metadata);
 
         // make sure the repartition source topics exist with the right number 
of partitions,
         // create these topics if necessary
@@ -421,7 +421,7 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
             sourceTopicsByGroup.put(entry.getKey(), 
entry.getValue().sourceTopics);
         }
 
-        Map<TaskId, Set<TopicPartition>> partitionsForTask = 
streamThread.partitionGrouper.partitionGroups(
+        Map<TaskId, Set<TopicPartition>> partitionsForTask = 
threadDataProvider.partitionGrouper().partitionGroups(
                 sourceTopicsByGroup, metadataWithInternalTopics);
 
         // check if all partitions are assigned, and there are no duplicates 
of partitions in multiple tasks
@@ -627,13 +627,13 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
     }
 
     private void checkForNewTopicAssignments(Assignment assignment) {
-        if (streamThread.builder.sourceTopicPattern() != null) {
+        if (threadDataProvider.builder().sourceTopicPattern() != null) {
             final Set<String> assignedTopics = new HashSet<>();
             for (final TopicPartition topicPartition : 
assignment.partitions()) {
                 assignedTopics.add(topicPartition.topic());
             }
-            if 
(!streamThread.builder.subscriptionUpdates().getUpdates().containsAll(assignedTopics))
 {
-                
assignedTopics.addAll(streamThread.builder.subscriptionUpdates().getUpdates());
+            if 
(!threadDataProvider.builder().subscriptionUpdates().getUpdates().containsAll(assignedTopics))
 {
+                
assignedTopics.addAll(threadDataProvider.builder().subscriptionUpdates().getUpdates());
                 updateSubscribedTopics(assignedTopics);
             }
         }
@@ -702,28 +702,28 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         }
     }
 
-    Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
+    public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
         if (partitionsByHostState == null) {
             return Collections.emptyMap();
         }
         return Collections.unmodifiableMap(partitionsByHostState);
     }
 
-    Cluster clusterMetadata() {
+    public Cluster clusterMetadata() {
         if (metadataWithInternalTopics == null) {
             return Cluster.empty();
         }
         return metadataWithInternalTopics;
     }
 
-    Map<TaskId, Set<TopicPartition>> activeTasks() {
+    public Map<TaskId, Set<TopicPartition>> activeTasks() {
         if (activeTasks == null) {
             return Collections.emptyMap();
         }
         return Collections.unmodifiableMap(activeTasks);
     }
 
-    Map<TaskId, Set<TopicPartition>> standbyTasks() {
+    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
         if (standbyTasks == null) {
             return Collections.emptyMap();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d4bd668..697bda8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static java.lang.String.format;
@@ -400,7 +401,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     }
 
     // helper to avoid calling suspend() twice if a suspended task is not 
reassigned and closed
-    void closeSuspended(boolean clean, RuntimeException firstException) {
+    public void closeSuspended(boolean clean, RuntimeException firstException) 
{
         try {
             closeStateManager(clean);
         } catch (final RuntimeException e) {
@@ -437,6 +438,11 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
         }
     }
 
+    @Override
+    public Map<TopicPartition, Long> checkpointedOffsets() {
+        throw new UnsupportedOperationException("checkpointedOffsets is not 
supported by StreamTasks");
+    }
+
     /**
      * <pre>
      * - {@link #suspend(boolean) suspend(clean)}
@@ -529,7 +535,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
      * current partition group timestamp has reached the defined stamp
      * Note, this is only called in the presence of new records
      */
-    boolean maybePunctuateStreamTime() {
+    public boolean maybePunctuateStreamTime() {
         final long timestamp = partitionGroup.timestamp();
 
         // if the timestamp is not known yet, meaning there is not enough data 
accumulated
@@ -546,11 +552,17 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
      * current system timestamp has reached the defined stamp
      * Note, this is called irrespective of the presence of new records
      */
-    boolean maybePunctuateSystemTime() {
+    public boolean maybePunctuateSystemTime() {
         final long timestamp = time.milliseconds();
 
         return systemTimePunctuationQueue.mayPunctuate(timestamp, 
PunctuationType.SYSTEM_TIME, this);
     }
+
+    @Override
+    public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition 
partition, final List<ConsumerRecord<byte[], byte[]>> remaining) {
+        throw new UnsupportedOperationException("update is not implemented");
+    }
+
     /**
      * Request committing the current task's state
      */
@@ -561,7 +573,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     /**
      * Whether or not a request has been made to commit the current state
      */
-    boolean commitNeeded() {
+    public boolean commitNeeded() {
         return commitRequested;
     }
 

Reply via email to