This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 72cc925a88ad054449f0ade9a476ead7a287cb6f Author: Chesnay Schepler <[email protected]> AuthorDate: Tue Jun 7 16:11:17 2022 +0200 [FLINK-27933][coordination] OperationResult is serializable --- .../AbstractAsynchronousOperationHandlers.java | 4 +++- .../handler/async/CompletedOperationCache.java | 8 +++++--- .../rest/handler/async/OperationResult.java | 11 +++++++---- .../dataset/ClusterDataSetDeleteHandlers.java | 22 ++++++++++++++++++---- 4 files changed, 33 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java index 3123f3282be..81beed31dca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java @@ -33,6 +33,7 @@ import org.apache.flink.util.concurrent.FutureUtils; import javax.annotation.Nonnull; +import java.io.Serializable; import java.time.Duration; import java.util.Map; import java.util.Optional; @@ -87,7 +88,8 @@ import java.util.concurrent.CompletableFuture; * @param <K> type of the operation key under which the result future is stored * @param <R> type of the operation result */ -public abstract class AbstractAsynchronousOperationHandlers<K extends OperationKey, R> { +public abstract class AbstractAsynchronousOperationHandlers< + K extends OperationKey, R extends Serializable> { private final CompletedOperationCache<K, R> completedOperationCache; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java index 6568551ad1c..3bd34d6602b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import java.io.Serializable; import java.time.Duration; import java.util.Map; import java.util.Optional; @@ -56,7 +57,8 @@ import static org.apache.flink.util.Preconditions.checkState; * operations will be removed from the cache automatically after a fixed timeout. */ @ThreadSafe -public class CompletedOperationCache<K extends OperationKey, R> implements AutoCloseableAsync { +public class CompletedOperationCache<K extends OperationKey, R extends Serializable> + implements AutoCloseableAsync { private static final Logger LOGGER = LoggerFactory.getLogger(CompletedOperationCache.class); @@ -207,7 +209,7 @@ public class CompletedOperationCache<K extends OperationKey, R> implements AutoC } /** Stores the result of an asynchronous operation, and tracks accesses to it. */ - private static class ResultAccessTracker<R> { + private static class ResultAccessTracker<R extends Serializable> { /** Result of an asynchronous operation. */ private final OperationResult<R> operationResult; @@ -215,7 +217,7 @@ public class CompletedOperationCache<K extends OperationKey, R> implements AutoC /** Future that completes if {@link #operationResult} is accessed after it finished. */ private final CompletableFuture<Void> accessed; - private static <R> ResultAccessTracker<R> inProgress() { + private static <R extends Serializable> ResultAccessTracker<R> inProgress() { return new ResultAccessTracker<>(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationResult.java index 5fcf545e2f1..b92e107e980 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationResult.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.async; import javax.annotation.Nullable; +import java.io.Serializable; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -29,7 +30,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * OperationResultStatus}, it contains either the actual result (if completed successfully), or the * cause of failure (if it failed), or none of the two (if still in progress). */ -public class OperationResult<R> { +public class OperationResult<R> implements Serializable { + private static final long serialVersionUID = 1L; + private final OperationResultStatus status; @Nullable private final R result; @Nullable private final Throwable throwable; @@ -58,17 +61,17 @@ public class OperationResult<R> { return throwable; } - public static <R> OperationResult<R> failure(Throwable throwable) { + public static <R extends Serializable> OperationResult<R> failure(Throwable throwable) { checkNotNull(throwable); return new OperationResult<>(OperationResultStatus.FAILURE, null, throwable); } - public static <R> OperationResult<R> success(R result) { + public static <R extends Serializable> OperationResult<R> success(R result) { checkNotNull(result); return new OperationResult<>(OperationResultStatus.SUCCESS, result, null); } - public static <R> OperationResult<R> inProgress() { + public static <R extends Serializable> OperationResult<R> inProgress() { return new OperationResult<>(OperationResultStatus.IN_PROGRESS, null, null); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/dataset/ClusterDataSetDeleteHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/dataset/ClusterDataSetDeleteHandlers.java index e7c883c5f48..e273d1f5af9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/dataset/ClusterDataSetDeleteHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/dataset/ClusterDataSetDeleteHandlers.java @@ -38,13 +38,15 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.SerializedThrowable; +import java.io.Serializable; import java.time.Duration; import java.util.Map; import java.util.concurrent.CompletableFuture; /** Handler for {@link ClusterDataSetDeleteTriggerHeaders}. */ public class ClusterDataSetDeleteHandlers - extends AbstractAsynchronousOperationHandlers<OperationKey, Void> { + extends AbstractAsynchronousOperationHandlers< + OperationKey, ClusterDataSetDeleteHandlers.SerializableVoid> { public ClusterDataSetDeleteHandlers(Duration cacheDuration) { super(cacheDuration); @@ -73,7 +75,7 @@ public class ClusterDataSetDeleteHandlers } @Override - protected CompletableFuture<Void> triggerOperation( + protected CompletableFuture<SerializableVoid> triggerOperation( HandlerRequest<EmptyRequestBody> request, RestfulGateway gateway) throws RestHandlerException { final IntermediateDataSetID clusterPartitionId = @@ -81,7 +83,9 @@ public class ClusterDataSetDeleteHandlers ResourceManagerGateway resourceManagerGateway = AbstractResourceManagerHandler.getResourceManagerGateway( resourceManagerGatewayRetriever); - return resourceManagerGateway.releaseClusterPartitions(clusterPartitionId); + return resourceManagerGateway + .releaseClusterPartitions(clusterPartitionId) + .thenApply(ignored -> null); } @Override @@ -122,8 +126,18 @@ public class ClusterDataSetDeleteHandlers } @Override - protected AsynchronousOperationInfo operationResultResponse(Void ignored) { + protected AsynchronousOperationInfo operationResultResponse(SerializableVoid ignored) { return AsynchronousOperationInfo.complete(); } } + + /** + * A {@link Void} alternative that implements {@link Serializable}. Useful in cases where a type + * must be serializable but in practice is always null. + */ + public static class SerializableVoid implements Serializable { + private static final long serialVersionUID = 1L; + + private SerializableVoid() {} + } }
