This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a55677dff7 [FLINK-35904][test] Make async test harness extend exsit 
test harness (#25856)
0a55677dff7 is described below

commit 0a55677dff7fe0c95abc52938207f73bb4fa77e4
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Thu Dec 26 10:27:45 2024 +0800

    [FLINK-35904][test] Make async test harness extend exsit test harness 
(#25856)
---
 .../util/MultiInputStreamOperatorTestHarness.java  |  2 +-
 ...ncKeyedMultiInputStreamOperatorTestHarness.java | 25 +++++-----
 ...syncKeyedOneInputStreamOperatorTestHarness.java | 53 ++++++++++++++--------
 ...syncKeyedTwoInputStreamOperatorTestHarness.java | 42 +++++++++++++----
 .../asyncprocessing/AsyncProcessingTestUtil.java   |  2 +-
 5 files changed, 82 insertions(+), 42 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
index 454e0e60d5a..058fc57ee8b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
@@ -69,7 +69,7 @@ public class MultiInputStreamOperatorTestHarness<OUT>
         
getCastedOperator().getInputs().get(idx).processRecordAttributes(recordAttributes);
     }
 
-    private MultipleInputStreamOperator<OUT> getCastedOperator() {
+    protected MultipleInputStreamOperator<OUT> getCastedOperator() {
         return (MultipleInputStreamOperator<OUT>) operator;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
index 82a6d424c4f..566424d7d21 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
 import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.List;
@@ -48,10 +48,10 @@ import static 
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTes
  * async processing, please use methods of test harness instead of operator.
  */
 public class AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
-        extends AbstractStreamOperatorTestHarness<OUT> {
+        extends MultiInputStreamOperatorTestHarness<OUT> {
 
     /** The executor service for async state processing. */
-    private ExecutorService executor;
+    private final ExecutorService executor;
 
     public static <K, OUT> AsyncKeyedMultiInputStreamOperatorTestHarness<K, 
OUT> create(
             StreamOperatorFactory<OUT> operatorFactory,
@@ -108,6 +108,8 @@ public class 
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
         config.serializeAllConfigs();
     }
 
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public void processElement(int idx, StreamRecord<?> element) throws 
Exception {
         Input input = getCastedOperator().getInputs().get(idx);
         ThrowingConsumer<StreamRecord<?>, Exception> inputProcessor =
@@ -115,16 +117,22 @@ public class 
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
         execute(executor, (ignore) -> inputProcessor.accept(element)).get();
     }
 
+    @Override
+    @SuppressWarnings("rawtypes")
     public void processWatermark(int idx, Watermark mark) throws Exception {
         Input input = getCastedOperator().getInputs().get(idx);
         execute(executor, (ignore) -> input.processWatermark(mark)).get();
     }
 
+    @Override
+    @SuppressWarnings("rawtypes")
     public void processWatermarkStatus(int idx, WatermarkStatus 
watermarkStatus) throws Exception {
         Input input = getCastedOperator().getInputs().get(idx);
         execute(executor, (ignore) -> 
input.processWatermarkStatus(watermarkStatus)).get();
     }
 
+    @Override
+    @SuppressWarnings("rawtypes")
     public void processRecordAttributes(int idx, RecordAttributes 
recordAttributes)
             throws Exception {
         Input input = getCastedOperator().getInputs().get(idx);
@@ -137,16 +145,7 @@ public class 
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
 
     @Override
     public void close() throws Exception {
-        execute(
-                        executor,
-                        (ignore) -> {
-                            super.close();
-                        })
-                .get();
+        execute(executor, (ignore) -> super.close()).get();
         executor.shutdown();
     }
-
-    private MultipleInputStreamOperator<OUT> getCastedOperator() {
-        return (MultipleInputStreamOperator<OUT>) operator;
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
index fbca7135f38..6128a5b915a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -34,7 +35,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.ArrayList;
@@ -54,13 +55,15 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * async processing, please use methods of test harness instead of operator.
  */
 public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
-        extends AbstractStreamOperatorTestHarness<OUT> {
+        extends OneInputStreamOperatorTestHarness<IN, OUT> {
 
     /** Empty if the {@link #operator} is not {@link 
MultipleInputStreamOperator}. */
-    private final List<Input> inputs = new ArrayList<>();
+    private final List<Input<IN>> inputs = new ArrayList<>();
+
+    private long currentWatermark;
 
     /** The executor service for async state processing. */
-    private ExecutorService executor;
+    private final ExecutorService executor;
 
     public static <K, IN, OUT> AsyncKeyedOneInputStreamOperatorTestHarness<K, 
IN, OUT> create(
             OneInputStreamOperator<IN, OUT> operator,
@@ -120,6 +123,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, 
IN, OUT>
     }
 
     @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public void setup(TypeSerializer<OUT> outputSerializer) {
         super.setup(outputSerializer);
         if (operator instanceof MultipleInputStreamOperator) {
@@ -128,10 +132,7 @@ public class 
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
         }
     }
 
-    public OneInputStreamOperator<IN, OUT> getOneInputOperator() {
-        return (OneInputStreamOperator<IN, OUT>) this.operator;
-    }
-
+    @Override
     public void processElement(StreamRecord<IN> element) throws Exception {
         processElementInternal(element).get();
     }
@@ -140,6 +141,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, 
IN, OUT>
      * Submit an element processing in an executor thread. This method is 
mainly used for internal
      * testing, please use {@link #processElement} for common operator testing.
      */
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public CompletableFuture<Void> processElementInternal(StreamRecord<IN> 
element)
             throws Exception {
         if (inputs.isEmpty()) {
@@ -160,22 +162,24 @@ public class 
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
         }
     }
 
+    @Override
     public void processWatermark(long watermark) throws Exception {
         processWatermarkInternal(watermark).get();
     }
 
     /** For internal testing. */
-    public CompletableFuture<Void> processWatermarkInternal(long watermark) 
throws Exception {
+    public CompletableFuture<Void> processWatermarkInternal(long watermark) {
         return processWatermarkInternal(new Watermark(watermark));
     }
 
+    @Override
     public void processWatermarkStatus(WatermarkStatus status) throws 
Exception {
         processWatermarkStatusInternal(status).get();
     }
 
     /** For internal testing. */
-    public CompletableFuture<Void> 
processWatermarkStatusInternal(WatermarkStatus status)
-            throws Exception {
+    @SuppressWarnings("rawtypes")
+    public CompletableFuture<Void> 
processWatermarkStatusInternal(WatermarkStatus status) {
         if (inputs.isEmpty()) {
             return execute(
                     executor, (ignore) -> 
getOneInputOperator().processWatermarkStatus(status));
@@ -186,12 +190,22 @@ public class 
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
         }
     }
 
+    @Override
     public void processWatermark(Watermark mark) throws Exception {
         processWatermarkInternal(mark).get();
     }
 
+    @Override
+    public void endInput() throws Exception {
+        if (operator instanceof BoundedOneInput) {
+            execute(executor, (ignore) -> ((BoundedOneInput) 
operator).endInput()).get();
+        }
+    }
+
     /** For internal testing. */
-    public CompletableFuture<Void> processWatermarkInternal(Watermark mark) 
throws Exception {
+    @SuppressWarnings("rawtypes")
+    public CompletableFuture<Void> processWatermarkInternal(Watermark mark) {
+        currentWatermark = mark.getTimestamp();
         if (inputs.isEmpty()) {
             return execute(executor, (ignore) -> 
getOneInputOperator().processWatermark(mark));
         } else {
@@ -206,6 +220,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, 
IN, OUT>
     }
 
     /** For internal testing. */
+    @SuppressWarnings("rawtypes")
     public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker 
marker) {
         if (inputs.isEmpty()) {
             return execute(
@@ -217,11 +232,18 @@ public class 
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
         }
     }
 
+    @Override
     public void processRecordAttributes(RecordAttributes recordAttributes) 
throws Exception {
         processRecordAttributesInternal(recordAttributes).get();
     }
 
+    @Override
+    public long getCurrentWatermark() {
+        return currentWatermark;
+    }
+
     /** For internal testing. */
+    @SuppressWarnings("rawtypes")
     public CompletableFuture<Void> processRecordAttributesInternal(
             RecordAttributes recordAttributes) {
         if (inputs.isEmpty()) {
@@ -246,12 +268,7 @@ public class 
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 
     @Override
     public void close() throws Exception {
-        execute(
-                        executor,
-                        (ignore) -> {
-                            super.close();
-                        })
-                .get();
+        execute(executor, (ignore) -> super.close()).get();
         executor.shutdown();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
index d3f34a1b19c..4128f073afd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -30,7 +31,7 @@ import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStatePr
 import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
 
@@ -48,7 +49,7 @@ import static 
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTes
  * async processing, please use methods of test harness instead of operator.
  */
 public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
-        extends AbstractStreamOperatorTestHarness<OUT> {
+        extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
 
     private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
 
@@ -56,7 +57,7 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, 
IN1, IN2, OUT>
     private ThrowingConsumer<StreamRecord<IN2>, Exception> processor2;
 
     /** The executor service for async state processing. */
-    private ExecutorService executor;
+    private final ExecutorService executor;
 
     public static <K, IN1, IN2, OUT>
             AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> 
create(
@@ -132,62 +133,85 @@ public class 
AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
         return processor2;
     }
 
+    @Override
     public void processElement1(StreamRecord<IN1> element) throws Exception {
         execute(executor, (ignore) -> 
getRecordProcessor1().accept(element)).get();
     }
 
+    @Override
     public void processElement1(IN1 value, long timestamp) throws Exception {
         processElement1(new StreamRecord<>(value, timestamp));
     }
 
+    @Override
     public void processElement2(StreamRecord<IN2> element) throws Exception {
         execute(executor, (ignore) -> 
getRecordProcessor2().accept(element)).get();
     }
 
+    @Override
     public void processElement2(IN2 value, long timestamp) throws Exception {
         processElement2(new StreamRecord<>(value, timestamp));
     }
 
+    @Override
     public void processWatermark1(Watermark mark) throws Exception {
         execute(executor, (ignore) -> 
twoInputOperator.processWatermark1(mark)).get();
     }
 
+    @Override
     public void processWatermark2(Watermark mark) throws Exception {
         execute(executor, (ignore) -> 
twoInputOperator.processWatermark2(mark)).get();
     }
 
+    @Override
+    public void processBothWatermarks(Watermark mark) throws Exception {
+        execute(executor, (ignore) -> 
twoInputOperator.processWatermark1(mark)).get();
+        execute(executor, (ignore) -> 
twoInputOperator.processWatermark2(mark)).get();
+    }
+
+    @Override
     public void processWatermarkStatus1(WatermarkStatus watermarkStatus) 
throws Exception {
         execute(executor, (ignore) -> 
twoInputOperator.processWatermarkStatus1(watermarkStatus))
                 .get();
     }
 
+    @Override
     public void processWatermarkStatus2(WatermarkStatus watermarkStatus) 
throws Exception {
         execute(executor, (ignore) -> 
twoInputOperator.processWatermarkStatus2(watermarkStatus))
                 .get();
     }
 
+    @Override
     public void processRecordAttributes1(RecordAttributes recordAttributes) 
throws Exception {
         execute(executor, (ignore) -> 
twoInputOperator.processRecordAttributes1(recordAttributes))
                 .get();
     }
 
+    @Override
     public void processRecordAttributes2(RecordAttributes recordAttributes) 
throws Exception {
         execute(executor, (ignore) -> 
twoInputOperator.processRecordAttributes2(recordAttributes))
                 .get();
     }
 
+    public void endInput1() throws Exception {
+        if (operator instanceof BoundedMultiInput) {
+            execute(executor, (ignore) -> ((BoundedMultiInput) 
operator).endInput(1)).get();
+        }
+    }
+
+    public void endInput2() throws Exception {
+        if (operator instanceof BoundedMultiInput) {
+            execute(executor, (ignore) -> ((BoundedMultiInput) 
operator).endInput(2)).get();
+        }
+    }
+
     public void drainStateRequests() throws Exception {
         execute(executor, (ignore) -> drain(operator)).get();
     }
 
     @Override
     public void close() throws Exception {
-        execute(
-                        executor,
-                        (ignore) -> {
-                            super.close();
-                        })
-                .get();
+        execute(executor, (ignore) -> super.close()).get();
         executor.shutdown();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
index a88146551e3..0c5252e5e40 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
@@ -42,7 +42,7 @@ public class AsyncProcessingTestUtil {
 
     public static CompletableFuture<Void> execute(
             ExecutorService executor, ThrowingConsumer<Void, Exception> 
processor) {
-        CompletableFuture<Void> future = new CompletableFuture();
+        CompletableFuture<Void> future = new CompletableFuture<>();
         executor.execute(
                 () -> {
                     try {

Reply via email to