zoltar9264 commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1142866450
##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java:
##########
@@ -246,6 +247,223 @@ void testFileAvailableAfterClose() throws Exception {
}
}
+ @Test
+ void testLocalFileDiscard() throws Exception {
+ long appendPersistThreshold = 100;
+ TaskChangelogRegistry taskChangelogRegistry =
+ new TaskChangelogRegistryImpl(Executors.directExecutor());
+
+ try (DiscardRecordableStateChangeUploader uploader =
+ new
DiscardRecordableStateChangeUploader(taskChangelogRegistry);
+ TestingBatchingUploadScheduler uploadScheduler =
+ new TestingBatchingUploadScheduler(uploader);
+ FsStateChangelogWriter writer =
+ new FsStateChangelogWriter(
+ UUID.randomUUID(),
+ KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
+ uploadScheduler,
+ appendPersistThreshold,
+ new SyncMailboxExecutor(),
+ taskChangelogRegistry,
+ TestLocalRecoveryConfig.enabledForTest(),
+ new LocalChangelogRegistryImpl(
+
Executors.newDirectExecutorService()))) {
+ SequenceNumber initialSqn = writer.initialSequenceNumber();
+
+ writer.append(KEY_GROUP, getBytes(10));
+
+ // checkpoint 1 trigger
+ SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
+ writer.persist(initialSqn, 1L);
+ uploadScheduler.scheduleAll(); // checkpoint 1 completed
+ writer.confirm(initialSqn, checkpoint1sqn, 1);
+
+ // trigger pre-emptive upload
+ writer.append(KEY_GROUP, getBytes(100));
+ uploadScheduler.scheduleAll();
+ writer.append(KEY_GROUP, getBytes(10));
+ // checkpoint 2 trigger
+ SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
+ CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
future2 =
+ writer.persist(initialSqn, 2L);
+ uploadScheduler.scheduleAll(); // checkpoint 2 completed
+ writer.confirm(initialSqn, checkpoint2sqn, 2);
+ SnapshotResult<ChangelogStateHandleStreamImpl> result2 =
future2.get();
+ for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+ result2.getTaskLocalSnapshot().getHandlesAndOffsets()) {
+ assertThat(uploader.isDiscarded(handleAndOffset.f0)).isFalse();
+ }
+
+ // materialization 1 trigger
+ SequenceNumber materializationSqn = writer.nextSequenceNumber();
+ writer.append(KEY_GROUP, getBytes(10));
+
+ // materialization 1 completed
+ // checkpoint 3 trigger
+ SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
+ writer.persist(materializationSqn, 3L);
+ uploadScheduler.scheduleAll(); // checkpoint 3 completed
+ writer.confirm(materializationSqn, checkpoint3sqn, 3);
+ for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+ result2.getTaskLocalSnapshot().getHandlesAndOffsets()) {
+ assertThat(uploader.isDiscarded(handleAndOffset.f0)).isTrue();
+ }
+ }
+ }
+
+ @Test
+ void testLocalFileAfterMaterialize() throws Exception {
+ // If register local files when confirm(), the following case will
fail:
+ // cp1 trigger: file1,file1'(local)
+ // JM: register [file1] to sharedRegistry
+ // cp1 complete: stopTracking [file1], register [file1'] to
localRegistry
+ // cp2 trigger: file1,file1',file2,file2'
+ // JM: register [file1,file2] to sharedRegistry
+ // cp2 complete: stopTracking [file1,file1',file2,file2'], register
[file1',file2'] to
+ // localRegistry
+ // cp1 subsume
+ // cp3 trigger: file1,file1',file2,file2',file3,file3'
+ // materialization: uploaded.clear()
Review Comment:
In fact, the status quo is that the uploader collection will not be cleared
after materialization is complete, but I think we should really do that. I will
submit a new ticket to discuss that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]