tweise closed pull request #7249: [FLINK-11048] Ability to programmatically
execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 0af6d937294..4fcb5556223 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -28,6 +30,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
@@ -65,6 +68,9 @@
/** The classpaths that need to be attached to each job. */
private final List<URL> globalClasspaths;
+ /** The savepoint restore settings for job execution. */
+ private final SavepointRestoreSettings savepointRestoreSettings;
+
/**
* Creates a new RemoteStreamEnvironment that points to the master
* (JobManager) described by the given host name and port.
@@ -133,6 +139,36 @@ public RemoteStreamEnvironment(String host, int port,
Configuration clientConfig
* The protocol must be supported by the {@link
java.net.URLClassLoader}.
*/
public RemoteStreamEnvironment(String host, int port, Configuration
clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
+ this(host, port, clientConfiguration, jarFiles, null, null);
+ }
+
+ /**
+ * Creates a new RemoteStreamEnvironment that points to the master
+ * (JobManager) described by the given host name and port.
+ *
+ * @param host
+ * The host name or address of the master (JobManager),
where the
+ * program should be executed.
+ * @param port
+ * The port of the master (JobManager), where the program
should
+ * be executed.
+ * @param clientConfiguration
+ * The configuration used to parametrize the client that
connects to the
+ * remote cluster.
+ * @param jarFiles
+ * The JAR files with code that needs to be shipped to the
+ * cluster. If the program uses user-defined functions,
+ * user-defined input formats, or any libraries, those must
be
+ * provided in the JAR files.
+ * @param globalClasspaths
+ * The paths of directories and JAR files that are added to
each user code
+ * classloader on all nodes in the cluster. Note that the
paths must specify a
+ * protocol (e.g. file://) and be accessible on all nodes
(e.g. by means of a NFS share).
+ * The protocol must be supported by the {@link
java.net.URLClassLoader}.
+ * @param savepointRestoreSettings
+ * Optional savepoint restore settings for job execution.
+ */
+ public RemoteStreamEnvironment(String host, int port, Configuration
clientConfiguration, String[] jarFiles, URL[] globalClasspaths,
SavepointRestoreSettings savepointRestoreSettings) {
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The RemoteEnvironment cannot be used
when submitting a program through a client, " +
@@ -167,35 +203,62 @@ public RemoteStreamEnvironment(String host, int port,
Configuration clientConfig
else {
this.globalClasspaths = Arrays.asList(globalClasspaths);
}
+ this.savepointRestoreSettings = savepointRestoreSettings;
}
- @Override
- public JobExecutionResult execute(String jobName) throws
ProgramInvocationException {
- StreamGraph streamGraph = getStreamGraph();
+ /**
+ * Executes the job remotely.
+ *
+ * <p>This method can be used independent of the {@link
StreamExecutionEnvironment} type.
+ * @return The result of the job execution, containing elapsed time and
accumulators.
+ */
+ @PublicEvolving
+ public static JobExecutionResult
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
+ List<URL> jarFiles,
+ String host,
+ int port,
+ Configuration clientConfiguration,
+ List<URL> globalClasspaths,
+ String jobName,
+ SavepointRestoreSettings savepointRestoreSettings
+ ) throws ProgramInvocationException {
+ StreamGraph streamGraph =
streamExecutionEnvironment.getStreamGraph();
streamGraph.setJobName(jobName);
- transformations.clear();
- return executeRemotely(streamGraph, jarFiles);
+ return executeRemotely(streamGraph,
+ streamExecutionEnvironment.getClass().getClassLoader(),
+ streamExecutionEnvironment.getConfig(),
+ jarFiles,
+ host,
+ port,
+ clientConfiguration,
+ globalClasspaths,
+ savepointRestoreSettings);
}
/**
- * Executes the remote job.
+ * Execute the given stream graph remotely.
*
- * @param streamGraph
- * Stream Graph to execute
- * @param jarFiles
- * List of jar file URLs to ship to the cluster
- * @return The result of the job execution, containing elapsed time and
accumulators.
+ * <p>Method for internal use since it exposes stream graph and other
implementation details that are subject to change.
+ * @throws ProgramInvocationException
*/
- protected JobExecutionResult executeRemotely(StreamGraph streamGraph,
List<URL> jarFiles) throws ProgramInvocationException {
+ private static JobExecutionResult executeRemotely(StreamGraph
streamGraph,
+ ClassLoader envClassLoader,
+ ExecutionConfig executionConfig,
+ List<URL> jarFiles,
+ String host,
+ int port,
+ Configuration clientConfiguration,
+ List<URL> globalClasspaths,
+ SavepointRestoreSettings savepointRestoreSettings
+ ) throws ProgramInvocationException {
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at {}:{}", host, port);
}
- ClassLoader usercodeClassLoader =
JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
- getClass().getClassLoader());
+ ClassLoader userCodeClassLoader =
JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
envClassLoader);
Configuration configuration = new Configuration();
- configuration.addAll(this.clientConfiguration);
+ configuration.addAll(clientConfiguration);
configuration.setString(JobManagerOptions.ADDRESS, host);
configuration.setInteger(JobManagerOptions.PORT, port);
@@ -211,10 +274,15 @@ protected JobExecutionResult executeRemotely(StreamGraph
streamGraph, List<URL>
streamGraph.getJobGraph().getJobID(), e);
}
-
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+
client.setPrintStatusDuringExecution(executionConfig.isSysoutLoggingEnabled());
+
+ if (savepointRestoreSettings == null) {
+ savepointRestoreSettings =
SavepointRestoreSettings.none();
+ }
try {
- return client.run(streamGraph, jarFiles,
globalClasspaths, usercodeClassLoader).getJobExecutionResult();
+ return client.run(streamGraph, jarFiles,
globalClasspaths, userCodeClassLoader, savepointRestoreSettings)
+ .getJobExecutionResult();
}
catch (ProgramInvocationException e) {
throw e;
@@ -233,6 +301,37 @@ protected JobExecutionResult executeRemotely(StreamGraph
streamGraph, List<URL>
}
}
+ @Override
+ public JobExecutionResult execute(String jobName) throws
ProgramInvocationException {
+ StreamGraph streamGraph = getStreamGraph();
+ streamGraph.setJobName(jobName);
+ transformations.clear();
+ return executeRemotely(streamGraph, jarFiles);
+ }
+
+ /**
+ * Executes the remote job.
+ *
+ * <p>Note: This method exposes stream graph internal in the public
API, but cannot be removed for backward compatibility.
+ * @param streamGraph
+ * Stream Graph to execute
+ * @param jarFiles
+ * List of jar file URLs to ship to the cluster
+ * @return The result of the job execution, containing elapsed time and
accumulators.
+ */
+ @Deprecated
+ protected JobExecutionResult executeRemotely(StreamGraph streamGraph,
List<URL> jarFiles) throws ProgramInvocationException {
+ return executeRemotely(streamGraph,
+ this.getClass().getClassLoader(),
+ getConfig(),
+ jarFiles,
+ host,
+ port,
+ clientConfiguration,
+ globalClasspaths,
+ savepointRestoreSettings);
+ }
+
@Override
public String toString() {
return "Remote Environment (" + this.host + ":" + this.port + "
- parallelism = "
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
index d9a257c9cb5..39bf6dbd1a4 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
@@ -18,53 +18,78 @@
package org.apache.flink.streaming.api.environment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
-import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.Iterator;
+
+import static org.mockito.Mockito.when;
/**
* Tests for the {@link RemoteStreamEnvironment}.
*/
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({RemoteStreamEnvironment.class})
public class RemoteStreamExecutionEnvironmentTest extends TestLogger {
- @ClassRule
- public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new
MiniClusterResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
- .setNumberSlotsPerTaskManager(1)
- .build());
-
/**
* Verifies that the port passed to the RemoteStreamEnvironment is used
for connecting to the cluster.
*/
@Test
public void testPortForwarding() throws Exception {
- final Configuration clientConfiguration = new Configuration();
- clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS,
0);
- final MiniCluster miniCluster =
MINI_CLUSTER_RESOURCE.getMiniCluster();
+ String host = "fakeHost";
+ int port = 99;
+ JobExecutionResult expectedResult = new
JobExecutionResult(null, 0, null);
+
+ RestClusterClient mockedClient =
Mockito.mock(RestClusterClient.class);
+ when(mockedClient.run(Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(expectedResult);
+
+
PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenAnswer((invocation)
-> {
+ Object[] args = invocation.getArguments();
+ Configuration config = (Configuration) args[0];
+
+ Assert.assertEquals(host,
config.getString(RestOptions.ADDRESS));
+ Assert.assertEquals(port,
config.getInteger(RestOptions.PORT));
+ return mockedClient;
+ }
+ );
+
+ final Configuration clientConfiguration = new Configuration();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(
- miniCluster.getRestAddress().getHost(),
- miniCluster.getRestAddress().getPort(),
- clientConfiguration);
+ host, port, clientConfiguration);
+ env.fromElements(1).map(x -> x * 2);
+ JobExecutionResult actualResult = env.execute("fakeJobName");
+ Assert.assertEquals(expectedResult, actualResult);
+ }
+
+ @Test
+ public void testRemoteExecutionWithSavepoint() throws Exception {
+ SavepointRestoreSettings restoreSettings =
SavepointRestoreSettings.forPath("fakePath");
+ RemoteStreamEnvironment env = new
RemoteStreamEnvironment("fakeHost", 1,
+ null, new String[]{}, null, restoreSettings);
+ env.fromElements(1).map(x -> x * 2);
+
+ RestClusterClient mockedClient =
Mockito.mock(RestClusterClient.class);
+ JobExecutionResult expectedResult = new
JobExecutionResult(null, 0, null);
- final DataStream<Integer> resultStream = env.fromElements(1)
- .map(x -> x * 2);
+
PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenReturn(mockedClient);
+ when(mockedClient.run(Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.eq(restoreSettings)))
+ .thenReturn(expectedResult);
- final Iterator<Integer> result =
DataStreamUtils.collect(resultStream);
- Assert.assertTrue(result.hasNext());
- Assert.assertEquals(2, result.next().intValue());
- Assert.assertFalse(result.hasNext());
+ JobExecutionResult actualResult = env.execute("fakeJobName");
+ Assert.assertEquals(expectedResult, actualResult);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services