This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c773ce5  [FLINK-13063] Temporary fix for AsyncWaitOperator consistency 
problems
c773ce5 is described below

commit c773ce5ff1c7cfb4ee2537c873b24cde4b9a060a
Author: Stefan Richter <[email protected]>
AuthorDate: Tue Jul 9 14:07:24 2019 +0200

    [FLINK-13063] Temporary fix for AsyncWaitOperator consistency problems
    
    The current implementation of AsyncWaitOperator can violate exactly-once 
and at-least once guarantees in some common scenarios. This commit provides a 
temporary fix by preventing the operator to be chained after other operators.
    
    This closes #9034.
---
 docs/dev/stream/operators/asyncio.md               |  8 ++
 .../api/operators/async/AsyncWaitOperator.java     |  5 +-
 .../api/operators/async/AsyncWaitOperatorTest.java | 86 +++++++++++++++++++---
 3 files changed, 88 insertions(+), 11 deletions(-)

diff --git a/docs/dev/stream/operators/asyncio.md 
b/docs/dev/stream/operators/asyncio.md
index e92e7a9..c9a9b2c 100644
--- a/docs/dev/stream/operators/asyncio.md
+++ b/docs/dev/stream/operators/asyncio.md
@@ -265,5 +265,13 @@ For example, the following patterns result in a blocking 
`asyncInvoke(...)` func
   - Using a database client whose lookup/query method call blocks until the 
result has been received back
 
   - Blocking/waiting on the future-type objects returned by an asynchronous 
client inside the `asyncInvoke(...)` method
+  
+**The operator for AsyncFunction (AsyncWaitOperator) must currently be at the 
head of operator chains for consistency reasons**
+
+For the reasons given in issue `FLINK-13063`, we currently must break operator 
chains for the `AsyncWaitOperator` to prevent 
+potential consistency problems. This is a change to the previous behavior that 
supported chaining. User that
+require the old behavior and accept potential violations of the consistency 
guarantees can instantiate and add the 
+`AsyncWaitOperator` manually to the job graph and set the chaining strategy 
back to chaining via 
+`AsyncWaitOperator#setChainingStrategy(ChainingStrategy.ALWAYS)`.
 
 {% top %}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 2555c3b..f875775 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -118,7 +118,10 @@ public class AsyncWaitOperator<IN, OUT>
                        int capacity,
                        AsyncDataStream.OutputMode outputMode) {
                super(asyncFunction);
-               chainingStrategy = ChainingStrategy.ALWAYS;
+
+               // TODO this is a temporary fix for the problems described 
under FLINK-13063 at the cost of breaking chains for
+               //  AsyncOperators.
+               setChainingStrategy(ChainingStrategy.HEAD);
 
                Preconditions.checkArgument(capacity > 0, "The number of 
concurrent async operation should be greater than 0.");
                this.capacity = capacity;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 8fdad8f..05df362 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -21,7 +21,10 @@ package org.apache.flink.streaming.api.operators.async;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -40,12 +43,14 @@ import 
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
 import 
org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
@@ -180,6 +185,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
        /**
         * A special {@link AsyncFunction} without issuing
         * {@link ResultFuture#complete} until the latch counts to zero.
+        * {@link ResultFuture#complete} until the latch counts to zero.
         * This function is used in the testStateSnapshotAndRestore, ensuring
         * that {@link StreamElementQueueEntry} can stay
         * in the {@link StreamElementQueue} to be
@@ -384,6 +390,32 @@ public class AsyncWaitOperatorTest extends TestLogger {
        }
 
        /**
+        * Test for the temporary fix to FLINK-13063.
+        */
+       @Test
+       public void testAsyncOperatorIsNeverChained() {
+               StreamExecutionEnvironment chainEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Integer> input = chainEnv.fromElements(1);
+               input = AsyncDataStream.orderedWait(
+                       input,
+                       new LazyAsyncFunction(),
+                       TIMEOUT,
+                       TimeUnit.MILLISECONDS,
+                       6).map((x) -> x);
+               AsyncDataStream.unorderedWait(
+                       input,
+                       new MyAsyncFunction(),
+                       TIMEOUT,
+                       TimeUnit.MILLISECONDS,
+                       3).map((x) -> x).addSink(new DiscardingSink<>());
+
+               final JobGraph jobGraph = 
chainEnv.getStreamGraph().getJobGraph();
+
+               Assert.assertEquals(3, 
jobGraph.getVerticesSortedTopologicallyFromSources().size());
+       }
+
+       /**
         *      Tests that the AsyncWaitOperator works together with chaining.
         */
        @Test
@@ -443,20 +475,20 @@ public class AsyncWaitOperatorTest extends TestLogger {
                DataStream<Integer> input = chainEnv.fromElements(1, 2, 3);
 
                if (withLazyFunction) {
-                       input = AsyncDataStream.orderedWait(
+                       input = addAsyncOperatorLegacyChained(
                                input,
                                new LazyAsyncFunction(),
                                TIMEOUT,
-                               TimeUnit.MILLISECONDS,
-                               6);
+                               6,
+                               AsyncDataStream.OutputMode.ORDERED);
                }
                else {
-                       input = AsyncDataStream.orderedWait(
+                       input = addAsyncOperatorLegacyChained(
                                input,
                                new MyAsyncFunction(),
                                TIMEOUT,
-                               TimeUnit.MILLISECONDS,
-                               6);
+                               6,
+                               AsyncDataStream.OutputMode.ORDERED);
                }
 
                // the map function is designed to chain after async function. 
we place an Integer object in it and
@@ -480,12 +512,12 @@ public class AsyncWaitOperatorTest extends TestLogger {
                        }
                });
 
-               input = AsyncDataStream.unorderedWait(
+               input = addAsyncOperatorLegacyChained(
                        input,
                        new MyAsyncFunction(),
                        TIMEOUT,
-                       TimeUnit.MILLISECONDS,
-                       3);
+                       3,
+                       AsyncDataStream.OutputMode.UNORDERED);
 
                input.map(new MapFunction<Integer, Integer>() {
                        private static final long serialVersionUID = 
5162085254238405527L;
@@ -499,7 +531,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
                // be build our own OperatorChain
                final JobGraph jobGraph = 
chainEnv.getStreamGraph().getJobGraph();
 
-               
Assert.assertTrue(jobGraph.getVerticesSortedTopologicallyFromSources().size() 
== 3);
+               Assert.assertEquals(3, 
jobGraph.getVerticesSortedTopologicallyFromSources().size());
 
                return 
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        }
@@ -1110,4 +1142,38 @@ public class AsyncWaitOperatorTest extends TestLogger {
                        // no op
                }
        }
+
+       /**
+        * This helper function is needed to check that the temporary fix for 
FLINK-13063 can be backwards compatible with
+        * the old chaining behavior by setting the ChainingStrategy manually. 
TODO: remove after a proper fix for
+        * FLINK-13063 is in place that allows chaining.
+        */
+       private <IN, OUT> SingleOutputStreamOperator<OUT> 
addAsyncOperatorLegacyChained(
+               DataStream<IN> in,
+               AsyncFunction<IN, OUT> func,
+               long timeout,
+               int bufSize,
+               AsyncDataStream.OutputMode mode) {
+
+               TypeInformation<OUT> outTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
+                       func,
+                       AsyncFunction.class,
+                       0,
+                       1,
+                       new int[]{1, 0},
+                       in.getType(),
+                       Utils.getCallLocationName(),
+                       true);
+
+               // create transform
+               AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
+                       in.getExecutionEnvironment().clean(func),
+                       timeout,
+                       bufSize,
+                       mode);
+
+               operator.setChainingStrategy(ChainingStrategy.ALWAYS);
+
+               return in.transform("async wait operator", outTypeInfo, 
operator);
+       }
 }

Reply via email to