WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1212817671


##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java:
##########
@@ -80,33 +82,65 @@ private TestingJobResultStore(
     }
 
     @Override
-    public void createDirtyResult(JobResultEntry jobResultEntry) throws 
IOException {
-        createDirtyResultConsumer.accept(jobResultEntry);
+    public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry) {
+        try {
+            createDirtyResultConsumer.accept(jobResultEntry);
+        } catch (IOException e) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
-    public void markResultAsClean(JobID jobId) throws IOException {
-        markResultAsCleanConsumer.accept(jobId);
+    public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
+        try {
+            markResultAsCleanConsumer.accept(jobId);
+        } catch (IOException e) {

Review Comment:
   I've fixed the builder of TestingJobResultStore.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##########
@@ -44,106 +45,140 @@ public interface JobResultStoreContractTest {
     JobResultStore createJobResultStore() throws IOException;
 
     @Test
-    default void testStoreJobResultsWithDuplicateIDsThrowsException() throws 
IOException {
+    default void testStoreJobResultsWithDuplicateIDsThrowsException()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
         final JobResultEntry otherEntryWithDuplicateId =
                 new JobResultEntry(
                         TestingJobResultStore.createSuccessfulJobResult(
                                 DUMMY_JOB_RESULT_ENTRY.getJobId()));
-        assertThatThrownBy(() -> 
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
-                .isInstanceOf(IllegalStateException.class);
+        assertThatThrownBy(
+                        () ->
+                                jobResultStore
+                                        
.createDirtyResultAsync(otherEntryWithDuplicateId)
+                                        .get())
+                .hasCauseInstanceOf(RuntimeException.class);
     }
 
     @Test
-    default void 
testStoreDirtyEntryForAlreadyCleanedJobResultThrowsException() throws 
IOException {
+    default void testStoreDirtyEntryForAlreadyCleanedJobResultThrowsException()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
-        assertThatThrownBy(() -> 
jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
-                .isInstanceOf(IllegalStateException.class);
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+        
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
+        assertThatThrownBy(
+                        () -> 
jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get())
+                .hasCauseInstanceOf(RuntimeException.class);
     }
 
     @Test
-    default void testCleaningDuplicateEntryThrowsNoException() throws 
IOException {
+    default void testCleaningDuplicateEntryThrowsNoException()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+        
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
         assertThatNoException()
                 .isThrownBy(
-                        () -> 
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()));
+                        () ->
+                                jobResultStore
+                                        
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                        .get());
     }
 
     @Test
     default void testCleaningNonExistentEntryThrowsException() throws 
IOException {
         JobResultStore jobResultStore = createJobResultStore();
         assertThatThrownBy(
-                        () -> 
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()))
-                .isInstanceOf(NoSuchElementException.class);
+                        () ->
+                                jobResultStore
+                                        
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                        .get())
+                .hasCauseInstanceOf(NoSuchElementException.class);
     }
 
     @Test
-    default void testHasJobResultEntryWithDirtyEntry() throws IOException {
+    default void testHasJobResultEntryWithDirtyEntry()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        
assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+        assertThat(
+                        jobResultStore
+                                
.hasDirtyJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                .get())
                 .isTrue();
-        
assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+        assertThat(
+                        jobResultStore
+                                
.hasCleanJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                .get())
                 .isFalse();
-        
assertThat(jobResultStore.hasJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId())).isTrue();
+        
assertThat(jobResultStore.hasJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get())
+                .isTrue();
     }
 
     @Test
-    default void testHasJobResultEntryWithCleanEntry() throws IOException {
+    default void testHasJobResultEntryWithCleanEntry()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
-        
assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+        
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
+        assertThat(
+                        jobResultStore
+                                
.hasDirtyJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                .get())
                 .isFalse();
-        
assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+        assertThat(
+                        jobResultStore
+                                
.hasCleanJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                .get())
+                .isTrue();
+        
assertThat(jobResultStore.hasJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get())
                 .isTrue();
-        
assertThat(jobResultStore.hasJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId())).isTrue();
     }
 
     @Test
-    default void testHasJobResultEntryWithEmptyStore() throws IOException {
+    default void testHasJobResultEntryWithEmptyStore()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
         JobID jobId = new JobID();
-        assertThat(jobResultStore.hasDirtyJobResultEntry(jobId)).isFalse();
-        assertThat(jobResultStore.hasCleanJobResultEntry(jobId)).isFalse();
-        assertThat(jobResultStore.hasJobResultEntry(jobId)).isFalse();
+        
assertThat(jobResultStore.hasDirtyJobResultEntryAsync(jobId).get()).isFalse();
+        
assertThat(jobResultStore.hasCleanJobResultEntryAsync(jobId).get()).isFalse();
+        
assertThat(jobResultStore.hasJobResultEntryAsync(jobId).get()).isFalse();
     }
 
     @Test
-    default void testGetDirtyResultsWithNoEntry() throws IOException {
+    default void testGetDirtyResultsWithNoEntry()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        assertThat(jobResultStore.getDirtyResults()).isEmpty();
+        assertThat(jobResultStore.getDirtyResultsAsync().get()).isEmpty();
     }
 
     @Test
-    default void testGetDirtyResultsWithDirtyEntry() throws IOException {
+    default void testGetDirtyResultsWithDirtyEntry()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
         assertThat(
-                        jobResultStore.getDirtyResults().stream()
+                        jobResultStore.getDirtyResultsAsync().get().stream()
                                 .map(JobResult::getJobId)
                                 .collect(Collectors.toList()))
                 .singleElement()
                 .isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobId());
     }
 
     @Test
-    default void testGetDirtyResultsWithDirtyAndCleanEntry() throws 
IOException {
+    default void testGetDirtyResultsWithDirtyAndCleanEntry()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to