aljoscha commented on a change in pull request #10205: [FLINK-14797][tests]
Remove power mockito from RemoteStreamExecutionEnvironmentTest
URL: https://github.com/apache/flink/pull/10205#discussion_r346733325
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
##########
@@ -20,96 +20,84 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.RemoteExecutor;
-import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.concurrent.CompletableFuture;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.when;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
/**
* Tests for the {@link RemoteStreamEnvironment}.
*/
-@RunWith(PowerMockRunner.class)
-// TODO: I don't like that I have to do this
-@PrepareForTest({RemoteStreamEnvironment.class, RemoteExecutor.class})
public class RemoteStreamExecutionEnvironmentTest extends TestLogger {
/**
* Verifies that the port passed to the RemoteStreamEnvironment is used
for connecting to the cluster.
*/
@Test
public void testPortForwarding() throws Exception {
+ final String host = "fakeHost";
+ final int port = 99;
+ final Configuration conf = new Configuration();
+
+ final RemoteStreamEnvironment env = (RemoteStreamEnvironment)
StreamExecutionEnvironment.createRemoteEnvironment(
+ host,
+ port,
+ conf);
+
+ final JobID jobID = new JobID();
+
+ env.setPlanExecutorFactory((_host, _port, _conf) -> {
+ assertThat(_host, is(host));
+ assertThat(_port, is(port));
+ assertThat(_conf, is(conf));
+ return new RemoteExecutor(_host, _port, _conf) {
+ @Override
+ public JobExecutionResult executePlan(Pipeline
plan, List<URL> jarFiles, List<URL> classpathList) {
+ return new JobExecutionResult(jobID, 0,
Collections.emptyMap());
+ }
+ };
+ });
- String host = "fakeHost";
- int port = 99;
- JobID jobID = new JobID();
- JobResult jobResult = (new JobResult.Builder())
- .jobId(jobID)
- .netRuntime(0)
- .applicationStatus(ApplicationStatus.SUCCEEDED)
- .build();
-
- RestClusterClient mockedClient =
Mockito.mock(RestClusterClient.class);
-
when(mockedClient.submitJob(any())).thenReturn(CompletableFuture.completedFuture(new
JobSubmissionResult(jobID)));
-
when(mockedClient.requestJobResult(any())).thenReturn(CompletableFuture.completedFuture(jobResult));
-
-
PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenAnswer((invocation)
-> {
- Object[] args = invocation.getArguments();
- Configuration config = (Configuration) args[0];
-
- Assert.assertEquals(host,
config.getString(RestOptions.ADDRESS));
- Assert.assertEquals(port,
config.getInteger(RestOptions.PORT));
- return mockedClient;
- }
- );
-
- final Configuration clientConfiguration = new Configuration();
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(
- host, port, clientConfiguration);
env.fromElements(1).map(x -> x * 2);
- JobExecutionResult actualResult = env.execute("fakeJobName");
- Assert.assertEquals(jobID, actualResult.getJobID());
+ JobExecutionResult actualResult = env.execute();
+ assertThat(actualResult.getJobID(), is(jobID));
}
@Test
public void testRemoteExecutionWithSavepoint() throws Exception {
- SavepointRestoreSettings restoreSettings =
SavepointRestoreSettings.forPath("fakePath");
- RemoteStreamEnvironment env = new
RemoteStreamEnvironment("fakeHost", 1,
- null, new String[]{}, null, restoreSettings);
- env.fromElements(1).map(x -> x * 2);
-
- JobID jobID = new JobID();
- JobResult jobResult = (new JobResult.Builder())
- .jobId(jobID)
- .netRuntime(0)
- .applicationStatus(ApplicationStatus.SUCCEEDED)
- .build();
-
- RestClusterClient mockedClient =
Mockito.mock(RestClusterClient.class);
+ final String host = "fakeHost";
Review comment:
I just noticed that this test was actually broken by an earlier change. If
you look at the original version
(https://github.com/apache/flink/commit/bb9b176357bef502391f612de85cce87f45c8d7f)
you can see that mockito is used to check whether the right savepoint settings
are passed to the client. The current version doesn't do that anymore, this
test doesn't really test anything.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services