iemejia commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r509318085
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment
createStreamExecutionEnvironment(
options.setShutdownSourcesAfterIdleMs(0L);
}
}
+ }
- applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
- if (options.getAutoWatermarkInterval() != null) {
-
flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
- }
-
- // State backend
- if (options.getStateBackendFactory() != null) {
+ private static void configureStateBackend(
+ FlinkPipelineOptions options, StreamExecutionEnvironment env) {
+ if (options.getStateBackend() != null) {
+ final String storagePath = options.getStateBackendStoragePath();
+ Preconditions.checkArgument(
+ storagePath != null,
+ "State backend was set to '%s' but no storage path was provided.",
+ options.getStateBackend());
+
+ final StateBackend stateBackend;
+ if (options.getStateBackend().equalsIgnoreCase("rocksdb")) {
+ try {
+ stateBackend = new RocksDBStateBackend(storagePath);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create RocksDB state
backend.", e);
+ }
+ } else if (options.getStateBackend().equalsIgnoreCase("filesystem")) {
+ stateBackend = new FsStateBackend(storagePath);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown state backend '%s'. Use 'rocksdb' or 'filesystem' or
configure via Flink config file.",
+ options.getStateBackend()));
+ }
+ env.setStateBackend(stateBackend);
+ } else if (options.getStateBackendFactory() != null) {
+ // Legacy way of setting the state backend
final StateBackend stateBackend =
InstanceBuilder.ofType(FlinkStateBackendFactory.class)
.fromClass(options.getStateBackendFactory())
.build()
.createStateBackend(options);
- flinkStreamEnv.setStateBackend(stateBackend);
+ env.setStateBackend(stateBackend);
Review comment:
We can do this `env.setStateBackend` out of the if/else block.
##########
File path:
runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
##########
@@ -464,6 +467,63 @@ public void shouldSetSavepointRestoreForRemoteStreaming() {
assertThat(getSavepointPath(sev), is(path));
}
+ @Test
+ public void shouldFailOnUnknownStateBackend() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setStreaming(true);
+ options.setStateBackend("unknown");
+ options.setStateBackendStoragePath("/path");
+
+ assertThrows(
+ "State backend was set to 'unkown' but no storage path was provided.",
+ IllegalArgumentException.class,
+ () ->
+ FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+ options, Collections.emptyList()));
+ }
+
+ @Test
+ public void shouldFailOnNoStoragePathProvided() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setStreaming(true);
+ options.setStateBackend("unknown");
+
+ assertThrows(
+ "State backend was set to 'unkown' but no storage path was provided.",
Review comment:
s/unkown/unknown
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment
createStreamExecutionEnvironment(
options.setShutdownSourcesAfterIdleMs(0L);
}
}
+ }
- applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
- if (options.getAutoWatermarkInterval() != null) {
-
flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
- }
-
- // State backend
- if (options.getStateBackendFactory() != null) {
+ private static void configureStateBackend(
Review comment:
Is the intention of supporting this to be able to configure new Backends
too? I mean like the new one by the RISE team? Is this the intended Nexmark use?
##########
File path: runners/flink/flink_runner.gradle
##########
@@ -148,6 +148,8 @@ dependencies {
compile "org.apache.flink:flink-java:$flink_version"
compile "org.apache.flink:flink-runtime_2.11:$flink_version"
compile "org.apache.flink:flink-streaming-java_2.11:$flink_version"
+ // RocksDB state backend
+ compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"
Review comment:
I checked and the dependency does not seem to be part of the default
Flink distribution (quite surprising for me btw). However the way of
instantiation makes the dependency needed, making it provided will solve it but
it is a bit less user-friendly.
##########
File path:
runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
##########
@@ -464,6 +467,63 @@ public void shouldSetSavepointRestoreForRemoteStreaming() {
assertThat(getSavepointPath(sev), is(path));
}
+ @Test
+ public void shouldFailOnUnknownStateBackend() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setStreaming(true);
+ options.setStateBackend("unknown");
+ options.setStateBackendStoragePath("/path");
+
+ assertThrows(
+ "State backend was set to 'unkown' but no storage path was provided.",
Review comment:
s/unkown/unknown
##########
File path: website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
##########
@@ -157,9 +157,19 @@
<td>Shuts down sources which have been idle for the configured time of
milliseconds. Once a source has been shut down, checkpointing is not possible
anymore. Shutting down the sources eventually leads to pipeline shutdown
(=Flink job finishes) once all input has been processed. Unless explicitly set,
this will default to Long.MAX_VALUE when checkpointing is enabled and to 0 when
checkpointing is disabled. See https://issues.apache.org/jira/browse/FLINK-2491
for progress on this issue.</td>
<td>Default: <code>-1</code></td>
</tr>
+<tr>
+ <td><code>stateBackend</code></td>
+ <td>State backend to store Beam's state. Use RocksDB or Filesystem. Defaults
to heap.</td>
Review comment:
ignorable nit: I will probably set the explicit values there as examples
'rocksdb' or 'filesystem' (or the classnames if we decide to change this).
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment
createStreamExecutionEnvironment(
options.setShutdownSourcesAfterIdleMs(0L);
}
}
+ }
- applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
- if (options.getAutoWatermarkInterval() != null) {
-
flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
- }
-
- // State backend
- if (options.getStateBackendFactory() != null) {
+ private static void configureStateBackend(
+ FlinkPipelineOptions options, StreamExecutionEnvironment env) {
+ if (options.getStateBackend() != null) {
+ final String storagePath = options.getStateBackendStoragePath();
+ Preconditions.checkArgument(
+ storagePath != null,
+ "State backend was set to '%s' but no storage path was provided.",
+ options.getStateBackend());
+
+ final StateBackend stateBackend;
+ if (options.getStateBackend().equalsIgnoreCase("rocksdb")) {
Review comment:
Should not this be the classname of the backend? I am assuming that what
we want to achieve here is to allow new backends (not available in Flink) to be
configured too (see comment above)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]