This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 67acbe9c881 [FLINK-32023][API / DataStream] Add config
execution.buffer-timeout.enabled to flush only when the output buffer is full.
67acbe9c881 is described below
commit 67acbe9c8814dd56262053d443ae2712e03d1cb0
Author: liujiangang <[email protected]>
AuthorDate: Wed May 10 15:46:05 2023 +0800
[FLINK-32023][API / DataStream] Add config execution.buffer-timeout.enabled
to flush only when the output buffer is full.
---
.../generated/execution_configuration.html | 10 +++-
.../flink/configuration/ExecutionOptions.java | 20 ++++++--
.../environment/StreamExecutionEnvironment.java | 15 ++++--
.../api/StreamExecutionEnvironmentTest.java | 54 ++++++++++++++++++++++
4 files changed, 91 insertions(+), 8 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/execution_configuration.html
b/docs/layouts/shortcodes/generated/execution_configuration.html
index 8de9b1cef17..5407666376d 100644
--- a/docs/layouts/shortcodes/generated/execution_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_configuration.html
@@ -15,10 +15,16 @@
<td>Defines how data is exchanged between tasks in batch
'execution.runtime-mode' if the shuffling behavior has not been set explicitly
for an individual exchange.<br />With pipelined exchanges, upstream and
downstream tasks run simultaneously. In order to achieve lower latency, a
result record is immediately sent to and processed by the downstream task.
Thus, the receiver back-pressures the sender. The streaming mode always uses
this exchange.<br />With blocking exchanges, u [...]
</tr>
<tr>
- <td><h5>execution.buffer-timeout</h5></td>
+ <td><h5>execution.buffer-timeout.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>If disabled, the config execution.buffer-timeout.interval will
not take effect and the flushing will be triggered only when the output buffer
is full thus maximizing throughput</td>
+ </tr>
+ <tr>
+ <td><h5>execution.buffer-timeout.interval</h5></td>
<td style="word-wrap: break-word;">100 ms</td>
<td>Duration</td>
- <td>The maximum time frequency (milliseconds) for the flushing of
the output buffers. By default the output buffers flush frequently to provide
low latency and to aid smooth developer experience. Setting the parameter can
result in three logical modes:<ul><li>A positive value triggers flushing
periodically by that interval</li><li>0 triggers flushing after every record
thus minimizing latency</li><li>-1 ms triggers flushing only when the output
buffer is full thus maximizing [...]
+ <td>The maximum time frequency (milliseconds) for the flushing of
the output buffers. By default the output buffers flush frequently to provide
low latency and to aid smooth developer experience. Setting the parameter can
result in three logical modes:<ul><li>A positive value triggers flushing
periodically by that interval</li><li>0 triggers flushing after every record
thus minimizing latency</li><li>If the config execution.buffer-timeout.enabled
is false, trigger flushing on [...]
</tr>
<tr>
<td><h5>execution.checkpointing.snapshot-compression</h5></td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
index fae0429c544..2a5f0b61736 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
@@ -92,10 +92,22 @@ public class ExecutionOptions {
.withDescription(
"Tells if we should use compression for the state
snapshot data or not");
+ public static final ConfigOption<Boolean> BUFFER_TIMEOUT_ENABLED =
+ ConfigOptions.key("execution.buffer-timeout.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ Description.builder()
+ .text(
+ "If disabled, the config
execution.buffer-timeout.interval will not take effect and the flushing will be
triggered only when the output "
+ + "buffer is full thus
maximizing throughput")
+ .build());
+
public static final ConfigOption<Duration> BUFFER_TIMEOUT =
- ConfigOptions.key("execution.buffer-timeout")
+ ConfigOptions.key("execution.buffer-timeout.interval")
.durationType()
.defaultValue(Duration.ofMillis(100))
+ .withDeprecatedKeys("execution.buffer-timeout")
.withDescription(
Description.builder()
.text(
@@ -109,8 +121,10 @@ public class ExecutionOptions {
FLUSH_AFTER_EVERY_RECORD
+ " triggers
flushing after every record thus minimizing latency"),
text(
-
DISABLED_NETWORK_BUFFER_TIMEOUT
- + " ms triggers
flushing only when the output buffer is full thus maximizing "
+ "If the config "
+ +
BUFFER_TIMEOUT_ENABLED.key()
+ + " is false,"
+ + " trigger
flushing only when the output buffer is full thus maximizing "
+ "throughput"))
.build());
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c516b074133..955afff19a6 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -990,9 +990,6 @@ public class StreamExecutionEnvironment implements
AutoCloseable {
configuration
.getOptional(PipelineOptions.OPERATOR_CHAINING)
.ifPresent(c -> this.isChainingEnabled = c);
- configuration
- .getOptional(ExecutionOptions.BUFFER_TIMEOUT)
- .ifPresent(t -> this.setBufferTimeout(t.toMillis()));
configuration
.getOptional(DeploymentOptions.JOB_LISTENERS)
.ifPresent(listeners -> registerCustomListeners(classLoader,
listeners));
@@ -1052,6 +1049,8 @@ public class StreamExecutionEnvironment implements
AutoCloseable {
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED,
flag));
+ configBufferTimeout(configuration);
+
config.configure(configuration, classLoader);
checkpointCfg.configure(configuration);
}
@@ -1077,6 +1076,16 @@ public class StreamExecutionEnvironment implements
AutoCloseable {
}
}
+ private void configBufferTimeout(ReadableConfig configuration) {
+ if (configuration.get(ExecutionOptions.BUFFER_TIMEOUT_ENABLED)) {
+ configuration
+ .getOptional(ExecutionOptions.BUFFER_TIMEOUT)
+ .ifPresent(t -> this.setBufferTimeout(t.toMillis()));
+ } else {
+
this.setBufferTimeout(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);
+ }
+ }
+
//
--------------------------------------------------------------------------------------------
// Data stream creations
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index a71a26cf54c..0d8836d8482 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -399,6 +400,59 @@ public class StreamExecutionEnvironmentTest {
assertEquals(new GenericTypeInfo<>(Row.class), source2.getType());
}
+ @Test
+ public void testBufferTimeoutByDefault() {
+ Configuration config = new Configuration();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ testBufferTimeout(config, env);
+ }
+
+ @Test
+ public void testBufferTimeoutEnabled() {
+ Configuration config = new Configuration();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ config.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, true);
+ testBufferTimeout(config, env);
+ }
+
+ @Test
+ public void testBufferTimeoutDisabled() {
+ Configuration config = new Configuration();
+ config.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, false);
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // The execution.buffer-timeout's default value 100ms will not take
effect.
+ env.configure(config, this.getClass().getClassLoader());
+ assertEquals(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT,
env.getBufferTimeout());
+
+ // Setting execution.buffer-timeout's to 0ms will not take effect.
+ config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "0ms");
+ env.configure(config, this.getClass().getClassLoader());
+ assertEquals(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT,
env.getBufferTimeout());
+
+ // Setting execution.buffer-timeout's to -1ms will not take effect.
+ config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms");
+ env.configure(config, this.getClass().getClassLoader());
+ assertEquals(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT,
env.getBufferTimeout());
+ }
+
+ private void testBufferTimeout(Configuration config,
StreamExecutionEnvironment env) {
+ env.configure(config, this.getClass().getClassLoader());
+ assertEquals(
+ ExecutionOptions.BUFFER_TIMEOUT.defaultValue().toMillis(),
env.getBufferTimeout());
+
+ config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "0ms");
+ env.configure(config, this.getClass().getClassLoader());
+ assertEquals(0, env.getBufferTimeout());
+
+ try {
+ config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms");
+ env.configure(config, this.getClass().getClassLoader());
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+ }
+
/////////////////////////////////////////////////////////////
// Utilities
/////////////////////////////////////////////////////////////