This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef453dd1ad4 KAFKA-12634 enforce checkpoint after restoration (#13269)
ef453dd1ad4 is described below

commit ef453dd1ad4d6388bd7a74dcb0c2d2573ee945a6
Author: Philip Nee <[email protected]>
AuthorDate: Fri Apr 7 02:18:40 2023 -0700

    KAFKA-12634 enforce checkpoint after restoration (#13269)
    
    Under at-least-once, we want to ensure checkpointing the progress after 
completing the restoration to prevent losing the progress and needing to 
restore from scratch.
    
    Reviewers: Guozhang Wang <[email protected]>, Bruno Cadonna 
<[email protected]>
---
 .../streams/processor/internals/StreamTask.java    |  3 +
 .../processor/internals/StreamTaskTest.java        | 82 +++++++++++++++++++---
 2 files changed, 76 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a44c13d8e13..130e88ff507 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -270,6 +270,9 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
                 initializeTopology();
                 processorContext.initialize();
+                if (!eosEnabled) {
+                    maybeCheckpoint(true);
+                }
 
                 transitionTo(State.RUNNING);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index f6d27f57281..3a9f8183390 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
@@ -59,7 +60,6 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockProcessorNode;
@@ -75,6 +75,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.MockitoSession;
+import org.mockito.quality.Strictness;
 
 import java.io.File;
 import java.io.IOException;
@@ -103,6 +106,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 import static 
org.apache.kafka.streams.processor.internals.Task.State.RESTORING;
 import static org.apache.kafka.streams.processor.internals.Task.State.RUNNING;
@@ -123,6 +127,10 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 @RunWith(EasyMockRunner.class)
 public class StreamTaskTest {
@@ -192,6 +200,7 @@ public class StreamTaskTest {
             punctuatedAt = timestamp;
         }
     };
+    private MockitoSession mockito;
 
     private static ProcessorTopology withRepartitionTopics(final 
List<ProcessorNode<?, ?, ?, ?>> processorNodes,
                                                            final Map<String, 
SourceNode<?, ?>> sourcesByTopic,
@@ -253,6 +262,10 @@ public class StreamTaskTest {
 
     @Before
     public void setup() {
+        mockito = Mockito.mockitoSession()
+            .initMocks(this)
+            .strictness(Strictness.STRICT_STUBS)
+            .startMocking();
         EasyMock.expect(stateManager.taskId()).andStubReturn(taskId);
         
EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
 
@@ -277,6 +290,7 @@ public class StreamTaskTest {
             task = null;
         }
         Utils.delete(BASE_DIR);
+        mockito.finishMocking();
     }
 
     @Test
@@ -698,6 +712,8 @@ public class StreamTaskTest {
             }
         };
 
+        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap());
 // restoration checkpoint
+
         task = 
createStatelessTaskWithForwardingTopology(evenKeyForwardingSourceNode);
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
@@ -1534,7 +1550,9 @@ public class StreamTaskTest {
     public void shouldReInitializeTopologyWhenResuming() throws IOException {
         stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
         EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
-        EasyMock.expect(recordCollector.offsets()).andThrow(new 
AssertionError("Should not try to read offsets")).anyTimes();
+        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap())
+            .andThrow(new AssertionError("Should not try to read 
offsets")).anyTimes();
+        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap());
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
 
         EasyMock.replay(recordCollector, stateDirectory, stateManager);
@@ -1577,6 +1595,7 @@ public class StreamTaskTest {
         stateManager.checkpoint();
         EasyMock.expectLastCall().once();
         EasyMock.expect(stateManager.changelogOffsets())
+            .andReturn(singletonMap(changelogPartition, 10L)) // restoration 
checkpoint
             .andReturn(singletonMap(changelogPartition, 10L))
             .andReturn(singletonMap(changelogPartition, 20L));
         EasyMock.expectLastCall();
@@ -1700,6 +1719,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap());
 // restoration checkpoint
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.expectLastCall();
         stateManager.close();
@@ -1730,7 +1750,8 @@ public class StreamTaskTest {
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
 // restoration checkpoint
+        
EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
@@ -1748,7 +1769,7 @@ public class StreamTaskTest {
             mkSet(partition1, repartition),
             topology,
             consumer,
-            new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
+            new TopologyConfig(null, config, new Properties()).getTaskConfig(),
             streamsMetrics,
             stateDirectory,
             cache,
@@ -1842,11 +1863,13 @@ public class StreamTaskTest {
     @Test
     public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
         EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L)) // restoration 
checkpoint
                 .andReturn(singletonMap(partition1, 1L))
                 .andReturn(singletonMap(partition1, 2L));
+        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
         stateManager.checkpoint();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint 
should not be called")).anyTimes();
-        EasyMock.replay(stateManager);
+        EasyMock.expectLastCall().once(); // checkpoint should only be called 
once
+        EasyMock.replay(stateManager, recordCollector);
 
         task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();
@@ -1862,12 +1885,14 @@ public class StreamTaskTest {
 
     @Test
     public void shouldCheckpointForSuspendedRunningTaskWithLargeProgress() {
+        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()); 
// restoration checkpoint
         EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L))
                 .andReturn(singletonMap(partition1, 12000L))
                 .andReturn(singletonMap(partition1, 24000L));
         stateManager.checkpoint();
         EasyMock.expectLastCall().times(2);
-        EasyMock.replay(stateManager);
+        EasyMock.replay(stateManager, recordCollector);
 
         task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();
@@ -1889,8 +1914,9 @@ public class StreamTaskTest {
         
stateManager.updateChangelogOffsets(EasyMock.eq(checkpointableOffsets));
         EasyMock.expectLastCall().once();
         EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(Collections.emptyMap()) // restoration checkpoint
                 .andReturn(checkpointableOffsets);
-        
EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).once();
+        
EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).times(2);
         EasyMock.replay(stateManager, recordCollector);
 
         task = createStatefulTask(createConfig(), true);
@@ -2029,6 +2055,7 @@ public class StreamTaskTest {
         EasyMock.expectLastCall().once();
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, offset + 10000L)) // 
restoration checkpoint
                 .andReturn(singletonMap(partition1, offset + 12000L));
         EasyMock.replay(recordCollector, stateManager);
 
@@ -2091,6 +2118,10 @@ public class StreamTaskTest {
     public void shouldThrowOnCloseCleanFlushError() {
         final long offset = 543L;
 
+        stateManager.flush(); // restoration checkpoint
+        EasyMock.expectLastCall();
+        stateManager.checkpoint(); // checkpoint upon restoration
+        EasyMock.expectLastCall();
         
EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition,
 offset));
         stateManager.flushCache();
         EasyMock.expectLastCall().andThrow(new 
ProcessorStateException("KABOOM!")).anyTimes();
@@ -2310,6 +2341,7 @@ public class StreamTaskTest {
         EasyMock.expectLastCall().once();
         recordCollector.closeClean();
         EasyMock.expectLastCall().once();
+        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).once();
         EasyMock.replay(stateManager, recordCollector);
 
         task = createStatefulTask(createConfig("100"), true);
@@ -2350,8 +2382,9 @@ public class StreamTaskTest {
 
     @Test
     public void shouldAlwaysSuspendRunningTasks() {
+        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()); 
// restoration checkpoint
         
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager);
+        EasyMock.replay(stateManager, recordCollector);
         task = createFaultyStatefulTask(createConfig("100"));
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
@@ -2515,6 +2548,37 @@ public class StreamTaskTest {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, 
processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager).checkpoint();
+    }
+
+    @Test
+    public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+
+        task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true, 
processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, never()).checkpoint();
+        verify(processorStateManager, never()).changelogOffsets();
+        verify(recordCollector, never()).offsets();
+    }
+
+    private ProcessorStateManager mockStateManager() {
+        final ProcessorStateManager manager = 
mock(ProcessorStateManager.class);
+        doReturn(TaskType.ACTIVE).when(manager).taskType();
+        doReturn(taskId).when(manager).taskId();
+        return manager;
+    }
+
     private List<MetricName> getTaskMetrics() {
         return metrics.metrics().keySet().stream().filter(m -> 
m.tags().containsKey("task-id")).collect(Collectors.toList());
     }

Reply via email to