This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c773ce5 [FLINK-13063] Temporary fix for AsyncWaitOperator consistency
problems
c773ce5 is described below
commit c773ce5ff1c7cfb4ee2537c873b24cde4b9a060a
Author: Stefan Richter <[email protected]>
AuthorDate: Tue Jul 9 14:07:24 2019 +0200
[FLINK-13063] Temporary fix for AsyncWaitOperator consistency problems
The current implementation of AsyncWaitOperator can violate exactly-once
and at-least once guarantees in some common scenarios. This commit provides a
temporary fix by preventing the operator to be chained after other operators.
This closes #9034.
---
docs/dev/stream/operators/asyncio.md | 8 ++
.../api/operators/async/AsyncWaitOperator.java | 5 +-
.../api/operators/async/AsyncWaitOperatorTest.java | 86 +++++++++++++++++++---
3 files changed, 88 insertions(+), 11 deletions(-)
diff --git a/docs/dev/stream/operators/asyncio.md
b/docs/dev/stream/operators/asyncio.md
index e92e7a9..c9a9b2c 100644
--- a/docs/dev/stream/operators/asyncio.md
+++ b/docs/dev/stream/operators/asyncio.md
@@ -265,5 +265,13 @@ For example, the following patterns result in a blocking
`asyncInvoke(...)` func
- Using a database client whose lookup/query method call blocks until the
result has been received back
- Blocking/waiting on the future-type objects returned by an asynchronous
client inside the `asyncInvoke(...)` method
+
+**The operator for AsyncFunction (AsyncWaitOperator) must currently be at the
head of operator chains for consistency reasons**
+
+For the reasons given in issue `FLINK-13063`, we currently must break operator
chains for the `AsyncWaitOperator` to prevent
+potential consistency problems. This is a change to the previous behavior that
supported chaining. User that
+require the old behavior and accept potential violations of the consistency
guarantees can instantiate and add the
+`AsyncWaitOperator` manually to the job graph and set the chaining strategy
back to chaining via
+`AsyncWaitOperator#setChainingStrategy(ChainingStrategy.ALWAYS)`.
{% top %}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 2555c3b..f875775 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -118,7 +118,10 @@ public class AsyncWaitOperator<IN, OUT>
int capacity,
AsyncDataStream.OutputMode outputMode) {
super(asyncFunction);
- chainingStrategy = ChainingStrategy.ALWAYS;
+
+ // TODO this is a temporary fix for the problems described
under FLINK-13063 at the cost of breaking chains for
+ // AsyncOperators.
+ setChainingStrategy(ChainingStrategy.HEAD);
Preconditions.checkArgument(capacity > 0, "The number of
concurrent async operation should be greater than 0.");
this.capacity = capacity;
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 8fdad8f..05df362 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -21,7 +21,10 @@ package org.apache.flink.streaming.api.operators.async;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -40,12 +43,14 @@ import
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import
org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
@@ -180,6 +185,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
/**
* A special {@link AsyncFunction} without issuing
* {@link ResultFuture#complete} until the latch counts to zero.
+ * {@link ResultFuture#complete} until the latch counts to zero.
* This function is used in the testStateSnapshotAndRestore, ensuring
* that {@link StreamElementQueueEntry} can stay
* in the {@link StreamElementQueue} to be
@@ -384,6 +390,32 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
/**
+ * Test for the temporary fix to FLINK-13063.
+ */
+ @Test
+ public void testAsyncOperatorIsNeverChained() {
+ StreamExecutionEnvironment chainEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Integer> input = chainEnv.fromElements(1);
+ input = AsyncDataStream.orderedWait(
+ input,
+ new LazyAsyncFunction(),
+ TIMEOUT,
+ TimeUnit.MILLISECONDS,
+ 6).map((x) -> x);
+ AsyncDataStream.unorderedWait(
+ input,
+ new MyAsyncFunction(),
+ TIMEOUT,
+ TimeUnit.MILLISECONDS,
+ 3).map((x) -> x).addSink(new DiscardingSink<>());
+
+ final JobGraph jobGraph =
chainEnv.getStreamGraph().getJobGraph();
+
+ Assert.assertEquals(3,
jobGraph.getVerticesSortedTopologicallyFromSources().size());
+ }
+
+ /**
* Tests that the AsyncWaitOperator works together with chaining.
*/
@Test
@@ -443,20 +475,20 @@ public class AsyncWaitOperatorTest extends TestLogger {
DataStream<Integer> input = chainEnv.fromElements(1, 2, 3);
if (withLazyFunction) {
- input = AsyncDataStream.orderedWait(
+ input = addAsyncOperatorLegacyChained(
input,
new LazyAsyncFunction(),
TIMEOUT,
- TimeUnit.MILLISECONDS,
- 6);
+ 6,
+ AsyncDataStream.OutputMode.ORDERED);
}
else {
- input = AsyncDataStream.orderedWait(
+ input = addAsyncOperatorLegacyChained(
input,
new MyAsyncFunction(),
TIMEOUT,
- TimeUnit.MILLISECONDS,
- 6);
+ 6,
+ AsyncDataStream.OutputMode.ORDERED);
}
// the map function is designed to chain after async function.
we place an Integer object in it and
@@ -480,12 +512,12 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
});
- input = AsyncDataStream.unorderedWait(
+ input = addAsyncOperatorLegacyChained(
input,
new MyAsyncFunction(),
TIMEOUT,
- TimeUnit.MILLISECONDS,
- 3);
+ 3,
+ AsyncDataStream.OutputMode.UNORDERED);
input.map(new MapFunction<Integer, Integer>() {
private static final long serialVersionUID =
5162085254238405527L;
@@ -499,7 +531,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
// be build our own OperatorChain
final JobGraph jobGraph =
chainEnv.getStreamGraph().getJobGraph();
-
Assert.assertTrue(jobGraph.getVerticesSortedTopologicallyFromSources().size()
== 3);
+ Assert.assertEquals(3,
jobGraph.getVerticesSortedTopologicallyFromSources().size());
return
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
}
@@ -1110,4 +1142,38 @@ public class AsyncWaitOperatorTest extends TestLogger {
// no op
}
}
+
+ /**
+ * This helper function is needed to check that the temporary fix for
FLINK-13063 can be backwards compatible with
+ * the old chaining behavior by setting the ChainingStrategy manually.
TODO: remove after a proper fix for
+ * FLINK-13063 is in place that allows chaining.
+ */
+ private <IN, OUT> SingleOutputStreamOperator<OUT>
addAsyncOperatorLegacyChained(
+ DataStream<IN> in,
+ AsyncFunction<IN, OUT> func,
+ long timeout,
+ int bufSize,
+ AsyncDataStream.OutputMode mode) {
+
+ TypeInformation<OUT> outTypeInfo =
TypeExtractor.getUnaryOperatorReturnType(
+ func,
+ AsyncFunction.class,
+ 0,
+ 1,
+ new int[]{1, 0},
+ in.getType(),
+ Utils.getCallLocationName(),
+ true);
+
+ // create transform
+ AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
+ in.getExecutionEnvironment().clean(func),
+ timeout,
+ bufSize,
+ mode);
+
+ operator.setChainingStrategy(ChainingStrategy.ALWAYS);
+
+ return in.transform("async wait operator", outTypeInfo,
operator);
+ }
}