zentol commented on a change in pull request #17474:
URL: https://github.com/apache/flink/pull/17474#discussion_r731914875



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
##########
@@ -103,33 +106,56 @@
  * }
  * </pre>
  */
-public class SavepointHandlers
-        extends 
AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, String> {
+public class SavepointHandlers {
 
     @Nullable private final String defaultSavepointDir;
 
     public SavepointHandlers(@Nullable final String defaultSavepointDir) {
         this.defaultSavepointDir = defaultSavepointDir;
     }
 
-    private abstract class SavepointHandlerBase<T extends RequestBody>
-            extends TriggerHandler<RestfulGateway, T, 
SavepointTriggerMessageParameters> {
+    private abstract static class SavepointHandlerBase<B extends RequestBody>
+            extends AbstractRestHandler<
+                    RestfulGateway, B, TriggerResponse, 
SavepointTriggerMessageParameters> {
 
         SavepointHandlerBase(
                 final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                 final Time timeout,
                 Map<String, String> responseHeaders,
-                final MessageHeaders<T, TriggerResponse, 
SavepointTriggerMessageParameters>
+                final MessageHeaders<B, TriggerResponse, 
SavepointTriggerMessageParameters>
                         messageHeaders) {
             super(leaderRetriever, timeout, responseHeaders, messageHeaders);
         }
 
-        @Override
         protected AsynchronousJobOperationKey createOperationKey(
-                final HandlerRequest<T, SavepointTriggerMessageParameters> 
request) {
+                final HandlerRequest<B, SavepointTriggerMessageParameters> 
request) {
             final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
             return AsynchronousJobOperationKey.of(new TriggerId(), jobId);
         }
+
+        @Override
+        public void close() {}

Review comment:
       you don't need to override `close`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import 
org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+/** Tests for the {@link DispatcherCachedOperationsHandler} component. */
+public class DispatcherCachedOperationsHandlerTest extends TestLogger {
+
+    private CompletedOperationCache<AsynchronousJobOperationKey, String> cache;
+    private DispatcherCachedOperationsHandler handler;
+
+    private SpyFunction<TriggerSavepointParameters, CompletableFuture<String>>
+            triggerSavepointFunction;
+    private SpyFunction<TriggerSavepointParameters, CompletableFuture<String>>
+            stopWithSavepointFunction;
+
+    private CompletableFuture<String> savepointLocationFuture = new 
CompletableFuture<>();
+    private final TriggerSavepointParameters savepointTriggerParameters =
+            new TriggerSavepointParameters(new JobID(), "dummyDirectory", 
false, Time.minutes(1));
+    private AsynchronousJobOperationKey operationKey;
+
+    @Before
+    public void setup() {
+        savepointLocationFuture = new CompletableFuture<>();
+        triggerSavepointFunction = SpyFunction.wrap(parameters -> 
savepointLocationFuture);
+        stopWithSavepointFunction = SpyFunction.wrap(parameters -> 
savepointLocationFuture);
+        cache = new CompletedOperationCache<>();
+        handler =
+                new DispatcherCachedOperationsHandler(
+                        triggerSavepointFunction, stopWithSavepointFunction, 
cache);
+        operationKey =
+                AsynchronousJobOperationKey.of(
+                        new TriggerId(), 
savepointTriggerParameters.getJobID());
+    }
+
+    @Test
+    public void triggerSavepointRepeatedly() throws ExecutionException, 
InterruptedException {
+        CompletableFuture<Acknowledge> firstAcknowledge =
+                handler.triggerSavepoint(operationKey, 
savepointTriggerParameters);
+        CompletableFuture<Acknowledge> secondAcknowledge =
+                handler.triggerSavepoint(operationKey, 
savepointTriggerParameters);
+
+        assertThat(triggerSavepointFunction.getNumberOfInvocations(), is(1));
+        assertThat(
+                triggerSavepointFunction.getInvocationParameters().get(0),
+                is(savepointTriggerParameters));
+
+        assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
+        assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+    }
+
+    @Test
+    public void stopWithSavepointRepeatedly() throws ExecutionException, 
InterruptedException {
+        CompletableFuture<Acknowledge> firstAcknowledge =
+                handler.stopWithSavepoint(operationKey, 
savepointTriggerParameters);
+        CompletableFuture<Acknowledge> secondAcknowledge =
+                handler.stopWithSavepoint(operationKey, 
savepointTriggerParameters);
+
+        assertThat(stopWithSavepointFunction.getNumberOfInvocations(), is(1));
+        assertThat(
+                stopWithSavepointFunction.getInvocationParameters().get(0),
+                is(savepointTriggerParameters));
+
+        assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
+        assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+    }
+
+    @Test
+    public void returnsFailedFutureIfOperationFails()
+            throws ExecutionException, InterruptedException {
+        CompletableFuture<Acknowledge> acknowledgeRegisteredOperation =
+                handler.triggerSavepoint(operationKey, 
savepointTriggerParameters);
+        savepointLocationFuture.completeExceptionally(new 
RuntimeException("Expected failure"));
+        CompletableFuture<Acknowledge> failedAcknowledgeFuture =
+                handler.triggerSavepoint(operationKey, 
savepointTriggerParameters);
+
+        assertThat(acknowledgeRegisteredOperation.get(), 
is(Acknowledge.get()));
+        assertThrows(ExecutionException.class, failedAcknowledgeFuture::get);
+    }
+
+    @Test
+    public void returnsFailedFutureIfCacheIsShuttingDown() throws 
InterruptedException {
+        cache.closeAsync();
+        CompletableFuture<Acknowledge> returnedFuture =
+                handler.triggerSavepoint(operationKey, 
savepointTriggerParameters);
+
+        try {
+            returnedFuture.get();

Review comment:
       Use `FlinkMatchers#futureWillCompleteExceptionally` instead

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerSavepointParameters.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class TriggerSavepointParameters {

Review comment:
       Why did you go with this instead of a 
`Trigger[StopWith]SavepointFunction` interface(s)?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.async.OperationResultStatus;
+import 
org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A handler for async operations triggered by the {@link Dispatcher} whose 
trigger parameters and
+ * results are cached.
+ */
+public class DispatcherCachedOperationsHandler {
+
+    private final CompletedOperationCache<AsynchronousJobOperationKey, String>
+            savepointTriggerCache;
+
+    private final Function<TriggerSavepointParameters, 
CompletableFuture<String>>
+            triggerSavepointFunction;
+
+    private final Function<TriggerSavepointParameters, 
CompletableFuture<String>>
+            stopWithSavepointFunction;
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction) {
+        this(triggerSavepointFunction, stopWithSavepointFunction, new 
CompletedOperationCache<>());
+    }
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction,
+            CompletedOperationCache<AsynchronousJobOperationKey, String> 
savepointTriggerCache) {
+        this.triggerSavepointFunction = triggerSavepointFunction;
+        this.stopWithSavepointFunction = stopWithSavepointFunction;
+        this.savepointTriggerCache = savepointTriggerCache;
+    }
+
+    public CompletableFuture<Acknowledge> triggerSavepoint(
+            AsynchronousJobOperationKey operationKey, 
TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, 
triggerSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<Acknowledge> stopWithSavepoint(
+            AsynchronousJobOperationKey operationKey, 
TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, 
stopWithSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<OperationResult<String>> getSavepointStatus(
+            AsynchronousJobOperationKey operationKey) {
+        return savepointTriggerCache
+                .get(operationKey)
+                .map(CompletableFuture::completedFuture)
+                .orElse(
+                        CompletableFuture.failedFuture(
+                                new 
UnknownOperationKeyException(operationKey)));
+    }
+
+    public CompletableFuture<Void> shutDownCache() {
+        return savepointTriggerCache.closeAsync();
+    }
+
+    private <P> CompletableFuture<Acknowledge> registerOperationIdempotently(
+            AsynchronousJobOperationKey operationKey,
+            Function<P, CompletableFuture<String>> operation,
+            P parameters) {
+        Optional<OperationResult<String>> resultOptional = 
savepointTriggerCache.get(operationKey);
+        if (resultOptional.isPresent()) {
+            return convertToFuture(resultOptional.get());
+        }
+
+        try {
+            savepointTriggerCache.registerOngoingOperation(
+                    operationKey, operation.apply(parameters));
+        } catch (IllegalStateException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+
+        return savepointTriggerCache
+                .get(operationKey)
+                .map(this::convertToFuture)
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        "Internal cache error: Failed to 
retrieve status"));

Review comment:
       We should be able to just return an Acknowledge here. We already covered 
the case of pre-existing operations at the top, and thus now that the operation 
that we just added is still ongoing.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import 
org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+/** Tests for the {@link DispatcherCachedOperationsHandler} component. */
+public class DispatcherCachedOperationsHandlerTest extends TestLogger {
+
+    private CompletedOperationCache<AsynchronousJobOperationKey, String> cache;
+    private DispatcherCachedOperationsHandler handler;
+
+    private SpyFunction<TriggerSavepointParameters, CompletableFuture<String>>
+            triggerSavepointFunction;
+    private SpyFunction<TriggerSavepointParameters, CompletableFuture<String>>
+            stopWithSavepointFunction;
+
+    private CompletableFuture<String> savepointLocationFuture = new 
CompletableFuture<>();
+    private final TriggerSavepointParameters savepointTriggerParameters =
+            new TriggerSavepointParameters(new JobID(), "dummyDirectory", 
false, Time.minutes(1));
+    private AsynchronousJobOperationKey operationKey;
+
+    @Before
+    public void setup() {
+        savepointLocationFuture = new CompletableFuture<>();
+        triggerSavepointFunction = SpyFunction.wrap(parameters -> 
savepointLocationFuture);
+        stopWithSavepointFunction = SpyFunction.wrap(parameters -> 
savepointLocationFuture);
+        cache = new CompletedOperationCache<>();
+        handler =
+                new DispatcherCachedOperationsHandler(
+                        triggerSavepointFunction, stopWithSavepointFunction, 
cache);
+        operationKey =
+                AsynchronousJobOperationKey.of(
+                        new TriggerId(), 
savepointTriggerParameters.getJobID());
+    }
+
+    @Test
+    public void triggerSavepointRepeatedly() throws ExecutionException, 
InterruptedException {
+        CompletableFuture<Acknowledge> firstAcknowledge =
+                handler.triggerSavepoint(operationKey, 
savepointTriggerParameters);
+        CompletableFuture<Acknowledge> secondAcknowledge =
+                handler.triggerSavepoint(operationKey, 
savepointTriggerParameters);
+
+        assertThat(triggerSavepointFunction.getNumberOfInvocations(), is(1));
+        assertThat(
+                triggerSavepointFunction.getInvocationParameters().get(0),
+                is(savepointTriggerParameters));
+
+        assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
+        assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+    }
+
+    @Test
+    public void stopWithSavepointRepeatedly() throws ExecutionException, 
InterruptedException {
+        CompletableFuture<Acknowledge> firstAcknowledge =
+                handler.stopWithSavepoint(operationKey, 
savepointTriggerParameters);
+        CompletableFuture<Acknowledge> secondAcknowledge =
+                handler.stopWithSavepoint(operationKey, 
savepointTriggerParameters);
+
+        assertThat(stopWithSavepointFunction.getNumberOfInvocations(), is(1));
+        assertThat(
+                stopWithSavepointFunction.getInvocationParameters().get(0),
+                is(savepointTriggerParameters));
+
+        assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
+        assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+    }
+
+    @Test
+    public void returnsFailedFutureIfOperationFails()
+            throws ExecutionException, InterruptedException {
+        CompletableFuture<Acknowledge> acknowledgeRegisteredOperation =
+                handler.triggerSavepoint(operationKey, 
savepointTriggerParameters);
+        savepointLocationFuture.completeExceptionally(new 
RuntimeException("Expected failure"));
+        CompletableFuture<Acknowledge> failedAcknowledgeFuture =
+                handler.triggerSavepoint(operationKey, 
savepointTriggerParameters);
+
+        assertThat(acknowledgeRegisteredOperation.get(), 
is(Acknowledge.get()));
+        assertThrows(ExecutionException.class, failedAcknowledgeFuture::get);
+    }
+
+    @Test
+    public void returnsFailedFutureIfCacheIsShuttingDown() throws 
InterruptedException {
+        cache.closeAsync();
+        CompletableFuture<Acknowledge> returnedFuture =
+                handler.triggerSavepoint(operationKey, 
savepointTriggerParameters);
+
+        try {
+            returnedFuture.get();
+            fail("Future should have completed exceptionally");
+        } catch (ExecutionException e) {
+            assertThat((IllegalStateException) e.getCause(), 
isA(IllegalStateException.class));
+        }
+    }
+
+    @Test
+    public void getStatus() throws ExecutionException, InterruptedException {
+        handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+
+        String savepointLocation = "location";
+        savepointLocationFuture.complete(savepointLocation);
+
+        CompletableFuture<OperationResult<String>> statusFuture =
+                handler.getSavepointStatus(operationKey);
+
+        assertEquals(statusFuture.get(), 
OperationResult.success(savepointLocation));
+    }
+
+    @Test
+    public void getStatusFailsIfKeyUnknown() throws InterruptedException {
+        CompletableFuture<OperationResult<String>> statusFuture =
+                handler.getSavepointStatus(operationKey);
+
+        try {
+            statusFuture.get();
+            fail("Retrieving the status should have failed");
+        } catch (ExecutionException e) {

Review comment:
       same as above

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
##########
@@ -194,11 +195,12 @@ public void testUnknownTriggerId() throws Exception {
      */
     @Test
     public void testCloseShouldFinishOnFirstServedResult() throws Exception {
-        final CompletableFuture<String> savepointFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Acknowledge> acknowledgeFuture = new 
CompletableFuture<>();
         final TestingRestfulGateway testingRestfulGateway =
                 new TestingRestfulGateway.Builder()
                         .setTriggerSavepointFunction(
-                                (JobID jobId, String directory) -> 
savepointFuture)
+                                (AsynchronousJobOperationKey operationKey, 
String directory) ->

Review comment:
       I'm wondering if we shouldn't use a different method (bit weird that 
we're using the one method that isn't used by he AsyncOperationHandlers), or 
even completely isolate this test from the RestfulGateway (because we really 
shouldn't have to change anything here). 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.async.OperationResultStatus;
+import 
org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A handler for async operations triggered by the {@link Dispatcher} whose 
trigger parameters and
+ * results are cached.
+ */
+public class DispatcherCachedOperationsHandler {
+
+    private final CompletedOperationCache<AsynchronousJobOperationKey, String>
+            savepointTriggerCache;
+
+    private final Function<TriggerSavepointParameters, 
CompletableFuture<String>>
+            triggerSavepointFunction;
+
+    private final Function<TriggerSavepointParameters, 
CompletableFuture<String>>
+            stopWithSavepointFunction;
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction) {
+        this(triggerSavepointFunction, stopWithSavepointFunction, new 
CompletedOperationCache<>());
+    }
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction,
+            CompletedOperationCache<AsynchronousJobOperationKey, String> 
savepointTriggerCache) {
+        this.triggerSavepointFunction = triggerSavepointFunction;
+        this.stopWithSavepointFunction = stopWithSavepointFunction;
+        this.savepointTriggerCache = savepointTriggerCache;
+    }
+
+    public CompletableFuture<Acknowledge> triggerSavepoint(
+            AsynchronousJobOperationKey operationKey, 
TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, 
triggerSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<Acknowledge> stopWithSavepoint(
+            AsynchronousJobOperationKey operationKey, 
TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, 
stopWithSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<OperationResult<String>> getSavepointStatus(
+            AsynchronousJobOperationKey operationKey) {
+        return savepointTriggerCache
+                .get(operationKey)
+                .map(CompletableFuture::completedFuture)
+                .orElse(
+                        CompletableFuture.failedFuture(
+                                new 
UnknownOperationKeyException(operationKey)));
+    }
+
+    public CompletableFuture<Void> shutDownCache() {
+        return savepointTriggerCache.closeAsync();
+    }
+
+    private <P> CompletableFuture<Acknowledge> registerOperationIdempotently(
+            AsynchronousJobOperationKey operationKey,
+            Function<P, CompletableFuture<String>> operation,
+            P parameters) {
+        Optional<OperationResult<String>> resultOptional = 
savepointTriggerCache.get(operationKey);
+        if (resultOptional.isPresent()) {
+            return convertToFuture(resultOptional.get());
+        }
+
+        try {
+            savepointTriggerCache.registerOngoingOperation(
+                    operationKey, operation.apply(parameters));
+        } catch (IllegalStateException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+
+        return savepointTriggerCache
+                .get(operationKey)
+                .map(this::convertToFuture)
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        "Internal cache error: Failed to 
retrieve status"));
+    }
+
+    private CompletableFuture<Acknowledge> 
convertToFuture(OperationResult<String> result) {
+        if (result.getStatus() == OperationResultStatus.FAILURE) {
+            return CompletableFuture.failedFuture(result.getThrowable());

Review comment:
       Let's wrap the Throwable in something like a 
`OperationAlreadyFailedException`, so that the handler scan differentiate 
between this kind of failure and some request validation error in the 
`Dispatcher`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.async.OperationResultStatus;
+import 
org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A handler for async operations triggered by the {@link Dispatcher} whose 
trigger parameters and
+ * results are cached.
+ */
+public class DispatcherCachedOperationsHandler {
+
+    private final CompletedOperationCache<AsynchronousJobOperationKey, String>
+            savepointTriggerCache;
+
+    private final Function<TriggerSavepointParameters, 
CompletableFuture<String>>
+            triggerSavepointFunction;
+
+    private final Function<TriggerSavepointParameters, 
CompletableFuture<String>>
+            stopWithSavepointFunction;
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction) {
+        this(triggerSavepointFunction, stopWithSavepointFunction, new 
CompletedOperationCache<>());
+    }
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction,
+            CompletedOperationCache<AsynchronousJobOperationKey, String> 
savepointTriggerCache) {
+        this.triggerSavepointFunction = triggerSavepointFunction;
+        this.stopWithSavepointFunction = stopWithSavepointFunction;
+        this.savepointTriggerCache = savepointTriggerCache;
+    }
+
+    public CompletableFuture<Acknowledge> triggerSavepoint(
+            AsynchronousJobOperationKey operationKey, 
TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, 
triggerSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<Acknowledge> stopWithSavepoint(
+            AsynchronousJobOperationKey operationKey, 
TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, 
stopWithSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<OperationResult<String>> getSavepointStatus(
+            AsynchronousJobOperationKey operationKey) {
+        return savepointTriggerCache
+                .get(operationKey)
+                .map(CompletableFuture::completedFuture)
+                .orElse(
+                        CompletableFuture.failedFuture(
+                                new 
UnknownOperationKeyException(operationKey)));
+    }
+
+    public CompletableFuture<Void> shutDownCache() {
+        return savepointTriggerCache.closeAsync();
+    }
+
+    private <P> CompletableFuture<Acknowledge> registerOperationIdempotently(
+            AsynchronousJobOperationKey operationKey,
+            Function<P, CompletableFuture<String>> operation,
+            P parameters) {
+        Optional<OperationResult<String>> resultOptional = 
savepointTriggerCache.get(operationKey);
+        if (resultOptional.isPresent()) {
+            return convertToFuture(resultOptional.get());
+        }
+
+        try {
+            savepointTriggerCache.registerOngoingOperation(
+                    operationKey, operation.apply(parameters));
+        } catch (IllegalStateException e) {
+            return CompletableFuture.failedFuture(e);

Review comment:
       There's no need to catch this exception; it just makes the code more 
complicated without any real benefit.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.async.OperationResultStatus;
+import 
org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A handler for async operations triggered by the {@link Dispatcher} whose 
trigger parameters and

Review comment:
       where are we caching the trigger parameters?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointTestUtilities.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/** Utility functions used in tests. */
+public class SavepointTestUtilities {
+    public static BiFunction<AsynchronousJobOperationKey, String, 
CompletableFuture<Acknowledge>>

Review comment:
       These need some javadocs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to