This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 283a194  KAFKA-3514: Part III, Refactor StreamThread main loop (#5428)
283a194 is described below

commit 283a19481d1ce4a77f5f465e7b96288db22a8ff1
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Tue Sep 11 16:16:52 2018 -0700

    KAFKA-3514: Part III, Refactor StreamThread main loop (#5428)
    
    * Refactor the StreamThread main loop, in the following:
    
    1. Fetch from consumer and enqueue data to tasks.
    2. Check if any tasks should be enforced process.
    3/ Loop over processable tasks and process them for N iterations, and then 
check for 1) commit, 2) punctuate, 3) need to call consumer.poll
    4. Even if there is not data to process in this iteration, still need to 
check if commit / punctuate is needed
    5. Finally, try update standby tasks.
    
    *Add an optimization to only commit when it is needed (i.e. at least some 
process() or punctuate() was triggered since last commit).
    
    *Found and fixed a ProducerFencedException scenario: producer.send() call 
would never throw a ProducerFencedException directly, but it may throw a 
KafkaException whose "cause" is a ProducerFencedException.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>, John Roesler 
<j...@confluent.io>, Bill Bejeck <b...@confluent.io>
---
 .../kafka/clients/consumer/ConsumerConfig.java     |   2 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  12 +-
 .../streams/processor/internals/AbstractTask.java  |   8 +-
 .../processor/internals/AssignedStreamsTasks.java  |  66 +++---
 .../streams/processor/internals/AssignedTasks.java |  39 ++--
 .../processor/internals/PartitionGroup.java        |   2 +-
 .../processor/internals/ProcessorContextImpl.java  |   2 +-
 .../processor/internals/ProcessorStateManager.java |   1 +
 .../streams/processor/internals/RecordQueue.java   |   6 +-
 .../processor/internals/StandbyContextImpl.java    |   2 +-
 .../streams/processor/internals/StandbyTask.java   |   7 +
 .../streams/processor/internals/StreamTask.java    | 148 ++++++------
 .../streams/processor/internals/StreamThread.java  | 247 +++++++++++----------
 .../kafka/streams/processor/internals/Task.java    |   2 +
 .../streams/processor/internals/TaskManager.java   |   8 +-
 .../KStreamAggregationDedupIntegrationTest.java    | 173 +++++----------
 .../integration/QueryableStateIntegrationTest.java |  23 +-
 .../integration/utils/IntegrationTestUtils.java    |  19 +-
 .../internals/AssignedStreamsTasksTest.java        |  36 +--
 .../processor/internals/RecordQueueTest.java       |   4 +-
 .../processor/internals/StreamTaskTest.java        | 126 ++++++++---
 .../processor/internals/StreamThreadTest.java      | 202 +++++++++++++----
 .../processor/internals/TaskManagerTest.java       |   8 +-
 .../java/org/apache/kafka/test/MockProcessor.java  |  10 +
 24 files changed, 663 insertions(+), 490 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 54d065f..1fcabca 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -499,7 +499,7 @@ public class ConsumerConfig extends AbstractConfig {
         super(CONFIG, props);
     }
 
-    ConsumerConfig(Map<?, ?> props, boolean doLog) {
+    protected ConsumerConfig(Map<?, ?> props, boolean doLog) {
         super(CONFIG, props, doLog);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b9eaaa6..736e9cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -278,7 +278,7 @@ public class StreamsConfig extends AbstractConfig {
     /** {@code buffered.records.per.partition} */
     @SuppressWarnings("WeakerAccess")
     public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = 
"buffered.records.per.partition";
-    private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The 
maximum number of records to buffer per partition.";
+    private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum 
number of records to buffer per partition.";
 
     /** {@code cache.max.bytes.buffering} */
     @SuppressWarnings("WeakerAccess")
@@ -298,6 +298,11 @@ public class StreamsConfig extends AbstractConfig {
         " (Note, if <code>processing.guarantee</code> is set to <code>" + 
EXACTLY_ONCE + "</code>, the default value is <code>" + 
EOS_DEFAULT_COMMIT_INTERVAL_MS + "</code>," +
         " otherwise the default value is <code>" + DEFAULT_COMMIT_INTERVAL_MS 
+ "</code>.";
 
+    /** {@code max.task.idle.ms} */
+    public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
+    private static final String MAX_TASK_IDLE_MS_DOC = "Maximum amount of time 
a stream task will stay idle when not all of its partition buffers contain 
records," +
+        " to avoid potential out-of-order record processing across multiple 
input streams.";
+
     /** {@code connections.max.idle.ms} */
     @SuppressWarnings("WeakerAccess")
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = 
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -538,6 +543,11 @@ public class StreamsConfig extends AbstractConfig {
                     1,
                     Importance.MEDIUM,
                     NUM_STREAM_THREADS_DOC)
+            .define(MAX_TASK_IDLE_MS_CONFIG,
+                    Type.LONG,
+                    0L,
+                    Importance.MEDIUM,
+                    MAX_TASK_IDLE_MS_DOC)
             .define(PROCESSING_GUARANTEE_CONFIG,
                     Type.STRING,
                     AT_LEAST_ONCE,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 94e4c71..8eb024c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -51,9 +51,11 @@ public abstract class AbstractTask implements Task {
     final boolean eosEnabled;
     final Logger log;
     final LogContext logContext;
+    final StateDirectory stateDirectory;
+
     boolean taskInitialized;
     boolean taskClosed;
-    final StateDirectory stateDirectory;
+    boolean commitNeeded;
 
     InternalProcessorContext processorContext;
 
@@ -267,6 +269,10 @@ public abstract class AbstractTask implements Task {
         return taskClosed;
     }
 
+    public boolean commitNeeded() {
+        return commitNeeded;
+    }
+
     public boolean hasStateStores() {
         return !topology.stateStores().isEmpty();
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 0a83965..1eb3ab9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -21,37 +21,14 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
 
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
 class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements 
RestoringTasks {
-    private final Logger log;
-    private final TaskAction<StreamTask> maybeCommitAction;
-    private int committed = 0;
-
     AssignedStreamsTasks(final LogContext logContext) {
         super(logContext, "stream task");
-
-        this.log = logContext.logger(getClass());
-
-        maybeCommitAction = new TaskAction<StreamTask>() {
-            @Override
-            public String name() {
-                return "maybeCommit";
-            }
-
-            @Override
-            public void apply(final StreamTask task) {
-                if (task.commitNeeded()) {
-                    committed++;
-                    task.commit();
-                    log.debug("Committed active task {} per user request in", 
task.id());
-                }
-            }
-        };
     }
 
     @Override
@@ -63,9 +40,41 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> 
implements Restorin
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
      */
-    int maybeCommit() {
-        committed = 0;
-        applyToRunningTasks(maybeCommitAction);
+    int maybeCommitPerUserRequested() {
+        int committed = 0;
+        RuntimeException firstException = null;
+
+        for (final Iterator<StreamTask> it = running().iterator(); 
it.hasNext(); ) {
+            final StreamTask task = it.next();
+            try {
+                if (task.commitRequested() && task.commitNeeded()) {
+                    task.commit();
+                    committed++;
+                    log.debug("Committed active task {} per user request in", 
task.id());
+                }
+            } catch (final TaskMigratedException e) {
+                log.info("Failed to commit {} since it got migrated to another 
thread already. " +
+                        "Closing it as zombie before triggering a new 
rebalance.", task.id());
+                final RuntimeException fatalException = closeZombieTask(task);
+                if (fatalException != null) {
+                    throw fatalException;
+                }
+                it.remove();
+                throw e;
+            } catch (final RuntimeException t) {
+                log.error("Failed to commit StreamTask {} due to the following 
error:",
+                        task.id(),
+                        t);
+                if (firstException == null) {
+                    firstException = t;
+                }
+            }
+        }
+
+        if (firstException != null) {
+            throw firstException;
+        }
+
         return committed;
     }
 
@@ -85,15 +94,14 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process() {
+    int process(final long now) {
         int processed = 0;
 
         final Iterator<Map.Entry<TaskId, StreamTask>> it = 
running.entrySet().iterator();
         while (it.hasNext()) {
             final StreamTask task = it.next().getValue();
-
             try {
-                if (task.isProcessable() && task.process()) {
+                if (task.isProcessable(now) && task.process()) {
                     processed++;
                 }
             } catch (final TaskMigratedException e) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 079d405..3cc396d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -37,14 +37,14 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 abstract class AssignedTasks<T extends Task> {
-    private final Logger log;
+    final Logger log;
     private final String taskTypeName;
-    private final TaskAction<T> commitAction;
     private final Map<TaskId, T> created = new HashMap<>();
     private final Map<TaskId, T> suspended = new HashMap<>();
     private final Map<TaskId, T> restoring = new HashMap<>();
     private final Set<TopicPartition> restoredPartitions = new HashSet<>();
     private final Set<TaskId> previousActiveTasks = new HashSet<>();
+
     // IQ may access this map.
     final Map<TaskId, T> running = new ConcurrentHashMap<>();
     private final Map<TopicPartition, T> runningByPartition = new HashMap<>();
@@ -53,20 +53,7 @@ abstract class AssignedTasks<T extends Task> {
     AssignedTasks(final LogContext logContext,
                   final String taskTypeName) {
         this.taskTypeName = taskTypeName;
-
         this.log = logContext.logger(getClass());
-
-        commitAction = new TaskAction<T>() {
-            @Override
-            public String name() {
-                return "commit";
-            }
-
-            @Override
-            public void apply(final T task) {
-                task.commit();
-            }
-        };
     }
 
     void addNewTask(final T task) {
@@ -349,17 +336,16 @@ abstract class AssignedTasks<T extends Task> {
      *                               or if the task producer got fenced (EOS)
      */
     int commit() {
-        applyToRunningTasks(commitAction);
-        return running.size();
-    }
-
-    void applyToRunningTasks(final TaskAction<T> action) {
+        int committed = 0;
         RuntimeException firstException = null;
 
         for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
             final T task = it.next();
             try {
-                action.apply(task);
+                if (task.commitNeeded()) {
+                    task.commit();
+                    committed++;
+                }
             } catch (final TaskMigratedException e) {
                 log.info("Failed to commit {} {} since it got migrated to 
another thread already. " +
                         "Closing it as zombie before triggering a new 
rebalance.", taskTypeName, task.id());
@@ -370,11 +356,10 @@ abstract class AssignedTasks<T extends Task> {
                 it.remove();
                 throw e;
             } catch (final RuntimeException t) {
-                log.error("Failed to {} {} {} due to the following error:",
-                          action.name(),
-                          taskTypeName,
-                          task.id(),
-                          t);
+                log.error("Failed to commit {} {} due to the following error:",
+                        taskTypeName,
+                        task.id(),
+                        t);
                 if (firstException == null) {
                     firstException = t;
                 }
@@ -384,6 +369,8 @@ abstract class AssignedTasks<T extends Task> {
         if (firstException != null) {
             throw firstException;
         }
+
+        return committed;
     }
 
     void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> 
newAssignment) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index f17c63a..7020253 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -66,7 +66,7 @@ public class PartitionGroup {
         this.partitionQueues = partitionQueues;
         totalBuffered = 0;
         allBuffered = false;
-        streamTime = RecordQueue.NOT_KNOWN;
+        streamTime = RecordQueue.UNKNOWN;
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index beab35f..ee21379 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -146,7 +146,7 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
 
     @Override
     public void commit() {
-        task.needCommit();
+        task.requestCommit();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 76d1a0c..15a5c21 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -318,6 +318,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
             return (BatchingStateRestoreCallback) callback;
         }
 
+        // TODO: avoid creating a new object for each update call?
         return new WrappedBatchingStateRestoreCallback(callback);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 86340bb..d06d7f3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -36,7 +36,7 @@ import java.util.ArrayDeque;
  */
 public class RecordQueue {
 
-    static final long NOT_KNOWN = -1L;
+    static final long UNKNOWN = -1L;
 
     private final Logger log;
     private final SourceNode source;
@@ -46,7 +46,7 @@ public class RecordQueue {
     private final RecordDeserializer recordDeserializer;
     private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
 
-    private long partitionTime = NOT_KNOWN;
+    private long partitionTime = UNKNOWN;
     private StampedRecord headRecord = null;
 
     RecordQueue(final TopicPartition partition,
@@ -151,7 +151,7 @@ public class RecordQueue {
     public void clear() {
         fifoQueue.clear();
         headRecord = null;
-        partitionTime = NOT_KNOWN;
+        partitionTime = UNKNOWN;
     }
 
     private void maybeUpdateTimestamp() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 67834d7..e8631aa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -74,7 +74,7 @@ class StandbyContextImpl extends AbstractProcessorContext 
implements RecordColle
         }
     };
 
-    private long streamTime = RecordQueue.NOT_KNOWN;
+    private long streamTime = RecordQueue.UNKNOWN;
 
     StandbyContextImpl(final TaskId id,
                        final StreamsConfig config,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 72cc629..3ac6414 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -103,6 +103,8 @@ public class StandbyTask extends AbstractTask {
         flushAndCheckpointState();
         // reinitialize offset limits
         updateOffsetLimits();
+
+        commitNeeded = false;
     }
 
     /**
@@ -185,6 +187,11 @@ public class StandbyTask extends AbstractTask {
         }
 
         stateMgr.updateStandbyStates(partition, restoreRecords, lastOffset);
+
+        if (!restoreRecords.isEmpty()) {
+            commitNeeded = true;
+        }
+
         return remainingRecords;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7f3d31f..2f97b7f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -60,26 +60,23 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
     private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new 
ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
 
-    private static final int WAIT_ON_PARTIAL_INPUT = 5;
-
+    private final Time time;
+    private final long maxTaskIdleMs;
+    private final int maxBufferedSize;
+    private final TaskMetrics taskMetrics;
     private final PartitionGroup partitionGroup;
+    private final RecordCollector recordCollector;
     private final PartitionGroup.RecordInfo recordInfo;
+    private final Map<TopicPartition, Long> consumedOffsets;
     private final PunctuationQueue streamTimePunctuationQueue;
     private final PunctuationQueue systemTimePunctuationQueue;
-
-    private final Map<TopicPartition, Long> consumedOffsets;
-    private final RecordCollector recordCollector;
     private final ProducerSupplier producerSupplier;
-    private Producer<byte[], byte[]> producer;
-    private final int maxBufferedSize;
 
+    private Sensor closeSensor;
+    private long idleStartTime;
+    private Producer<byte[], byte[]> producer;
     private boolean commitRequested = false;
-    private boolean commitOffsetNeeded = false;
     private boolean transactionInFlight = false;
-    private int waits = WAIT_ON_PARTIAL_INPUT;
-    private final Time time;
-    private final TaskMetrics taskMetrics;
-    private Sensor closeSensor;
 
     protected static final class TaskMetrics {
         final StreamsMetricsImpl metrics;
@@ -87,7 +84,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
         final Sensor taskEnforcedProcessSensor;
         private final String taskName;
 
-
         TaskMetrics(final TaskId id, final StreamsMetricsImpl metrics) {
             taskName = id.toString();
             this.metrics = metrics;
@@ -134,13 +130,13 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
             );
 
             // add the metrics for enforced processing
-            taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, 
"enforced-process", Sensor.RecordingLevel.DEBUG, parent);
+            taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, 
"enforced-processing", Sensor.RecordingLevel.DEBUG, parent);
             taskEnforcedProcessSensor.add(
-                    new MetricName("enforced-process-rate", group, "The 
average number of occurrence of enforced-process per second.", tagMap),
+                    new MetricName("enforced-processing-rate", group, "The 
average number of occurrence of enforced-processing operation per second.", 
tagMap),
                     new Rate(TimeUnit.SECONDS, new Count())
             );
             taskEnforcedProcessSensor.add(
-                    new MetricName("enforced-process-total", group, "The total 
number of occurrence of enforced-process operations.", tagMap),
+                    new MetricName("enforced-processing-total", group, "The 
total number of occurrence of enforced-processing operations.", tagMap),
                     new CumulativeCount()
             );
 
@@ -207,6 +203,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
         streamTimePunctuationQueue = new PunctuationQueue();
         systemTimePunctuationQueue = new PunctuationQueue();
+        maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
         maxBufferedSize = 
config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 
         // initialize the consumed and committed offset cache
@@ -253,6 +250,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     public boolean initializeStateStores() {
         log.trace("Initializing state stores");
         registerStateStores();
+
         return changelogPartitions().isEmpty();
     }
 
@@ -277,7 +275,10 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
         }
 
         processorContext.initialized();
+
         taskInitialized = true;
+
+        idleStartTime = RecordQueue.UNKNOWN;
     }
 
     /**
@@ -299,18 +300,27 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     }
 
     /**
-     * An active task is processable if its buffer contains data for all of 
its input source topic partitions
+     * An active task is processable if its buffer contains data for all of 
its input
+     * source topic partitions, or if it is enforced to be processable
      */
-    public boolean isProcessable() {
+    boolean isProcessable(final long now) {
         if (partitionGroup.allPartitionsBuffered()) {
+            idleStartTime = RecordQueue.UNKNOWN;
             return true;
-        } else if (partitionGroup.numBuffered() > 0 && --waits < 0) {
-            taskMetrics.taskEnforcedProcessSensor.record();
-            waits = WAIT_ON_PARTIAL_INPUT;
-            return true;
-        }
+        } else if (partitionGroup.numBuffered() > 0) {
+            if (idleStartTime == RecordQueue.UNKNOWN) {
+                idleStartTime = now;
+            }
 
-        return false;
+            if (now - idleStartTime >= maxTaskIdleMs) {
+                taskMetrics.taskEnforcedProcessSensor.record();
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            return false;
+        }
     }
 
     /**
@@ -343,7 +353,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
             // update the consumed offset map after processing is done
             consumedOffsets.put(partition, record.offset());
-            commitOffsetNeeded = true;
+            commitNeeded = true;
 
             // after processing this record, if its partition queue's buffered 
size has been
             // decreased to the threshold, we can then resume the consumption 
on this partition
@@ -435,10 +445,32 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
             stateMgr.checkpoint(activeTaskCheckpointableOffsets());
         }
 
-        commitOffsets(startNewTransaction);
+        final Map<TopicPartition, OffsetAndMetadata> 
consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
+        for (final Map.Entry<TopicPartition, Long> entry : 
consumedOffsets.entrySet()) {
+            final TopicPartition partition = entry.getKey();
+            final long offset = entry.getValue() + 1;
+            consumedOffsetsAndMetadata.put(partition, new 
OffsetAndMetadata(offset));
+            stateMgr.putOffsetLimit(partition, offset);
+        }
 
-        commitRequested = false;
+        try {
+            if (eosEnabled) {
+                producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, 
applicationId);
+                producer.commitTransaction();
+                transactionInFlight = false;
+                if (startNewTransaction) {
+                    producer.beginTransaction();
+                    transactionInFlight = true;
+                }
+            } else {
+                consumer.commitSync(consumedOffsetsAndMetadata);
+            }
+        } catch (final CommitFailedException | ProducerFencedException error) {
+            throw new TaskMigratedException(this, error);
+        }
 
+        commitNeeded = false;
+        commitRequested = false;
         taskMetrics.taskCommitTimeSensor.record(time.nanoseconds() - startNs);
     }
 
@@ -463,43 +495,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
         }
     }
 
-    /**
-     * @throws TaskMigratedException if committing offsets failed (non-EOS)
-     *                               or if the task producer got fenced (EOS)
-     */
-    private void commitOffsets(final boolean startNewTransaction) {
-        try {
-            if (commitOffsetNeeded) {
-                log.trace("Committing offsets");
-                final Map<TopicPartition, OffsetAndMetadata> 
consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
-                for (final Map.Entry<TopicPartition, Long> entry : 
consumedOffsets.entrySet()) {
-                    final TopicPartition partition = entry.getKey();
-                    final long offset = entry.getValue() + 1;
-                    consumedOffsetsAndMetadata.put(partition, new 
OffsetAndMetadata(offset));
-                    stateMgr.putOffsetLimit(partition, offset);
-                }
-
-                if (eosEnabled) {
-                    
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
-                } else {
-                    consumer.commitSync(consumedOffsetsAndMetadata);
-                }
-                commitOffsetNeeded = false;
-            }
-
-            if (eosEnabled) {
-                producer.commitTransaction();
-                transactionInFlight = false;
-                if (startNewTransaction) {
-                    producer.beginTransaction();
-                    transactionInFlight = true;
-                }
-            }
-        } catch (final CommitFailedException | ProducerFencedException fatal) {
-            throw new TaskMigratedException(this, fatal);
-        }
-    }
-
     Map<TopicPartition, Long> purgableOffsets() {
         final Map<TopicPartition, Long> purgableConsumedOffsets = new 
HashMap<>();
         for (final Map.Entry<TopicPartition, Long> entry : 
consumedOffsets.entrySet()) {
@@ -645,13 +640,12 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
     // helper to avoid calling suspend() twice if a suspended task is not 
reassigned and closed
     @Override
-    public void closeSuspended(boolean clean,
+    public void closeSuspended(final boolean clean,
                                final boolean isZombie,
                                RuntimeException firstException) {
         try {
             closeStateManager(clean);
         } catch (final RuntimeException e) {
-            clean = false;
             if (firstException == null) {
                 firstException = e;
             }
@@ -793,10 +787,16 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
         // if the timestamp is not known yet, meaning there is not enough data 
accumulated
         // to reason stream partition time, then skip.
-        if (timestamp == RecordQueue.NOT_KNOWN) {
+        if (timestamp == RecordQueue.UNKNOWN) {
             return false;
         } else {
-            return streamTimePunctuationQueue.mayPunctuate(timestamp, 
PunctuationType.STREAM_TIME, this);
+            final boolean punctuated = 
streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, 
this);
+
+            if (punctuated) {
+                commitNeeded = true;
+            }
+
+            return punctuated;
         }
     }
 
@@ -810,20 +810,26 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     public boolean maybePunctuateSystemTime() {
         final long timestamp = time.milliseconds();
 
-        return systemTimePunctuationQueue.mayPunctuate(timestamp, 
PunctuationType.WALL_CLOCK_TIME, this);
+        final boolean punctuated = 
systemTimePunctuationQueue.mayPunctuate(timestamp, 
PunctuationType.WALL_CLOCK_TIME, this);
+
+        if (punctuated) {
+            commitNeeded = true;
+        }
+
+        return punctuated;
     }
 
     /**
      * Request committing the current task's state
      */
-    void needCommit() {
+    void requestCommit() {
         commitRequested = true;
     }
 
     /**
      * Whether or not a request has been made to commit the current state
      */
-    boolean commitNeeded() {
+    boolean commitRequested() {
         return commitRequested;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 28cedbe..b43177d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
@@ -68,7 +69,6 @@ import static java.util.Collections.singleton;
 
 public class StreamThread extends Thread {
 
-    private final static int UNLIMITED_RECORDS = -1;
     private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new 
AtomicInteger(1);
 
     /**
@@ -554,18 +554,21 @@ public class StreamThread extends Thread {
     }
 
     private final Time time;
-    private final Duration pollTime;
-    private final long commitTimeMs;
-    private final Object stateLock;
     private final Logger log;
     private final String logPrefix;
+    private final Object stateLock;
+    private final Duration pollTime;
+    private final long commitTimeMs;
+    private final int maxPollTimeMs;
+    private final String originalReset;
     private final TaskManager taskManager;
     private final StreamsMetricsThreadImpl streamsMetrics;
     private final AtomicInteger assignmentErrorCode;
 
+    private long now;
+    private long lastPollMs;
     private long lastCommitMs;
-    private long timerStartedMs;
-    private final String originalReset;
+    private int numIterations;
     private Throwable rebalanceException = null;
     private boolean processStandbyRecords = false;
     private volatile State state = State.CREATED;
@@ -711,11 +714,21 @@ public class StreamThread extends Thread {
         this.assignmentErrorCode = assignmentErrorCode;
 
         this.pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
+        this.maxPollTimeMs = new 
InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", 
"dummyClientId"))
+                .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
         this.commitTimeMs = 
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
 
+        this.numIterations = 1;
+
         updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
     }
 
+    private static final class InternalConsumerConfig extends ConsumerConfig {
+        private InternalConsumerConfig(final Map<String, Object> props) {
+            super(ConsumerConfig.addDeserializerToConfig(props, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()), false);
+        }
+    }
+
     /**
      * Execute the stream processors
      *
@@ -757,12 +770,11 @@ public class StreamThread extends Thread {
      * @throws StreamsException      if the store's change log does not 
contain the partition
      */
     private void runLoop() {
-        long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
         consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
 
         while (isRunning()) {
             try {
-                recordsProcessedBeforeCommit = 
runOnce(recordsProcessedBeforeCommit);
+                runOnce();
                 if (assignmentErrorCode.get() == 
StreamsPartitionAssignor.Error.VERSION_PROBING.code()) {
                     log.info("Version probing detected. Triggering new 
rebalance.");
                     enforceRebalance();
@@ -791,12 +803,10 @@ public class StreamThread extends Thread {
      *                               or if the task producer got fenced (EOS)
      */
     // Visible for testing
-    long runOnce(final long recordsProcessedBeforeCommit) {
-        long processedBeforeCommit = recordsProcessedBeforeCommit;
-
+    void runOnce() {
         final ConsumerRecords<byte[], byte[]> records;
 
-        timerStartedMs = time.milliseconds();
+        now = time.milliseconds();
 
         if (state == State.PARTITIONS_ASSIGNED) {
             // try to fetch some records with zero poll millis
@@ -816,6 +826,13 @@ public class StreamThread extends Thread {
             throw new StreamsException(logPrefix + "Unexpected state " + state 
+ " during normal iteration");
         }
 
+        final long pollLatency = advanceNowAndComputeLatency();
+
+        if (records != null && !records.isEmpty()) {
+            streamsMetrics.pollTimeSensor.record(pollLatency, now);
+            addRecordsToTasks(records);
+        }
+
         // only try to initialize the assigned tasks
         // if the state is still in PARTITION_ASSIGNED after the poll call
         if (state == State.PARTITIONS_ASSIGNED) {
@@ -824,28 +841,60 @@ public class StreamThread extends Thread {
             }
         }
 
-        if (records != null && !records.isEmpty()) {
-            streamsMetrics.pollTimeSensor.record(computeLatency(), 
timerStartedMs);
-            addRecordsToTasks(records);
-        }
+        advanceNowAndComputeLatency();
 
+        // TODO: we will process some tasks even if the state is not RUNNING, 
i.e. some other
+        // tasks are still being restored.
         if (taskManager.hasActiveRunningTasks()) {
-            final long totalProcessed = 
processAndMaybeCommit(recordsProcessedBeforeCommit);
-            if (totalProcessed > 0) {
-                final long processLatency = computeLatency();
-                streamsMetrics.processTimeSensor.record(processLatency / 
(double) totalProcessed, timerStartedMs);
-                processedBeforeCommit = adjustRecordsProcessedBeforeCommit(
-                        recordsProcessedBeforeCommit,
-                        totalProcessed,
-                        processLatency,
-                        commitTimeMs);
-            }
+            /*
+             * Within an iteration, after N (N initialized as 1 upon start up) 
round of processing one-record-each on the applicable tasks, check the current 
time:
+             *  1. If it is time to commit, do it;
+             *  2. If it is time to punctuate, do it;
+             *  3. If elapsed time is close to consumer's 
max.poll.interval.ms, end the current iteration immediately.
+             *  4. If none of the the above happens, increment N.
+             *  5. If one of the above happens, half the value of N.
+             */
+            int processed = 0;
+            long timeSinceLastPoll = 0L;
+
+            do {
+                for (int i = 0; i < numIterations; i++) {
+                    processed = taskManager.process(now);
+
+                    if (processed > 0) {
+                        final long processLatency = 
advanceNowAndComputeLatency();
+                        streamsMetrics.processTimeSensor.record(processLatency 
/ (double) processed, now);
+
+                        // commit any tasks that have requested a commit
+                        final int committed = 
taskManager.maybeCommitActiveTasksPerUserRequested();
+
+                        if (committed > 0) {
+                            final long commitLatency = 
advanceNowAndComputeLatency();
+                            
streamsMetrics.commitTimeSensor.record(commitLatency / (double) committed, now);
+                        }
+                    } else {
+                        // if there is no records to be processed, exit 
immediately
+                        break;
+                    }
+                }
+
+                timeSinceLastPoll = Math.max(now - lastPollMs, 0);
+
+                if (maybePunctuate() || maybeCommit()) {
+                    numIterations = numIterations > 1 ? numIterations / 2 : 
numIterations;
+                } else if (timeSinceLastPoll > maxPollTimeMs / 2) {
+                    numIterations = numIterations > 1 ? numIterations / 2 : 
numIterations;
+                    break;
+                } else if (processed > 0) {
+                    numIterations++;
+                }
+            } while (processed > 0);
         }
 
-        punctuate();
-        maybeCommit(timerStartedMs);
-        maybeUpdateStandbyTasks(timerStartedMs);
-        return processedBeforeCommit;
+        // update standby tasks and maybe commit the standby tasks as well
+        maybeUpdateStandbyTasks();
+
+        maybeCommit();
     }
 
     /**
@@ -858,6 +907,8 @@ public class StreamThread extends Thread {
     private ConsumerRecords<byte[], byte[]> pollRequests(final Duration 
pollTime) {
         ConsumerRecords<byte[], byte[]> records = null;
 
+        lastPollMs = now;
+
         try {
             records = consumer.poll(pollTime);
         } catch (final InvalidOffsetException e) {
@@ -939,119 +990,64 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Schedule the records processing by selecting which record is processed 
next. Commits may
-     * happen as records are processed.
-     *
-     * @param recordsProcessedBeforeCommit number of records to be processed 
before commit is called.
-     *                                     if UNLIMITED_RECORDS, then commit 
is never called
-     * @return Number of records processed since last commit.
-     * @throws TaskMigratedException if committing offsets failed (non-EOS)
-     *                               or if the task producer got fenced (EOS)
-     */
-    private long processAndMaybeCommit(final long 
recordsProcessedBeforeCommit) {
-
-        long processed;
-        long totalProcessedSinceLastMaybeCommit = 0;
-        // Round-robin scheduling by taking one record from each task 
repeatedly
-        // until no task has any records left
-        do {
-            processed = taskManager.process();
-            if (processed > 0) {
-                streamsMetrics.processTimeSensor.record(computeLatency() / 
(double) processed, timerStartedMs);
-            }
-            totalProcessedSinceLastMaybeCommit += processed;
-
-            punctuate();
-
-            if (recordsProcessedBeforeCommit != UNLIMITED_RECORDS &&
-                totalProcessedSinceLastMaybeCommit >= 
recordsProcessedBeforeCommit) {
-                totalProcessedSinceLastMaybeCommit = 0;
-                maybeCommit(timerStartedMs);
-            }
-            // commit any tasks that have requested a commit
-            final int committed = taskManager.maybeCommitActiveTasks();
-            if (committed > 0) {
-                streamsMetrics.commitTimeSensor.record(computeLatency() / 
(double) committed, timerStartedMs);
-            }
-        } while (processed != 0);
-
-        return totalProcessedSinceLastMaybeCommit;
-    }
-
-    /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    private void punctuate() {
+    private boolean maybePunctuate() {
         final int punctuated = taskManager.punctuate();
         if (punctuated > 0) {
-            streamsMetrics.punctuateTimeSensor.record(computeLatency() / 
(double) punctuated, timerStartedMs);
+            final long punctuateLatency = advanceNowAndComputeLatency();
+            streamsMetrics.punctuateTimeSensor.record(punctuateLatency / 
(double) punctuated, now);
         }
-    }
 
-    /**
-     * Adjust the number of records that should be processed by scheduler. 
This avoids
-     * scenarios where the processing time is higher than the commit time.
-     *
-     * @param prevRecordsProcessedBeforeCommit Previous number of records 
processed by scheduler.
-     * @param totalProcessed                   Total number of records 
processed in this last round.
-     * @param processLatency                   Total processing latency in ms 
processed in this last round.
-     * @param commitTime                       Desired commit time in ms.
-     * @return An adjusted number of records to be processed in the next round.
-     */
-    private long adjustRecordsProcessedBeforeCommit(final long 
prevRecordsProcessedBeforeCommit, final long totalProcessed,
-                                                    final long processLatency, 
final long commitTime) {
-        long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
-        // check if process latency larger than commit latency
-        // note that once we set recordsProcessedBeforeCommit, it will never 
be UNLIMITED_RECORDS again, so
-        // we will never process all records again. This might be an issue if 
the initial measurement
-        // was off due to a slow start.
-        if (processLatency > 0 && processLatency > commitTime) {
-            // push down
-            recordsProcessedBeforeCommit = Math.max(1, (commitTime * 
totalProcessed) / processLatency);
-            log.debug("processing latency {} > commit time {} for {} records. 
Adjusting down recordsProcessedBeforeCommit={}",
-                processLatency, commitTime, totalProcessed, 
recordsProcessedBeforeCommit);
-        } else if (prevRecordsProcessedBeforeCommit != UNLIMITED_RECORDS && 
processLatency > 0) {
-            // push up
-            recordsProcessedBeforeCommit = Math.max(1, (commitTime * 
totalProcessed) / processLatency);
-            log.debug("processing latency {} < commit time {} for {} records. 
Adjusting up recordsProcessedBeforeCommit={}",
-                processLatency, commitTime, totalProcessed, 
recordsProcessedBeforeCommit);
-        }
-
-        return recordsProcessedBeforeCommit;
+        return punctuated > 0;
     }
 
     /**
-     * Commit all tasks owned by this thread if specified interval time has 
elapsed
+     * Try to commit all active tasks owned by this thread.
+     *
+     * Visible for testing.
      *
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
      */
-    void maybeCommit(final long now) {
-        if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
+    boolean maybeCommit() {
+        int committed = 0;
+
+        if (commitTimeMs >= 0 && now - lastCommitMs > commitTimeMs) {
             if (log.isTraceEnabled()) {
                 log.trace("Committing all active tasks {} and standby tasks {} 
since {}ms has elapsed (commit interval is {}ms)",
                     taskManager.activeTaskIds(), taskManager.standbyTaskIds(), 
now - lastCommitMs, commitTimeMs);
             }
 
-            final int committed = taskManager.commitAll();
+            committed += taskManager.commitAll();
             if (committed > 0) {
-                streamsMetrics.commitTimeSensor.record(computeLatency() / 
(double) committed, timerStartedMs);
+                final long intervalCommitLatency = 
advanceNowAndComputeLatency();
+                streamsMetrics.commitTimeSensor.record(intervalCommitLatency / 
(double) committed, now);
 
                 // try to purge the committed records for repartition topics 
if possible
                 taskManager.maybePurgeCommitedRecords();
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Committed all active tasks {} and standby tasks {} 
in {}ms",
-                    taskManager.activeTaskIds(), taskManager.standbyTaskIds(), 
timerStartedMs - now);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Committed all active tasks {} and standby tasks 
{} in {}ms",
+                        taskManager.activeTaskIds(), 
taskManager.standbyTaskIds(), intervalCommitLatency);
+                }
             }
 
             lastCommitMs = now;
-
             processStandbyRecords = true;
+        } else {
+            final int commitPerRequested = 
taskManager.maybeCommitActiveTasksPerUserRequested();
+            if (commitPerRequested > 0) {
+                final long requestCommitLatency = 
advanceNowAndComputeLatency();
+                streamsMetrics.commitTimeSensor.record(requestCommitLatency / 
(double) committed, now);
+                committed += commitPerRequested;
+            }
         }
+
+        return committed > 0;
     }
 
-    private void maybeUpdateStandbyTasks(final long now) {
+    private void maybeUpdateStandbyTasks() {
         if (state == State.RUNNING && taskManager.hasStandbyRunningTasks()) {
             if (processStandbyRecords) {
                 if (!standbyRecords.isEmpty()) {
@@ -1080,7 +1076,9 @@ public class StreamThread extends Thread {
 
                     standbyRecords = remainingStandbyRecords;
 
-                    log.debug("Updated standby tasks {} in {}ms", 
taskManager.standbyTaskIds(), time.milliseconds() - now);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Updated standby tasks {} in {}ms", 
taskManager.standbyTaskIds(), time.milliseconds() - now);
+                    }
                 }
                 processStandbyRecords = false;
             }
@@ -1130,6 +1128,9 @@ public class StreamThread extends Thread {
                 }
                 restoreConsumer.seekToBeginning(partitions);
             }
+
+            // update now if the standby restoration indeed executed
+            advanceNowAndComputeLatency();
         }
     }
 
@@ -1139,11 +1140,11 @@ public class StreamThread extends Thread {
      *
      * @return latency
      */
-    private long computeLatency() {
-        final long previousTimeMs = timerStartedMs;
-        timerStartedMs = time.milliseconds();
+    private long advanceNowAndComputeLatency() {
+        final long previous = now;
+        now = time.milliseconds();
 
-        return Math.max(timerStartedMs - previousTimeMs, 0);
+        return Math.max(now - previous, 0);
     }
 
     /**
@@ -1242,6 +1243,10 @@ public class StreamThread extends Thread {
     }
 
     // the following are for testing only
+    void setNow(final long now) {
+        this.now = now;
+    }
+
     TaskManager taskManager() {
         return taskManager;
     }
@@ -1250,6 +1255,10 @@ public class StreamThread extends Thread {
         return standbyRecords;
     }
 
+    int currentNumIterations() {
+        return numIterations;
+    }
+
     public Map<MetricName, Metric> producerMetrics() {
         final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
         if (producer != null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 2b43640..812e7e1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -34,6 +34,8 @@ public interface Task {
      */
     boolean initializeStateStores();
 
+    boolean commitNeeded();
+
     void initializeTopology();
 
     void commit();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index e251337..9cc5a19 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -409,8 +409,8 @@ public class TaskManager {
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process() {
-        return active.process();
+    int process(final long now) {
+        return active.process(now);
     }
 
     /**
@@ -424,8 +424,8 @@ public class TaskManager {
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
      */
-    int maybeCommitActiveTasks() {
-        return active.maybeCommit();
+    int maybeCommitActiveTasksPerUserRequested() {
+        return active.maybeCommitPerUserRequested();
     }
 
     void maybePurgeCommitedRecords() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 4593e59..b51511e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -24,14 +24,13 @@ import 
org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -40,9 +39,6 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
@@ -54,14 +50,9 @@ import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Properties;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
 /**
  * Similar to KStreamAggregationIntegrationTest but with dedupping enabled
  * by virtue of having a large commit interval
@@ -93,11 +84,9 @@ public class KStreamAggregationDedupIntegrationTest {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "kgrouped-stream-test-" +
-            testNo;
+        final String applicationId = "kgrouped-stream-test-" + testNo;
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
-        streamsConfiguration
-            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL_MS);
@@ -111,12 +100,7 @@ public class KStreamAggregationDedupIntegrationTest {
                 mapper,
                 Serialized.with(Serdes.String(), Serdes.String()));
 
-        reducer = new Reducer<String>() {
-            @Override
-            public String apply(final String value1, final String value2) {
-                return value1 + ":" + value2;
-            }
-        };
+        reducer = (value1, value2) -> value1 + ":" + value2;
     }
 
     @After
@@ -132,7 +116,7 @@ public class KStreamAggregationDedupIntegrationTest {
     public void shouldReduce() throws Exception {
         produceMessages(System.currentTimeMillis());
         groupedStream
-                .reduce(reducer, Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("reduce-by-key"))
+                .reduce(reducer, Materialized.as("reduce-by-key"))
                 .toStream()
                 .to(outputTopic, Produced.with(Serdes.String(), 
Serdes.String()));
 
@@ -140,34 +124,15 @@ public class KStreamAggregationDedupIntegrationTest {
 
         produceMessages(System.currentTimeMillis());
 
-        final List<KeyValue<String, String>> results = receiveMessages(
-            new StringDeserializer(),
-            new StringDeserializer(),
-            5);
-
-        Collections.sort(results, new Comparator<KeyValue<String, String>>() {
-            @Override
-            public int compare(final KeyValue<String, String> o1, final 
KeyValue<String, String> o2) {
-                return KStreamAggregationDedupIntegrationTest.compare(o1, o2);
-            }
-        });
-
-        assertThat(results, is(Arrays.asList(
-            KeyValue.pair("A", "A:A"),
-            KeyValue.pair("B", "B:B"),
-            KeyValue.pair("C", "C:C"),
-            KeyValue.pair("D", "D:D"),
-            KeyValue.pair("E", "E:E"))));
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <K extends Comparable, V extends Comparable> int 
compare(final KeyValue<K, V> o1,
-                                                                            
final KeyValue<K, V> o2) {
-        final int keyComparison = o1.key.compareTo(o2.key);
-        if (keyComparison == 0) {
-            return o1.value.compareTo(o2.value);
-        }
-        return keyComparison;
+        validateReceivedMessages(
+                new StringDeserializer(),
+                new StringDeserializer(),
+                Arrays.asList(
+                        KeyValue.pair("A", "A:A"),
+                        KeyValue.pair("B", "B:B"),
+                        KeyValue.pair("C", "C:C"),
+                        KeyValue.pair("D", "D:D"),
+                        KeyValue.pair("E", "E:E")));
     }
 
     @Test
@@ -180,50 +145,31 @@ public class KStreamAggregationDedupIntegrationTest {
 
         groupedStream
             .windowedBy(TimeWindows.of(500L))
-            .reduce(reducer, Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("reduce-time-windows"))
-            .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
-                @Override
-                public String apply(final Windowed<String> windowedKey, final 
String value) {
-                    return windowedKey.key() + "@" + 
windowedKey.window().start();
-                }
-            })
+            .reduce(reducer, Materialized.as("reduce-time-windows"))
+            .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start())
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
         startStreams();
 
-        final List<KeyValue<String, String>> windowedOutput = receiveMessages(
-            new StringDeserializer(),
-            new StringDeserializer(),
-            10);
-
-        final Comparator<KeyValue<String, String>>
-            comparator =
-            new Comparator<KeyValue<String, String>>() {
-                @Override
-                public int compare(final KeyValue<String, String> o1,
-                                   final KeyValue<String, String> o2) {
-                    return KStreamAggregationDedupIntegrationTest.compare(o1, 
o2);
-                }
-            };
-
-        Collections.sort(windowedOutput, comparator);
         final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
         final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
 
-        assertThat(windowedOutput, is(
-            Arrays.asList(
-                new KeyValue<>("A@" + firstBatchWindow, "A"),
-                new KeyValue<>("A@" + secondBatchWindow, "A:A"),
-                new KeyValue<>("B@" + firstBatchWindow, "B"),
-                new KeyValue<>("B@" + secondBatchWindow, "B:B"),
-                new KeyValue<>("C@" + firstBatchWindow, "C"),
-                new KeyValue<>("C@" + secondBatchWindow, "C:C"),
-                new KeyValue<>("D@" + firstBatchWindow, "D"),
-                new KeyValue<>("D@" + secondBatchWindow, "D:D"),
-                new KeyValue<>("E@" + firstBatchWindow, "E"),
-                new KeyValue<>("E@" + secondBatchWindow, "E:E")
-            )
-        ));
+        validateReceivedMessages(
+                new StringDeserializer(),
+                new StringDeserializer(),
+                Arrays.asList(
+                        new KeyValue<>("A@" + firstBatchWindow, "A"),
+                        new KeyValue<>("A@" + secondBatchWindow, "A:A"),
+                        new KeyValue<>("B@" + firstBatchWindow, "B"),
+                        new KeyValue<>("B@" + secondBatchWindow, "B:B"),
+                        new KeyValue<>("C@" + firstBatchWindow, "C"),
+                        new KeyValue<>("C@" + secondBatchWindow, "C:C"),
+                        new KeyValue<>("D@" + firstBatchWindow, "D"),
+                        new KeyValue<>("D@" + secondBatchWindow, "D:D"),
+                        new KeyValue<>("E@" + firstBatchWindow, "E"),
+                        new KeyValue<>("E@" + secondBatchWindow, "E:E")
+                )
+        );
     }
 
     @Test
@@ -234,36 +180,25 @@ public class KStreamAggregationDedupIntegrationTest {
 
         stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String()))
             .windowedBy(TimeWindows.of(500L))
-            .count(Materialized.<Integer, Long, WindowStore<Bytes, 
byte[]>>as("count-windows"))
-            .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
-                @Override
-                public String apply(final Windowed<Integer> windowedKey, final 
Long value) {
-                    return windowedKey.key() + "@" + 
windowedKey.window().start();
-                }
-            }).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+            .count(Materialized.as("count-windows"))
+            .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start())
+            .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         startStreams();
 
-        final List<KeyValue<String, Long>> results = receiveMessages(
-            new StringDeserializer(),
-            new LongDeserializer(),
-            5);
-        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
-            @Override
-            public int compare(final KeyValue<String, Long> o1, final 
KeyValue<String, Long> o2) {
-                return KStreamAggregationDedupIntegrationTest.compare(o1, o2);
-            }
-        });
-
         final long window = timestamp / 500 * 500;
-        assertThat(results, is(Arrays.asList(
-            KeyValue.pair("1@" + window, 2L),
-            KeyValue.pair("2@" + window, 2L),
-            KeyValue.pair("3@" + window, 2L),
-            KeyValue.pair("4@" + window, 2L),
-            KeyValue.pair("5@" + window, 2L)
-        )));
 
+        validateReceivedMessages(
+                new StringDeserializer(),
+                new LongDeserializer(),
+                Arrays.asList(
+                        KeyValue.pair("1@" + window, 2L),
+                        KeyValue.pair("2@" + window, 2L),
+                        KeyValue.pair("3@" + window, 2L),
+                        KeyValue.pair("4@" + window, 2L),
+                        KeyValue.pair("5@" + window, 2L)
+                )
+        );
     }
 
 
@@ -298,11 +233,9 @@ public class KStreamAggregationDedupIntegrationTest {
     }
 
 
-    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
-                                                            keyDeserializer,
-                                                        final Deserializer<V>
-                                                            valueDeserializer,
-                                                        final int numMessages)
+    private <K, V> void validateReceivedMessages(final Deserializer<K> 
keyDeserializer,
+                                                 final Deserializer<V> 
valueDeserializer,
+                                                 final List<KeyValue<K, V>> 
expectedRecords)
         throws InterruptedException {
         final Properties consumerProperties = new Properties();
         consumerProperties
@@ -314,11 +247,11 @@ public class KStreamAggregationDedupIntegrationTest {
             keyDeserializer.getClass().getName());
         
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
             valueDeserializer.getClass().getName());
-        return 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
-            outputTopic,
-            numMessages,
-            60 * 1000);
 
+        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+            consumerProperties,
+            outputTopic,
+            expectedRecords);
     }
 
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 496ba58..97d1071 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -524,20 +524,24 @@ public class QueryableStateIntegrationTest {
             myFilterNotStore = kafkaStreams.store("queryFilterNot", 
QueryableStoreTypes.<String, Long>keyValueStore());
 
         for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
-            assertEquals(myFilterStore.get(expectedEntry.key), 
expectedEntry.value);
+            TestUtils.waitForCondition(() -> 
expectedEntry.value.equals(myFilterStore.get(expectedEntry.key)),
+                    "Cannot get expected result");
         }
         for (final KeyValue<String, Long> batchEntry : batch1) {
             if (!expectedBatch1.contains(batchEntry)) {
-                assertNull(myFilterStore.get(batchEntry.key));
+                TestUtils.waitForCondition(() -> 
myFilterStore.get(batchEntry.key) == null,
+                        "Cannot get null result");
             }
         }
 
         for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
-            assertNull(myFilterNotStore.get(expectedEntry.key));
+            TestUtils.waitForCondition(() -> 
myFilterNotStore.get(expectedEntry.key) == null,
+                    "Cannot get null result");
         }
         for (final KeyValue<String, Long> batchEntry : batch1) {
             if (!expectedBatch1.contains(batchEntry)) {
-                assertEquals(myFilterNotStore.get(batchEntry.key), 
batchEntry.value);
+                TestUtils.waitForCondition(() -> 
batchEntry.value.equals(myFilterNotStore.get(batchEntry.key)),
+                        "Cannot get expected result");
             }
         }
     }
@@ -568,24 +572,25 @@ public class QueryableStateIntegrationTest {
             mockTime);
 
         final KTable<String, String> t1 = builder.table(streamOne);
-        final KTable<String, Long> t2 = t1.mapValues(new ValueMapper<String, 
Long>() {
+        t1.mapValues(new ValueMapper<String, Long>() {
             @Override
             public Long apply(final String value) {
                 return Long.valueOf(value);
             }
-        }, Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
-        t2.toStream().to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
+        }, Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
+            .toStream()
+            .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
-        waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
+        waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
 
         final ReadOnlyKeyValueStore<String, Long>
             myMapStore = kafkaStreams.store("queryMapValues",
             QueryableStoreTypes.<String, Long>keyValueStore());
         for (final KeyValue<String, String> batchEntry : batch1) {
-            assertEquals(myMapStore.get(batchEntry.key), 
Long.valueOf(batchEntry.value));
+            assertEquals(Long.valueOf(batchEntry.value), 
myMapStore.get(batchEntry.key));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index d9602f3..985b57f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -46,7 +46,9 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -369,19 +371,22 @@ public class IntegrationTestUtils {
                         readKeyValues(topic, consumer, waitTime, 
expectedRecords.size());
                 accumData.addAll(readData);
 
-                final int accumLastIndex = accumData.size() - 1;
-                final int expectedLastIndex = expectedRecords.size() - 1;
-
                 // filter out all intermediate records we don't want
                 final List<KeyValue<K, V>> accumulatedActual = 
accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
 
-                // need this check as filtering above could have removed the 
last record from accumData, but it did not
-                // equal the last expected record
-                final boolean lastRecordsMatch = 
accumData.get(accumLastIndex).equals(expectedRecords.get(expectedLastIndex));
+                // still need to check that for each key, the ordering is 
expected
+                final Map<K, List<KeyValue<K, V>>> finalAccumData = new 
HashMap<>();
+                for (final KeyValue<K, V> kv : accumulatedActual) {
+                    finalAccumData.computeIfAbsent(kv.key, key -> new 
ArrayList<>()).add(kv);
+                }
+                final Map<K, List<KeyValue<K, V>>> finalExpected = new 
HashMap<>();
+                for (final KeyValue<K, V> kv : expectedRecords) {
+                    finalExpected.computeIfAbsent(kv.key, key -> new 
ArrayList<>()).add(kv);
+                }
 
                 // returns true only if the remaining records in both lists 
are the same and in the same order
                 // and the last record received matches the last expected 
record
-                return accumulatedActual.equals(expectedRecords) && 
lastRecordsMatch;
+                return finalAccumData.equals(finalExpected);
 
             };
             final String conditionDetails = "Did not receive all " + 
expectedRecords + " records from topic " + topic;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index a3c47b5..fe71135 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -61,7 +61,7 @@ public class AssignedStreamsTasksTest {
     public void shouldInitializeNewTasks() {
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
+        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
         EasyMock.replay(t1);
 
         addAndInitTask();
@@ -75,13 +75,13 @@ public class AssignedStreamsTasksTest {
         t1.initializeTopology();
         EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
+        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
         EasyMock.expect(t2.initializeStateStores()).andReturn(true);
         t2.initializeTopology();
         EasyMock.expectLastCall().once();
         final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
         EasyMock.expect(t2.partitions()).andReturn(t2partitions);
-        
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+        
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList());
 
         EasyMock.replay(t1, t2);
 
@@ -101,7 +101,7 @@ public class AssignedStreamsTasksTest {
         t2.initializeTopology();
         EasyMock.expectLastCall().once();
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
-        
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+        
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList());
 
         EasyMock.replay(t2);
 
@@ -145,7 +145,7 @@ public class AssignedStreamsTasksTest {
     public void shouldCloseRestoringTasks() {
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
+        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
         t1.close(false, false);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -247,12 +247,13 @@ public class AssignedStreamsTasksTest {
         t1.initializeTopology();
         EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList());
     }
 
     @Test
     public void shouldCommitRunningTasks() {
         mockTaskInitialization();
+        EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -266,6 +267,7 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldCloseTaskOnCommitIfTaskMigratedException() {
         mockTaskInitialization();
+        EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
@@ -285,6 +287,7 @@ public class AssignedStreamsTasksTest {
     @Test
     public void 
shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() {
         mockTaskInitialization();
+        EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
         EasyMock.expectLastCall().andThrow(new RuntimeException(""));
         EasyMock.replay(t1);
@@ -303,6 +306,7 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldCommitRunningTasksIfNeeded() {
         mockTaskInitialization();
+        EasyMock.expect(t1.commitRequested()).andReturn(true);
         EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
         EasyMock.expectLastCall();
@@ -310,13 +314,14 @@ public class AssignedStreamsTasksTest {
 
         addAndInitTask();
 
-        assertThat(assignedTasks.maybeCommit(), equalTo(1));
+        assertThat(assignedTasks.maybeCommitPerUserRequested(), equalTo(1));
         EasyMock.verify(t1);
     }
 
     @Test
     public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
         mockTaskInitialization();
+        EasyMock.expect(t1.commitRequested()).andReturn(true);
         EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
@@ -326,7 +331,7 @@ public class AssignedStreamsTasksTest {
         addAndInitTask();
 
         try {
-            assignedTasks.maybeCommit();
+            assignedTasks.maybeCommitPerUserRequested();
             fail("Should have thrown TaskMigratedException.");
         } catch (final TaskMigratedException expected) { /* ignore */ }
 
@@ -337,7 +342,7 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
         mockTaskInitialization();
-        EasyMock.expect(t1.isProcessable()).andReturn(true);
+        EasyMock.expect(t1.isProcessable(0L)).andReturn(true);
         t1.process();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
@@ -346,7 +351,7 @@ public class AssignedStreamsTasksTest {
         addAndInitTask();
 
         try {
-            assignedTasks.process();
+            assignedTasks.process(0L);
             fail("Should have thrown TaskMigratedException.");
         } catch (final TaskMigratedException expected) { /* ignore */ }
 
@@ -357,11 +362,11 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldNotProcessUnprocessableTasks() {
         mockTaskInitialization();
-        EasyMock.expect(t1.isProcessable()).andReturn(false);
+        EasyMock.expect(t1.isProcessable(0L)).andReturn(false);
         EasyMock.replay(t1);
         addAndInitTask();
 
-        assertThat(assignedTasks.process(), equalTo(0));
+        assertThat(assignedTasks.process(0L), equalTo(0));
 
         EasyMock.verify(t1);
     }
@@ -369,13 +374,14 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldAlwaysProcessProcessableTasks() {
         mockTaskInitialization();
-        EasyMock.expect(t1.isProcessable()).andReturn(true);
+        EasyMock.expect(t1.isProcessable(0L)).andReturn(true);
         EasyMock.expect(t1.process()).andReturn(true).once();
+
         EasyMock.replay(t1);
 
         addAndInitTask();
 
-        assertThat(assignedTasks.process(), equalTo(1));
+        assertThat(assignedTasks.process(0L), equalTo(1));
 
         EasyMock.verify(t1);
     }
@@ -459,7 +465,7 @@ public class AssignedStreamsTasksTest {
         EasyMock.expectLastCall().once();
         EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
         
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
-        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
+        
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
         t1.suspend();
         EasyMock.expectLastCall();
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index a8cd2c8..b91aba5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -101,7 +101,7 @@ public class RecordQueueTest {
 
         assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
-        assertEquals(RecordQueue.NOT_KNOWN, queue.timestamp());
+        assertEquals(RecordQueue.UNKNOWN, queue.timestamp());
 
         // add three 3 out-of-order records with timestamp 2, 1, 3
         final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
@@ -173,7 +173,7 @@ public class RecordQueueTest {
         queue.clear();
         assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
-        assertEquals(RecordQueue.NOT_KNOWN, queue.timestamp());
+        assertEquals(RecordQueue.UNKNOWN, queue.timestamp());
 
         // re-insert the three records with 4, 5, 6
         queue.addRawRecords(list3);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 848cd4f..834ab3e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -21,16 +21,17 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -82,9 +83,9 @@ import static org.junit.Assert.fail;
 
 public class StreamTaskTest {
 
-    private final Serializer<Integer> intSerializer = new IntegerSerializer();
-    private final Deserializer<Integer> intDeserializer = new 
IntegerDeserializer();
-    private final Serializer<byte[]> bytesSerializer = new 
ByteArraySerializer();
+    private final Serializer<Integer> intSerializer = 
Serdes.Integer().serializer();
+    private final Serializer<byte[]> bytesSerializer = 
Serdes.ByteArray().serializer();
+    private final Deserializer<Integer> intDeserializer = 
Serdes.Integer().deserializer();
     private final String topic1 = "topic1";
     private final String topic2 = "topic2";
     private final TopicPartition partition1 = new TopicPartition(topic1, 1);
@@ -113,7 +114,7 @@ public class StreamTaskTest {
     private final Long offset = 543L;
 
     private final ProcessorTopology topology = ProcessorTopology.withSources(
-        Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, 
processorSystemTime),
+        Utils.mkList(source1, source2, processorStreamTime, 
processorSystemTime),
         mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2))
     );
 
@@ -129,8 +130,7 @@ public class StreamTaskTest {
     };
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
-    private final Metrics metrics = new Metrics();
-    private final Sensor skippedRecordsSensor = 
metrics.sensor("skipped-records");
+    private final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
     private final StreamsMetricsImpl streamsMetrics = new 
MockStreamsMetrics(metrics);
     private final TaskId taskId00 = new TaskId(0, 0);
     private final MockTime time = new MockTime();
@@ -159,7 +159,8 @@ public class StreamTaskTest {
             mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, canonicalPath),
             mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class.getName()),
-            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? 
StreamsConfig.EXACTLY_ONCE : StreamsConfig.AT_LEAST_ONCE)
+            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? 
StreamsConfig.EXACTLY_ONCE : StreamsConfig.AT_LEAST_ONCE),
+            mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, "100")
         )));
     }
 
@@ -393,9 +394,6 @@ public class StreamTaskTest {
         assertEquals(4, source2.numReceived);
         assertFalse(task.maybePunctuateStreamTime());
 
-        assertFalse(task.process());
-        assertFalse(task.maybePunctuateStreamTime());
-
         
processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 20L, 142L, 155L, 160L);
     }
 
@@ -454,22 +452,61 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldRespectCommitNeeded() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        assertFalse(task.commitNeeded());
+
+        task.addRecords(partition1, 
singletonList(getConsumerRecord(partition1, 0)));
+        assertTrue(task.process());
+        assertTrue(task.commitNeeded());
+
+        task.commit();
+        assertFalse(task.commitNeeded());
+
+        assertTrue(task.maybePunctuateStreamTime());
+        assertTrue(task.commitNeeded());
+
+        task.commit();
+        assertFalse(task.commitNeeded());
+
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        assertTrue(task.commitNeeded());
+
+        task.commit();
+        assertFalse(task.commitNeeded());
+    }
+
+    @Test
+    public void shouldRespectCommitRequested() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        task.requestCommit();
+        assertTrue(task.commitRequested());
+    }
+
+    @Test
     public void shouldBeProcessableIfAllPartitionsBuffered() {
         task = createStatelessTask(createConfig(false));
         task.initializeStateStores();
         task.initializeTopology();
 
-        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable(0L));
 
         final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
 
         task.addRecords(partition1, Collections.singleton(new 
ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
 
-        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable(0L));
 
         task.addRecords(partition2, Collections.singleton(new 
ConsumerRecord<>(topic2, 1, 0, bytes, bytes)));
 
-        assertTrue(task.isProcessable());
+        assertTrue(task.isProcessable(0L));
     }
 
     @Test
@@ -478,19 +515,42 @@ public class StreamTaskTest {
         task.initializeStateStores();
         task.initializeTopology();
 
-        assertFalse(task.isProcessable());
+        final MetricName enforcedProcessMetric = 
metrics.metricName("enforced-processing-total", "stream-task-metrics", 
mkMap(mkEntry("client-id", "test"), mkEntry("task-id", taskId00.toString())));
+
+        assertFalse(task.isProcessable(0L));
+        assertEquals(0.0, metrics.metric(enforcedProcessMetric).metricValue());
 
         final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
 
         task.addRecords(partition1, Collections.singleton(new 
ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
 
-        assertFalse(task.isProcessable());
-        assertFalse(task.isProcessable());
-        assertFalse(task.isProcessable());
-        assertFalse(task.isProcessable());
-        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable(time.milliseconds()));
+
+        assertFalse(task.isProcessable(time.milliseconds() + 50L));
+
+        assertTrue(task.isProcessable(time.milliseconds() + 100L));
+        assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+        // once decided to enforce, continue doing that
+        assertTrue(task.isProcessable(time.milliseconds() + 101L));
+        assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+        task.addRecords(partition2, Collections.singleton(new 
ConsumerRecord<>(topic2, 1, 0, bytes, bytes)));
+
+        assertTrue(task.isProcessable(time.milliseconds() + 130L));
+        assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+        // one resumed to normal processing, the timer should be reset
+        task.process();
+
+        assertFalse(task.isProcessable(time.milliseconds() + 150L));
+        assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
+
+        assertFalse(task.isProcessable(time.milliseconds() + 249L));
+        assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue());
 
-        assertTrue(task.isProcessable());
+        assertTrue(task.isProcessable(time.milliseconds() + 250L));
+        assertEquals(3.0, metrics.metric(enforcedProcessMetric).metricValue());
     }
 
 
@@ -1155,8 +1215,8 @@ public class StreamTaskTest {
         final TopicPartition repartition = new TopicPartition("repartition", 
1);
 
         final ProcessorTopology topology = 
ProcessorTopology.withRepartitionTopics(
-            Utils.<ProcessorNode>mkList(source1, source2),
-            mkMap(mkEntry(topic1, (SourceNode) source1), 
mkEntry(repartition.topic(), (SourceNode) source2)),
+            Utils.mkList(source1, source2),
+            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
             Collections.singleton(repartition.topic())
         );
         consumer.assign(Arrays.asList(partition1, repartition));
@@ -1227,10 +1287,10 @@ public class StreamTaskTest {
 
     private StreamTask createStatefulTask(final StreamsConfig config, final 
boolean logged) {
         final ProcessorTopology topology = ProcessorTopology.with(
-            Utils.<ProcessorNode>mkList(source1, source2),
-            mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, 
(SourceNode) source2)),
+            Utils.mkList(source1, source2),
+            mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)),
             singletonList(stateStore),
-            logged ? Collections.singletonMap(storeName, storeName + 
"-changelog") : Collections.<String, String>emptyMap());
+            logged ? Collections.singletonMap(storeName, storeName + 
"-changelog") : Collections.emptyMap());
 
         return new StreamTask(
             taskId00,
@@ -1249,10 +1309,10 @@ public class StreamTaskTest {
 
     private StreamTask createStatefulTaskThatThrowsExceptionOnClose() {
         final ProcessorTopology topology = ProcessorTopology.with(
-            Utils.<ProcessorNode>mkList(source1, source3),
-            mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, 
(SourceNode) source3)),
+            Utils.mkList(source1, source3),
+            mkMap(mkEntry(topic1, source1), mkEntry(topic2, source3)),
             singletonList(stateStore),
-            Collections.<String, String>emptyMap());
+            Collections.emptyMap());
 
         return new StreamTask(
             taskId00,
@@ -1271,7 +1331,7 @@ public class StreamTaskTest {
 
     private StreamTask createStatelessTask(final StreamsConfig streamsConfig) {
         final ProcessorTopology topology = ProcessorTopology.withSources(
-            Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, 
processorSystemTime),
+            Utils.mkList(source1, source2, processorStreamTime, 
processorSystemTime),
             mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2))
         );
 
@@ -1298,8 +1358,8 @@ public class StreamTaskTest {
     // this task will throw exception when processing (on partition2), 
flushing, suspending and closing
     private StreamTask createTaskThatThrowsException(final boolean enableEos) {
         final ProcessorTopology topology = ProcessorTopology.withSources(
-            Utils.<ProcessorNode>mkList(source1, source3, processorStreamTime, 
processorSystemTime),
-            mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, 
(SourceNode) source3))
+            Utils.mkList(source1, source3, processorStreamTime, 
processorSystemTime),
+            mkMap(mkEntry(topic1, source1), mkEntry(topic2, source3))
         );
 
         source1.addChild(processorStreamTime);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c1485fb..e691c54 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -61,8 +61,10 @@ import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -106,15 +108,16 @@ public class StreamThreadTest {
     private final MockTime mockTime = new MockTime();
     private final Metrics metrics = new Metrics();
     private final MockClientSupplier clientSupplier = new MockClientSupplier();
-    private UUID processId = UUID.randomUUID();
     private final InternalStreamsBuilder internalStreamsBuilder = new 
InternalStreamsBuilder(new InternalTopologyBuilder());
-    private InternalTopologyBuilder internalTopologyBuilder;
     private final StreamsConfig config = new StreamsConfig(configProps(false));
     private final String stateDir = TestUtils.tempDirectory().getPath();
     private final StateDirectory stateDirectory = new StateDirectory(config, 
mockTime);
-    private StreamsMetadataState streamsMetadataState;
     private final ConsumedInternal<Object, Object> consumed = new 
ConsumedInternal<>();
 
+    private UUID processId = UUID.randomUUID();
+    private InternalTopologyBuilder internalTopologyBuilder;
+    private StreamsMetadataState streamsMetadataState;
+
     @Before
     public void setUp() {
         processId = UUID.randomUUID();
@@ -177,7 +180,7 @@ public class StreamThreadTest {
         mockConsumer.assign(assignedPartitions);
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
-        thread.runOnce(-1);
+        thread.runOnce();
         assertEquals(thread.state(), StreamThread.State.RUNNING);
         Assert.assertEquals(4, stateListener.numChanges);
         Assert.assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, 
stateListener.oldState);
@@ -307,13 +310,106 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger()
         );
-        thread.maybeCommit(mockTime.milliseconds());
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
         mockTime.sleep(commitInterval - 10L);
-        thread.maybeCommit(mockTime.milliseconds());
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
 
         EasyMock.verify(taskManager);
     }
 
+    @Test
+    public void shouldRespectNumIterationsInMainLoop() {
+        final MockProcessor mockProcessor = new 
MockProcessor(PunctuationType.WALL_CLOCK_TIME, 10L);
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
+        internalTopologyBuilder.addProcessor("processor1", () -> 
mockProcessor, "source1");
+        internalTopologyBuilder.addProcessor("processor2", () -> new 
MockProcessor(PunctuationType.STREAM_TIME, 10L), "source1");
+
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        final StreamsConfig config = new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(applicationId,
+            "localhost:2171",
+            Serdes.ByteArraySerde.class.getName(),
+            Serdes.ByteArraySerde.class.getName(),
+            properties));
+        final StreamThread thread = createStreamThread(clientId, config, 
false);
+
+        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+
+        final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);
+        thread.taskManager().setAssignmentMetadata(
+            Collections.singletonMap(
+                new TaskId(0, t1p1.partition()),
+                assignedPartitions),
+            Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(Collections.singleton(t1p1));
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.runOnce();
+
+        // processed one record, punctuated after the first record, and hence 
num.iterations is still 1
+        long offset = -1;
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 0, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
+        thread.runOnce();
+
+        assertThat(thread.currentNumIterations(), equalTo(1));
+
+        // processed one more record without punctuation, and bump 
num.iterations to 2
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
+        thread.runOnce();
+
+        assertThat(thread.currentNumIterations(), equalTo(2));
+
+        // processed zero records, early exit and iterations stays as 2
+        thread.runOnce();
+        assertThat(thread.currentNumIterations(), equalTo(2));
+
+        // system time based punctutation halves to 1
+        mockTime.sleep(11L);
+
+        thread.runOnce();
+        assertThat(thread.currentNumIterations(), equalTo(1));
+
+        // processed two records, bumping up iterations to 2
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 5, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 6, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
+        thread.runOnce();
+
+        assertThat(thread.currentNumIterations(), equalTo(2));
+
+        // stream time based punctutation halves to 1
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 11, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
+        thread.runOnce();
+
+        assertThat(thread.currentNumIterations(), equalTo(1));
+
+        // processed three records, bumping up iterations to 3 (1 + 2)
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 12, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 13, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 14, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
+        thread.runOnce();
+
+        assertThat(thread.currentNumIterations(), equalTo(3));
+
+        mockProcessor.requestCommit();
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 15, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
+        thread.runOnce();
+
+        // user requested commit should not impact on iteration adjustment
+        assertThat(thread.currentNumIterations(), equalTo(3));
+
+        // time based commit, halves iterations to 3 / 2 = 1
+        mockTime.sleep(90L);
+        thread.runOnce();
+
+        assertThat(thread.currentNumIterations(), equalTo(1));
+
+    }
+
     @SuppressWarnings({"unchecked", "ThrowableNotThrown"})
     @Test
     public void shouldNotCauseExceptionIfNothingCommitted() {
@@ -341,9 +437,11 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger()
         );
-        thread.maybeCommit(mockTime.milliseconds());
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
         mockTime.sleep(commitInterval - 10L);
-        thread.maybeCommit(mockTime.milliseconds());
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
 
         EasyMock.verify(taskManager);
     }
@@ -376,9 +474,11 @@ public class StreamThreadTest {
             new LogContext(""),
             new AtomicInteger()
         );
-        thread.maybeCommit(mockTime.milliseconds());
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
         mockTime.sleep(commitInterval + 1);
-        thread.maybeCommit(mockTime.milliseconds());
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
 
         EasyMock.verify(taskManager);
     }
@@ -457,7 +557,7 @@ public class StreamThreadTest {
         mockConsumer.updateBeginningOffsets(beginOffsets);
         thread.rebalanceListener.onPartitionsAssigned(new 
HashSet<>(assignedPartitions));
 
-        thread.runOnce(-1);
+        thread.runOnce();
 
         assertEquals(thread.tasks().size(), clientSupplier.producers.size());
         assertSame(clientSupplier.consumer, thread.consumer);
@@ -646,7 +746,7 @@ public class StreamThreadTest {
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
-        thread.runOnce(-1);
+        thread.runOnce();
         assertThat(thread.tasks().size(), equalTo(1));
         final MockProducer producer = clientSupplier.producers.get(0);
 
@@ -657,7 +757,7 @@ public class StreamThreadTest {
 
         consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new 
byte[0]));
         mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) 
+ 1);
-        thread.runOnce(-1);
+        thread.runOnce();
         assertThat(producer.history().size(), equalTo(1));
 
         assertFalse(producer.transactionCommitted());
@@ -666,16 +766,16 @@ public class StreamThreadTest {
             new TestCondition() {
                 @Override
                 public boolean conditionMet() {
-                    return producer.commitCount() == 2;
+                    return producer.commitCount() == 1;
                 }
             },
             "StreamsThread did not commit transaction.");
 
         producer.fenceProducer();
         mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) 
+ 1L);
-        consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new 
byte[0]));
+        consumer.addRecord(new ConsumerRecord<>(topic1, 1, 1, new byte[0], new 
byte[0]));
         try {
-            thread.runOnce(-1);
+            thread.runOnce();
             fail("Should have thrown TaskMigratedException");
         } catch (final TaskMigratedException expected) { /* ignore */ }
         TestUtils.waitForCondition(
@@ -687,15 +787,16 @@ public class StreamThreadTest {
             },
             "StreamsThread did not remove fenced zombie task.");
 
-        assertThat(producer.commitCount(), equalTo(2L));
+        assertThat(producer.commitCount(), equalTo(1L));
     }
 
-    private StreamThread setupStreamThread() {
+    @Test
+    public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTaks()
 {
+        final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
+
         internalTopologyBuilder.addSource(null, "name", null, null, null, 
topic1);
         internalTopologyBuilder.addSink("out", "output", null, null, null, 
"name");
 
-        final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
-
         thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
@@ -713,19 +814,12 @@ public class StreamThreadTest {
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
-        thread.runOnce(-1);
+        thread.runOnce();
 
         assertThat(thread.tasks().size(), equalTo(1));
-        return thread;
-    }
-
-    @Test
-    public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTaks()
 {
-        final StreamThread thread = setupStreamThread();
 
         clientSupplier.producers.get(0).fenceProducer();
         thread.rebalanceListener.onPartitionsRevoked(null);
-
         assertTrue(clientSupplier.producers.get(0).transactionInFlight());
         assertFalse(clientSupplier.producers.get(0).transactionCommitted());
         assertTrue(clientSupplier.producers.get(0).closed());
@@ -733,8 +827,32 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCloseTransactionWhenSuspendingTaks()
 {
-        final StreamThread thread = setupStreamThread();
+    public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCloseTransactionWhenSuspendingTasks()
 {
+        final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
+
+        internalTopologyBuilder.addSource(null, "name", null, null, null, 
topic1);
+        internalTopologyBuilder.addSink("out", "output", null, null, null, 
"name");
+
+        thread.setState(StreamThread.State.RUNNING);
+        thread.rebalanceListener.onPartitionsRevoked(null);
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+
+        thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        thread.runOnce();
+
+        assertThat(thread.tasks().size(), equalTo(1));
 
         clientSupplier.producers.get(0).fenceProducerOnClose();
         thread.rebalanceListener.onPartitionsRevoked(null);
@@ -789,7 +907,7 @@ public class StreamThreadTest {
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
-        thread.runOnce(-1);
+        thread.runOnce();
 
         final ThreadMetadata threadMetadata = thread.threadMetadata();
         assertEquals(StreamThread.State.RUNNING.name(), 
threadMetadata.threadState());
@@ -834,7 +952,7 @@ public class StreamThreadTest {
 
         
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
 
-        thread.runOnce(-1);
+        thread.runOnce();
 
         final ThreadMetadata threadMetadata = thread.threadMetadata();
         assertEquals(StreamThread.State.RUNNING.name(), 
threadMetadata.threadState());
@@ -900,7 +1018,7 @@ public class StreamThreadTest {
 
         
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
 
-        thread.runOnce(-1);
+        thread.runOnce();
 
         final StandbyTask standbyTask1 = 
thread.taskManager().standbyTask(partition1);
         final StandbyTask standbyTask2 = 
thread.taskManager().standbyTask(partition2);
@@ -967,7 +1085,7 @@ public class StreamThreadTest {
         
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
-        thread.runOnce(-1);
+        thread.runOnce();
 
         assertEquals(0, punctuatedStreamTime.size());
         assertEquals(0, punctuatedWallClockTime.size());
@@ -977,14 +1095,14 @@ public class StreamThreadTest {
             clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1, 
i, i * 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" + 
i).getBytes().length, ("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" + 
i).getBytes()));
         }
 
-        thread.runOnce(-1);
+        thread.runOnce();
 
         assertEquals(1, punctuatedStreamTime.size());
         assertEquals(1, punctuatedWallClockTime.size());
 
         mockTime.sleep(100L);
 
-        thread.runOnce(-1);
+        thread.runOnce();
 
         // we should skip stream time punctuation, only trigger wall-clock 
time punctuation
         assertEquals(1, punctuatedStreamTime.size());
@@ -1177,7 +1295,7 @@ public class StreamThreadTest {
         mockConsumer.assign(Collections.singleton(t1p1));
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
-        thread.runOnce(-1);
+        thread.runOnce();
 
         final MetricName skippedTotalMetric = 
metrics.metricName("skipped-records-total", "stream-metrics", 
Collections.singletonMap("client-id", thread.getName()));
         final MetricName skippedRateMetric = 
metrics.metricName("skipped-records-rate", "stream-metrics", 
Collections.singletonMap("client-id", thread.getName()));
@@ -1187,7 +1305,7 @@ public class StreamThreadTest {
         long offset = -1;
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], "I am not an integer.".getBytes()));
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], "I am not an integer.".getBytes()));
-        thread.runOnce(-1);
+        thread.runOnce();
         assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
         assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
@@ -1221,7 +1339,7 @@ public class StreamThreadTest {
         mockConsumer.assign(Collections.singleton(t1p1));
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
-        thread.runOnce(-1);
+        thread.runOnce();
 
         final MetricName skippedTotalMetric = 
metrics.metricName("skipped-records-total", "stream-metrics", 
Collections.singletonMap("client-id", thread.getName()));
         final MetricName skippedRateMetric = 
metrics.metricName("skipped-records-rate", "stream-metrics", 
Collections.singletonMap("client-id", thread.getName()));
@@ -1231,7 +1349,7 @@ public class StreamThreadTest {
         long offset = -1;
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
-        thread.runOnce(-1);
+        thread.runOnce();
         assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
         assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
@@ -1239,13 +1357,13 @@ public class StreamThreadTest {
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
-        thread.runOnce(-1);
+        thread.runOnce();
         assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
         assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), 
t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new 
byte[0], new byte[0]));
-        thread.runOnce(-1);
+        thread.runOnce();
         assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
         assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 508f2ee..b0e7fce 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -590,19 +590,19 @@ public class TaskManagerTest {
 
     @Test
     public void shouldMaybeCommitActiveTasks() {
-        EasyMock.expect(active.maybeCommit()).andReturn(5);
+        EasyMock.expect(active.maybeCommitPerUserRequested()).andReturn(5);
         replay();
 
-        assertThat(taskManager.maybeCommitActiveTasks(), equalTo(5));
+        assertThat(taskManager.maybeCommitActiveTasksPerUserRequested(), 
equalTo(5));
         verify(active);
     }
 
     @Test
     public void shouldProcessActiveTasks() {
-        EasyMock.expect(active.process()).andReturn(10);
+        EasyMock.expect(active.process(0L)).andReturn(10);
         replay();
 
-        assertThat(taskManager.process(), equalTo(10));
+        assertThat(taskManager.process(0L), equalTo(10));
         verify(active);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java 
b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 927be0b..c95f408 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -40,6 +40,8 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, 
V> {
     private final PunctuationType punctuationType;
     private final long scheduleInterval;
 
+    private boolean commitRequested = false;
+
     public MockProcessor(final PunctuationType punctuationType, final long 
scheduleInterval) {
         this.punctuationType = punctuationType;
         this.scheduleInterval = scheduleInterval;
@@ -76,6 +78,10 @@ public class MockProcessor<K, V> extends 
AbstractProcessor<K, V> {
         processed.add((key == null ? "null" : key) + ":" +
                 (value == null ? "null" : value));
 
+        if (commitRequested) {
+            context().commit();
+            commitRequested = false;
+        }
     }
 
     public void checkAndClearProcessResult(final String... expected) {
@@ -87,6 +93,10 @@ public class MockProcessor<K, V> extends 
AbstractProcessor<K, V> {
         processed.clear();
     }
 
+    public void requestCommit() {
+        commitRequested = true;
+    }
+
     public void checkEmptyAndClearProcessResult() {
         assertEquals("the number of outputs:", 0, processed.size());
         processed.clear();

Reply via email to