This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 72cc925a88ad054449f0ade9a476ead7a287cb6f
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Jun 7 16:11:17 2022 +0200

    [FLINK-27933][coordination] OperationResult is serializable
---
 .../AbstractAsynchronousOperationHandlers.java     |  4 +++-
 .../handler/async/CompletedOperationCache.java     |  8 +++++---
 .../rest/handler/async/OperationResult.java        | 11 +++++++----
 .../dataset/ClusterDataSetDeleteHandlers.java      | 22 ++++++++++++++++++----
 4 files changed, 33 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
index 3123f3282be..81beed31dca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
@@ -33,6 +33,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
 
 import javax.annotation.Nonnull;
 
+import java.io.Serializable;
 import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
@@ -87,7 +88,8 @@ import java.util.concurrent.CompletableFuture;
  * @param <K> type of the operation key under which the result future is stored
  * @param <R> type of the operation result
  */
-public abstract class AbstractAsynchronousOperationHandlers<K extends 
OperationKey, R> {
+public abstract class AbstractAsynchronousOperationHandlers<
+        K extends OperationKey, R extends Serializable> {
 
     private final CompletedOperationCache<K, R> completedOperationCache;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
index 6568551ad1c..3bd34d6602b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
@@ -36,6 +36,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.io.Serializable;
 import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
@@ -56,7 +57,8 @@ import static org.apache.flink.util.Preconditions.checkState;
  * operations will be removed from the cache automatically after a fixed 
timeout.
  */
 @ThreadSafe
-public class CompletedOperationCache<K extends OperationKey, R> implements 
AutoCloseableAsync {
+public class CompletedOperationCache<K extends OperationKey, R extends 
Serializable>
+        implements AutoCloseableAsync {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CompletedOperationCache.class);
 
@@ -207,7 +209,7 @@ public class CompletedOperationCache<K extends 
OperationKey, R> implements AutoC
     }
 
     /** Stores the result of an asynchronous operation, and tracks accesses to 
it. */
-    private static class ResultAccessTracker<R> {
+    private static class ResultAccessTracker<R extends Serializable> {
 
         /** Result of an asynchronous operation. */
         private final OperationResult<R> operationResult;
@@ -215,7 +217,7 @@ public class CompletedOperationCache<K extends 
OperationKey, R> implements AutoC
         /** Future that completes if {@link #operationResult} is accessed 
after it finished. */
         private final CompletableFuture<Void> accessed;
 
-        private static <R> ResultAccessTracker<R> inProgress() {
+        private static <R extends Serializable> ResultAccessTracker<R> 
inProgress() {
             return new ResultAccessTracker<>();
         }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationResult.java
index 5fcf545e2f1..b92e107e980 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationResult.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.async;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -29,7 +30,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * OperationResultStatus}, it contains either the actual result (if completed 
successfully), or the
  * cause of failure (if it failed), or none of the two (if still in progress).
  */
-public class OperationResult<R> {
+public class OperationResult<R> implements Serializable {
+    private static final long serialVersionUID = 1L;
+
     private final OperationResultStatus status;
     @Nullable private final R result;
     @Nullable private final Throwable throwable;
@@ -58,17 +61,17 @@ public class OperationResult<R> {
         return throwable;
     }
 
-    public static <R> OperationResult<R> failure(Throwable throwable) {
+    public static <R extends Serializable> OperationResult<R> 
failure(Throwable throwable) {
         checkNotNull(throwable);
         return new OperationResult<>(OperationResultStatus.FAILURE, null, 
throwable);
     }
 
-    public static <R> OperationResult<R> success(R result) {
+    public static <R extends Serializable> OperationResult<R> success(R 
result) {
         checkNotNull(result);
         return new OperationResult<>(OperationResultStatus.SUCCESS, result, 
null);
     }
 
-    public static <R> OperationResult<R> inProgress() {
+    public static <R extends Serializable> OperationResult<R> inProgress() {
         return new OperationResult<>(OperationResultStatus.IN_PROGRESS, null, 
null);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/dataset/ClusterDataSetDeleteHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/dataset/ClusterDataSetDeleteHandlers.java
index e7c883c5f48..e273d1f5af9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/dataset/ClusterDataSetDeleteHandlers.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/dataset/ClusterDataSetDeleteHandlers.java
@@ -38,13 +38,15 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.SerializedThrowable;
 
+import java.io.Serializable;
 import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /** Handler for {@link ClusterDataSetDeleteTriggerHeaders}. */
 public class ClusterDataSetDeleteHandlers
-        extends AbstractAsynchronousOperationHandlers<OperationKey, Void> {
+        extends AbstractAsynchronousOperationHandlers<
+                OperationKey, ClusterDataSetDeleteHandlers.SerializableVoid> {
 
     public ClusterDataSetDeleteHandlers(Duration cacheDuration) {
         super(cacheDuration);
@@ -73,7 +75,7 @@ public class ClusterDataSetDeleteHandlers
         }
 
         @Override
-        protected CompletableFuture<Void> triggerOperation(
+        protected CompletableFuture<SerializableVoid> triggerOperation(
                 HandlerRequest<EmptyRequestBody> request, RestfulGateway 
gateway)
                 throws RestHandlerException {
             final IntermediateDataSetID clusterPartitionId =
@@ -81,7 +83,9 @@ public class ClusterDataSetDeleteHandlers
             ResourceManagerGateway resourceManagerGateway =
                     AbstractResourceManagerHandler.getResourceManagerGateway(
                             resourceManagerGatewayRetriever);
-            return 
resourceManagerGateway.releaseClusterPartitions(clusterPartitionId);
+            return resourceManagerGateway
+                    .releaseClusterPartitions(clusterPartitionId)
+                    .thenApply(ignored -> null);
         }
 
         @Override
@@ -122,8 +126,18 @@ public class ClusterDataSetDeleteHandlers
         }
 
         @Override
-        protected AsynchronousOperationInfo operationResultResponse(Void 
ignored) {
+        protected AsynchronousOperationInfo 
operationResultResponse(SerializableVoid ignored) {
             return AsynchronousOperationInfo.complete();
         }
     }
+
+    /**
+     * A {@link Void} alternative that implements {@link Serializable}. Useful 
in cases where a type
+     * must be serializable but in practice is always null.
+     */
+    public static class SerializableVoid implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private SerializableVoid() {}
+    }
 }

Reply via email to