This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bf30a9b175648fea4dda7aab748a6b6d73dfba27 Author: Matthias Pohl <[email protected]> AuthorDate: Thu Feb 3 17:53:18 2022 +0100 [FLINK-25432][runtime] Adds generic interfaces for cleaning up Job-related data --- .../dispatcher/cleanup/DefaultResourceCleaner.java | 145 +++++++++++++ .../cleanup/GloballyCleanableResource.java | 46 ++++ .../cleanup/LocallyCleanableResource.java | 47 +++++ .../dispatcher/cleanup/ResourceCleaner.java | 36 ++++ .../dispatcher/cleanup/ResourceCleanerFactory.java | 54 +++++ .../cleanup/DefaultResourceCleanerTest.java | 235 +++++++++++++++++++++ 6 files changed, 563 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java new file mode 100644 index 0000000..ce4b2d4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** {@code DefaultResourceCleaner} is the default implementation of {@link ResourceCleaner}. */ +public class DefaultResourceCleaner<T> implements ResourceCleaner { + + private final ComponentMainThreadExecutor mainThreadExecutor; + private final Executor cleanupExecutor; + private final CleanupFn<T> cleanupFn; + + private final Collection<T> prioritizedCleanup; + private final Collection<T> regularCleanup; + + public static Builder<LocallyCleanableResource> forLocallyCleanableResources( + ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor) { + return forCleanableResources( + mainThreadExecutor, cleanupExecutor, LocallyCleanableResource::localCleanupAsync); + } + + public static Builder<GloballyCleanableResource> forGloballyCleanableResources( + ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor) { + return forCleanableResources( + mainThreadExecutor, cleanupExecutor, GloballyCleanableResource::globalCleanupAsync); + } + + @VisibleForTesting + static <T> Builder<T> forCleanableResources( + ComponentMainThreadExecutor mainThreadExecutor, + Executor cleanupExecutor, + CleanupFn<T> cleanupFunction) { + return new Builder<>(mainThreadExecutor, cleanupExecutor, cleanupFunction); + } + + @VisibleForTesting + @FunctionalInterface + interface CleanupFn<T> { + CompletableFuture<Void> cleanupAsync(T resource, JobID jobId, Executor cleanupExecutor); + } + + /** + * {@code Builder} for creating {@code DefaultResourceCleaner} instances. + * + * @param <T> The functional interface that's being translated into the internally used {@link + * CleanupFn}. + */ + public static class Builder<T> { + + private final ComponentMainThreadExecutor mainThreadExecutor; + private final Executor cleanupExecutor; + private final CleanupFn<T> cleanupFn; + + private final Collection<T> prioritizedCleanup = new ArrayList<>(); + private final Collection<T> regularCleanup = new ArrayList<>(); + + private Builder( + ComponentMainThreadExecutor mainThreadExecutor, + Executor cleanupExecutor, + CleanupFn<T> cleanupFn) { + this.mainThreadExecutor = mainThreadExecutor; + this.cleanupExecutor = cleanupExecutor; + this.cleanupFn = cleanupFn; + } + + public Builder<T> withPrioritizedCleanup(T prioritizedCleanup) { + this.prioritizedCleanup.add(prioritizedCleanup); + return this; + } + + public Builder<T> withRegularCleanup(T regularCleanup) { + this.regularCleanup.add(regularCleanup); + return this; + } + + public DefaultResourceCleaner<T> build() { + return new DefaultResourceCleaner<>( + mainThreadExecutor, + cleanupExecutor, + cleanupFn, + prioritizedCleanup, + regularCleanup); + } + } + + private DefaultResourceCleaner( + ComponentMainThreadExecutor mainThreadExecutor, + Executor cleanupExecutor, + CleanupFn<T> cleanupFn, + Collection<T> prioritizedCleanup, + Collection<T> regularCleanup) { + this.mainThreadExecutor = mainThreadExecutor; + this.cleanupExecutor = cleanupExecutor; + this.cleanupFn = cleanupFn; + this.prioritizedCleanup = prioritizedCleanup; + this.regularCleanup = regularCleanup; + } + + @Override + public CompletableFuture<Void> cleanupAsync(JobID jobId) { + mainThreadExecutor.assertRunningInMainThread(); + CompletableFuture<Void> cleanupFuture = FutureUtils.completedVoidFuture(); + for (T cleanup : prioritizedCleanup) { + cleanupFuture = + cleanupFuture.thenCompose( + ignoredValue -> + cleanupFn.cleanupAsync(cleanup, jobId, cleanupExecutor)); + } + return cleanupFuture.thenCompose( + ignoredValue -> + FutureUtils.completeAll( + regularCleanup.stream() + .map( + cleanup -> + cleanupFn.cleanupAsync( + cleanup, jobId, cleanupExecutor)) + .collect(Collectors.toList()))); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java new file mode 100644 index 0000000..66bfd32 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * {@code GloballyCleanableResource} is supposed to be used by any class that provides artifacts for + * a given job that can be cleaned up globally. Globally available artifacts should survive a + * JobManager failover and are, in contrast to {@link GloballyCleanableResource}, only cleaned up + * after the corresponding job reached a globally-terminal state. + * + * @see org.apache.flink.api.common.JobStatus + */ +@FunctionalInterface +public interface GloballyCleanableResource { + + /** + * {@code globalCleanupAsync} is expected to be called from the main thread. Heavy IO tasks + * should be outsourced into the passed {@code cleanupExecutor}. Thread-safety must be ensured. + * + * @param jobId The {@link JobID} of the job for which the local data should be cleaned up. + * @param cleanupExecutor The fallback executor for IO-heavy operations. + * @return The cleanup result future. + */ + CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor cleanupExecutor); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java new file mode 100644 index 0000000..9e44a55 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * {@code LocallyCleanableResource} is supposed to be used by any class that provides artifacts for + * a given job that can be cleaned up locally. Artifacts considered to be local are located on the + * JobManager instance itself and won't survive a failover scenario. These artifacts are, in + * contrast to {@link GloballyCleanableResource} artifacts, going to be cleaned up even after the + * job reaches a locally-terminated state. + * + * @see org.apache.flink.api.common.JobStatus + */ +@FunctionalInterface +public interface LocallyCleanableResource { + + /** + * {@code localCleanupAsync} is expected to be called from the main thread. Heavy IO tasks + * should be outsourced into the passed {@code cleanupExecutor}. Thread-safety must be ensured. + * + * @param jobId The {@link JobID} of the job for which the local data should be cleaned up. + * @param cleanupExecutor The fallback executor for IO-heavy operations. + * @return The cleanup result future. + */ + CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor cleanupExecutor); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java new file mode 100644 index 0000000..bfea159 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleaner.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; + +/** {@code ResourceCleaner} executes instances on the given {@code JobID}. */ +@FunctionalInterface +public interface ResourceCleaner { + + /** + * Cleans job-related data from resources asynchronously. + * + * @param jobId The {@link JobID} referring to the job for which the data shall be cleaned up. + * @return the cleanup result future. + */ + CompletableFuture<Void> cleanupAsync(JobID jobId); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java new file mode 100644 index 0000000..9f724b1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ResourceCleanerFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; + +import java.util.concurrent.Executor; + +/** + * {@code ResourceCleanerFactory} provides methods to create {@link ResourceCleaner} for local and + * global cleanup. + * + * @see GloballyCleanableResource + * @see LocallyCleanableResource + */ +public interface ResourceCleanerFactory { + + /** + * Creates {@link ResourceCleaner} that initiates {@link + * LocallyCleanableResource#localCleanupAsync(JobID, Executor)} calls. + * + * @param mainThreadExecutor Used for validating that the {@link + * LocallyCleanableResource#localCleanupAsync(JobID, Executor)} is called from the main + * thread. + */ + ResourceCleaner createLocalResourceCleaner(ComponentMainThreadExecutor mainThreadExecutor); + + /** + * Creates {@link ResourceCleaner} that initiates {@link + * GloballyCleanableResource#globalCleanupAsync(JobID, Executor)} calls. + * + * @param mainThreadExecutor Used for validating that the {@link + * GloballyCleanableResource#globalCleanupAsync(JobID, Executor)} is called from the main + * thread. + */ + ResourceCleaner createGlobalResourceCleaner(ComponentMainThreadExecutor mainThreadExecutor); +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java new file mode 100644 index 0000000..e8bbd4f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; + +/** {@code DefaultResourceCleanerTest} tests {@link DefaultResourceCleaner}. */ +public class DefaultResourceCleanerTest { + + private static final Executor EXECUTOR = Executors.directExecutor(); + private static final JobID JOB_ID = new JobID(); + + private DefaultResourceCleaner<CleanupCallback> testInstance; + private CleanupCallback cleanup0; + private CleanupCallback cleanup1; + + @BeforeEach + public void setup() { + cleanup0 = CleanupCallback.withoutCompletionOnCleanup(); + cleanup1 = CleanupCallback.withoutCompletionOnCleanup(); + + testInstance = + createTestInstanceBuilder() + .withRegularCleanup(cleanup0) + .withRegularCleanup(cleanup1) + .build(); + } + + @Test + public void testSuccessfulConcurrentCleanup() { + CompletableFuture<Void> cleanupResult = testInstance.cleanupAsync(JOB_ID); + + assertThat(cleanupResult).isNotCompleted(); + assertThat(cleanup0).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID); + assertThat(cleanup1).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID); + + cleanup0.completeCleanup(); + assertThat(cleanupResult).isNotCompleted(); + + cleanup1.completeCleanup(); + assertThat(cleanupResult).isCompleted(); + } + + @Test + public void testConcurrentCleanupWithExceptionFirst() { + CompletableFuture<Void> cleanupResult = testInstance.cleanupAsync(JOB_ID); + + assertThat(cleanupResult).isNotCompleted(); + assertThat(cleanup0).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID); + assertThat(cleanup1).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID); + + final RuntimeException expectedException = new RuntimeException("Expected exception"); + cleanup0.completeCleanupExceptionally(expectedException); + assertThat(cleanupResult).isNotCompleted(); + + cleanup1.completeCleanup(); + assertThat(cleanupResult) + .failsWithin(Duration.ZERO) + .withThrowableOfType(ExecutionException.class) + .withCause(expectedException); + } + + @Test + public void testConcurrentCleanupWithExceptionSecond() { + CompletableFuture<Void> cleanupResult = testInstance.cleanupAsync(JOB_ID); + + assertThat(cleanupResult).isNotCompleted(); + assertThat(cleanup0).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID); + assertThat(cleanup1).extracting(CleanupCallback::getProcessedJobId).isEqualTo(JOB_ID); + + cleanup0.completeCleanup(); + assertThat(cleanupResult).isNotCompleted(); + + final RuntimeException expectedException = new RuntimeException("Expected exception"); + cleanup1.completeCleanupExceptionally(expectedException); + assertThat(cleanupResult) + .failsWithin(Duration.ZERO) + .withThrowableOfType(ExecutionException.class) + .withCause(expectedException); + } + + @Test + public void testHighestPriorityCleanupBlocksAllOtherCleanups() { + final CleanupCallback highPriorityCleanup = CleanupCallback.withoutCompletionOnCleanup(); + final CleanupCallback lowerThanHighPriorityCleanup = + CleanupCallback.withCompletionOnCleanup(); + final CleanupCallback noPriorityCleanup0 = CleanupCallback.withCompletionOnCleanup(); + final CleanupCallback noPriorityCleanup1 = CleanupCallback.withCompletionOnCleanup(); + + final DefaultResourceCleaner<CleanupCallback> testInstance = + createTestInstanceBuilder() + .withPrioritizedCleanup(highPriorityCleanup) + .withPrioritizedCleanup(lowerThanHighPriorityCleanup) + .withRegularCleanup(noPriorityCleanup0) + .withRegularCleanup(noPriorityCleanup1) + .build(); + + final CompletableFuture<Void> overallCleanupResult = testInstance.cleanupAsync(JOB_ID); + + assertThat(highPriorityCleanup.isDone()).isFalse(); + assertThat(lowerThanHighPriorityCleanup.isDone()).isFalse(); + assertThat(noPriorityCleanup0.isDone()).isFalse(); + assertThat(noPriorityCleanup1.isDone()).isFalse(); + + assertThat(overallCleanupResult.isDone()).isFalse(); + + highPriorityCleanup.completeCleanup(); + + assertThat(overallCleanupResult).succeedsWithin(Duration.ofMillis(100)); + + assertThat(highPriorityCleanup.isDone()).isTrue(); + assertThat(lowerThanHighPriorityCleanup.isDone()).isTrue(); + assertThat(noPriorityCleanup0.isDone()).isTrue(); + assertThat(noPriorityCleanup1.isDone()).isTrue(); + } + + @Test + public void testMediumPriorityCleanupBlocksAllLowerPrioritizedCleanups() { + final CleanupCallback highPriorityCleanup = CleanupCallback.withCompletionOnCleanup(); + final CleanupCallback lowerThanHighPriorityCleanup = + CleanupCallback.withoutCompletionOnCleanup(); + final CleanupCallback noPriorityCleanup0 = CleanupCallback.withCompletionOnCleanup(); + final CleanupCallback noPriorityCleanup1 = CleanupCallback.withCompletionOnCleanup(); + + final DefaultResourceCleaner<CleanupCallback> testInstance = + createTestInstanceBuilder() + .withPrioritizedCleanup(highPriorityCleanup) + .withPrioritizedCleanup(lowerThanHighPriorityCleanup) + .withRegularCleanup(noPriorityCleanup0) + .withRegularCleanup(noPriorityCleanup1) + .build(); + + assertThat(highPriorityCleanup.isDone()).isFalse(); + + final CompletableFuture<Void> overallCleanupResult = testInstance.cleanupAsync(JOB_ID); + + assertThat(highPriorityCleanup.isDone()).isTrue(); + assertThat(lowerThanHighPriorityCleanup.isDone()).isFalse(); + assertThat(noPriorityCleanup0.isDone()).isFalse(); + assertThat(noPriorityCleanup1.isDone()).isFalse(); + + assertThat(overallCleanupResult.isDone()).isFalse(); + + lowerThanHighPriorityCleanup.completeCleanup(); + + assertThat(overallCleanupResult).succeedsWithin(Duration.ofMillis(100)); + + assertThat(highPriorityCleanup.isDone()).isTrue(); + assertThat(lowerThanHighPriorityCleanup.isDone()).isTrue(); + assertThat(noPriorityCleanup0.isDone()).isTrue(); + assertThat(noPriorityCleanup1.isDone()).isTrue(); + } + + private static DefaultResourceCleaner.Builder<CleanupCallback> createTestInstanceBuilder() { + return DefaultResourceCleaner.forCleanableResources( + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + EXECUTOR, + CleanupCallback::cleanup); + } + + private static class CleanupCallback { + + private final CompletableFuture<Void> resultFuture = new CompletableFuture<>(); + private JobID jobId; + + private final Consumer<CompletableFuture<Void>> internalFunction; + + public static CleanupCallback withCompletionOnCleanup() { + return new CleanupCallback(resultFuture -> resultFuture.complete(null)); + } + + public static CleanupCallback withoutCompletionOnCleanup() { + return new CleanupCallback(ignoredResultFuture -> {}); + } + + private CleanupCallback(Consumer<CompletableFuture<Void>> internalFunction) { + this.internalFunction = internalFunction; + } + + public CompletableFuture<Void> cleanup(JobID jobId, Executor executor) { + Preconditions.checkState(this.jobId == null); + this.jobId = jobId; + + internalFunction.accept(resultFuture); + + return resultFuture; + } + + public boolean isDone() { + return resultFuture.isDone(); + } + + public JobID getProcessedJobId() { + return jobId; + } + + public void completeCleanup() { + this.resultFuture.complete(null); + } + + public void completeCleanupExceptionally(Throwable expectedException) { + this.resultFuture.completeExceptionally(expectedException); + } + } +}
