Repository: flink
Updated Branches:
  refs/heads/release-1.5 27061d35a -> 884c2e39b


[FLINK-9397] [DataStream API] Correctly propagate operator buffer timeout of 0

This also improves some JavaDocs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/884c2e39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/884c2e39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/884c2e39

Branch: refs/heads/release-1.5
Commit: 884c2e39b401fc4f1e0623e856008af53ed5f98e
Parents: 27061d3
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 19:49:16 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 17 20:33:03 2018 +0200

----------------------------------------------------------------------
 .../datastream/SingleOutputStreamOperator.java  | 14 ++++++-
 .../api/graph/StreamGraphGenerator.java         |  2 +-
 .../transformations/StreamTransformation.java   | 16 ++++---
 .../api/graph/StreamGraphGeneratorTest.java     | 44 ++++++++++++++++++++
 4 files changed, 68 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 7885934..1ca3ece 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * {@code SingleOutputStreamOperator} represents a user defined transformation
@@ -226,14 +227,23 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
        }
 
        /**
-        * Sets the maximum time frequency (ms) for the flushing of the output
-        * buffer. By default the output buffers flush only when they are full.
+        * Sets the buffering timeout for data produced by this operation.
+        * The timeout defines how long data may linger in a partially full 
buffer
+        * before being sent over the network.
+        *
+        * <p>Lower timeouts lead to lower tail latencies, but may affect 
throughput.
+        * Timeouts of 1 ms still sustain high throughput, even for jobs with 
high parallelism.
+        *
+        * <p>A value of '-1' means that the default buffer timeout should be 
used. A value
+        * of '0' indicates that no buffering should happen, and all 
records/events should be
+        * immediately sent through the network, without additional buffering.
         *
         * @param timeoutMillis
         *            The maximum time between two output flushes.
         * @return The operator with buffer timeout set.
         */
        public SingleOutputStreamOperator<T> setBufferTimeout(long 
timeoutMillis) {
+               checkArgument(timeoutMillis >= -1, "timeout must be >= -1");
                transformation.setBufferTimeout(timeoutMillis);
                return this;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 7d0333f..11a7002 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -194,7 +194,7 @@ public class StreamGraphGenerator {
                        alreadyTransformed.put(transform, transformedIds);
                }
 
-               if (transform.getBufferTimeout() > 0) {
+               if (transform.getBufferTimeout() >= 0) {
                        streamGraph.setBufferTimeout(transform.getId(), 
transform.getBufferTimeout());
                }
                if (transform.getUid() != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 8cc4db9..1f763bb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -394,13 +395,18 @@ public abstract class StreamTransformation<T> {
        public abstract void setChainingStrategy(ChainingStrategy strategy);
 
        /**
-        * Set the buffer timeout of this {@code StreamTransformation}. The 
timeout is used when
-        * sending elements over the network. The timeout specifies how long a 
network buffer
-        * should be kept waiting before sending. A higher timeout means that 
more elements will
-        * be sent in one buffer, this increases throughput. The latency, 
however, is negatively
-        * affected by a higher timeout.
+        * Set the buffer timeout of this {@code StreamTransformation}. The 
timeout defines how long data
+        * may linger in a partially full buffer before being sent over the 
network.
+        *
+        * <p>Lower timeouts lead to lower tail latencies, but may affect 
throughput.
+        * For Flink 1.5+, timeouts of 1ms are feasible for jobs with high 
parallelism.
+        *
+        * <p>A value of -1 means that the default buffer timeout should be 
used. A value
+        * of zero indicates that no buffering should happen, and all 
records/events should be
+        * immediately sent through the network, without additional buffering.
         */
        public void setBufferTimeout(long bufferTimeout) {
+               checkArgument(bufferTimeout >= -1);
                this.bufferTimeout = bufferTimeout;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index d10fb3c..f2a268a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
@@ -58,6 +59,49 @@ import static org.junit.Assert.assertTrue;
  */
 public class StreamGraphGeneratorTest {
 
+       @Test
+       public void testBufferTimeout() {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               env.setBufferTimeout(77); // set timeout to some recognizable 
number
+
+               env
+                       .fromElements(1, 2, 3, 4, 5)
+
+                       .map(value -> value)
+                               .setBufferTimeout(-1)
+                               .name("A")
+                       .map(value -> value)
+                               .setBufferTimeout(0)
+                               .name("B")
+                       .map(value -> value)
+                               .setBufferTimeout(12)
+                               .name("C")
+                       .map(value -> value)
+                               .name("D");
+
+               final StreamGraph sg = env.getStreamGraph();
+               for (StreamNode node : sg.getStreamNodes()) {
+                       switch (node.getOperatorName()) {
+
+                               case "A":
+                                       assertEquals(77L, 
node.getBufferTimeout().longValue());
+                                       break;
+                               case "B":
+                                       assertEquals(0L, 
node.getBufferTimeout().longValue());
+                                       break;
+                               case "C":
+                                       assertEquals(12L, 
node.getBufferTimeout().longValue());
+                                       break;
+                               case "D":
+                                       assertEquals(77L, 
node.getBufferTimeout().longValue());
+                                       break;
+                               default:
+                                       assertTrue(node.getOperator() 
instanceof StreamSource);
+                       }
+               }
+       }
+
        /**
         * This tests whether virtual Transformations behave correctly.
         *

Reply via email to