http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 5af50b2..43a9981 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -21,8 +21,6 @@ package org.apache.flink.runtime.rest; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; -import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; -import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.ErrorResponseBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessageParameters; @@ -45,6 +43,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; @@ -110,6 +109,7 @@ public class RestClient { bootstrap = new Bootstrap(); bootstrap + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout())) .group(group) .channel(NioSocketChannel.class) .handler(initializer); @@ -141,18 +141,6 @@ public class RestClient { } } - public <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters) throws IOException { - return sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, EmptyRequestBody.getInstance()); - } - - public <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, R request) throws IOException { - return sendRequest(targetAddress, targetPort, messageHeaders, EmptyMessageParameters.getInstance(), request); - } - - public <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders) throws IOException { - return sendRequest(targetAddress, targetPort, messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); - } - public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException { Preconditions.checkNotNull(targetAddress); Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536]."); @@ -264,6 +252,12 @@ public class RestClient { ctx.close(); } + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { + jsonFuture.completeExceptionally(cause); + ctx.close(); + } + private void readRawResponse(FullHttpResponse msg) { ByteBuf content = msg.content();
http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java index 7bf0307..86578a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rest; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.util.ConfigurationException; @@ -36,8 +37,11 @@ public final class RestClientConfiguration { @Nullable private final SSLEngine sslEngine; - private RestClientConfiguration(@Nullable SSLEngine sslEngine) { + private final long connectionTimeout; + + private RestClientConfiguration(@Nullable SSLEngine sslEngine, final long connectionTimeout) { this.sslEngine = sslEngine; + this.connectionTimeout = connectionTimeout; } /** @@ -51,6 +55,13 @@ public final class RestClientConfiguration { } /** + * @see RestOptions#CONNECTION_TIMEOUT + */ + public long getConnectionTimeout() { + return connectionTimeout; + } + + /** * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}. * * @param config configuration from which the REST client endpoint configuration should be created from @@ -76,6 +87,8 @@ public final class RestClientConfiguration { } } - return new RestClientConfiguration(sslEngine); + final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT); + + return new RestClientConfiguration(sslEngine, connectionTimeout); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 30a68d1..a22e38c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.RestServerEndpoint; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; @@ -91,6 +93,7 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHead import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.FileUtils; @@ -104,6 +107,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -112,7 +116,7 @@ import java.util.concurrent.Executor; * * @param <T> type of the leader gateway */ -public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint { +public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender { protected final GatewayRetriever<T> leaderRetriever; private final Configuration clusterConfiguration; @@ -125,6 +129,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp private final MetricFetcher<T> metricFetcher; + private final LeaderElectionService leaderElectionService; + + private final FatalErrorHandler fatalErrorHandler; + public WebMonitorEndpoint( RestServerEndpointConfiguration endpointConfiguration, GatewayRetriever<T> leaderRetriever, @@ -132,7 +140,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp RestHandlerConfiguration restConfiguration, GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever, Executor executor, - MetricQueryServiceRetriever metricQueryServiceRetriever) { + MetricQueryServiceRetriever metricQueryServiceRetriever, + LeaderElectionService leaderElectionService, + FatalErrorHandler fatalErrorHandler) { super(endpointConfiguration); this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever); this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration); @@ -152,6 +162,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp metricQueryServiceRetriever, executor, restConfiguration.getTimeout()); + + this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService); + this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); } @Override @@ -468,6 +481,12 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp } @Override + public void start() throws Exception { + super.start(); + leaderElectionService.start(this); + } + + @Override public void shutdown(Time timeout) { super.shutdown(timeout); @@ -481,5 +500,37 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp } catch (Throwable t) { log.warn("Error while deleting cache directory {}", tmpDir, t); } + + try { + leaderElectionService.stop(); + } catch (Exception e) { + log.warn("Error while stopping leaderElectionService", e); + } } + + //------------------------------------------------------------------------- + // LeaderContender + //------------------------------------------------------------------------- + + @Override + public void grantLeadership(final UUID leaderSessionID) { + log.info("{} was granted leadership with leaderSessionID={}", getRestAddress(), leaderSessionID); + leaderElectionService.confirmLeaderSessionID(leaderSessionID); + } + + @Override + public void revokeLeadership() { + log.info("{} lost leadership", getRestAddress()); + } + + @Override + public String getAddress() { + return getRestAddress(); + } + + @Override + public void handleError(final Exception exception) { + fatalErrorHandler.onFatalError(exception); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java index eb0ce2a..7d14ff2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -31,14 +31,19 @@ import org.mockito.invocation.InvocationOnMock; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; @@ -241,4 +246,36 @@ public class FutureUtilsTest extends TestLogger { assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException); } } + + @Test + public void testRetryWithDelayAndPredicate() throws Exception { + final ScheduledExecutorService retryExecutor = Executors.newSingleThreadScheduledExecutor(); + final String retryableExceptionMessage = "first exception"; + class TestStringSupplier implements Supplier<CompletableFuture<String>> { + private final AtomicInteger counter = new AtomicInteger(); + + @Override + public CompletableFuture<String> get() { + if (counter.getAndIncrement() == 0) { + return FutureUtils.completedExceptionally(new RuntimeException(retryableExceptionMessage)); + } else { + return FutureUtils.completedExceptionally(new RuntimeException("should propagate")); + } + } + } + + try { + FutureUtils.retryWithDelay( + new TestStringSupplier(), + 1, + Time.seconds(0), + throwable -> + throwable instanceof RuntimeException && throwable.getMessage().contains(retryableExceptionMessage), + new ScheduledExecutorServiceAdapter(retryExecutor)).get(); + } catch (final ExecutionException e) { + assertThat(e.getMessage(), containsString("Could not complete the operation")); + } finally { + retryExecutor.shutdownNow(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index db0b88e..98d5d74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -40,6 +40,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private volatile LeaderRetrievalService dispatcherLeaderRetriever; + private volatile LeaderRetrievalService webMonitorEndpointLeaderRetriever; + private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>(); private ConcurrentHashMap<JobID, LeaderElectionService> jobManagerLeaderElectionServices = new ConcurrentHashMap<>(); @@ -48,6 +50,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private volatile LeaderElectionService dispatcherLeaderElectionService; + private volatile LeaderElectionService webMonitorEndpointLeaderElectionService; + private volatile CheckpointRecoveryFactory checkpointRecoveryFactory; private volatile SubmittedJobGraphStore submittedJobGraphStore; @@ -66,6 +70,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices this.dispatcherLeaderRetriever = dispatcherLeaderRetriever; } + public void setWebMonitorEndpointLeaderRetriever(final LeaderRetrievalService webMonitorEndpointLeaderRetriever) { + this.webMonitorEndpointLeaderRetriever = webMonitorEndpointLeaderRetriever; + } + public void setJobMasterLeaderRetriever(JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) { this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever); } @@ -82,6 +90,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices this.dispatcherLeaderElectionService = leaderElectionService; } + public void setWebMonitorEndpointLeaderElectionService(final LeaderElectionService webMonitorEndpointLeaderElectionService) { + this.webMonitorEndpointLeaderElectionService = webMonitorEndpointLeaderElectionService; + } + public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) { this.checkpointRecoveryFactory = checkpointRecoveryFactory; } @@ -130,6 +142,11 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override + public LeaderRetrievalService getWebMonitorLeaderRetriever() { + return webMonitorEndpointLeaderRetriever; + } + + @Override public LeaderElectionService getResourceManagerLeaderElectionService() { LeaderElectionService service = resourceManagerLeaderElectionService; @@ -163,6 +180,11 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override + public LeaderElectionService getWebMonitorLeaderElectionService() { + return webMonitorEndpointLeaderElectionService; + } + + @Override public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { CheckpointRecoveryFactory factory = checkpointRecoveryFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java index 1f319eb..512f0d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java @@ -47,10 +47,13 @@ public class TestingManualHighAvailabilityServices implements HighAvailabilitySe private final ManualLeaderService dispatcherLeaderService; + private final ManualLeaderService webMonitorEndpointLeaderService; + public TestingManualHighAvailabilityServices() { jobManagerLeaderServices = new HashMap<>(4); resourceManagerLeaderService = new ManualLeaderService(); dispatcherLeaderService = new ManualLeaderService(); + webMonitorEndpointLeaderService = new ManualLeaderService(); } @Override @@ -76,6 +79,11 @@ public class TestingManualHighAvailabilityServices implements HighAvailabilitySe } @Override + public LeaderRetrievalService getWebMonitorLeaderRetriever() { + return webMonitorEndpointLeaderService.createLeaderRetrievalService(); + } + + @Override public LeaderElectionService getResourceManagerLeaderElectionService() { return resourceManagerLeaderService.createLeaderElectionService(); } @@ -93,6 +101,11 @@ public class TestingManualHighAvailabilityServices implements HighAvailabilitySe } @Override + public LeaderElectionService getWebMonitorLeaderElectionService() { + return webMonitorEndpointLeaderService.createLeaderElectionService(); + } + + @Override public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { return new StandaloneCheckpointRecoveryFactory(); } http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java index 1cf2e5b..c93dbcc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java @@ -41,6 +41,7 @@ public class StandaloneHaServicesTest extends TestLogger { private final String jobManagerAddress = "jobManager"; private final String dispatcherAddress = "dispatcher"; private final String resourceManagerAddress = "resourceManager"; + private final String webMonitorAddress = "webMonitor"; private StandaloneHaServices standaloneHaServices; @@ -50,7 +51,8 @@ public class StandaloneHaServicesTest extends TestLogger { standaloneHaServices = new StandaloneHaServices( resourceManagerAddress, dispatcherAddress, - jobManagerAddress); + jobManagerAddress, + webMonitorAddress); } @After http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java new file mode 100644 index 0000000..eb77af1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link RestClient}. + */ +public class RestClientTest extends TestLogger { + + @Test + public void testConnectionTimeout() throws Exception { + final Configuration config = new Configuration(); + config.setLong(RestOptions.CONNECTION_TIMEOUT, 1); + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor()); + final String unroutableIp = "10.255.255.1"; + try { + restClient.sendRequest( + unroutableIp, + 80, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()) + .get(60, TimeUnit.SECONDS); + } catch (final ExecutionException e) { + final Throwable throwable = ExceptionUtils.stripExecutionException(e); + assertThat(throwable, instanceOf(ConnectTimeoutException.class)); + assertThat(throwable.getMessage(), containsString(unroutableIp)); + } + } + + private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> { + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<EmptyResponseBody> getResponseClass() { + return EmptyResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/"; + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java index 9954ab7..1cdc70a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; @@ -80,11 +81,12 @@ public class RestEndpointITCase extends TestLogger { private static final Time timeout = Time.seconds(10L); private RestServerEndpoint serverEndpoint; - private RestClient clientEndpoint; + private RestClient restClient; @Before public void setup() throws Exception { Configuration config = new Configuration(); + config.setInteger(RestOptions.REST_PORT, 0); RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config); RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config); @@ -101,16 +103,16 @@ public class RestEndpointITCase extends TestLogger { RpcUtils.INF_TIMEOUT); serverEndpoint = new TestRestServerEndpoint(serverConfig, testHandler); - clientEndpoint = new TestRestClient(clientConfig); + restClient = new TestRestClient(clientConfig); serverEndpoint.start(); } @After public void teardown() { - if (clientEndpoint != null) { - clientEndpoint.shutdown(timeout); - clientEndpoint = null; + if (restClient != null) { + restClient.shutdown(timeout); + restClient = null; } if (serverEndpoint != null) { @@ -135,7 +137,7 @@ public class RestEndpointITCase extends TestLogger { final InetSocketAddress serverAddress = serverEndpoint.getServerAddress(); synchronized (TestHandler.LOCK) { - response1 = clientEndpoint.sendRequest( + response1 = restClient.sendRequest( serverAddress.getHostName(), serverAddress.getPort(), new TestHeaders(), @@ -145,7 +147,7 @@ public class RestEndpointITCase extends TestLogger { } // send second request and verify response - CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest( + CompletableFuture<TestResponse> response2 = restClient.sendRequest( serverAddress.getHostName(), serverAddress.getPort(), new TestHeaders(), @@ -176,7 +178,7 @@ public class RestEndpointITCase extends TestLogger { parameters.faultyJobIDPathParameter.resolve(PATH_JOB_ID); ((TestParameters) parameters).jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID)); - CompletableFuture<TestResponse> response = clientEndpoint.sendRequest( + CompletableFuture<TestResponse> response = restClient.sendRequest( serverAddress.getHostName(), serverAddress.getPort(), new TestHeaders(), http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 0f08512..0c3adae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -522,6 +522,7 @@ public class TaskExecutorTest extends TestLogger { final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress); final String dispatcherAddress = "localhost"; final String jobManagerAddress = "localhost"; + final String webMonitorAddress = "localhost"; // register a mock resource manager gateway ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); @@ -540,8 +541,9 @@ public class TaskExecutorTest extends TestLogger { StandaloneHaServices haServices = new StandaloneHaServices( resourceManagerAddress, - dispatcherAddress, - jobManagerAddress); + dispatcherAddress, + jobManagerAddress, + webMonitorAddress); final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); final SlotReport slotReport = new SlotReport(); http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java index 86db1c4..ce00741 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java @@ -177,6 +177,17 @@ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices { } @Override + public LeaderElectionService getWebMonitorLeaderElectionService() { + enter(); + try { + throw new UnsupportedOperationException(); + } + finally { + exit(); + } + } + + @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { enter(); try { @@ -198,6 +209,17 @@ public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices { } } + @Override + public LeaderRetrievalService getWebMonitorLeaderRetriever() { + enter(); + try { + throw new UnsupportedOperationException(); + } + finally { + exit(); + } + } + // ------------------------------------------------------------------------ // shutdown // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java index c1466d2..9d60d89 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java @@ -199,6 +199,17 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi } @Override + public LeaderElectionService getWebMonitorLeaderElectionService() { + enter(); + try { + throw new UnsupportedOperationException(); + } + finally { + exit(); + } + } + + @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { enter(); try { @@ -218,4 +229,15 @@ public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServi exit(); } } + + @Override + public LeaderRetrievalService getWebMonitorLeaderRetriever() { + enter(); + try { + throw new UnsupportedOperationException(); + } + finally { + exit(); + } + } }
