This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d69a79  KAFKA-4641: Add more unit test for stream thread (#4531)
5d69a79 is described below

commit 5d69a79948bce0fab4705c7d7d0f3b02f2548c0d
Author: Guozhang Wang <[email protected]>
AuthorDate: Wed Feb 7 14:08:53 2018 -0800

    KAFKA-4641: Add more unit test for stream thread (#4531)
    
    Before the patch, jacoco coverage test:
    
    Element     Missed Instructions     Cov.    Missed Branches Cov.    Missed  
Cxty    Missed  Lines   Missed  Methods Missed  Classes
    Total       3,386 of 22,177 84%     336 of 1,639    79%     350     1,589   
526     4,451   103     768     1       102
    StreamThread                77%             76%     27      102     48      
299     1       31      0       1
    After the patch:
    
    Element     Missed Instructions     Cov.    Missed Branches Cov.    Missed  
Cxty    Missed  Lines   Missed  Methods Missed  Classes
    Total       3,329 of 22,180 84%     329 of 1,639    79%     345     1,590   
516     4,452   102     769     1       102
    StreamThread                81%             80%     23      103     39      
300     1       32      0       1
    
    
    Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax 
<[email protected]>, Damian Guy <[email protected]>
---
 .../streams/processor/internals/StreamThread.java  |   6 +-
 .../processor/internals/StreamThreadTest.java      | 161 ++++++++++++++++++++-
 2 files changed, 161 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 064a293..5e25d02 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1183,8 +1183,12 @@ public class StreamThread extends Thread {
         return sb.toString();
     }
 
-    // this is for testing only
+    // the following are for testing only
     TaskManager taskManager() {
         return taskManager;
     }
+
+    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() 
{
+        return standbyRecords;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e67fe14..cc05604 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -40,7 +41,13 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -100,13 +107,16 @@ public class StreamThreadTest {
     }
 
     private final String topic1 = "topic1";
+    private final String topic2 = "topic2";
 
     private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
     private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
+    private final TopicPartition t2p1 = new TopicPartition(topic2, 1);
 
     // task0 is unused
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
+    private final TaskId task3 = new TaskId(1, 1);
 
     private Properties configProps(final boolean enableEos) {
         return new Properties() {
@@ -129,7 +139,7 @@ public class StreamThreadTest {
     public void testPartitionAssignmentChangeForSingleGroup() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
-        final StreamThread thread = getStreamThread();
+        final StreamThread thread = createStreamThread(clientId, config, 
false);
 
         final StateListenerStub stateListener = new StateListenerStub();
         thread.setStateListener(stateListener);
@@ -685,10 +695,6 @@ public class StreamThreadTest {
         }
     }
 
-    private StreamThread getStreamThread() {
-        return createStreamThread(clientId, config, false);
-    }
-
     @Test
     public void shouldReturnActiveTaskMetadataWhileRunningState() throws 
InterruptedException {
         internalTopologyBuilder.addSource(null, "source", null, null, null, 
topic1);
@@ -759,6 +765,151 @@ public class StreamThreadTest {
         assertTrue(threadMetadata.activeTasks().isEmpty());
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldUpdateStandbyTask() {
+        final String storeName1 = "count-one";
+        final String storeName2 = "table-two";
+        final String changelogName = applicationId + "-" + storeName1 + 
"-changelog";
+        final TopicPartition partition1 = new TopicPartition(changelogName, 1);
+        final TopicPartition partition2 = t2p1;
+        internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+                .groupByKey().count(Materialized.<Object, Long, 
KeyValueStore<Bytes, byte[]>>as(storeName1));
+        internalStreamsBuilder.table(topic2, new ConsumedInternal(), new 
MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, ""));
+
+        final StreamThread thread = createStreamThread(clientId, config, 
false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
+        restoreConsumer.updatePartitions(changelogName,
+                Collections.singletonList(new PartitionInfo(changelogName,
+                        1,
+                        null,
+                        new Node[0],
+                        new Node[0])));
+
+        restoreConsumer.assign(Utils.mkSet(partition1, partition2));
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 
10L));
+        
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 
0L));
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 
10L));
+        
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 
0L));
+        // let the store1 be restored from 0 to 10; store2 be restored from 0 
to (committed offset) 5
+        clientSupplier.consumer.assign(Utils.mkSet(partition2));
+        
clientSupplier.consumer.commitSync(Collections.singletonMap(partition2, new 
OffsetAndMetadata(5L, "")));
+
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName, 1, 
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(topic2, 1, i, ("K" 
+ i).getBytes(), ("V" + i).getBytes()));
+        }
+
+        thread.setState(StreamThread.State.RUNNING);
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        // assign single partition
+        standbyTasks.put(task1, Collections.singleton(t1p1));
+        standbyTasks.put(task3, Collections.singleton(t2p1));
+
+        thread.taskManager().setAssignmentMetadata(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), standbyTasks);
+
+        
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+
+        thread.runOnce(-1);
+
+        final StandbyTask standbyTask1 = 
thread.taskManager().standbyTask(partition1);
+        final StandbyTask standbyTask2 = 
thread.taskManager().standbyTask(partition2);
+        final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, 
Long>) standbyTask1.getStore(storeName1);
+        final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, 
Long>) standbyTask2.getStore(storeName2);
+
+        assertEquals(10L, store1.approximateNumEntries());
+        assertEquals(5L, store2.approximateNumEntries());
+        assertEquals(Collections.singleton(partition2), 
restoreConsumer.paused());
+        assertEquals(1, thread.standbyRecords().size());
+        assertEquals(5, thread.standbyRecords().get(partition2).size());
+    }
+
+    @Test
+    public void shouldPunctuateActiveTask() {
+        final List<Long> punctuatedStreamTime = new ArrayList<>();
+        final List<Long> punctuatedWallClockTime = new ArrayList<>();
+        final ProcessorSupplier<Object, Object> punctuateProcessor = new 
ProcessorSupplier<Object, Object>() {
+            @Override
+            public Processor<Object, Object> get() {
+                return new Processor<Object, Object>() {
+                    @Override
+                    public void init(ProcessorContext context) {
+                        context.schedule(100L, PunctuationType.STREAM_TIME, 
new Punctuator() {
+                            @Override
+                            public void punctuate(long timestamp) {
+                                punctuatedStreamTime.add(timestamp);
+                            }
+                        });
+                        context.schedule(100L, 
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+                            @Override
+                            public void punctuate(long timestamp) {
+                                punctuatedWallClockTime.add(timestamp);
+                            }
+                        });
+                    }
+
+                    @Override
+                    public void process(Object key, Object value) { }
+
+                    @SuppressWarnings("deprecation")
+                    @Override
+                    public void punctuate(long timestamp) { }
+
+                    @Override
+                    public void close() { }
+                };
+            }
+        };
+
+        internalStreamsBuilder.stream(Collections.singleton(topic1), 
consumed).process(punctuateProcessor);
+
+        final StreamThread thread = createStreamThread(clientId, config, 
false);
+
+        thread.setState(StreamThread.State.RUNNING);
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+
+        thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        clientSupplier.consumer.assign(assignedPartitions);
+        
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+
+        thread.runOnce(-1);
+
+        assertEquals(0, punctuatedStreamTime.size());
+        assertEquals(0, punctuatedWallClockTime.size());
+
+        mockTime.sleep(100L);
+        for (long i = 0L; i < 10L; i++) {
+            clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1, 
i, i * 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" + 
i).getBytes().length, ("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" + 
i).getBytes()));
+        }
+
+        thread.runOnce(-1);
+
+        assertEquals(1, punctuatedStreamTime.size());
+        assertEquals(1, punctuatedWallClockTime.size());
+
+        mockTime.sleep(100L);
+
+        thread.runOnce(-1);
+
+        // we should skip stream time punctuation, only trigger wall-clock 
time punctuation
+        assertEquals(1, punctuatedStreamTime.size());
+        assertEquals(2, punctuatedWallClockTime.size());
+    }
+
     @Test
     public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws 
InterruptedException {
         final StreamThread thread = createStreamThread(clientId, config, 
false);

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to