This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new afe00ef  KAFKA-3514: Part II, Choose tasks with data on all partitions 
to process (#5398)
afe00ef is described below

commit afe00effe2d79e89e159972cf1f5f7ffeb0c6e97
Author: Guozhang Wang <[email protected]>
AuthorDate: Thu Aug 2 18:34:53 2018 -0700

    KAFKA-3514: Part II, Choose tasks with data on all partitions to process 
(#5398)
    
    1. In each iteration, decide if a task is processable if all of its 
partitions contains data, so it can decide which record to process next.
    
    1.a Add one exception that, if the task indeed have data on some but not 
all of its partitions, we only consider as not processable for some finite 
round of iterations.
    1.b Add a task-level metric to record whenever we are forced to process a 
task that is only "partially data available", since it may leads to 
non-determinism.
    
    2. Break the main loop on put-raw-data and process-them. Since now not all 
data put into the queue would be processed completely within a single iteration.
    
    3. NOTE that within an iteration, if a task has exhausted one of its queue 
it will still be processed, since we only update processable list once in each 
iteration, I'm improving on this on the follow-up part III PR.
    
    4. Found and fixed a bug in metrics recording: the taskName and sensorName 
parameters were exchanged.
    
    5. Optimized task stream time computation again since our current partition 
stream time reasoning has been simplified.
    
    6. Added unit tests.
    
    Reviewers: Matthias J. Sax <[email protected]>, John Roesler 
<[email protected]>, Bill Bejeck <[email protected]>
---
 .../processor/internals/AssignedStreamsTasks.java  |   5 +-
 .../processor/internals/PartitionGroup.java        |  55 +++---
 .../streams/processor/internals/StreamTask.java    |  33 +++-
 .../streams/processor/internals/StreamThread.java  |  13 +-
 .../internals/metrics/StreamsMetricsImpl.java      |   8 +-
 .../kafka/streams/state/internals/NamedCache.java  |   2 +-
 .../integration/utils/IntegrationTestUtils.java    |  41 ++++
 .../internals/AssignedStreamsTasksTest.java        |  27 +++
 .../processor/internals/PartitionGroupTest.java    |  16 +-
 .../processor/internals/StreamTaskTest.java        | 212 +++++++++------------
 ...StreamToTableJoinScalaIntegrationTestBase.scala |  10 +-
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |   4 -
 12 files changed, 242 insertions(+), 184 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index f98e635..0a83965 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -87,11 +87,13 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
      */
     int process() {
         int processed = 0;
+
         final Iterator<Map.Entry<TaskId, StreamTask>> it = 
running.entrySet().iterator();
         while (it.hasNext()) {
             final StreamTask task = it.next().getValue();
+
             try {
-                if (task.process()) {
+                if (task.isProcessable() && task.process()) {
                     processed++;
                 }
             } catch (final TaskMigratedException e) {
@@ -108,6 +110,7 @@ class AssignedStreamsTasks extends 
AssignedTasks<StreamTask> implements Restorin
                 throw e;
             }
         }
+
         return processed;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 34252bf..f17c63a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -27,15 +27,23 @@ import java.util.Set;
 
 /**
  * A PartitionGroup is composed from a set of partitions. It also maintains 
the timestamp of this
- * group, hence the associated task as the min timestamp across all partitions 
in the group.
+ * group, a.k.a. the stream time of the associated task. It is defined as the 
maximum timestamp of
+ * all the records having been retrieved for processing from this 
PartitionGroup so far.
+ *
+ * We decide from which partition to retrieve the next record to process based 
on partitions' timestamps.
+ * The timestamp of a specific partition is initialized as UNKNOWN (-1), and 
is updated with the head record's timestamp
+ * if it is smaller (i.e. it should be monotonically increasing); when the 
partition's buffer becomes empty and there is
+ * no head record, the partition's timestamp will not be updated any more.
  */
 public class PartitionGroup {
 
     private final Map<TopicPartition, RecordQueue> partitionQueues;
-
     private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
+
     private long streamTime;
     private int totalBuffered;
+    private boolean allBuffered;
+
 
     public static class RecordInfo {
         RecordQueue queue;
@@ -53,11 +61,11 @@ public class PartitionGroup {
         }
     }
 
-
     PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) {
         nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::timestamp));
         this.partitionQueues = partitionQueues;
         totalBuffered = 0;
+        allBuffered = false;
         streamTime = RecordQueue.NOT_KNOWN;
     }
 
@@ -79,17 +87,16 @@ public class PartitionGroup {
             if (record != null) {
                 --totalBuffered;
 
-                if (!queue.isEmpty()) {
+                if (queue.isEmpty()) {
+                    // if a certain queue has been drained, reset the flag
+                    allBuffered = false;
+                } else {
                     nonEmptyQueuesByTime.offer(queue);
                 }
 
-                // Since this was previously a queue with min timestamp,
-                // streamTime could only advance if this queue's time did.
-                if (queue.timestamp() > streamTime) {
-                    computeStreamTime();
-                }
+                // always update the stream time to the record's timestamp yet 
to be processed if it is larger
+                streamTime = Math.max(streamTime, record.timestamp);
             }
-
         }
 
         return record;
@@ -106,17 +113,18 @@ public class PartitionGroup {
         final RecordQueue recordQueue = partitionQueues.get(partition);
 
         final int oldSize = recordQueue.size();
-        final long oldTimestamp = recordQueue.timestamp();
         final int newSize = recordQueue.addRawRecords(rawRecords);
 
         // add this record queue to be considered for processing in the future 
if it was empty before
         if (oldSize == 0 && newSize > 0) {
             nonEmptyQueuesByTime.offer(recordQueue);
-        }
 
-        // Adding to this queue could only advance streamTime if it was 
previously the queue with min timestamp (= streamTime)
-        if (oldTimestamp <= streamTime && recordQueue.timestamp() > 
streamTime) {
-            computeStreamTime();
+            // if all partitions now are non-empty, set the flag
+            // we do not need to update the stream time here since this task 
will definitely be
+            // processed next, and hence the stream time will be updated when 
we retrieved records by then
+            if (nonEmptyQueuesByTime.size() == this.partitionQueues.size()) {
+                allBuffered = true;
+            }
         }
 
         totalBuffered += newSize - oldSize;
@@ -136,18 +144,6 @@ public class PartitionGroup {
         return streamTime;
     }
 
-    private void computeStreamTime() {
-        // we should always return the smallest timestamp of all partitions
-        // to avoid group partition time goes backward
-        long timestamp = Long.MAX_VALUE;
-        for (final RecordQueue queue : partitionQueues.values()) {
-            if (queue.timestamp() < timestamp) {
-                timestamp = queue.timestamp();
-            }
-        }
-        this.streamTime = timestamp;
-    }
-
     /**
      * @throws IllegalStateException if the record's partition does not belong 
to this partition group
      */
@@ -165,7 +161,12 @@ public class PartitionGroup {
         return totalBuffered;
     }
 
+    boolean allPartitionsBuffered() {
+        return allBuffered;
+    }
+
     public void close() {
+        clear();
         partitionQueues.clear();
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7f121fe..6f3b031 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -59,6 +59,8 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
     private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new 
ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
 
+    private static final int WAIT_ON_PARTIAL_INPUT = 5;
+
     private final PartitionGroup partitionGroup;
     private final PartitionGroup.RecordInfo recordInfo;
     private final PunctuationQueue streamTimePunctuationQueue;
@@ -72,12 +74,14 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     private boolean commitRequested = false;
     private boolean commitOffsetNeeded = false;
     private boolean transactionInFlight = false;
+    private int waits = WAIT_ON_PARTIAL_INPUT;
     private final Time time;
     private final TaskMetrics taskMetrics;
 
     protected static final class TaskMetrics {
         final StreamsMetricsImpl metrics;
         final Sensor taskCommitTimeSensor;
+        final Sensor taskEnforcedProcessSensor;
         private final String taskName;
 
 
@@ -108,7 +112,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
             // add the operation metrics with additional tags
             final Map<String, String> tagMap = metrics.tagMap("task-id", 
taskName);
-            taskCommitTimeSensor = metrics.taskLevelSensor("commit", taskName, 
Sensor.RecordingLevel.DEBUG, parent);
+            taskCommitTimeSensor = metrics.taskLevelSensor(taskName, "commit", 
Sensor.RecordingLevel.DEBUG, parent);
             taskCommitTimeSensor.add(
                 new MetricName("commit-latency-avg", group, "The average 
latency of commit operation.", tagMap),
                 new Avg()
@@ -125,6 +129,18 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
                 new MetricName("commit-total", group, "The total number of 
occurrence of commit operations.", tagMap),
                 new Count()
             );
+
+            // add the metrics for enforced processing
+            taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, 
"enforced-process", Sensor.RecordingLevel.DEBUG, parent);
+            taskEnforcedProcessSensor.add(
+                    new MetricName("enforced-process-rate", group, "The 
average number of occurrence of enforced-process per second.", tagMap),
+                    new Rate(TimeUnit.SECONDS, new Count())
+            );
+            taskEnforcedProcessSensor.add(
+                    new MetricName("enforced-process-total", group, "The total 
number of occurrence of enforced-process operations.", tagMap),
+                    new Count()
+            );
+
         }
 
         void removeAllSensors() {
@@ -264,6 +280,21 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     }
 
     /**
+     * An active task is processable if its buffer contains data for all of 
its input source topic partitions
+     */
+    public boolean isProcessable() {
+        if (partitionGroup.allPartitionsBuffered()) {
+            return true;
+        } else if (partitionGroup.numBuffered() > 0 && --waits < 0) {
+            taskMetrics.taskEnforcedProcessSensor.record();
+            waits = WAIT_ON_PARTIAL_INPUT;
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
      * Process one record.
      *
      * @return true if this method processes a record, false if it does not 
process a record.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 428aa1d..42f55ef 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -831,18 +831,21 @@ public class StreamThread extends Thread {
             }
         }
 
-        if (records != null && !records.isEmpty() && 
taskManager.hasActiveRunningTasks()) {
+        if (records != null && !records.isEmpty()) {
             streamsMetrics.pollTimeSensor.record(computeLatency(), 
timerStartedMs);
             addRecordsToTasks(records);
+        }
+
+        if (taskManager.hasActiveRunningTasks()) {
             final long totalProcessed = 
processAndMaybeCommit(recordsProcessedBeforeCommit);
             if (totalProcessed > 0) {
                 final long processLatency = computeLatency();
                 streamsMetrics.processTimeSensor.record(processLatency / 
(double) totalProcessed, timerStartedMs);
                 processedBeforeCommit = adjustRecordsProcessedBeforeCommit(
-                    recordsProcessedBeforeCommit,
-                    totalProcessed,
-                    processLatency,
-                    commitTimeMs);
+                        recordsProcessedBeforeCommit,
+                        totalProcessed,
+                        processLatency,
+                        commitTimeMs);
             }
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 662ded5..e99a5b3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -88,13 +88,13 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     }
 
     public final Sensor taskLevelSensor(final String taskName,
-                                         final String sensorName,
-                                         final Sensor.RecordingLevel 
recordingLevel,
-                                         final Sensor... parents) {
+                                        final String sensorName,
+                                        final Sensor.RecordingLevel 
recordingLevel,
+                                        final Sensor... parents) {
         final String key = threadName + "." + taskName;
         synchronized (taskLevelSensors) {
             if (!taskLevelSensors.containsKey(key)) {
-                taskLevelSensors.put(key, new LinkedList<String>());
+                taskLevelSensors.put(key, new LinkedList<>());
             }
 
             final String fullSensorName = key + "." + sensorName;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 77b9c1e..12b4cf3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -370,7 +370,7 @@ class NamedCache {
                 "record-cache-id", "all",
                 "task-id", taskName
             );
-            final Sensor taskLevelHitRatioSensor = 
metrics.taskLevelSensor("hitRatio", taskName, Sensor.RecordingLevel.DEBUG);
+            final Sensor taskLevelHitRatioSensor = 
metrics.taskLevelSensor(taskName, "hitRatio", Sensor.RecordingLevel.DEBUG);
             taskLevelHitRatioSensor.add(
                 new MetricName("hitRatio-avg", group, "The average cache hit 
ratio.", allMetricTags),
                 new Avg()
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 749d748..1a78ed3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -50,7 +50,9 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -350,6 +352,45 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
+    public static <K, V> List<KeyValue<K, V>> 
waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig,
+                                                                               
     final String topic,
+                                                                               
     final List<KeyValue<K, V>> expectedRecords) throws InterruptedException {
+        return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, 
expectedRecords, DEFAULT_TIMEOUT);
+    }
+
+    public static <K, V> List<KeyValue<K, V>> 
waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig,
+                                                                               
     final String topic,
+                                                                               
     final List<KeyValue<K, V>> expectedRecords,
+                                                                               
     final long waitTime) throws InterruptedException {
+        final List<KeyValue<K, V>> accumData = new ArrayList<>();
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final List<KeyValue<K, V>> readData =
+                            readKeyValues(topic, consumer, waitTime, 
expectedRecords.size());
+                    accumData.addAll(readData);
+
+                    final Map<K, V> finalData = new HashMap<>();
+
+                    for (final KeyValue<K, V> keyValue : accumData) {
+                        finalData.put(keyValue.key, keyValue.value);
+                    }
+
+                    for (final KeyValue<K, V> keyValue : expectedRecords) {
+                        if 
(!keyValue.value.equals(finalData.get(keyValue.key)))
+                            return false;
+                    }
+
+                    return true;
+                }
+            };
+            final String conditionDetails = "Did not receive all " + 
expectedRecords + " records from topic " + topic;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
+        return accumData;
+    }
+
     public static <K, V> List<ConsumerRecord<K, V>> 
waitUntilMinRecordsReceived(final Properties consumerConfig,
                                                                                
 final String topic,
                                                                                
 final int expectedNumRecords,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 8a8d625..7efe653 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -337,6 +337,7 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
         mockTaskInitialization();
+        EasyMock.expect(t1.isProcessable()).andReturn(true);
         t1.process();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
@@ -354,6 +355,32 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
+    public void shouldNotProcessUnprocessableTasks() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.isProcessable()).andReturn(false);
+        EasyMock.replay(t1);
+        addAndInitTask();
+
+        assertThat(assignedTasks.process(), equalTo(0));
+
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldAlwaysProcessProcessableTasks() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.isProcessable()).andReturn(true);
+        EasyMock.expect(t1.process()).andReturn(true).once();
+        EasyMock.replay(t1);
+
+        addAndInitTask();
+
+        assertThat(assignedTasks.process(), equalTo(1));
+
+        EasyMock.verify(t1);
+    }
+
+    @Test
     public void shouldPunctuateRunningTasks() {
         mockTaskInitialization();
         EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index b3123e4..2df4f66 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -88,12 +88,12 @@ public class PartitionGroupTest {
         group.addRawRecords(partition2, list2);
         // 1:[1, 3, 5]
         // 2:[2, 4, 6]
-        // st: 1
+        // st: -1 since no records was being processed yet
 
         assertEquals(6, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(3, group.numBuffered(partition2));
-        assertEquals(1L, group.timestamp());
+        assertEquals(-1L, group.timestamp());
 
         StampedRecord record;
         final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
@@ -108,7 +108,7 @@ public class PartitionGroupTest {
         assertEquals(5, group.numBuffered());
         assertEquals(2, group.numBuffered(partition1));
         assertEquals(3, group.numBuffered(partition2));
-        assertEquals(2L, group.timestamp());
+        assertEquals(1L, group.timestamp());
 
         // get one record, now the time should be advanced
         record = group.nextRecord(info);
@@ -120,7 +120,7 @@ public class PartitionGroupTest {
         assertEquals(4, group.numBuffered());
         assertEquals(2, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
+        assertEquals(2L, group.timestamp());
 
         // add 2 more records with timestamp 2, 4 to partition-1
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -134,7 +134,7 @@ public class PartitionGroupTest {
         assertEquals(6, group.numBuffered());
         assertEquals(4, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
+        assertEquals(2L, group.timestamp());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
@@ -146,7 +146,7 @@ public class PartitionGroupTest {
         assertEquals(5, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
-        assertEquals(4L, group.timestamp());
+        assertEquals(3L, group.timestamp());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
@@ -158,7 +158,7 @@ public class PartitionGroupTest {
         assertEquals(4, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(1, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        assertEquals(4L, group.timestamp());
 
         // get one more record, now time should be advanced
         record = group.nextRecord(info);
@@ -206,7 +206,7 @@ public class PartitionGroupTest {
         assertEquals(0, group.numBuffered());
         assertEquals(0, group.numBuffered(partition1));
         assertEquals(0, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        assertEquals(6L, group.timestamp());
 
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index bfbb2a0..146bcb3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -57,6 +57,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -111,7 +112,7 @@ public class StreamTaskTest {
 
     private final ProcessorTopology topology = ProcessorTopology.withSources(
         Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, 
processorSystemTime),
-        mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, 
(SourceNode) source2))
+        mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2))
     );
 
     private final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -309,97 +310,6 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testMaybePunctuateStreamTime() {
-        task = createStatelessTask(createConfig(false));
-        task.initializeStateStores();
-        task.initializeTopology();
-
-        task.addRecords(partition1, Arrays.asList(
-            getConsumerRecord(partition1, 0),
-            getConsumerRecord(partition1, 20),
-            getConsumerRecord(partition1, 32),
-            getConsumerRecord(partition1, 40),
-            getConsumerRecord(partition1, 60)
-        ));
-
-        task.addRecords(partition2, Arrays.asList(
-            getConsumerRecord(partition2, 25),
-            getConsumerRecord(partition2, 35),
-            getConsumerRecord(partition2, 45),
-            getConsumerRecord(partition2, 61)
-        ));
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(8, task.numBuffered());
-        assertEquals(1, source1.numReceived);
-        assertEquals(0, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(7, task.numBuffered());
-        assertEquals(2, source1.numReceived);
-        assertEquals(0, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(6, task.numBuffered());
-        assertEquals(2, source1.numReceived);
-        assertEquals(1, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(5, task.numBuffered());
-        assertEquals(3, source1.numReceived);
-        assertEquals(1, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(4, task.numBuffered());
-        assertEquals(3, source1.numReceived);
-        assertEquals(2, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(3, task.numBuffered());
-        assertEquals(4, source1.numReceived);
-        assertEquals(2, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(2, task.numBuffered());
-        assertEquals(4, source1.numReceived);
-        assertEquals(3, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(1, task.numBuffered());
-        assertEquals(5, source1.numReceived);
-        assertEquals(3, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(0, task.numBuffered());
-        assertEquals(5, source1.numReceived);
-        assertEquals(4, source2.numReceived);
-
-        assertFalse(task.process());
-        assertFalse(task.maybePunctuateStreamTime());
-
-        
processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 0L, 20L, 32L, 40L, 60L);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
     public void shouldPunctuateOnceStreamTimeAfterGap() {
         task = createStatelessTask(createConfig(false));
         task.initializeStateStores();
@@ -419,64 +329,67 @@ public class StreamTaskTest {
             getConsumerRecord(partition2, 161)
         ));
 
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20
+        // st: -1
+        assertFalse(task.maybePunctuateStreamTime()); // punctuate at 20
 
+        // st: 20
         assertTrue(task.process());
         assertEquals(7, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 25
         assertTrue(task.process());
         assertEquals(6, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(1, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142
-
-        // only one punctuation after 100ms gap
         assertFalse(task.maybePunctuateStreamTime());
 
+        // st: 142
+        // punctuate at 142
         assertTrue(task.process());
         assertEquals(5, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(1, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 145
+        // only one punctuation after 100ms gap
         assertTrue(task.process());
         assertEquals(4, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(2, source2.numReceived);
+        assertFalse(task.maybePunctuateStreamTime());
 
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155
-
+        // st: 155
+        // punctuate at 155
         assertTrue(task.process());
         assertEquals(3, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(2, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 159
         assertTrue(task.process());
         assertEquals(2, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(3, source2.numReceived);
+        assertFalse(task.maybePunctuateStreamTime());
 
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160, 
still aligned on the initial punctuation
-
+        // st: 160, aligned at 0
         assertTrue(task.process());
         assertEquals(1, task.numBuffered());
         assertEquals(4, source1.numReceived);
         assertEquals(3, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 161
         assertTrue(task.process());
         assertEquals(0, task.numBuffered());
         assertEquals(4, source1.numReceived);
         assertEquals(4, source2.numReceived);
+        assertFalse(task.maybePunctuateStreamTime());
 
         assertFalse(task.process());
         assertFalse(task.maybePunctuateStreamTime());
@@ -484,9 +397,8 @@ public class StreamTaskTest {
         
processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
 20L, 142L, 155L, 160L);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testCancelPunctuateStreamTime() {
+    public void shouldRespectPunctuateCancellationStreamTime() {
         task = createStatelessTask(createConfig(false));
         task.initializeStateStores();
         task.initializeTopology();
@@ -503,12 +415,19 @@ public class StreamTaskTest {
             getConsumerRecord(partition2, 45)
         ));
 
+        assertFalse(task.maybePunctuateStreamTime());
+
+        // st is now 20
+        assertTrue(task.process());
+
         assertTrue(task.maybePunctuateStreamTime());
 
+        // st is now 25
         assertTrue(task.process());
 
         assertFalse(task.maybePunctuateStreamTime());
 
+        // st is now 30
         assertTrue(task.process());
 
         processorStreamTime.mockProcessor.scheduleCancellable.cancel();
@@ -519,6 +438,61 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldRespectPunctuateCancellationSystemTime() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+        final long now = time.milliseconds();
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        processorSystemTime.mockProcessor.scheduleCancellable.cancel();
+        time.sleep(10);
+        assertFalse(task.maybePunctuateSystemTime());
+        
processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now + 10);
+    }
+
+    @Test
+    public void shouldBeProcessableIfAllPartitionsBuffered() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        assertFalse(task.isProcessable());
+
+        final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
+
+        task.addRecords(partition1, Collections.singleton(new 
ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
+
+        assertFalse(task.isProcessable());
+
+        task.addRecords(partition2, Collections.singleton(new 
ConsumerRecord<>(topic2, 1, 0, bytes, bytes)));
+
+        assertTrue(task.isProcessable());
+    }
+
+    @Test
+    public void shouldBeProcessableIfWaitedForTooLong() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        assertFalse(task.isProcessable());
+
+        final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
+
+        task.addRecords(partition1, Collections.singleton(new 
ConsumerRecord<>(topic1, 1, 0, bytes, bytes)));
+
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+
+        assertTrue(task.isProcessable());
+    }
+
+
+    @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
         task = createStatelessTask(createConfig(false));
         task.initializeStateStores();
@@ -576,20 +550,6 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void testCancelPunctuateSystemTime() {
-        task = createStatelessTask(createConfig(false));
-        task.initializeStateStores();
-        task.initializeTopology();
-        final long now = time.milliseconds();
-        time.sleep(10);
-        assertTrue(task.maybePunctuateSystemTime());
-        processorSystemTime.mockProcessor.scheduleCancellable.cancel();
-        time.sleep(10);
-        assertFalse(task.maybePunctuateSystemTime());
-        
processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now + 10);
-    }
-
-    @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
         task = createTaskThatThrowsException();
         task.initializeStateStores();
@@ -1110,7 +1070,7 @@ public class StreamTaskTest {
     private StreamTask createStatelessTask(final StreamsConfig streamsConfig) {
         final ProcessorTopology topology = ProcessorTopology.withSources(
             Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, 
processorSystemTime),
-            mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, 
(SourceNode) source2))
+            mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2))
         );
 
         source1.addChild(processorStreamTime);
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
index 32ad793..cf87eb5 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
@@ -24,11 +24,7 @@ import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.streams._
 import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, 
IntegrationTestUtils}
-import org.apache.kafka.streams.processor.internals.StreamThread
-import org.apache.kafka.streams.scala.ImplicitConversions._
-import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.test.TestUtils
-import org.junit.Assert._
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.scalatest.junit.JUnitSuite
@@ -129,9 +125,9 @@ class StreamToTableJoinScalaIntegrationTestBase extends 
JUnitSuite with StreamTo
       // consume and verify result
       val consumerConfig = getConsumerConfig()
 
-      IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
-                                                               outputTopic,
-                                                               
expectedClicksPerRegion.size)
+      
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig,
+                                                                 outputTopic,
+                                                                 
expectedClicksPerRegion.asJava)
     } else {
       java.util.Collections.emptyList()
     }
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index e5253f9..3d1bab5 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -82,9 +82,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends StreamToTableJ
     val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
       produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
     streams.close()
-
-    import collection.JavaConverters._
-    assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), 
expectedClicksPerRegion.sortBy(_.key))
   }
 
   @Test def testShouldCountClicksPerRegionJava(): Unit = {
@@ -149,6 +146,5 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends StreamToTableJ
       produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
 
     streams.close()
-    assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), 
expectedClicksPerRegion.sortBy(_.key))
   }
 }

Reply via email to