This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9fe920f [FLINK-14807][rest] Introduce REST API for communication between clients and operator coordinators 9fe920f is described below commit 9fe920fdc6f7eeaf2d99901099c842cfd0f1380a Author: TsReaper <tsreape...@gmail.com> AuthorDate: Fri May 15 07:46:31 2020 +0800 [FLINK-14807][rest] Introduce REST API for communication between clients and operator coordinators This closes #12037 --- .../deployment/ClusterClientJobClientAdapter.java | 14 ++- .../deployment/application/EmbeddedJobClient.java | 20 +++- .../apache/flink/client/program/ClusterClient.java | 13 +++ .../flink/client/program/MiniClusterClient.java | 20 ++++ .../client/program/PerJobMiniClusterFactory.java | 19 +++- .../client/program/rest/RestClusterClient.java | 37 ++++++++ .../flink/client/program/TestingClusterClient.java | 11 +++ .../client/program/rest/RestClusterClientTest.java | 75 +++++++++++++++ .../src/test/resources/rest_api_v1.snapshot | 33 +++++++ .../flink/runtime/dispatcher/Dispatcher.java | 17 ++++ .../apache/flink/runtime/jobmaster/JobMaster.java | 15 +++ .../flink/runtime/jobmaster/JobMasterGateway.java | 19 ++++ .../flink/runtime/minicluster/MiniCluster.java | 14 +++ .../coordination/CoordinationRequest.java | 27 ++++++ .../coordination/CoordinationRequestGateway.java | 39 ++++++++ .../coordination/CoordinationRequestHandler.java | 36 +++++++ .../coordination/CoordinationResponse.java | 27 ++++++ .../coordination/ClientCoordinationHandler.java | 84 +++++++++++++++++ .../rest/messages/OperatorIDPathParameter.java | 49 ++++++++++ .../coordination/ClientCoordinationHeaders.java | 80 ++++++++++++++++ .../ClientCoordinationMessageParameters.java | 49 ++++++++++ .../ClientCoordinationRequestBody.java | 56 +++++++++++ .../ClientCoordinationResponseBody.java | 56 +++++++++++ .../flink/runtime/scheduler/SchedulerBase.java | 36 ++++++- .../flink/runtime/scheduler/SchedulerNG.java | 12 +++ .../flink/runtime/webmonitor/RestfulGateway.java | 24 +++++ .../runtime/webmonitor/WebMonitorEndpoint.java | 9 ++ .../jobmaster/utils/TestingJobMasterGateway.java | 14 ++- .../utils/TestingJobMasterGatewayBuilder.java | 11 ++- .../OperatorCoordinatorSchedulerTest.java | 48 ++++++++++ .../TestingCoordinationRequestHandler.java | 104 +++++++++++++++++++++ .../webmonitor/TestingDispatcherGateway.java | 14 ++- .../runtime/webmonitor/TestingRestfulGateway.java | 34 ++++++- .../environment/RemoteStreamEnvironmentTest.java | 11 +++ 34 files changed, 1112 insertions(+), 15 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java index 3fb0e5c..94c31f2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java @@ -26,6 +26,10 @@ import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.commons.io.IOUtils; @@ -41,7 +45,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * An implementation of the {@link JobClient} interface that uses a {@link ClusterClient} underneath.. */ -public class ClusterClientJobClientAdapter<ClusterID> implements JobClient { +public class ClusterClientJobClientAdapter<ClusterID> implements JobClient, CoordinationRequestGateway { private final ClusterClientProvider<ClusterID> clusterClientProvider; @@ -115,6 +119,13 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient { }))); } + @Override + public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) { + return bridgeClientRequest( + clusterClientProvider, + clusterClient -> clusterClient.sendCoordinationRequest(jobID, operatorId, request)); + } + private static <T> CompletableFuture<T> bridgeClientRequest( ClusterClientProvider<?> clusterClientProvider, Function<ClusterClient<?>, CompletableFuture<T>> resultRetriever) { @@ -132,5 +143,4 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient { return resultFuture.whenCompleteAsync( (jobResult, throwable) -> IOUtils.closeQuietly(clusterClient::close)); } - } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java index 617ca08..e72c467 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java @@ -25,12 +25,19 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.util.SerializedValue; import javax.annotation.Nullable; +import java.io.IOException; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -42,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * uses directly the {@link DispatcherGateway}. */ @Internal -public class EmbeddedJobClient implements JobClient { +public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway { private final JobID jobId; @@ -119,4 +126,15 @@ public class EmbeddedJobClient implements JobClient { } }); } + + @Override + public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) { + try { + SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request); + return dispatcherGateway.deliverCoordinationRequestToCoordinator( + jobId, operatorId, serializedRequest, timeout); + } catch (IOException e) { + return FutureUtils.completedExceptionally(e); + } + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 952e428..e6843dc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -23,8 +23,11 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.util.FlinkException; import javax.annotation.Nullable; @@ -162,4 +165,14 @@ public interface ClusterClient<T> extends AutoCloseable { * @return path future where the savepoint is located */ CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory); + + /** + * Sends out a request to a specified coordinator and return the response. + * + * @param jobId specifies the job which the coordinator belongs to + * @param operatorId specifies which coordinator to receive the request + * @param request the request to send + * @return the response from the coordinator + */ + CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId, OperatorID operatorId, CoordinationRequest request); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index 2eeb1f4..4c81681 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -24,12 +24,17 @@ import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +42,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -155,6 +161,20 @@ public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniCl } } + @Override + public CompletableFuture<CoordinationResponse> sendCoordinationRequest( + JobID jobId, + OperatorID operatorId, + CoordinationRequest request) { + try { + SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request); + return miniCluster.deliverCoordinationRequestToCoordinator(jobId, operatorId, serializedRequest); + } catch (IOException e) { + LOG.error("Error while sending coordination request", e); + return FutureUtils.completedExceptionally(e); + } + } + /** * The type of the Cluster ID for the local {@link MiniCluster}. */ diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java index f8998a2..16ca6b6 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java @@ -26,18 +26,25 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.util.MathUtils; +import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -127,7 +134,7 @@ public final class PerJobMiniClusterFactory { /** * A {@link JobClient} for a {@link PerJobMiniClusterFactory}. */ - private static final class PerJobMiniClusterJobClient implements JobClient { + private static final class PerJobMiniClusterJobClient implements JobClient, CoordinationRequestGateway { private final JobID jobID; private final MiniCluster miniCluster; @@ -182,5 +189,15 @@ public final class PerJobMiniClusterFactory { } }); } + + @Override + public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) { + try { + SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request); + return miniCluster.deliverCoordinationRequestToCoordinator(jobID, operatorId, serializedRequest); + } catch (IOException e) { + return FutureUtils.completedExceptionally(e); + } + } } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index f2c6aa1..0b6601c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -37,9 +37,12 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.rest.FileUpload; import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; @@ -67,6 +70,9 @@ import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters; @@ -88,6 +94,7 @@ import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; @@ -411,6 +418,36 @@ public class RestClusterClient<T> implements ClusterClient<T> { return triggerSavepoint(jobId, savepointDirectory, false); } + @Override + public CompletableFuture<CoordinationResponse> sendCoordinationRequest( + JobID jobId, + OperatorID operatorId, + CoordinationRequest request) { + ClientCoordinationHeaders headers = ClientCoordinationHeaders.getInstance(); + ClientCoordinationMessageParameters params = new ClientCoordinationMessageParameters(); + params.jobPathParameter.resolve(jobId); + params.operatorPathParameter.resolve(operatorId); + + SerializedValue<CoordinationRequest> serializedRequest; + try { + serializedRequest = new SerializedValue<>(request); + } catch (IOException e) { + return FutureUtils.completedExceptionally(e); + } + + ClientCoordinationRequestBody requestBody = new ClientCoordinationRequestBody(serializedRequest); + return sendRequest(headers, params, requestBody).thenApply( + responseBody -> { + try { + return responseBody + .getSerializedCoordinationResponse() + .deserializeValue(getClass().getClassLoader()); + } catch (IOException | ClassNotFoundException e) { + throw new CompletionException("Failed to deserialize coordination response", e); + } + }); + } + private CompletableFuture<String> triggerSavepoint( final JobID jobId, final @Nullable String savepointDirectory, diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java index 46df5ea..25124a0 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java @@ -23,8 +23,11 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.util.function.TriFunction; import javax.annotation.Nonnull; @@ -133,6 +136,14 @@ public class TestingClusterClient<T> implements ClusterClient<T> { } @Override + public CompletableFuture<CoordinationResponse> sendCoordinationRequest( + JobID jobId, + OperatorID operatorId, + CoordinationRequest request) { + throw new UnsupportedOperationException(); + } + + @Override public void close() { } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index ff1f5ff..a55b9e7 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -39,10 +39,13 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.rest.FileUpload; import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.RestClient; @@ -76,6 +79,10 @@ import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters; @@ -701,6 +708,74 @@ public class RestClusterClientTest extends TestLogger { } } + @Test + public void testSendCoordinationRequest() throws Exception { + final TestClientCoordinationHandler handler = new TestClientCoordinationHandler(); + + try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(handler)) { + RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort()); + + String payload = "testing payload"; + TestCoordinationRequest<String> request = new TestCoordinationRequest<>(payload); + try { + CompletableFuture<CoordinationResponse> future = + restClusterClient.sendCoordinationRequest(jobId, new OperatorID(), request); + TestCoordinationResponse response = (TestCoordinationResponse) future.get(); + + assertEquals(payload, response.payload); + } finally { + restClusterClient.close(); + } + } + } + + private class TestClientCoordinationHandler extends TestHandler<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> { + + private TestClientCoordinationHandler() { + super(ClientCoordinationHeaders.getInstance()); + } + + @Override + @SuppressWarnings("unchecked") + protected CompletableFuture<ClientCoordinationResponseBody> handleRequest(@Nonnull HandlerRequest<ClientCoordinationRequestBody, ClientCoordinationMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { + try { + TestCoordinationRequest req = + (TestCoordinationRequest) request + .getRequestBody() + .getSerializedCoordinationRequest() + .deserializeValue(getClass().getClassLoader()); + TestCoordinationResponse resp = new TestCoordinationResponse(req.payload); + return CompletableFuture.completedFuture( + new ClientCoordinationResponseBody( + new SerializedValue<>(resp))); + } catch (Exception e) { + return FutureUtils.completedExceptionally(e); + } + } + } + + private static class TestCoordinationRequest<T> implements CoordinationRequest { + + private static final long serialVersionUID = 1L; + + private final T payload; + + private TestCoordinationRequest(T payload) { + this.payload = payload; + } + } + + private static class TestCoordinationResponse<T> implements CoordinationResponse { + + private static final long serialVersionUID = 1L; + + private final T payload; + + private TestCoordinationResponse(T payload) { + this.payload = payload; + } + } + private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> { public TestAccumulatorHandler() { diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index af55a7a..b14b741 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1430,6 +1430,39 @@ "type" : "any" } }, { + "url" : "/jobs/:jobid/coordinators/:operatorid", + "method" : "POST", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + }, { + "key" : "operatorid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:coordination:ClientCoordinationRequestBody", + "properties" : { + "serializedCoordinationRequest" : { + "type" : "any" + } + } + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:coordination:ClientCoordinationResponseBody", + "properties" : { + "serializedCoordinationResult" : { + "type" : "any" + } + } + } + }, { "url" : "/jobs/:jobid/exceptions", "method" : "GET", "status-code" : "200 OK", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 05aab6e..dde3908 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmanager.JobGraphWriter; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl; @@ -55,6 +56,8 @@ import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceOverview; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; @@ -66,6 +69,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.util.function.FunctionUtils; @@ -625,6 +629,19 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher return CompletableFuture.completedFuture(Acknowledge.get()); } + @Override + public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator( + JobID jobId, + OperatorID operatorId, + SerializedValue<CoordinationRequest> serializedRequest, + Time timeout) { + final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); + + return jobMasterGatewayFuture.thenCompose( + (JobMasterGateway jobMasterGateway) -> + jobMasterGateway.deliverCoordinationRequestToCoordinator(operatorId, serializedRequest, timeout)); + } + /** * Cleans up the job related data from the dispatcher. If cleanupHA is true, then * the data will also be removed from HA. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 543ef12..d5087cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -62,6 +62,8 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; @@ -718,6 +720,19 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast return CompletableFuture.completedFuture(aggregateFunction.getResult(accumulator)); } + @Override + public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator( + OperatorID operatorId, + SerializedValue<CoordinationRequest> serializedRequest, + Time timeout) { + try { + CoordinationRequest request = serializedRequest.deserializeValue(userCodeLoader); + return schedulerNG.deliverCoordinationRequestToCoordinator(operatorId, request); + } catch (Exception e) { + return FutureUtils.completedExceptionally(e); + } + } + //---------------------------------------------------------------------------------------------- // Internal methods //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 1006dad..16decc3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -33,8 +33,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; @@ -44,6 +47,7 @@ import org.apache.flink.runtime.taskexecutor.AccumulatorReport; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.util.SerializedValue; import javax.annotation.Nullable; @@ -271,4 +275,19 @@ public interface JobMasterGateway extends * @return The updated aggregate */ CompletableFuture<Object> updateGlobalAggregate(String aggregateName, Object aggregand, byte[] serializedAggregationFunction); + + /** + * Deliver a coordination request to a specified coordinator and return the response. + * + * @param operatorId identifying the coordinator to receive the request + * @param serializedRequest serialized request to deliver + * @return A future containing the response. + * The response will fail with a {@link org.apache.flink.util.FlinkException} + * if the task is not running, or no operator/coordinator exists for the given ID, + * or the coordinator cannot handle client events. + */ + CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator( + OperatorID operatorId, + SerializedValue<CoordinationRequest> serializedRequest, + @RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 556af4a..58e08ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; @@ -60,6 +61,8 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.ReporterSetup; import org.apache.flink.runtime.metrics.groups.ProcessMetricGroup; import org.apache.flink.runtime.metrics.util.MetricUtils; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; @@ -79,6 +82,7 @@ import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.function.FunctionUtils; import org.slf4j.Logger; @@ -601,6 +605,16 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJob(jobId, rpcTimeout)); } + public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator( + JobID jobId, + OperatorID operatorId, + SerializedValue<CoordinationRequest> serializedRequest) { + return runDispatcherCommand( + dispatcherGateway -> + dispatcherGateway.deliverCoordinationRequestToCoordinator( + jobId, operatorId, serializedRequest, rpcTimeout)); + } + private <T> CompletableFuture<T> runDispatcherCommand(Function<DispatcherGateway, CompletableFuture<T>> dispatcherCommand) { return getDispatcherGatewayFuture().thenApply(dispatcherCommand).thenCompose(Function.identity()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java new file mode 100644 index 0000000..a02678d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import java.io.Serializable; + +/** + * Root interface for all requests from the client to a {@link OperatorCoordinator} + * which requests for a {@link CoordinationResponse}. + */ +public interface CoordinationRequest extends Serializable {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java new file mode 100644 index 0000000..0767569 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestGateway.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import org.apache.flink.runtime.jobgraph.OperatorID; + +import java.util.concurrent.CompletableFuture; + +/** + * Client interface which sends out a {@link CoordinationRequest} and + * expects for a {@link CoordinationResponse} from a {@link OperatorCoordinator}. + */ +public interface CoordinationRequestGateway { + + /** + * Send out a request to a specified coordinator and return the response. + * + * @param operatorId specifies which coordinator to receive the request + * @param request the request to send + * @return the response from the coordinator + */ + CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java new file mode 100644 index 0000000..3514d1e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationRequestHandler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import java.util.concurrent.CompletableFuture; + +/** + * Coordinator interface which can handle {@link CoordinationRequest}s + * and response with {@link CoordinationResponse}s to the client. + */ +public interface CoordinationRequestHandler { + + /** + * Called when receiving a request from the client. + * + * @param request the request received + * @return a future containing the response from the coordinator for this request + */ + CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java new file mode 100644 index 0000000..d28e30b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinationResponse.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import java.io.Serializable; + +/** + * Root interface for all responses from a {@link OperatorCoordinator} to the client + * which is the response for a {@link CoordinationRequest}. + */ +public interface CoordinationResponse extends Serializable {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java new file mode 100644 index 0000000..e361f64 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/coordination/ClientCoordinationHandler.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.coordination; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.OperatorIDPathParameter; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationResponseBody; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * Handler that receives the coordination requests from the client and returns the response from the coordinator. + */ +public class ClientCoordinationHandler extends AbstractRestHandler<RestfulGateway, ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> { + + public ClientCoordinationHandler( + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders, + MessageHeaders<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> messageHeaders) { + super(leaderRetriever, timeout, responseHeaders, messageHeaders); + } + + @Override + protected CompletableFuture<ClientCoordinationResponseBody> handleRequest( + @Nonnull HandlerRequest<ClientCoordinationRequestBody, ClientCoordinationMessageParameters> request, + @Nonnull RestfulGateway gateway) throws RestHandlerException { + JobID jobId = request.getPathParameter(JobIDPathParameter.class); + OperatorID operatorId = request.getPathParameter(OperatorIDPathParameter.class); + SerializedValue<CoordinationRequest> serializedRequest = + request.getRequestBody().getSerializedCoordinationRequest(); + CompletableFuture<CoordinationResponse> responseFuture = + gateway.deliverCoordinationRequestToCoordinator(jobId, operatorId, serializedRequest, timeout); + return responseFuture.thenApply( + coordinationResponse -> { + try { + return new ClientCoordinationResponseBody(new SerializedValue<>(coordinationResponse)); + } catch (IOException e) { + throw new CompletionException( + new RestHandlerException( + "Failed to serialize coordination response", + HttpResponseStatus.INTERNAL_SERVER_ERROR, + e)); + } + }); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java new file mode 100644 index 0000000..87d985c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/OperatorIDPathParameter.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.util.StringUtils; + +/** + * Path parameter identifying operators. + */ +public class OperatorIDPathParameter extends MessagePathParameter<OperatorID> { + + public static final String KEY = "operatorid"; + + public OperatorIDPathParameter() { + super(KEY); + } + + @Override + protected OperatorID convertFromString(String value) throws ConversionException { + return new OperatorID(StringUtils.hexStringToByte(value)); + } + + @Override + protected String convertToString(OperatorID value) { + return value.toString(); + } + + @Override + public String getDescription() { + return "string value that identifies an operator."; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java new file mode 100644 index 0000000..aded1e3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.coordination; + +import org.apache.flink.annotation.docs.Documentation; +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link ClientCoordinationHandler}. + */ +@Documentation.ExcludeFromDocumentation( + "This API is not exposed to the users, as coordinators are used only internally.") +public class ClientCoordinationHeaders implements MessageHeaders<ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> { + + public static final String URL = "/jobs/:jobid/coordinators/:operatorid"; + + private static final ClientCoordinationHeaders INSTANCE = new ClientCoordinationHeaders(); + + private ClientCoordinationHeaders() {} + + @Override + public Class<ClientCoordinationRequestBody> getRequestClass() { + return ClientCoordinationRequestBody.class; + } + + @Override + public Class<ClientCoordinationResponseBody> getResponseClass() { + return ClientCoordinationResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public ClientCoordinationMessageParameters getUnresolvedMessageParameters() { + return new ClientCoordinationMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.POST; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static ClientCoordinationHeaders getInstance() { + return INSTANCE; + } + + @Override + public String getDescription() { + return "Send a request to a specified coordinator of the specified job and get the response. " + + "This API is for internal use only."; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java new file mode 100644 index 0000000..01be123 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationMessageParameters.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.coordination; + +import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.rest.messages.OperatorIDPathParameter; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** + * {@link MessageParameters} for {@link ClientCoordinationHandler}. + */ +public class ClientCoordinationMessageParameters extends MessageParameters { + + public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + public final OperatorIDPathParameter operatorPathParameter = new OperatorIDPathParameter(); + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Arrays.asList(jobPathParameter, operatorPathParameter); + } + + @Override + public Collection<MessageQueryParameter<?>> getQueryParameters() { + return Collections.emptyList(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java new file mode 100644 index 0000000..e663320 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationRequestBody.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.coordination; + +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer; +import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +/** + * Request that carries a serialized {@link CoordinationRequest} to a specified coordinator. + */ +public class ClientCoordinationRequestBody implements RequestBody { + + public static final String FIELD_NAME_SERIALIZED_COORDINATION_REQUEST = "serializedCoordinationRequest"; + + @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_REQUEST) + @JsonSerialize(using = SerializedValueSerializer.class) + @JsonDeserialize(using = SerializedValueDeserializer.class) + private final SerializedValue<CoordinationRequest> serializedCoordinationRequest; + + @JsonCreator + public ClientCoordinationRequestBody( + @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_REQUEST) + SerializedValue<CoordinationRequest> serializedCoordinationRequest) { + this.serializedCoordinationRequest = serializedCoordinationRequest; + } + + @JsonIgnore + public SerializedValue<CoordinationRequest> getSerializedCoordinationRequest() { + return serializedCoordinationRequest; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java new file mode 100644 index 0000000..5d1cdf2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationResponseBody.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.coordination; + +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer; +import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +/** + * Response that carries a serialized {@link CoordinationResponse} to the client. + */ +public class ClientCoordinationResponseBody implements ResponseBody { + + public static final String FIELD_NAME_SERIALIZED_COORDINATION_RESULT = "serializedCoordinationResult"; + + @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_RESULT) + @JsonSerialize(using = SerializedValueSerializer.class) + @JsonDeserialize(using = SerializedValueDeserializer.class) + private final SerializedValue<CoordinationResponse> serializedCoordinationResponse; + + @JsonCreator + public ClientCoordinationResponseBody( + @JsonProperty(FIELD_NAME_SERIALIZED_COORDINATION_RESULT) + SerializedValue<CoordinationResponse> serializedCoordinationResponse) { + this.serializedCoordinationResponse = serializedCoordinationResponse; + } + + @JsonIgnore + public SerializedValue<CoordinationResponse> getSerializedCoordinationResponse() { + return serializedCoordinationResponse; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 2be0e5c..7cc17dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -76,6 +76,9 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; @@ -107,6 +110,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -164,6 +168,8 @@ public abstract class SchedulerBase implements SchedulerNG { protected final ExecutionVertexVersioner executionVertexVersioner; + private final Map<OperatorID, OperatorCoordinator> coordinatorMap; + private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor( "SchedulerBase is not initialized with proper main thread executor. " + "Call to SchedulerBase.setMainThreadExecutor(...) required."); @@ -222,6 +228,8 @@ public abstract class SchedulerBase implements SchedulerNG { this.schedulingTopology = executionGraph.getSchedulingTopology(); this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph); + + this.coordinatorMap = createCoordinatorMap(); } private ExecutionGraph createAndRestoreExecutionGraph( @@ -932,6 +940,20 @@ public abstract class SchedulerBase implements SchedulerNG { } } + @Override + public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator( + OperatorID operator, + CoordinationRequest request) throws FlinkException { + OperatorCoordinator coordinator = coordinatorMap.get(operator); + if (coordinator instanceof CoordinationRequestHandler) { + return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request); + } else if (coordinator != null) { + throw new FlinkException("Coordinator of operator " + operator + " cannot handle client event"); + } else { + throw new FlinkException("Coordinator of operator " + operator + " does not exist"); + } + } + private void startAllOperatorCoordinators() { final Collection<OperatorCoordinator> coordinators = getAllCoordinators(); try { @@ -951,8 +973,16 @@ public abstract class SchedulerBase implements SchedulerNG { } private Collection<OperatorCoordinator> getAllCoordinators() { - return getExecutionGraph().getAllVertices().values().stream() - .flatMap((vertex) -> vertex.getOperatorCoordinators().stream()) - .collect(Collectors.toList()); + return coordinatorMap.values(); + } + + private Map<OperatorID, OperatorCoordinator> createCoordinatorMap() { + Map<OperatorID, OperatorCoordinator> coordinatorMap = new HashMap<>(); + for (ExecutionJobVertex vertex : getExecutionGraph().getAllVertices().values()) { + for (Map.Entry<OperatorID, OperatorCoordinator> entry : vertex.getOperatorCoordinatorMap().entrySet()) { + coordinatorMap.put(entry.getKey(), entry.getValue()); + } + } + return coordinatorMap; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java index 5a75d37..07f7e7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java @@ -40,6 +40,8 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.query.KvStateLocation; @@ -138,4 +140,14 @@ public interface SchedulerNG { * for the given ID. */ void deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecution, OperatorID operator, OperatorEvent evt) throws FlinkException; + + /** + * Delivers a coordination request to the {@link OperatorCoordinator} with the given {@link OperatorID} + * and returns the coordinator's response. + * + * @return A future containing the response. + * @throws FlinkException Thrown, if the task is not running, or no operator/coordinator exists + * for the given ID, or the coordinator cannot handle client events. + */ + CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operator, CoordinationRequest request) throws FlinkException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index be0e45a..c736565 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -26,15 +26,19 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.util.SerializedValue; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -186,4 +190,24 @@ public interface RestfulGateway extends RpcGateway { default CompletableFuture<Acknowledge> shutDownCluster() { throw new UnsupportedOperationException(); } + + /** + * Deliver a coordination request to a specified coordinator and return the response. + * + * @param jobId identifying the job which the coordinator belongs to + * @param operatorId identifying the coordinator to receive the request + * @param serializedRequest serialized request to deliver + * @param timeout RPC timeout + * @return A future containing the response. + * The response will fail with a {@link org.apache.flink.util.FlinkException} + * if the task is not running, or no operator/coordinator exists for the given ID, + * or the coordinator cannot handle client events. + */ + default CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator( + JobID jobId, + OperatorID operatorId, + SerializedValue<CoordinationRequest> serializedRequest, + @RpcTimeout Time timeout) { + throw new UnsupportedOperationException(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index c17b2b7..fd67f86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -64,6 +64,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler; import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingJobsMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingTaskManagersMetricsHandler; @@ -119,6 +120,7 @@ import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders; +import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders; @@ -559,6 +561,12 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp final ClusterDataSetDeleteHandlers.ClusterDataSetDeleteStatusHandler clusterDataSetDeleteStatusHandler = clusterDataSetDeleteHandlers.new ClusterDataSetDeleteStatusHandler(leaderRetriever, timeout, responseHeaders); + final ClientCoordinationHandler clientCoordinationHandler = new ClientCoordinationHandler( + leaderRetriever, + timeout, + responseHeaders, + ClientCoordinationHeaders.getInstance()); + final ShutdownHandler shutdownHandler = new ShutdownHandler( leaderRetriever, timeout, @@ -625,6 +633,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp handlers.add(Tuple2.of(clusterDataSetListHandler.getMessageHeaders(), clusterDataSetListHandler)); handlers.add(Tuple2.of(clusterDataSetDeleteTriggerHandler.getMessageHeaders(), clusterDataSetDeleteTriggerHandler)); handlers.add(Tuple2.of(clusterDataSetDeleteStatusHandler.getMessageHeaders(), clusterDataSetDeleteStatusHandler)); + handlers.add(Tuple2.of(clientCoordinationHandler.getMessageHeaders(), clientCoordinationHandler)); // TODO: Remove once the Yarn proxy can forward all REST verbs handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), yarnJobCancelTerminationHandler)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 47b9206..59b3cc5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -42,6 +42,8 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -157,6 +159,9 @@ public class TestingJobMasterGateway implements JobMasterGateway { @Nonnull private final TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender; + @Nonnull + private final BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction; + public TestingJobMasterGateway( @Nonnull String address, @Nonnull String hostname, @@ -185,7 +190,8 @@ public class TestingJobMasterGateway implements JobMasterGateway { @Nonnull Function<Tuple6<JobID, JobVertexID, KeyGroupRange, String, KvStateID, InetSocketAddress>, CompletableFuture<Acknowledge>> notifyKvStateRegisteredFunction, @Nonnull Function<Tuple4<JobID, JobVertexID, KeyGroupRange, String>, CompletableFuture<Acknowledge>> notifyKvStateUnregisteredFunction, @Nonnull TriFunction<String, Object, byte[], CompletableFuture<Object>> updateAggregateFunction, - @Nonnull TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender) { + @Nonnull TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender, + @Nonnull BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction) { this.address = address; this.hostname = hostname; this.cancelFunction = cancelFunction; @@ -214,6 +220,7 @@ public class TestingJobMasterGateway implements JobMasterGateway { this.notifyKvStateUnregisteredFunction = notifyKvStateUnregisteredFunction; this.updateAggregateFunction = updateAggregateFunction; this.operatorEventSender = operatorEventSender; + this.deliverCoordinationRequestFunction = deliverCoordinationRequestFunction; } @Override @@ -360,4 +367,9 @@ public class TestingJobMasterGateway implements JobMasterGateway { public CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(ExecutionAttemptID task, OperatorID operatorID, SerializedValue<OperatorEvent> event) { return operatorEventSender.apply(task, operatorID, event); } + + @Override + public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest, Time timeout) { + return deliverCoordinationRequestFunction.apply(operatorId, serializedRequest); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java index a960448..20202cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java @@ -41,6 +41,8 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; @@ -102,6 +104,7 @@ public class TestingJobMasterGatewayBuilder { private Function<Tuple4<JobID, JobVertexID, KeyGroupRange, String>, CompletableFuture<Acknowledge>> notifyKvStateUnregisteredFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); private TriFunction<String, Object, byte[], CompletableFuture<Object>> updateAggregateFunction = (a, b, c) -> CompletableFuture.completedFuture(new Object()); private TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>> operatorEventSender = (a, b, c) -> CompletableFuture.completedFuture(Acknowledge.get()); + private BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction = (a, b) -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); public TestingJobMasterGatewayBuilder setAddress(String address) { this.address = address; @@ -243,6 +246,11 @@ public class TestingJobMasterGatewayBuilder { return this; } + public TestingJobMasterGatewayBuilder setDeliverCoordinationRequestFunction(BiFunction<OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestFunction) { + this.deliverCoordinationRequestFunction = deliverCoordinationRequestFunction; + return this; + } + public TestingJobMasterGateway build() { return new TestingJobMasterGateway( address, @@ -272,6 +280,7 @@ public class TestingJobMasterGatewayBuilder { notifyKvStateRegisteredFunction, notifyKvStateUnregisteredFunction, updateAggregateFunction, - operatorEventSender); + operatorEventSender, + deliverCoordinationRequestFunction); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java index 17a82f0..8ecf172 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators.coordination; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; @@ -35,6 +36,7 @@ import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -163,6 +165,52 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { assertThat(result, futureFailedWith(TestException.class)); } + @Test + @SuppressWarnings("unchecked") + public void testDeliveringClientRequestToRequestHandler() throws Exception { + final OperatorCoordinator.Provider provider = new TestingCoordinationRequestHandler.Provider(testOperatorId); + final DefaultScheduler scheduler = createScheduler(provider); + + final String payload = "testing payload"; + final TestingCoordinationRequestHandler.Request<String> request = + new TestingCoordinationRequestHandler.Request<>(payload); + final TestingCoordinationRequestHandler.Response<String> response = + (TestingCoordinationRequestHandler.Response<String>) + scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request).get(); + + assertEquals(payload, response.getPayload()); + } + + @Test + public void testDeliveringClientRequestToNonRequestHandler() throws Exception { + final OperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(testOperatorId); + final DefaultScheduler scheduler = createScheduler(provider); + + final String payload = "testing payload"; + final TestingCoordinationRequestHandler.Request<String> request = + new TestingCoordinationRequestHandler.Request<>(payload); + + CommonTestUtils.assertThrows( + "cannot handle client event", + FlinkException.class, + () -> scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request)); + } + + @Test + public void testDeliveringClientRequestToNonExistingCoordinator() throws Exception { + final OperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(testOperatorId); + final DefaultScheduler scheduler = createScheduler(provider); + + final String payload = "testing payload"; + final TestingCoordinationRequestHandler.Request<String> request = + new TestingCoordinationRequestHandler.Request<>(payload); + + CommonTestUtils.assertThrows( + "does not exist", + FlinkException.class, + () -> scheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), request)); + } + // ------------------------------------------------------------------------ // test setups // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java new file mode 100644 index 0000000..4aa3c43 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingCoordinationRequestHandler.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import org.apache.flink.runtime.jobgraph.OperatorID; + +import java.util.concurrent.CompletableFuture; + +/** + * A simple testing implementation of the {@link CoordinationRequestHandler}. + */ +public class TestingCoordinationRequestHandler extends TestingOperatorCoordinator implements CoordinationRequestHandler { + + public TestingCoordinationRequestHandler(Context context) { + super(context); + } + + @Override + public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) { + Request req = (Request) request; + return CompletableFuture.completedFuture(new Response<>(req.getPayload())); + } + + /** + * A testing stub for an {@link OperatorCoordinator.Provider} that creates a + * {@link TestingCoordinationRequestHandler}. + */ + public static final class Provider implements OperatorCoordinator.Provider { + + private static final long serialVersionUID = 1L; + + private final OperatorID operatorId; + + public Provider(OperatorID operatorId) { + this.operatorId = operatorId; + } + + @Override + public OperatorID getOperatorId() { + return operatorId; + } + + @Override + public OperatorCoordinator create(Context context) { + return new TestingCoordinationRequestHandler(context); + } + } + + /** + * A {@link CoordinationRequest} that a {@link TestingCoordinationRequestHandler} receives. + * + * @param <T> payload type + */ + public static class Request<T> implements CoordinationRequest { + + private static final long serialVersionUID = 1L; + + private final T payload; + + public Request(T payload) { + this.payload = payload; + } + + public T getPayload() { + return payload; + } + } + + /** + * A {@link CoordinationResponse} that a {@link TestingCoordinationRequestHandler} gives. + * + * @param <T> payload type + */ + public static class Response<T> implements CoordinationResponse { + + private static final long serialVersionUID = 1L; + + private final T payload; + + public Response(T payload) { + this.payload = payload; + } + + public T getPayload() { + return payload; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java index 4afd56d..becf513 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java @@ -29,12 +29,17 @@ import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.function.TriFunction; import java.util.Collection; import java.util.Collections; @@ -92,7 +97,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway implem DispatcherId fencingToken, Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction, Supplier<CompletableFuture<Acknowledge>> clusterShutdownSupplier, - Function<ApplicationStatus, CompletableFuture<Acknowledge>> clusterShutdownWithStatusFunction) { + Function<ApplicationStatus, CompletableFuture<Acknowledge>> clusterShutdownWithStatusFunction, + TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction) { super( address, hostname, @@ -107,7 +113,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway implem requestOperatorBackPressureStatsFunction, triggerSavepointFunction, stopWithSavepointFunction, - clusterShutdownSupplier); + clusterShutdownSupplier, + deliverCoordinationRequestToCoordinatorFunction); this.submitFunction = submitFunction; this.listFunction = listFunction; this.blobServerPort = blobServerPort; @@ -219,7 +226,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway implem fencingToken, requestArchivedJobFunction, clusterShutdownSupplier, - clusterShutdownWithStatusFunction); + clusterShutdownWithStatusFunction, + deliverCoordinationRequestToCoordinatorFunction); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java index b7cf02d..0acc5ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java @@ -26,12 +26,17 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.function.TriFunction; import java.util.Collection; import java.util.Collections; @@ -57,6 +62,7 @@ public class TestingRestfulGateway implements RestfulGateway { static final BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> DEFAULT_REQUEST_OPERATOR_BACK_PRESSURE_STATS_SUPPLIER = (jobId, jobVertexId) -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); static final BiFunction<JobID, String, CompletableFuture<String>> DEFAULT_TRIGGER_SAVEPOINT_FUNCTION = (JobID jobId, String targetDirectory) -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); static final BiFunction<JobID, String, CompletableFuture<String>> DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION = (JobID jobId, String targetDirectory) -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); + static final TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION = (JobID jobId, OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest) -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); static final String LOCALHOST = "localhost"; protected String address; @@ -89,6 +95,8 @@ public class TestingRestfulGateway implements RestfulGateway { protected BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction; + protected TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction; + public TestingRestfulGateway() { this( LOCALHOST, @@ -104,7 +112,8 @@ public class TestingRestfulGateway implements RestfulGateway { DEFAULT_REQUEST_OPERATOR_BACK_PRESSURE_STATS_SUPPLIER, DEFAULT_TRIGGER_SAVEPOINT_FUNCTION, DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION, - DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER); + DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER, + DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION); } public TestingRestfulGateway( @@ -121,7 +130,8 @@ public class TestingRestfulGateway implements RestfulGateway { BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction, BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction, BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction, - Supplier<CompletableFuture<Acknowledge>> clusterShutdownSupplier) { + Supplier<CompletableFuture<Acknowledge>> clusterShutdownSupplier, + TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction) { this.address = address; this.hostname = hostname; this.cancelJobFunction = cancelJobFunction; @@ -136,6 +146,7 @@ public class TestingRestfulGateway implements RestfulGateway { this.triggerSavepointFunction = triggerSavepointFunction; this.stopWithSavepointFunction = stopWithSavepointFunction; this.clusterShutdownSupplier = clusterShutdownSupplier; + this.deliverCoordinationRequestToCoordinatorFunction = deliverCoordinationRequestToCoordinatorFunction; } @Override @@ -199,6 +210,15 @@ public class TestingRestfulGateway implements RestfulGateway { } @Override + public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator( + JobID jobId, + OperatorID operatorId, + SerializedValue<CoordinationRequest> serializedRequest, + Time timeout) { + return deliverCoordinationRequestToCoordinatorFunction.apply(jobId, operatorId, serializedRequest); + } + + @Override public String getAddress() { return address; } @@ -229,6 +249,7 @@ public class TestingRestfulGateway implements RestfulGateway { protected BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction; protected BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction; protected BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction; + protected TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction; protected AbstractBuilder() { cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION; @@ -243,6 +264,7 @@ public class TestingRestfulGateway implements RestfulGateway { triggerSavepointFunction = DEFAULT_TRIGGER_SAVEPOINT_FUNCTION; stopWithSavepointFunction = DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION; clusterShutdownSupplier = DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER; + deliverCoordinationRequestToCoordinatorFunction = DEFAULT_DELIVER_COORDINATION_REQUEST_TO_COORDINATOR_FUNCTION; } public T setAddress(String address) { @@ -315,6 +337,11 @@ public class TestingRestfulGateway implements RestfulGateway { return self(); } + public T setDeliverCoordinationRequestToCoordinatorFunction(TriFunction<JobID, OperatorID, SerializedValue<CoordinationRequest>, CompletableFuture<CoordinationResponse>> deliverCoordinationRequestToCoordinatorFunction) { + this.deliverCoordinationRequestToCoordinatorFunction = deliverCoordinationRequestToCoordinatorFunction; + return self(); + } + protected abstract T self(); public abstract TestingRestfulGateway build(); @@ -346,7 +373,8 @@ public class TestingRestfulGateway implements RestfulGateway { requestOperatorBackPressureStatsFunction, triggerSavepointFunction, stopWithSavepointFunction, - clusterShutdownSupplier); + clusterShutdownSupplier, + deliverCoordinationRequestToCoordinatorFunction); } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java index 00bc2b9..e548f5e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java @@ -31,10 +31,13 @@ import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.jobmaster.JobResult.Builder; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -274,6 +277,14 @@ public class RemoteStreamEnvironmentTest extends TestLogger { @Nullable String savepointDirectory) { return null; } + + @Override + public CompletableFuture<CoordinationResponse> sendCoordinationRequest( + JobID jobId, + OperatorID operatorId, + CoordinationRequest request) { + return null; + } } }