Repository: flink Updated Branches: refs/heads/release-1.5 27061d35a -> 884c2e39b
[FLINK-9397] [DataStream API] Correctly propagate operator buffer timeout of 0 This also improves some JavaDocs. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/884c2e39 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/884c2e39 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/884c2e39 Branch: refs/heads/release-1.5 Commit: 884c2e39b401fc4f1e0623e856008af53ed5f98e Parents: 27061d3 Author: Stephan Ewen <[email protected]> Authored: Thu May 17 19:49:16 2018 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu May 17 20:33:03 2018 +0200 ---------------------------------------------------------------------- .../datastream/SingleOutputStreamOperator.java | 14 ++++++- .../api/graph/StreamGraphGenerator.java | 2 +- .../transformations/StreamTransformation.java | 16 ++++--- .../api/graph/StreamGraphGeneratorTest.java | 44 ++++++++++++++++++++ 4 files changed, 68 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 7885934..1ca3ece 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -36,6 +36,7 @@ import java.util.HashMap; import java.util.Map; import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; /** * {@code SingleOutputStreamOperator} represents a user defined transformation @@ -226,14 +227,23 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { } /** - * Sets the maximum time frequency (ms) for the flushing of the output - * buffer. By default the output buffers flush only when they are full. + * Sets the buffering timeout for data produced by this operation. + * The timeout defines how long data may linger in a partially full buffer + * before being sent over the network. + * + * <p>Lower timeouts lead to lower tail latencies, but may affect throughput. + * Timeouts of 1 ms still sustain high throughput, even for jobs with high parallelism. + * + * <p>A value of '-1' means that the default buffer timeout should be used. A value + * of '0' indicates that no buffering should happen, and all records/events should be + * immediately sent through the network, without additional buffering. * * @param timeoutMillis * The maximum time between two output flushes. * @return The operator with buffer timeout set. */ public SingleOutputStreamOperator<T> setBufferTimeout(long timeoutMillis) { + checkArgument(timeoutMillis >= -1, "timeout must be >= -1"); transformation.setBufferTimeout(timeoutMillis); return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 7d0333f..11a7002 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -194,7 +194,7 @@ public class StreamGraphGenerator { alreadyTransformed.put(transform, transformedIds); } - if (transform.getBufferTimeout() > 0) { + if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } if (transform.getUid() != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java index 8cc4db9..1f763bb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java @@ -31,6 +31,7 @@ import org.apache.flink.util.Preconditions; import java.util.Collection; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -394,13 +395,18 @@ public abstract class StreamTransformation<T> { public abstract void setChainingStrategy(ChainingStrategy strategy); /** - * Set the buffer timeout of this {@code StreamTransformation}. The timeout is used when - * sending elements over the network. The timeout specifies how long a network buffer - * should be kept waiting before sending. A higher timeout means that more elements will - * be sent in one buffer, this increases throughput. The latency, however, is negatively - * affected by a higher timeout. + * Set the buffer timeout of this {@code StreamTransformation}. The timeout defines how long data + * may linger in a partially full buffer before being sent over the network. + * + * <p>Lower timeouts lead to lower tail latencies, but may affect throughput. + * For Flink 1.5+, timeouts of 1ms are feasible for jobs with high parallelism. + * + * <p>A value of -1 means that the default buffer timeout should be used. A value + * of zero indicates that no buffering should happen, and all records/events should be + * immediately sent through the network, without additional buffering. */ public void setBufferTimeout(long bufferTimeout) { + checkArgument(bufferTimeout >= -1); this.bufferTimeout = bufferTimeout; } http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index d10fb3c..f2a268a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; @@ -58,6 +59,49 @@ import static org.junit.Assert.assertTrue; */ public class StreamGraphGeneratorTest { + @Test + public void testBufferTimeout() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setBufferTimeout(77); // set timeout to some recognizable number + + env + .fromElements(1, 2, 3, 4, 5) + + .map(value -> value) + .setBufferTimeout(-1) + .name("A") + .map(value -> value) + .setBufferTimeout(0) + .name("B") + .map(value -> value) + .setBufferTimeout(12) + .name("C") + .map(value -> value) + .name("D"); + + final StreamGraph sg = env.getStreamGraph(); + for (StreamNode node : sg.getStreamNodes()) { + switch (node.getOperatorName()) { + + case "A": + assertEquals(77L, node.getBufferTimeout().longValue()); + break; + case "B": + assertEquals(0L, node.getBufferTimeout().longValue()); + break; + case "C": + assertEquals(12L, node.getBufferTimeout().longValue()); + break; + case "D": + assertEquals(77L, node.getBufferTimeout().longValue()); + break; + default: + assertTrue(node.getOperator() instanceof StreamSource); + } + } + } + /** * This tests whether virtual Transformations behave correctly. *
