GJL commented on a change in pull request #7659: [FLINK-11514][client] Port ClusterClientTest URL: https://github.com/apache/flink/pull/7659#discussion_r255027696
########## File path: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; +import org.apache.flink.runtime.rest.handler.async.TriggerResponse; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link RestClusterClient} for operations that trigger savepoints. + * + * <p>These tests verify that the client uses the appropriate headers for each + * request, properly constructs the request bodies/parameters and processes the responses correctly. + */ +public class RestClusterClientSavepointTriggerTest extends TestLogger { + + private static final DispatcherGateway mockRestfulGateway = new TestingDispatcherGateway.Builder().build(); + + private static final GatewayRetriever<DispatcherGateway> mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway); + + private static RestServerEndpointConfiguration restServerEndpointConfiguration; + + private static ExecutorService executor; + + private static final Configuration restConfig; + + static { + final Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10); + config.setLong(RestOptions.RETRY_DELAY, 0); + config.setInteger(RestOptions.PORT, 0); + + restConfig = config; + } + + @BeforeClass + public static void setUp() throws ConfigurationException { + restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(restConfig); + executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientSavepointTriggerTest.class.getSimpleName())); + } + + @AfterClass + public static void tearDown() { + if (executor != null) { + executor.shutdown(); + } + } + + @Test + public void testTriggerSavepointDefaultDirectory() throws Exception { + final TriggerId triggerId = new TriggerId(); + final String expectedReturnedSavepointDir = "hello"; + + try (final RestServerEndpoint restServerEndpoint = createRestServerEndpoint( + request -> { + assertNull(request.getTargetDirectory()); + assertFalse(request.isCancelJob()); + return triggerId; + }, + trigger -> { + assertEquals(triggerId, trigger); + return new SavepointInfo(expectedReturnedSavepointDir, null); + })) { + + final RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort()); + + final String savepointPath = restClusterClient.triggerSavepoint(new JobID(), null).get(); + assertEquals(expectedReturnedSavepointDir, savepointPath); + } + } + + @Test + public void testTriggerSavepointTargetDirectory() throws Exception { + final TriggerId triggerId = new TriggerId(); + final String expectedSubmittedSavepointDir = "world"; + final String expectedReturnedSavepointDir = "hello"; + + try (final RestServerEndpoint restServerEndpoint = createRestServerEndpoint( + triggerRequestBody -> { + assertEquals(expectedSubmittedSavepointDir, triggerRequestBody.getTargetDirectory()); + assertFalse(triggerRequestBody.isCancelJob()); + return triggerId; + }, + statusRequestTriggerId -> { + assertEquals(triggerId, statusRequestTriggerId); + return new SavepointInfo(expectedReturnedSavepointDir, null); + })) { + + final RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort()); + + final String savepointPath = restClusterClient.triggerSavepoint(new JobID(), expectedSubmittedSavepointDir).get(); + assertEquals(expectedReturnedSavepointDir, savepointPath); + } + } + + @Test + public void testTriggerSavepointCancelJob() throws Exception { + final TriggerId triggerId = new TriggerId(); + final String expectedSavepointDir = "hello"; + + try (final RestServerEndpoint restServerEndpoint = createRestServerEndpoint( + request -> { + assertTrue(request.isCancelJob()); + return triggerId; + }, + trigger -> { + assertEquals(triggerId, trigger); + return new SavepointInfo(expectedSavepointDir, null); + })) { + + final RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort()); + + final String savepointPath = restClusterClient.cancelWithSavepoint(new JobID(), null); + assertEquals(expectedSavepointDir, savepointPath); + } + } + + @Test + public void testTriggerSavepointFailure() throws Exception { + final TriggerId triggerId = new TriggerId(); + + try (final RestServerEndpoint restServerEndpoint = createRestServerEndpoint( + request -> triggerId, + trigger -> new SavepointInfo(null, new SerializedThrowable(new RuntimeException("expected"))))) { + + final RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort()); + + try { + restClusterClient.triggerSavepoint(new JobID(), null).get(); + } catch (ExecutionException e) { + final Throwable cause = e.getCause(); + assertThat(cause, instanceOf(SerializedThrowable.class)); + assertThat(((SerializedThrowable) cause) + .deserializeError(ClassLoader.getSystemClassLoader()) + .getMessage(), equalTo("expected")); + } + } + } + + @Test + public void testTriggerSavepointRetry() throws Exception { + final TriggerId triggerId = new TriggerId(); + final String expectedSavepointDir = "hello"; + + final AtomicBoolean failRequest = new AtomicBoolean(true); + try (final RestServerEndpoint restServerEndpoint = createRestServerEndpoint( + request -> triggerId, + trigger -> { + if (failRequest.compareAndSet(true, false)) { + throw new RestHandlerException("expected", HttpResponseStatus.SERVICE_UNAVAILABLE); + } else { + return new SavepointInfo(expectedSavepointDir, null); + } + })) { + + final RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort()); + + final String savepointPath = restClusterClient.triggerSavepoint(new JobID(), null).get(); + assertEquals(expectedSavepointDir, savepointPath); + } + } + + private static RestServerEndpoint createRestServerEndpoint( + final FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> triggerHandlerLogic, + final FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> savepointHandlerLogic) throws Exception { + return createRestServerEndpoint(restServerEndpointConfiguration, + new TestSavepointTriggerHandler(triggerHandlerLogic), + new TestSavepointHandler(savepointHandlerLogic)); + } + + private static final class TestSavepointTriggerHandler extends TestHandler<SavepointTriggerRequestBody, TriggerResponse, SavepointTriggerMessageParameters> { + + private final FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> triggerHandlerLogic; + + TestSavepointTriggerHandler(final FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> triggerHandlerLogic) { + super(SavepointTriggerHeaders.getInstance()); + this.triggerHandlerLogic = triggerHandlerLogic; + } + + @Override + protected CompletableFuture<TriggerResponse> handleRequest( + @Nonnull HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> request, + @Nonnull DispatcherGateway gateway) throws RestHandlerException { + + return CompletableFuture.completedFuture(new TriggerResponse(triggerHandlerLogic.apply(request.getRequestBody()))); + } + } + + private static class TestSavepointHandler extends TestHandler<EmptyRequestBody, AsynchronousOperationResult<SavepointInfo>, SavepointStatusMessageParameters> { + + private final FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> savepointHandlerLogic; + + TestSavepointHandler(final FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> savepointHandlerLogic) { + super(SavepointStatusHeaders.getInstance()); + this.savepointHandlerLogic = savepointHandlerLogic; + } + + @Override + protected CompletableFuture<AsynchronousOperationResult<SavepointInfo>> handleRequest( + @Nonnull HandlerRequest<EmptyRequestBody, SavepointStatusMessageParameters> request, + @Nonnull DispatcherGateway gateway) throws RestHandlerException { + + final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class); + return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(savepointHandlerLogic.apply(triggerId))); + } + } + + private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> { + + private TestHandler(MessageHeaders<R, P, M> headers) { + super( + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT, + Collections.emptyMap(), + headers); + } + } + + private static TestRestServerEndpoint createRestServerEndpoint( + final RestServerEndpointConfiguration restServerEndpointConfiguration, + final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) throws Exception { + final TestRestServerEndpoint testRestServerEndpoint = new TestRestServerEndpoint(restServerEndpointConfiguration, abstractRestHandlers); + testRestServerEndpoint.start(); + return testRestServerEndpoint; + } + + private static class TestRestServerEndpoint extends RestServerEndpoint implements AutoCloseable { Review comment: I would extract this class to avoid code duplication with the other `RestClusterClientTest`. Same with `TestHandler`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
