This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3faa6cf6d0 MINOR: Use mock time in DefaultStateUpdaterTest (#12344)
3faa6cf6d0 is described below

commit 3faa6cf6d060887288fcf68adb8c3f1e2090b8ed
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Wed Jun 29 12:33:00 2022 -0700

    MINOR: Use mock time in DefaultStateUpdaterTest (#12344)
    
    For most tests we would need an auto-ticking mock timer to work with 
draining-with-timeout functions.
    For tests that check for never checkpoint we need no auto-ticking timer to 
control exactly how much time elapsed.
    
    Reviewers: Bruno Cadonna <cado...@apache.org>
---
 .../processor/internals/DefaultStateUpdater.java   |  5 ++--
 .../internals/DefaultStateUpdaterTest.java         | 34 ++++++++++++----------
 2 files changed, 22 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 0e84574c5c..886a37b314 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -356,8 +356,6 @@ public class DefaultStateUpdater implements StateUpdater {
         this.offsetResetter = offsetResetter;
         this.time = time;
         this.commitIntervalMs = 
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
-        // initialize the last commit as of now to prevent first commit 
happens immediately
-        this.lastCommitMs = time.milliseconds();
     }
 
     public void start() {
@@ -365,6 +363,9 @@ public class DefaultStateUpdater implements StateUpdater {
             stateUpdaterThread = new StateUpdaterThread("state-updater", 
changelogReader, offsetResetter);
             stateUpdaterThread.start();
             shutdownGate = new CountDownLatch(1);
+
+            // initialize the last commit as of now to prevent first commit 
happens immediately
+            this.lastCommitMs = time.milliseconds();
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 8bd81828f6..5e2d90de71 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -44,7 +45,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.common.utils.Utils.sleep;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.easymock.EasyMock.anyBoolean;
@@ -79,24 +79,25 @@ class DefaultStateUpdaterTest {
     private final static TaskId TASK_1_0 = new TaskId(1, 0);
     private final static TaskId TASK_1_1 = new TaskId(1, 1);
 
-    private final StreamsConfig config = new 
StreamsConfig(configProps(COMMIT_INTERVAL));
+    // need an auto-tick timer to work for draining with timeout
+    private final Time time = new MockTime(1L);
+    private final StreamsConfig config = new StreamsConfig(configProps());
     private final ChangelogReader changelogReader = 
mock(ChangelogReader.class);
     private final java.util.function.Consumer<Set<TopicPartition>> 
offsetResetter = topicPartitions -> { };
-
-    private DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, 
changelogReader, offsetResetter, Time.SYSTEM);
+    private final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, time);
 
     @AfterEach
     public void tearDown() {
         stateUpdater.shutdown(Duration.ofMinutes(1));
     }
 
-    private Properties configProps(final int commitInterval) {
+    private Properties configProps() {
         return mkObjectProperties(mkMap(
                 mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
                 mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:2171"),
                 mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2),
-                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
commitInterval),
-                
mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), 
commitInterval)
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL),
+                
mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), 
COMMIT_INTERVAL)
         ));
     }
 
@@ -437,16 +438,16 @@ class DefaultStateUpdaterTest {
     @Test
     public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() 
throws Exception {
         final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
-        shouldNotRemoveTaskFromRestoredActiveTasks(task, 
Collections.singleton(TOPIC_PARTITION_A_0));
+        shouldNotRemoveTaskFromRestoredActiveTasks(task);
     }
 
     @Test
     public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws 
Exception {
         final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
-        shouldNotRemoveTaskFromRestoredActiveTasks(task, 
Collections.emptySet());
+        shouldNotRemoveTaskFromRestoredActiveTasks(task);
     }
 
-    private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask 
task, final Set<TopicPartition> completedChangelogs) throws Exception {
+    private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask 
task) throws Exception {
         final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
         
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@@ -701,8 +702,10 @@ class DefaultStateUpdaterTest {
         stateUpdater.add(task2);
         stateUpdater.add(task3);
         stateUpdater.add(task4);
+        // wait for all tasks added to the thread before advance timer
+        verifyUpdatingTasks(task1, task2, task3, task4);
 
-        sleep(COMMIT_INTERVAL);
+        time.sleep(COMMIT_INTERVAL + 1);
 
         verifyExceptionsAndFailedTasks();
         verifyCheckpointTasks(false, task1, task2, task3, task4);
@@ -710,10 +713,9 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
-        stateUpdater.shutdown(Duration.ofMinutes(1));
-        final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
-        stateUpdater = new DefaultStateUpdater(config, changelogReader, 
offsetResetter, Time.SYSTEM);
-
+        // we need to use a non auto-ticking timer here to control how much 
time elapsed exactly
+        final Time time = new MockTime();
+        final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, time);
         try {
             final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
             final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
@@ -727,6 +729,8 @@ class DefaultStateUpdaterTest {
             stateUpdater.add(task3);
             stateUpdater.add(task4);
 
+            time.sleep(COMMIT_INTERVAL);
+
             verifyNeverCheckpointTasks(task1, task2, task3, task4);
         } finally {
             stateUpdater.shutdown(Duration.ofMinutes(1));

Reply via email to