WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1212817671
##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java:
##########
@@ -80,33 +82,65 @@ private TestingJobResultStore(
}
@Override
- public void createDirtyResult(JobResultEntry jobResultEntry) throws
IOException {
- createDirtyResultConsumer.accept(jobResultEntry);
+ public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry) {
+ try {
+ createDirtyResultConsumer.accept(jobResultEntry);
+ } catch (IOException e) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ return CompletableFuture.completedFuture(null);
}
@Override
- public void markResultAsClean(JobID jobId) throws IOException {
- markResultAsCleanConsumer.accept(jobId);
+ public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
+ try {
+ markResultAsCleanConsumer.accept(jobId);
+ } catch (IOException e) {
Review Comment:
I've fixed the builder of TestingJobResultStore.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##########
@@ -44,106 +45,140 @@ public interface JobResultStoreContractTest {
JobResultStore createJobResultStore() throws IOException;
@Test
- default void testStoreJobResultsWithDuplicateIDsThrowsException() throws
IOException {
+ default void testStoreJobResultsWithDuplicateIDsThrowsException()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
final JobResultEntry otherEntryWithDuplicateId =
new JobResultEntry(
TestingJobResultStore.createSuccessfulJobResult(
DUMMY_JOB_RESULT_ENTRY.getJobId()));
- assertThatThrownBy(() ->
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
- .isInstanceOf(IllegalStateException.class);
+ assertThatThrownBy(
+ () ->
+ jobResultStore
+
.createDirtyResultAsync(otherEntryWithDuplicateId)
+ .get())
+ .hasCauseInstanceOf(RuntimeException.class);
}
@Test
- default void
testStoreDirtyEntryForAlreadyCleanedJobResultThrowsException() throws
IOException {
+ default void testStoreDirtyEntryForAlreadyCleanedJobResultThrowsException()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
- jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
- assertThatThrownBy(() ->
jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
- .isInstanceOf(IllegalStateException.class);
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
+ assertThatThrownBy(
+ () ->
jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get())
+ .hasCauseInstanceOf(RuntimeException.class);
}
@Test
- default void testCleaningDuplicateEntryThrowsNoException() throws
IOException {
+ default void testCleaningDuplicateEntryThrowsNoException()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
- jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
assertThatNoException()
.isThrownBy(
- () ->
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()));
+ () ->
+ jobResultStore
+
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get());
}
@Test
default void testCleaningNonExistentEntryThrowsException() throws
IOException {
JobResultStore jobResultStore = createJobResultStore();
assertThatThrownBy(
- () ->
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()))
- .isInstanceOf(NoSuchElementException.class);
+ () ->
+ jobResultStore
+
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get())
+ .hasCauseInstanceOf(NoSuchElementException.class);
}
@Test
- default void testHasJobResultEntryWithDirtyEntry() throws IOException {
+ default void testHasJobResultEntryWithDirtyEntry()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-
assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+ assertThat(
+ jobResultStore
+
.hasDirtyJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get())
.isTrue();
-
assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+ assertThat(
+ jobResultStore
+
.hasCleanJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get())
.isFalse();
-
assertThat(jobResultStore.hasJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId())).isTrue();
+
assertThat(jobResultStore.hasJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get())
+ .isTrue();
}
@Test
- default void testHasJobResultEntryWithCleanEntry() throws IOException {
+ default void testHasJobResultEntryWithCleanEntry()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
- jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
-
assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
+ assertThat(
+ jobResultStore
+
.hasDirtyJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get())
.isFalse();
-
assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+ assertThat(
+ jobResultStore
+
.hasCleanJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get())
+ .isTrue();
+
assertThat(jobResultStore.hasJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get())
.isTrue();
-
assertThat(jobResultStore.hasJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId())).isTrue();
}
@Test
- default void testHasJobResultEntryWithEmptyStore() throws IOException {
+ default void testHasJobResultEntryWithEmptyStore()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
JobID jobId = new JobID();
- assertThat(jobResultStore.hasDirtyJobResultEntry(jobId)).isFalse();
- assertThat(jobResultStore.hasCleanJobResultEntry(jobId)).isFalse();
- assertThat(jobResultStore.hasJobResultEntry(jobId)).isFalse();
+
assertThat(jobResultStore.hasDirtyJobResultEntryAsync(jobId).get()).isFalse();
+
assertThat(jobResultStore.hasCleanJobResultEntryAsync(jobId).get()).isFalse();
+
assertThat(jobResultStore.hasJobResultEntryAsync(jobId).get()).isFalse();
}
@Test
- default void testGetDirtyResultsWithNoEntry() throws IOException {
+ default void testGetDirtyResultsWithNoEntry()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- assertThat(jobResultStore.getDirtyResults()).isEmpty();
+ assertThat(jobResultStore.getDirtyResultsAsync().get()).isEmpty();
}
@Test
- default void testGetDirtyResultsWithDirtyEntry() throws IOException {
+ default void testGetDirtyResultsWithDirtyEntry()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
assertThat(
- jobResultStore.getDirtyResults().stream()
+ jobResultStore.getDirtyResultsAsync().get().stream()
.map(JobResult::getJobId)
.collect(Collectors.toList()))
.singleElement()
.isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobId());
}
@Test
- default void testGetDirtyResultsWithDirtyAndCleanEntry() throws
IOException {
+ default void testGetDirtyResultsWithDirtyAndCleanEntry()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
- jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]