This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 2535530 [FLINK-12351][DataStream] Fix AsyncWaitOperator to deep copy
StreamElement when object reuse is enabled #8321 (#8321)
2535530 is described below
commit 253553062a6c944bac174af0804fd192d550b97f
Author: Jark Wu <[email protected]>
AuthorDate: Sat Apr 24 21:17:30 2021 +0800
[FLINK-12351][DataStream] Fix AsyncWaitOperator to deep copy StreamElement
when object reuse is enabled #8321 (#8321)
---
.../api/operators/async/AsyncWaitOperator.java | 16 +++-
.../api/operators/async/AsyncWaitOperatorTest.java | 96 ++++++++++++++++++++--
2 files changed, 106 insertions(+), 6 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 7c9e0b9..ee81685 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -105,6 +105,9 @@ public class AsyncWaitOperator<IN, OUT>
private transient TimestampedCollector<OUT> timestampedCollector;
+ /** Whether object reuse has been enabled or disabled. */
+ private transient boolean isObjectReuseEnabled;
+
public AsyncWaitOperator(
@Nonnull AsyncFunction<IN, OUT> asyncFunction,
long timeout,
@@ -158,6 +161,8 @@ public class AsyncWaitOperator<IN, OUT>
public void open() throws Exception {
super.open();
+ this.isObjectReuseEnabled =
getExecutionConfig().isObjectReuseEnabled();
+
if (recoveredStreamElements != null) {
for (StreamElement element : recoveredStreamElements.get()) {
if (element.isRecord()) {
@@ -178,7 +183,16 @@ public class AsyncWaitOperator<IN, OUT>
}
@Override
- public void processElement(StreamRecord<IN> element) throws Exception {
+ public void processElement(StreamRecord<IN> record) throws Exception {
+ StreamRecord<IN> element;
+ // copy the element avoid the element is reused
+ if (isObjectReuseEnabled) {
+ //noinspection unchecked
+ element = (StreamRecord<IN>)
inStreamElementSerializer.copy(record);
+ } else {
+ element = record;
+ }
+
// add element first to the queue
final ResultFuture<OUT> entry = addToWorkQueue(element);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 19184ac..986989f 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -22,9 +22,12 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -100,9 +103,10 @@ import static org.junit.Assert.assertTrue;
public class AsyncWaitOperatorTest extends TestLogger {
private static final long TIMEOUT = 1000L;
- @Rule public Timeout timeoutRule = new Timeout(10, TimeUnit.SECONDS);
+ @Rule public Timeout timeoutRule = new Timeout(100, TimeUnit.SECONDS);
- private static class MyAsyncFunction extends RichAsyncFunction<Integer,
Integer> {
+ private abstract static class MyAbstractAsyncFunction<IN>
+ extends RichAsyncFunction<IN, Integer> {
private static final long serialVersionUID = 8522411971886428444L;
private static final long TERMINATION_TIMEOUT = 5000L;
@@ -115,7 +119,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- synchronized (MyAsyncFunction.class) {
+ synchronized (MyAbstractAsyncFunction.class) {
if (counter == 0) {
executorService =
Executors.newFixedThreadPool(THREAD_POOL_SIZE);
}
@@ -132,7 +136,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
private void freeExecutor() {
- synchronized (MyAsyncFunction.class) {
+ synchronized (MyAbstractAsyncFunction.class) {
--counter;
if (counter == 0) {
@@ -151,6 +155,10 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
}
}
+ }
+
+ private static class MyAsyncFunction extends
MyAbstractAsyncFunction<Integer> {
+ private static final long serialVersionUID = -1504699677704123889L;
@Override
public void asyncInvoke(final Integer input, final
ResultFuture<Integer> resultFuture)
@@ -183,7 +191,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
@Override
public void asyncInvoke(final Integer input, final
ResultFuture<Integer> resultFuture)
throws Exception {
- this.executorService.submit(
+ executorService.submit(
new Runnable() {
@Override
public void run() {
@@ -203,6 +211,23 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
}
+ private static class InputReusedAsyncFunction extends
MyAbstractAsyncFunction<Tuple1<Integer>> {
+
+ private static final long serialVersionUID = 8627909616410487720L;
+
+ @Override
+ public void asyncInvoke(Tuple1<Integer> input, ResultFuture<Integer>
resultFuture)
+ throws Exception {
+ executorService.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+
resultFuture.complete(Collections.singletonList(input.f0 * 2));
+ }
+ });
+ }
+ }
+
/**
* A special {@link LazyAsyncFunction} for timeout handling. Complete the
result future with 3
* times the input when the timeout occurred.
@@ -596,6 +621,67 @@ public class AsyncWaitOperatorTest extends TestLogger {
restoredTaskHarness.getOutput());
}
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testStateSnapshotAndRestoreWithObjectReused() throws Exception
{
+ TypeSerializer[] fieldSerializers = new TypeSerializer[]
{IntSerializer.INSTANCE};
+ TupleSerializer<Tuple1> inputSerializer =
+ new TupleSerializer<>(Tuple1.class, fieldSerializers);
+ AsyncWaitOperatorFactory<Tuple1<Integer>, Integer> factory =
+ new AsyncWaitOperatorFactory<>(
+ new InputReusedAsyncFunction(),
+ TIMEOUT,
+ 4,
+ AsyncDataStream.OutputMode.ORDERED);
+
+ //noinspection unchecked
+ final OneInputStreamOperatorTestHarness<Tuple1<Integer>, Integer>
testHarness =
+ new OneInputStreamOperatorTestHarness(factory,
inputSerializer);
+ // enable object reuse
+ testHarness.getExecutionConfig().enableObjectReuse();
+
+ final long initialTime = 0L;
+ Tuple1<Integer> reusedTuple = new Tuple1<>();
+ StreamRecord<Tuple1<Integer>> reusedRecord = new
StreamRecord<>(reusedTuple, -1L);
+
+ testHarness.setup();
+ testHarness.open();
+
+ synchronized (testHarness.getCheckpointLock()) {
+ reusedTuple.setFields(1);
+ reusedRecord.setTimestamp(initialTime + 1);
+ testHarness.processElement(reusedRecord);
+
+ reusedTuple.setFields(2);
+ reusedRecord.setTimestamp(initialTime + 2);
+ testHarness.processElement(reusedRecord);
+
+ reusedTuple.setFields(3);
+ reusedRecord.setTimestamp(initialTime + 3);
+ testHarness.processElement(reusedRecord);
+
+ reusedTuple.setFields(4);
+ reusedRecord.setTimestamp(initialTime + 4);
+ testHarness.processElement(reusedRecord);
+ }
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+ expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
+ expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
+ expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
+ expectedOutput.add(new StreamRecord<>(8, initialTime + 4));
+
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.endInput();
+ testHarness.close();
+ }
+
+ TestHarnessUtil.assertOutputEquals(
+ "StateAndRestoredWithObjectReuse Test Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput());
+ }
+
@Test
public void testAsyncTimeoutFailure() throws Exception {
testAsyncTimeout(