fredia commented on code in PR #21895:
URL: https://github.com/apache/flink/pull/21895#discussion_r1100916405


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java:
##########
@@ -201,6 +201,29 @@ void testPersistNonFailedChanges() throws Exception {
                 });
     }
 
+    @Test
+    void testPreEmptivelyUpload() throws Exception {
+        int preEmptivelyUploadThreshold = 100;
+        withWriter(
+                preEmptivelyUploadThreshold,
+                (writer, uploader) -> {
+                    writer.append(KEY_GROUP, getBytes(10));
+
+                    // trigger materialization, move ChangeSet[sqn=0] to 
notUploaded
+                    SequenceNumber materializationSqn = 
writer.nextSequenceNumber();
+
+                    // trigger pre-emptively upload
+                    writer.append(KEY_GROUP, getBytes(100));
+
+                    // assert pre-emptively upload happened
+                    SequenceNumber sqnAfterPreEmptivelyUpload = 
writer.lastAppendedSqnUnsafe();
+                    
assertThat(sqnAfterPreEmptivelyUpload).isEqualTo(materializationSqn.next());
+
+                    // assert pre-emptively upload not persisted older data
+                    assertThat(writer.getNotUploaded()).isNotEmpty();

Review Comment:
   Could you please add some checks on when the notUploaded StateChangeSet will 
be deleted?
   Or move 
https://github.com/apache/flink/pull/21812/files#diff-cc303bbf16bfae7907b8983d0ef8ab52b964135d5e3efec60515d0cf377fab66R113
 to here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to