This is an automated email from the ASF dual-hosted git repository. dchen pushed a commit to branch fix-rollback-stale-checkpoints in repository https://gitbox.apache.org/repos/asf/samza.git
commit b10e8a006b00122b8cd2ba73d84989541e538969 Author: Daniel Chen <[email protected]> AuthorDate: Wed Dec 22 02:03:13 2021 -0500 Fix stale v2 checkpoints on rollback --- .../java/org/apache/samza/config/TaskConfig.java | 6 +++ .../checkpoint/kafka/KafkaCheckpointManager.scala | 27 ++++++++---- .../CheckpointVersionIntegrationTest.java | 48 ++++++++++++++++++---- 3 files changed, 64 insertions(+), 17 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index 06a8727..a2b96eb 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -127,6 +127,8 @@ public class TaskConfig extends MapConfig { // checkpoint version to read during container startup public static final String CHECKPOINT_READ_VERSIONS = "task.checkpoint.read.versions"; public static final List<String> DEFAULT_CHECKPOINT_READ_VERSIONS = ImmutableList.of("1"); + public static final String LIVE_CHECKPOINT_MAX_AGE_MS = "task.live.checkpoint.max.age"; + public static final long DEFAULT_LIVE_CHECKPOINT_MAX_AGE_MS = 600000L; // 10 mins public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = "task.transactional.state.checkpoint.enabled"; private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true; @@ -360,6 +362,10 @@ public class TaskConfig extends MapConfig { } } + public long getLiveCheckpointMaxAgeMillis() { + return getLong(LIVE_CHECKPOINT_MAX_AGE_MS, DEFAULT_LIVE_CHECKPOINT_MAX_AGE_MS); + } + public boolean getTransactionalStateCheckpointEnabled() { return getBoolean(TRANSACTIONAL_STATE_CHECKPOINT_ENABLED, DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED); } diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 7dbb9b3..d6c904f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -82,6 +82,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, val stopConsumerAfterFirstRead: Boolean = new TaskConfig(config).getCheckpointManagerConsumerStopAfterFirstRead val checkpointReadVersions: util.List[lang.Short] = new TaskConfig(config).getCheckpointReadVersions + val LiveCheckpointMaxAgeMillis: Long = new TaskConfig(config).getLiveCheckpointMaxAgeMillis + /** * Create checkpoint stream prior to start. @@ -243,12 +245,15 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, */ private def readCheckpoints(): Map[TaskName, Checkpoint] = { val checkpoints = mutable.Map[TaskName, Checkpoint]() + val checkpointAppendTime = mutable.Map[TaskName, Long]() val iterator = new SystemStreamPartitionIterator(systemConsumer, checkpointSsp) var numMessagesRead = 0 while (iterator.hasNext) { val checkpointEnvelope: IncomingMessageEnvelope = iterator.next + // Kafka log append time for the checkpoint message + val checkpointEnvelopeTs = checkpointEnvelope.getEventTime; val offset = checkpointEnvelope.getOffset numMessagesRead += 1 @@ -290,9 +295,12 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, // if checkpoint key version does not match configured checkpoint version to read, skip the message. if (checkpointReadVersions.contains( KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(checkpointKey.getType))) { - if (!checkpoints.contains(checkpointKey.getTaskName) || - shouldOverrideCheckpoint(checkpoints.get(checkpointKey.getTaskName), checkpointKey)) { - checkpoints.put(checkpointKey.getTaskName, deserializeCheckpoint(checkpointKey, msgBytes)) + val taskName = checkpointKey.getTaskName + if (!checkpoints.contains(taskName) || + shouldOverrideCheckpoint(checkpoints.get(taskName), checkpointKey, checkpointAppendTime.get(taskName), + checkpointEnvelopeTs)) { + checkpoints.put(taskName, deserializeCheckpoint(checkpointKey, msgBytes)) + checkpointAppendTime.put(taskName, checkpointEnvelopeTs) } // else ignore the de-prioritized checkpoint } else { // Ignore and skip the unknown checkpoint key type. We do not want to throw any exceptions for this case @@ -375,19 +383,22 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, } } - private def shouldOverrideCheckpoint(currentCheckpoint: Option[Checkpoint], - newCheckpointKey: KafkaCheckpointLogKey): Boolean = { + private def shouldOverrideCheckpoint(currentCheckpoint: Option[Checkpoint], newCheckpointKey: KafkaCheckpointLogKey, + currentCheckpointAppendTime: Option[Long], newCheckpointAppendTime: Long): Boolean = { val newCheckpointVersion = KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(newCheckpointKey.getType) if (newCheckpointVersion == null) { // Unknown checkpoint version throw new IllegalArgumentException("Unknown checkpoint key type: " + newCheckpointKey.getType + " for checkpoint key: " + newCheckpointKey) } - // Override checkpoint if the current checkpoint does not exist or if new checkpoint has a higher restore - // priority than the currently written checkpoint + // Override checkpoint if: + // 1. The current checkpoint does not exist or + // 2. The new checkpoint has a higher restore priority than the currently written checkpoint + // 3. The current checkpoint is determined to be stale compared to the new checkpoint timestamp currentCheckpoint.isEmpty || checkpointReadVersions.indexOf(newCheckpointVersion) <= - checkpointReadVersions.indexOf(currentCheckpoint.get.getVersion) + checkpointReadVersions.indexOf(currentCheckpoint.get.getVersion) || + (newCheckpointAppendTime - currentCheckpointAppendTime.get > LiveCheckpointMaxAgeMillis) } private def deserializeCheckpoint(checkpointKey: KafkaCheckpointLogKey, checkpointMsgBytes: Array[Byte]): Checkpoint = { diff --git a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java index 66eb79c..8d99803 100644 --- a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java @@ -77,7 +77,7 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once"); // double check collectors.flush List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99"); - initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun); + runStatefulApp(inputMessagesOnInitialRun, inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun, CONFIGS); // first two are reverts for uncommitted messages from last run for keys 98 and 99 List<String> expectedChangelogMessagesAfterSecondRun = @@ -85,11 +85,42 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3"); Map<String, String> configOverrides = new HashMap<>(CONFIGS); configOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2"); - secondRun(CHANGELOG_TOPIC, - expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun, configOverrides); + finalRun(CHANGELOG_TOPIC, + expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun, + Arrays.asList("4", "5", "5", ":shutdown"),configOverrides); } - private void initialRun(List<String> inputMessages, List<String> expectedChangelogMessages) { + @Test + public void testStopCheckpointV1V2AndRestartStaleCheckpointV2() { + List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once"); + // double check collectors.flush + List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99"); + runStatefulApp(inputMessagesOnInitialRun, inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun, CONFIGS); + + + Map<String, String> secondConfigRunOverrides = new HashMap<>(CONFIGS); + // only write checkpoint v1, making checkpoint v2 stale + secondConfigRunOverrides.put(TaskConfig.CHECKPOINT_WRITE_VERSIONS, "1"); + secondConfigRunOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2, 1"); + List<String> inputMessagesOnSecondRun = Arrays.asList("77", "78", "79", ":shutdown"); + // first two are reverts for uncommitted messages from last run for keys 98 and 99 + expectedChangelogMessagesOnInitialRun = Arrays.asList(null, null, "98", "99", "77", "78", "79"); + runStatefulApp(inputMessagesOnSecondRun, inputMessagesOnSecondRun, expectedChangelogMessagesOnInitialRun, + secondConfigRunOverrides); + + // takes the latest written checkpoint v1 from run 2 since v2 checkpoints are stale + List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3", "77", "78", "79", "98", "99"); + + Map<String, String> configOverrides = new HashMap<>(CONFIGS); + configOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2, 1"); + // Does not have to rewind to the last written v2 checkpoints (1, 2, 3) despite the v2 priority + configOverrides.put(TaskConfig.LIVE_CHECKPOINT_MAX_AGE_MS, "0"); // use the latest checkpoint + finalRun(CHANGELOG_TOPIC, + Collections.emptyList(), expectedInitialStoreContentsOnSecondRun, Collections.emptyList(), configOverrides); + } + + private void runStatefulApp(List<String> inputMessages, List<String> expectedInputTopicMessages, + List<String> expectedChangelogMessages, Map<String, String> configs) { // create input topic and produce the first batch of input messages createTopic(INPUT_TOPIC, 1); inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m)); @@ -99,12 +130,12 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati List<ConsumerRecord<String, String>> inputRecords = consumeMessages(Collections.singletonList(INPUT_TOPIC), inputMessages.size()); List<String> readInputMessages = inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList()); - Assert.assertEquals(inputMessages, readInputMessages); + Assert.assertEquals(expectedInputTopicMessages, readInputMessages); } // run the application RunApplicationContext context = runApplication( - new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)), "myApp", CONFIGS); + new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)), "myApp", configs); // wait for the application to finish context.getRunner().waitForFinish(); @@ -120,14 +151,13 @@ public class CheckpointVersionIntegrationTest extends StreamApplicationIntegrati LOG.info("Finished initial run"); } - private void secondRun(String changelogTopic, List<String> expectedChangelogMessages, - List<String> expectedInitialStoreContents, Map<String, String> overriddenConfigs) { + private void finalRun(String changelogTopic, List<String> expectedChangelogMessages, + List<String> expectedInitialStoreContents, List<String> inputMessages, Map<String, String> overriddenConfigs) { // remove previous files so restore is from the checkpointV2 new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // produce the second batch of input messages - List<String> inputMessages = Arrays.asList("4", "5", "5", ":shutdown"); inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m)); // run the application
