zentol closed pull request #7345: [FLINK-11163][tests] Use random port in
RestClusterClientTest
URL: https://github.com/apache/flink/pull/7345
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 3e0f2f525a0..bc0432ea4ee 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -32,7 +32,6 @@
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -88,7 +87,9 @@
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
@@ -105,8 +106,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
import javax.annotation.Nonnull;
@@ -150,16 +149,12 @@
*/
public class RestClusterClientTest extends TestLogger {
- @Mock
- private Dispatcher mockRestfulGateway;
+ private final DispatcherGateway mockRestfulGateway = new
TestingDispatcherGateway.Builder().build();
- @Mock
private GatewayRetriever<DispatcherGateway> mockGatewayRetriever;
private RestServerEndpointConfiguration restServerEndpointConfiguration;
- private RestClusterClient<StandaloneClusterId> restClusterClient;
-
private volatile FailHttpRequestPredicate failHttpRequest =
FailHttpRequestPredicate.never();
private ExecutorService executor;
@@ -167,28 +162,58 @@
private JobGraph jobGraph;
private JobID jobId;
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
+ private static final Configuration restConfig;
+ static {
final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
config.setLong(RestOptions.RETRY_DELAY, 0);
+ config.setInteger(RestOptions.PORT, 0);
+
+ restConfig = config;
+ }
- restServerEndpointConfiguration =
RestServerEndpointConfiguration.fromConfiguration(config);
+ @Before
+ public void setUp() throws Exception {
+ restServerEndpointConfiguration =
RestServerEndpointConfiguration.fromConfiguration(restConfig);
mockGatewayRetriever = () ->
CompletableFuture.completedFuture(mockRestfulGateway);
executor = Executors.newSingleThreadExecutor(new
ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
- final RestClient restClient = new
RestClient(RestClientConfiguration.fromConfiguration(config), executor) {
+
+ jobGraph = new JobGraph("testjob");
+ jobId = jobGraph.getJobID();
+ }
+
+ @After
+ public void tearDown() {
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
+
+ private RestClusterClient<StandaloneClusterId>
createRestClusterClient(final int port) throws Exception {
+ final Configuration clientConfig = new
Configuration(restConfig);
+ clientConfig.setInteger(RestOptions.PORT, port);
+ return new RestClusterClient<>(
+ clientConfig,
+ createRestClient(),
+ StandaloneClusterId.getInstance(),
+ (attempt) -> 0,
+ null);
+ }
+
+ @Nonnull
+ private RestClient createRestClient() throws ConfigurationException {
+ return new
RestClient(RestClientConfiguration.fromConfiguration(restConfig), executor) {
@Override
public <M extends MessageHeaders<R, P, U>, U extends
MessageParameters, R extends RequestBody, P extends ResponseBody>
CompletableFuture<P>
sendRequest(
- final String targetAddress,
- final int targetPort,
- final M messageHeaders,
- final U messageParameters,
- final R request) throws IOException {
+ final String targetAddress,
+ final int targetPort,
+ final M messageHeaders,
+ final U messageParameters,
+ final R request) throws IOException {
if (failHttpRequest.test(messageHeaders,
messageParameters, request)) {
return
FutureUtils.completedExceptionally(new IOException("expected"));
} else {
@@ -196,26 +221,6 @@ public void setUp() throws Exception {
}
}
};
- restClusterClient = new RestClusterClient<>(
- config,
- restClient,
- StandaloneClusterId.getInstance(),
- (attempt) -> 0,
- null);
-
- jobGraph = new JobGraph("testjob");
- jobId = jobGraph.getJobID();
- }
-
- @After
- public void tearDown() throws Exception {
- if (restClusterClient != null) {
- restClusterClient.shutdown();
- }
-
- if (executor != null) {
- executor.shutdown();
- }
}
@Test
@@ -230,22 +235,27 @@ public void testJobSubmitCancelStop() throws Exception {
.netRuntime(Long.MAX_VALUE)
.build()));
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+ try (TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(
submitHandler,
terminationHandler,
testJobExecutionResultHandler)) {
+ RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- Assert.assertFalse(submitHandler.jobSubmitted);
- restClusterClient.submitJob(jobGraph,
ClassLoader.getSystemClassLoader());
- Assert.assertTrue(submitHandler.jobSubmitted);
+ try {
+ Assert.assertFalse(submitHandler.jobSubmitted);
+ restClusterClient.submitJob(jobGraph,
ClassLoader.getSystemClassLoader());
+ Assert.assertTrue(submitHandler.jobSubmitted);
- Assert.assertFalse(terminationHandler.jobCanceled);
- restClusterClient.cancel(jobId);
- Assert.assertTrue(terminationHandler.jobCanceled);
+
Assert.assertFalse(terminationHandler.jobCanceled);
+ restClusterClient.cancel(jobId);
+
Assert.assertTrue(terminationHandler.jobCanceled);
- Assert.assertFalse(terminationHandler.jobStopped);
- restClusterClient.stop(jobId);
- Assert.assertTrue(terminationHandler.jobStopped);
+
Assert.assertFalse(terminationHandler.jobStopped);
+ restClusterClient.stop(jobId);
+
Assert.assertTrue(terminationHandler.jobStopped);
+ } finally {
+ restClusterClient.shutdown();
+ }
}
}
@@ -257,16 +267,21 @@ public void testDetachedJobSubmission() throws Exception {
final TestJobSubmitHandler testJobSubmitHandler = new
TestJobSubmitHandler();
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+ try (TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(
testJobSubmitHandler)) {
+ RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- restClusterClient.setDetached(true);
- final JobSubmissionResult jobSubmissionResult =
restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
-
- // if the detached mode didn't work, then we would not
reach this point because the execution result
- // retrieval would have failed.
- assertThat(jobSubmissionResult,
is(not(instanceOf(JobExecutionResult.class))));
- assertThat(jobSubmissionResult.getJobID(), is(jobId));
+ try {
+ restClusterClient.setDetached(true);
+ final JobSubmissionResult jobSubmissionResult =
restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
+
+ // if the detached mode didn't work, then we
would not reach this point because the execution result
+ // retrieval would have failed.
+ assertThat(jobSubmissionResult,
is(not(instanceOf(JobExecutionResult.class))));
+ assertThat(jobSubmissionResult.getJobID(),
is(jobId));
+ } finally {
+ restClusterClient.shutdown();
+ }
}
}
@@ -365,29 +380,34 @@ public void testSubmitJobAndWaitForExecutionResult()
throws Exception {
failHttpRequest = (messageHeaders, messageParameters,
requestBody) ->
messageHeaders instanceof JobExecutionResultHeaders &&
!firstPollFailed.getAndSet(true);
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+ try (TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(
testJobExecutionResultHandler,
new TestJobSubmitHandler())) {
+ RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- JobExecutionResult jobExecutionResult;
+ try {
+ JobExecutionResult jobExecutionResult;
- jobExecutionResult = (JobExecutionResult)
restClusterClient.submitJob(
- jobGraph,
- ClassLoader.getSystemClassLoader());
- assertThat(jobExecutionResult.getJobID(),
equalTo(jobId));
- assertThat(jobExecutionResult.getNetRuntime(),
equalTo(Long.MAX_VALUE));
- assertThat(
- jobExecutionResult.getAllAccumulatorResults(),
- equalTo(Collections.singletonMap("testName",
1.0)));
+ jobExecutionResult = (JobExecutionResult)
restClusterClient.submitJob(
+ jobGraph,
+ ClassLoader.getSystemClassLoader());
+ assertThat(jobExecutionResult.getJobID(),
equalTo(jobId));
+ assertThat(jobExecutionResult.getNetRuntime(),
equalTo(Long.MAX_VALUE));
+ assertThat(
+
jobExecutionResult.getAllAccumulatorResults(),
+
equalTo(Collections.singletonMap("testName", 1.0)));
- try {
- restClusterClient.submitJob(jobGraph,
ClassLoader.getSystemClassLoader());
- fail("Expected exception not thrown.");
- } catch (final ProgramInvocationException e) {
- final Optional<RuntimeException> cause =
ExceptionUtils.findThrowable(e, RuntimeException.class);
+ try {
+ restClusterClient.submitJob(jobGraph,
ClassLoader.getSystemClassLoader());
+ fail("Expected exception not thrown.");
+ } catch (final ProgramInvocationException e) {
+ final Optional<RuntimeException> cause
= ExceptionUtils.findThrowable(e, RuntimeException.class);
- assertThat(cause.isPresent(), is(true));
- assertThat(cause.get().getMessage(),
equalTo("expected"));
+ assertThat(cause.isPresent(), is(true));
+ assertThat(cause.get().getMessage(),
equalTo("expected"));
+ }
+ } finally {
+ restClusterClient.shutdown();
}
}
}
@@ -420,43 +440,48 @@ public void testTriggerSavepoint() throws Exception {
failHttpRequest = (messageHeaders, messageParameters,
requestBody) ->
messageHeaders instanceof SavepointStatusHeaders &&
!firstPollFailed.getAndSet(true);
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+ try (TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(
triggerHandler,
savepointHandler)) {
+ RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- JobID id = new JobID();
- {
- CompletableFuture<String> savepointPathFuture =
restClusterClient.triggerSavepoint(id, null);
- String savepointPath =
savepointPathFuture.get();
- assertEquals(savepointLocationDefaultDir,
savepointPath);
- }
+ try {
+ JobID id = new JobID();
+ {
+ CompletableFuture<String>
savepointPathFuture = restClusterClient.triggerSavepoint(id, null);
+ String savepointPath =
savepointPathFuture.get();
+
assertEquals(savepointLocationDefaultDir, savepointPath);
+ }
- {
- CompletableFuture<String> savepointPathFuture =
restClusterClient.triggerSavepoint(id, targetSavepointDirectory);
- String savepointPath =
savepointPathFuture.get();
- assertEquals(savepointLocationRequestedDir,
savepointPath);
- }
+ {
+ CompletableFuture<String>
savepointPathFuture = restClusterClient.triggerSavepoint(id,
targetSavepointDirectory);
+ String savepointPath =
savepointPathFuture.get();
+
assertEquals(savepointLocationRequestedDir, savepointPath);
+ }
+
+ {
+ try {
+
restClusterClient.triggerSavepoint(id, null).get();
+ fail("Expected exception not
thrown.");
+ } catch (ExecutionException e) {
+ final Throwable cause =
e.getCause();
+ assertThat(cause,
instanceOf(SerializedThrowable.class));
+
assertThat(((SerializedThrowable) cause)
+
.deserializeError(ClassLoader.getSystemClassLoader())
+ .getMessage(),
equalTo("expected"));
+ }
+ }
- {
try {
- restClusterClient.triggerSavepoint(id,
null).get();
+ restClusterClient.triggerSavepoint(new
JobID(), null).get();
fail("Expected exception not thrown.");
- } catch (ExecutionException e) {
- final Throwable cause = e.getCause();
- assertThat(cause,
instanceOf(SerializedThrowable.class));
- assertThat(((SerializedThrowable) cause)
-
.deserializeError(ClassLoader.getSystemClassLoader())
- .getMessage(),
equalTo("expected"));
+ } catch (final ExecutionException e) {
+ assertTrue(
+ "RestClientException not in
causal chain",
+ ExceptionUtils.findThrowable(e,
RestClientException.class).isPresent());
}
- }
-
- try {
- restClusterClient.triggerSavepoint(new JobID(),
null).get();
- fail("Expected exception not thrown.");
- } catch (final ExecutionException e) {
- assertTrue(
- "RestClientException not in causal
chain",
- ExceptionUtils.findThrowable(e,
RestClientException.class).isPresent());
+ } finally {
+ restClusterClient.shutdown();
}
}
}
@@ -540,32 +565,38 @@ public void testDisposeSavepoint() throws Exception {
OptionalFailure.of(AsynchronousOperationInfo.completeExceptional(new
SerializedThrowable(testException))),
OptionalFailure.ofFailure(testException));
- try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+ try (TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(
testSavepointDisposalStatusHandler,
testSavepointDisposalTriggerHandler)) {
- {
- final CompletableFuture<Acknowledge>
disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
- assertThat(disposeSavepointFuture.get(),
is(Acknowledge.get()));
- }
+ RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- {
- final CompletableFuture<Acknowledge>
disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+ try {
+ {
+ final CompletableFuture<Acknowledge>
disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+
assertThat(disposeSavepointFuture.get(), is(Acknowledge.get()));
+ }
- try {
- disposeSavepointFuture.get();
- fail("Expected an exception");
- } catch (ExecutionException ee) {
-
assertThat(ExceptionUtils.findThrowableWithMessage(ee,
exceptionMessage).isPresent(), is(true));
+ {
+ final CompletableFuture<Acknowledge>
disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+
+ try {
+ disposeSavepointFuture.get();
+ fail("Expected an exception");
+ } catch (ExecutionException ee) {
+
assertThat(ExceptionUtils.findThrowableWithMessage(ee,
exceptionMessage).isPresent(), is(true));
+ }
}
- }
- {
- try {
-
restClusterClient.disposeSavepoint(savepointPath).get();
- fail("Expected an exception.");
- } catch (ExecutionException ee) {
-
assertThat(ExceptionUtils.findThrowable(ee,
RestClientException.class).isPresent(), is(true));
+ {
+ try {
+
restClusterClient.disposeSavepoint(savepointPath).get();
+ fail("Expected an exception.");
+ } catch (ExecutionException ee) {
+
assertThat(ExceptionUtils.findThrowable(ee,
RestClientException.class).isPresent(), is(true));
+ }
}
+ } finally {
+ restClusterClient.shutdown();
}
}
}
@@ -626,14 +657,18 @@ private
TestSavepointDisposalStatusHandler(OptionalFailure<AsynchronousOperation
@Test
public void testListJobs() throws Exception {
- try (TestRestServerEndpoint ignored =
createRestServerEndpoint(new TestListJobsHandler())) {
- {
+ try (TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(new TestListJobsHandler())) {
+ RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
+ try {
CompletableFuture<Collection<JobStatusMessage>>
jobDetailsFuture = restClusterClient.listJobs();
Collection<JobStatusMessage> jobDetails =
jobDetailsFuture.get();
Iterator<JobStatusMessage> jobDetailsIterator =
jobDetails.iterator();
JobStatusMessage job1 =
jobDetailsIterator.next();
JobStatusMessage job2 =
jobDetailsIterator.next();
Assert.assertNotEquals("The job status should
not be equal.", job1.getJobState(), job2.getJobState());
+ } finally {
+ restClusterClient.shutdown();
}
}
}
@@ -642,17 +677,22 @@ public void testListJobs() throws Exception {
public void testGetAccumulators() throws Exception {
TestAccumulatorHandler accumulatorHandler = new
TestAccumulatorHandler();
- try (TestRestServerEndpoint ignored =
createRestServerEndpoint(accumulatorHandler)){
+ try (TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(accumulatorHandler)){
+ RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- JobID id = new JobID();
+ try {
+ JobID id = new JobID();
- {
- Map<String, OptionalFailure<Object>>
accumulators = restClusterClient.getAccumulators(id);
- assertNotNull(accumulators);
- assertEquals(1, accumulators.size());
+ {
+ Map<String, OptionalFailure<Object>>
accumulators = restClusterClient.getAccumulators(id);
+ assertNotNull(accumulators);
+ assertEquals(1, accumulators.size());
- assertEquals(true,
accumulators.containsKey("testKey"));
- assertEquals("testValue",
accumulators.get("testKey").get().toString());
+ assertEquals(true,
accumulators.containsKey("testKey"));
+ assertEquals("testValue",
accumulators.get("testKey").get().toString());
+ }
+ } finally {
+ restClusterClient.shutdown();
}
}
}
@@ -698,11 +738,17 @@ public void
testRetriableSendOperationIfConnectionErrorOrServiceUnavailable() th
CompletableFuture.completedFuture(EmptyResponseBody.getInstance()));
try (final TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(pingRestHandler)) {
- final AtomicBoolean firstPollFailed = new
AtomicBoolean();
- failHttpRequest = (messageHeaders, messageParameters,
requestBody) ->
- messageHeaders instanceof
PingRestHandlerHeaders && !firstPollFailed.getAndSet(true);
+ RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
-
restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
+ try {
+ final AtomicBoolean firstPollFailed = new
AtomicBoolean();
+ failHttpRequest = (messageHeaders,
messageParameters, requestBody) ->
+ messageHeaders instanceof
PingRestHandlerHeaders && !firstPollFailed.getAndSet(true);
+
+
restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
+ } finally {
+ restClusterClient.shutdown();
+ }
}
}
@@ -716,11 +762,15 @@ public void testSendIsNotRetriableIfHttpNotFound() throws
Exception {
FutureUtils.completedExceptionally(new
RestHandlerException(exceptionMessage, HttpResponseStatus.NOT_FOUND)));
try (final TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(pingRestHandler)) {
+ RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+
try {
restClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
fail("The rest request should have failed.");
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowableWithMessage(e,
exceptionMessage).isPresent(), is(true));
+ } finally {
+ restClusterClient.shutdown();
}
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services