This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 2535530  [FLINK-12351][DataStream] Fix AsyncWaitOperator to deep copy 
StreamElement when object reuse is enabled #8321 (#8321)
2535530 is described below

commit 253553062a6c944bac174af0804fd192d550b97f
Author: Jark Wu <[email protected]>
AuthorDate: Sat Apr 24 21:17:30 2021 +0800

    [FLINK-12351][DataStream] Fix AsyncWaitOperator to deep copy StreamElement 
when object reuse is enabled #8321 (#8321)
---
 .../api/operators/async/AsyncWaitOperator.java     | 16 +++-
 .../api/operators/async/AsyncWaitOperatorTest.java | 96 ++++++++++++++++++++--
 2 files changed, 106 insertions(+), 6 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 7c9e0b9..ee81685 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -105,6 +105,9 @@ public class AsyncWaitOperator<IN, OUT>
 
     private transient TimestampedCollector<OUT> timestampedCollector;
 
+    /** Whether object reuse has been enabled or disabled. */
+    private transient boolean isObjectReuseEnabled;
+
     public AsyncWaitOperator(
             @Nonnull AsyncFunction<IN, OUT> asyncFunction,
             long timeout,
@@ -158,6 +161,8 @@ public class AsyncWaitOperator<IN, OUT>
     public void open() throws Exception {
         super.open();
 
+        this.isObjectReuseEnabled = 
getExecutionConfig().isObjectReuseEnabled();
+
         if (recoveredStreamElements != null) {
             for (StreamElement element : recoveredStreamElements.get()) {
                 if (element.isRecord()) {
@@ -178,7 +183,16 @@ public class AsyncWaitOperator<IN, OUT>
     }
 
     @Override
-    public void processElement(StreamRecord<IN> element) throws Exception {
+    public void processElement(StreamRecord<IN> record) throws Exception {
+        StreamRecord<IN> element;
+        // copy the element avoid the element is reused
+        if (isObjectReuseEnabled) {
+            //noinspection unchecked
+            element = (StreamRecord<IN>) 
inStreamElementSerializer.copy(record);
+        } else {
+            element = record;
+        }
+
         // add element first to the queue
         final ResultFuture<OUT> entry = addToWorkQueue(element);
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 19184ac..986989f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -22,9 +22,12 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -100,9 +103,10 @@ import static org.junit.Assert.assertTrue;
 public class AsyncWaitOperatorTest extends TestLogger {
     private static final long TIMEOUT = 1000L;
 
-    @Rule public Timeout timeoutRule = new Timeout(10, TimeUnit.SECONDS);
+    @Rule public Timeout timeoutRule = new Timeout(100, TimeUnit.SECONDS);
 
-    private static class MyAsyncFunction extends RichAsyncFunction<Integer, 
Integer> {
+    private abstract static class MyAbstractAsyncFunction<IN>
+            extends RichAsyncFunction<IN, Integer> {
         private static final long serialVersionUID = 8522411971886428444L;
 
         private static final long TERMINATION_TIMEOUT = 5000L;
@@ -115,7 +119,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
         public void open(Configuration parameters) throws Exception {
             super.open(parameters);
 
-            synchronized (MyAsyncFunction.class) {
+            synchronized (MyAbstractAsyncFunction.class) {
                 if (counter == 0) {
                     executorService = 
Executors.newFixedThreadPool(THREAD_POOL_SIZE);
                 }
@@ -132,7 +136,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
         }
 
         private void freeExecutor() {
-            synchronized (MyAsyncFunction.class) {
+            synchronized (MyAbstractAsyncFunction.class) {
                 --counter;
 
                 if (counter == 0) {
@@ -151,6 +155,10 @@ public class AsyncWaitOperatorTest extends TestLogger {
                 }
             }
         }
+    }
+
+    private static class MyAsyncFunction extends 
MyAbstractAsyncFunction<Integer> {
+        private static final long serialVersionUID = -1504699677704123889L;
 
         @Override
         public void asyncInvoke(final Integer input, final 
ResultFuture<Integer> resultFuture)
@@ -183,7 +191,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
         @Override
         public void asyncInvoke(final Integer input, final 
ResultFuture<Integer> resultFuture)
                 throws Exception {
-            this.executorService.submit(
+            executorService.submit(
                     new Runnable() {
                         @Override
                         public void run() {
@@ -203,6 +211,23 @@ public class AsyncWaitOperatorTest extends TestLogger {
         }
     }
 
+    private static class InputReusedAsyncFunction extends 
MyAbstractAsyncFunction<Tuple1<Integer>> {
+
+        private static final long serialVersionUID = 8627909616410487720L;
+
+        @Override
+        public void asyncInvoke(Tuple1<Integer> input, ResultFuture<Integer> 
resultFuture)
+                throws Exception {
+            executorService.submit(
+                    new Runnable() {
+                        @Override
+                        public void run() {
+                            
resultFuture.complete(Collections.singletonList(input.f0 * 2));
+                        }
+                    });
+        }
+    }
+
     /**
      * A special {@link LazyAsyncFunction} for timeout handling. Complete the 
result future with 3
      * times the input when the timeout occurred.
@@ -596,6 +621,67 @@ public class AsyncWaitOperatorTest extends TestLogger {
                 restoredTaskHarness.getOutput());
     }
 
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testStateSnapshotAndRestoreWithObjectReused() throws Exception 
{
+        TypeSerializer[] fieldSerializers = new TypeSerializer[] 
{IntSerializer.INSTANCE};
+        TupleSerializer<Tuple1> inputSerializer =
+                new TupleSerializer<>(Tuple1.class, fieldSerializers);
+        AsyncWaitOperatorFactory<Tuple1<Integer>, Integer> factory =
+                new AsyncWaitOperatorFactory<>(
+                        new InputReusedAsyncFunction(),
+                        TIMEOUT,
+                        4,
+                        AsyncDataStream.OutputMode.ORDERED);
+
+        //noinspection unchecked
+        final OneInputStreamOperatorTestHarness<Tuple1<Integer>, Integer> 
testHarness =
+                new OneInputStreamOperatorTestHarness(factory, 
inputSerializer);
+        // enable object reuse
+        testHarness.getExecutionConfig().enableObjectReuse();
+
+        final long initialTime = 0L;
+        Tuple1<Integer> reusedTuple = new Tuple1<>();
+        StreamRecord<Tuple1<Integer>> reusedRecord = new 
StreamRecord<>(reusedTuple, -1L);
+
+        testHarness.setup();
+        testHarness.open();
+
+        synchronized (testHarness.getCheckpointLock()) {
+            reusedTuple.setFields(1);
+            reusedRecord.setTimestamp(initialTime + 1);
+            testHarness.processElement(reusedRecord);
+
+            reusedTuple.setFields(2);
+            reusedRecord.setTimestamp(initialTime + 2);
+            testHarness.processElement(reusedRecord);
+
+            reusedTuple.setFields(3);
+            reusedRecord.setTimestamp(initialTime + 3);
+            testHarness.processElement(reusedRecord);
+
+            reusedTuple.setFields(4);
+            reusedRecord.setTimestamp(initialTime + 4);
+            testHarness.processElement(reusedRecord);
+        }
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+        expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
+        expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
+        expectedOutput.add(new StreamRecord<>(6, initialTime + 3));
+        expectedOutput.add(new StreamRecord<>(8, initialTime + 4));
+
+        synchronized (testHarness.getCheckpointLock()) {
+            testHarness.endInput();
+            testHarness.close();
+        }
+
+        TestHarnessUtil.assertOutputEquals(
+                "StateAndRestoredWithObjectReuse Test Output was not correct.",
+                expectedOutput,
+                testHarness.getOutput());
+    }
+
     @Test
     public void testAsyncTimeoutFailure() throws Exception {
         testAsyncTimeout(

Reply via email to