dawidwys commented on a change in pull request #7249: [FLINK-11048] Ability to
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r242605967
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
##########
@@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port,
Configuration clientConfig
}
}
- @Override
- public JobExecutionResult execute(String jobName) throws
ProgramInvocationException {
- StreamGraph streamGraph = getStreamGraph();
- streamGraph.setJobName(jobName);
- transformations.clear();
- return executeRemotely(streamGraph, jarFiles);
+ /**
+ * Set savepoint restore settings that will be used when executing the
job.
+ * @param savepointRestoreSettings savepoint restore settings
+ */
+ public void setSavepointRestoreSettings(SavepointRestoreSettings
savepointRestoreSettings) {
+ this.savepointRestoreSettings = savepointRestoreSettings;
Review comment:
I second @mxm here, why not have e.g. additional method like:
`execute(String jobName, SavepointSettings savepointSettings)` that a user that
already have a `RemoteEnvironemt` could use.
So to sum up methods we would have:
* the actual submission
```
private static JobExecutionResult executeRemotely(StreamGraph streamGraph,
ClassLoader envClassLoader,
ExecutionConfig executionConfig,
List<URL> jarFiles,
String host,
int port,
Configuration clientConfiguration,
List<URL> globalClasspaths,
SavepointRestoreSettings savepointRestoreSettings
) throws ProgramInvocationException {
```
Also I would annotate all new methods that we introduce here as `@Internal`
so that we can change them in future more easily.
* because we have to be backwards compatible
```
protected JobExecutionResult executeRemotely(StreamGraph streamGraph,
List<URL> jarFiles) throws ProgramInvocationException {
```
* because we cannot change `StreamExecutionEnvironment:
```
@Override
public JobExecutionResult execute(String jobName) throws
ProgramInvocationException {
```
* because we want to make it easier for users who have already
`RemoteEnvironment`:
```
public JobExecutionResult execute(String jobName, SavepointRestoreSettings
savepointRestoreSettings) throws ProgramInvocationException {
```
* because we want to be able to submit any `StreamEnvironment`:
```
public static JobExecutionResult executeRemotely(StreamExecutionEnvironment
streamExecutionEnvironment,
List<URL> jarFiles,
String host,
int port,
Configuration clientConfiguration,
List<URL> globalClasspaths,
String jobName,
SavepointRestoreSettings savepointRestoreSettings
) throws ProgramInvocationException {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services