This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ef453dd1ad4 KAFKA-12634 enforce checkpoint after restoration (#13269)
ef453dd1ad4 is described below
commit ef453dd1ad4d6388bd7a74dcb0c2d2573ee945a6
Author: Philip Nee <[email protected]>
AuthorDate: Fri Apr 7 02:18:40 2023 -0700
KAFKA-12634 enforce checkpoint after restoration (#13269)
Under at-least-once, we want to ensure checkpointing the progress after
completing the restoration to prevent losing the progress and needing to
restore from scratch.
Reviewers: Guozhang Wang <[email protected]>, Bruno Cadonna
<[email protected]>
---
.../streams/processor/internals/StreamTask.java | 3 +
.../processor/internals/StreamTaskTest.java | 82 +++++++++++++++++++---
2 files changed, 76 insertions(+), 9 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a44c13d8e13..130e88ff507 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -270,6 +270,9 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
initializeTopology();
processorContext.initialize();
+ if (!eosEnabled) {
+ maybeCheckpoint(true);
+ }
transitionTo(State.RUNNING);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index f6d27f57281..3a9f8183390 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
@@ -59,7 +60,6 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorNode;
@@ -75,6 +75,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.MockitoSession;
+import org.mockito.quality.Strictness;
import java.io.File;
import java.io.IOException;
@@ -103,6 +106,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
import static
org.apache.kafka.streams.processor.internals.Task.State.RESTORING;
import static org.apache.kafka.streams.processor.internals.Task.State.RUNNING;
@@ -123,6 +127,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
@RunWith(EasyMockRunner.class)
public class StreamTaskTest {
@@ -192,6 +200,7 @@ public class StreamTaskTest {
punctuatedAt = timestamp;
}
};
+ private MockitoSession mockito;
private static ProcessorTopology withRepartitionTopics(final
List<ProcessorNode<?, ?, ?, ?>> processorNodes,
final Map<String,
SourceNode<?, ?>> sourcesByTopic,
@@ -253,6 +262,10 @@ public class StreamTaskTest {
@Before
public void setup() {
+ mockito = Mockito.mockitoSession()
+ .initMocks(this)
+ .strictness(Strictness.STRICT_STUBS)
+ .startMocking();
EasyMock.expect(stateManager.taskId()).andStubReturn(taskId);
EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
@@ -277,6 +290,7 @@ public class StreamTaskTest {
task = null;
}
Utils.delete(BASE_DIR);
+ mockito.finishMocking();
}
@Test
@@ -698,6 +712,8 @@ public class StreamTaskTest {
}
};
+
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap());
// restoration checkpoint
+
task =
createStatelessTaskWithForwardingTopology(evenKeyForwardingSourceNode);
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
@@ -1534,7 +1550,9 @@ public class StreamTaskTest {
public void shouldReInitializeTopologyWhenResuming() throws IOException {
stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
- EasyMock.expect(recordCollector.offsets()).andThrow(new
AssertionError("Should not try to read offsets")).anyTimes();
+
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap())
+ .andThrow(new AssertionError("Should not try to read
offsets")).anyTimes();
+
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap());
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.replay(recordCollector, stateDirectory, stateManager);
@@ -1577,6 +1595,7 @@ public class StreamTaskTest {
stateManager.checkpoint();
EasyMock.expectLastCall().once();
EasyMock.expect(stateManager.changelogOffsets())
+ .andReturn(singletonMap(changelogPartition, 10L)) // restoration
checkpoint
.andReturn(singletonMap(changelogPartition, 10L))
.andReturn(singletonMap(changelogPartition, 20L));
EasyMock.expectLastCall();
@@ -1700,6 +1719,7 @@ public class StreamTaskTest {
@Test
public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
+
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap());
// restoration checkpoint
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expectLastCall();
stateManager.close();
@@ -1730,7 +1750,8 @@ public class StreamTaskTest {
consumer.assign(asList(partition1, repartition));
consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
// restoration checkpoint
+
EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
@@ -1748,7 +1769,7 @@ public class StreamTaskTest {
mkSet(partition1, repartition),
topology,
consumer,
- new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
+ new TopologyConfig(null, config, new Properties()).getTaskConfig(),
streamsMetrics,
stateDirectory,
cache,
@@ -1842,11 +1863,13 @@ public class StreamTaskTest {
@Test
public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
EasyMock.expect(stateManager.changelogOffsets())
+ .andReturn(singletonMap(partition1, 0L)) // restoration
checkpoint
.andReturn(singletonMap(partition1, 1L))
.andReturn(singletonMap(partition1, 2L));
+
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
stateManager.checkpoint();
- EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint
should not be called")).anyTimes();
- EasyMock.replay(stateManager);
+ EasyMock.expectLastCall().once(); // checkpoint should only be called
once
+ EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
@@ -1862,12 +1885,14 @@ public class StreamTaskTest {
@Test
public void shouldCheckpointForSuspendedRunningTaskWithLargeProgress() {
+
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
// restoration checkpoint
EasyMock.expect(stateManager.changelogOffsets())
+ .andReturn(singletonMap(partition1, 0L))
.andReturn(singletonMap(partition1, 12000L))
.andReturn(singletonMap(partition1, 24000L));
stateManager.checkpoint();
EasyMock.expectLastCall().times(2);
- EasyMock.replay(stateManager);
+ EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
@@ -1889,8 +1914,9 @@ public class StreamTaskTest {
stateManager.updateChangelogOffsets(EasyMock.eq(checkpointableOffsets));
EasyMock.expectLastCall().once();
EasyMock.expect(stateManager.changelogOffsets())
+ .andReturn(Collections.emptyMap()) // restoration checkpoint
.andReturn(checkpointableOffsets);
-
EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).once();
+
EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).times(2);
EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig(), true);
@@ -2029,6 +2055,7 @@ public class StreamTaskTest {
EasyMock.expectLastCall().once();
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(stateManager.changelogOffsets())
+ .andReturn(singletonMap(partition1, offset + 10000L)) //
restoration checkpoint
.andReturn(singletonMap(partition1, offset + 12000L));
EasyMock.replay(recordCollector, stateManager);
@@ -2091,6 +2118,10 @@ public class StreamTaskTest {
public void shouldThrowOnCloseCleanFlushError() {
final long offset = 543L;
+ stateManager.flush(); // restoration checkpoint
+ EasyMock.expectLastCall();
+ stateManager.checkpoint(); // checkpoint upon restoration
+ EasyMock.expectLastCall();
EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition,
offset));
stateManager.flushCache();
EasyMock.expectLastCall().andThrow(new
ProcessorStateException("KABOOM!")).anyTimes();
@@ -2310,6 +2341,7 @@ public class StreamTaskTest {
EasyMock.expectLastCall().once();
recordCollector.closeClean();
EasyMock.expectLastCall().once();
+
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).once();
EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig("100"), true);
@@ -2350,8 +2382,9 @@ public class StreamTaskTest {
@Test
public void shouldAlwaysSuspendRunningTasks() {
+
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
// restoration checkpoint
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
- EasyMock.replay(stateManager);
+ EasyMock.replay(stateManager, recordCollector);
task = createFaultyStatefulTask(createConfig("100"));
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
@@ -2515,6 +2548,37 @@ public class StreamTaskTest {
);
}
+ @Test
+ public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+ final ProcessorStateManager processorStateManager = mockStateManager();
+ recordCollector = mock(RecordCollectorImpl.class);
+
+ task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true,
processorStateManager);
+ task.initializeIfNeeded();
+ task.completeRestoration(noOpResetter -> { });
+ verify(processorStateManager).checkpoint();
+ }
+
+ @Test
+ public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
+ final ProcessorStateManager processorStateManager = mockStateManager();
+ recordCollector = mock(RecordCollectorImpl.class);
+
+ task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true,
processorStateManager);
+ task.initializeIfNeeded();
+ task.completeRestoration(noOpResetter -> { });
+ verify(processorStateManager, never()).checkpoint();
+ verify(processorStateManager, never()).changelogOffsets();
+ verify(recordCollector, never()).offsets();
+ }
+
+ private ProcessorStateManager mockStateManager() {
+ final ProcessorStateManager manager =
mock(ProcessorStateManager.class);
+ doReturn(TaskType.ACTIVE).when(manager).taskType();
+ doReturn(taskId).when(manager).taskId();
+ return manager;
+ }
+
private List<MetricName> getTaskMetrics() {
return metrics.metrics().keySet().stream().filter(m ->
m.tags().containsKey("task-id")).collect(Collectors.toList());
}