This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0a55677dff7 [FLINK-35904][test] Make async test harness extend exsit test harness (#25856) 0a55677dff7 is described below commit 0a55677dff7fe0c95abc52938207f73bb4fa77e4 Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Thu Dec 26 10:27:45 2024 +0800 [FLINK-35904][test] Make async test harness extend exsit test harness (#25856) --- .../util/MultiInputStreamOperatorTestHarness.java | 2 +- ...ncKeyedMultiInputStreamOperatorTestHarness.java | 25 +++++----- ...syncKeyedOneInputStreamOperatorTestHarness.java | 53 ++++++++++++++-------- ...syncKeyedTwoInputStreamOperatorTestHarness.java | 42 +++++++++++++---- .../asyncprocessing/AsyncProcessingTestUtil.java | 2 +- 5 files changed, 82 insertions(+), 42 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java index 454e0e60d5a..058fc57ee8b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java @@ -69,7 +69,7 @@ public class MultiInputStreamOperatorTestHarness<OUT> getCastedOperator().getInputs().get(idx).processRecordAttributes(recordAttributes); } - private MultipleInputStreamOperator<OUT> getCastedOperator() { + protected MultipleInputStreamOperator<OUT> getCastedOperator() { return (MultipleInputStreamOperator<OUT>) operator; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java index 82a6d424c4f..566424d7d21 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java @@ -30,7 +30,7 @@ import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness; import org.apache.flink.util.function.ThrowingConsumer; import java.util.List; @@ -48,10 +48,10 @@ import static org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTes * async processing, please use methods of test harness instead of operator. */ public class AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT> - extends AbstractStreamOperatorTestHarness<OUT> { + extends MultiInputStreamOperatorTestHarness<OUT> { /** The executor service for async state processing. */ - private ExecutorService executor; + private final ExecutorService executor; public static <K, OUT> AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT> create( StreamOperatorFactory<OUT> operatorFactory, @@ -108,6 +108,8 @@ public class AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT> config.serializeAllConfigs(); } + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) public void processElement(int idx, StreamRecord<?> element) throws Exception { Input input = getCastedOperator().getInputs().get(idx); ThrowingConsumer<StreamRecord<?>, Exception> inputProcessor = @@ -115,16 +117,22 @@ public class AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT> execute(executor, (ignore) -> inputProcessor.accept(element)).get(); } + @Override + @SuppressWarnings("rawtypes") public void processWatermark(int idx, Watermark mark) throws Exception { Input input = getCastedOperator().getInputs().get(idx); execute(executor, (ignore) -> input.processWatermark(mark)).get(); } + @Override + @SuppressWarnings("rawtypes") public void processWatermarkStatus(int idx, WatermarkStatus watermarkStatus) throws Exception { Input input = getCastedOperator().getInputs().get(idx); execute(executor, (ignore) -> input.processWatermarkStatus(watermarkStatus)).get(); } + @Override + @SuppressWarnings("rawtypes") public void processRecordAttributes(int idx, RecordAttributes recordAttributes) throws Exception { Input input = getCastedOperator().getInputs().get(idx); @@ -137,16 +145,7 @@ public class AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT> @Override public void close() throws Exception { - execute( - executor, - (ignore) -> { - super.close(); - }) - .get(); + execute(executor, (ignore) -> super.close()).get(); executor.shutdown(); } - - private MultipleInputStreamOperator<OUT> getCastedOperator() { - return (MultipleInputStreamOperator<OUT>) operator; - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java index fbca7135f38..6128a5b915a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -34,7 +35,7 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.function.ThrowingConsumer; import java.util.ArrayList; @@ -54,13 +55,15 @@ import static org.apache.flink.util.Preconditions.checkState; * async processing, please use methods of test harness instead of operator. */ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> - extends AbstractStreamOperatorTestHarness<OUT> { + extends OneInputStreamOperatorTestHarness<IN, OUT> { /** Empty if the {@link #operator} is not {@link MultipleInputStreamOperator}. */ - private final List<Input> inputs = new ArrayList<>(); + private final List<Input<IN>> inputs = new ArrayList<>(); + + private long currentWatermark; /** The executor service for async state processing. */ - private ExecutorService executor; + private final ExecutorService executor; public static <K, IN, OUT> AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> create( OneInputStreamOperator<IN, OUT> operator, @@ -120,6 +123,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } @Override + @SuppressWarnings({"rawtypes", "unchecked"}) public void setup(TypeSerializer<OUT> outputSerializer) { super.setup(outputSerializer); if (operator instanceof MultipleInputStreamOperator) { @@ -128,10 +132,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } } - public OneInputStreamOperator<IN, OUT> getOneInputOperator() { - return (OneInputStreamOperator<IN, OUT>) this.operator; - } - + @Override public void processElement(StreamRecord<IN> element) throws Exception { processElementInternal(element).get(); } @@ -140,6 +141,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> * Submit an element processing in an executor thread. This method is mainly used for internal * testing, please use {@link #processElement} for common operator testing. */ + @SuppressWarnings({"rawtypes", "unchecked"}) public CompletableFuture<Void> processElementInternal(StreamRecord<IN> element) throws Exception { if (inputs.isEmpty()) { @@ -160,22 +162,24 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } } + @Override public void processWatermark(long watermark) throws Exception { processWatermarkInternal(watermark).get(); } /** For internal testing. */ - public CompletableFuture<Void> processWatermarkInternal(long watermark) throws Exception { + public CompletableFuture<Void> processWatermarkInternal(long watermark) { return processWatermarkInternal(new Watermark(watermark)); } + @Override public void processWatermarkStatus(WatermarkStatus status) throws Exception { processWatermarkStatusInternal(status).get(); } /** For internal testing. */ - public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus status) - throws Exception { + @SuppressWarnings("rawtypes") + public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus status) { if (inputs.isEmpty()) { return execute( executor, (ignore) -> getOneInputOperator().processWatermarkStatus(status)); @@ -186,12 +190,22 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } } + @Override public void processWatermark(Watermark mark) throws Exception { processWatermarkInternal(mark).get(); } + @Override + public void endInput() throws Exception { + if (operator instanceof BoundedOneInput) { + execute(executor, (ignore) -> ((BoundedOneInput) operator).endInput()).get(); + } + } + /** For internal testing. */ - public CompletableFuture<Void> processWatermarkInternal(Watermark mark) throws Exception { + @SuppressWarnings("rawtypes") + public CompletableFuture<Void> processWatermarkInternal(Watermark mark) { + currentWatermark = mark.getTimestamp(); if (inputs.isEmpty()) { return execute(executor, (ignore) -> getOneInputOperator().processWatermark(mark)); } else { @@ -206,6 +220,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } /** For internal testing. */ + @SuppressWarnings("rawtypes") public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker marker) { if (inputs.isEmpty()) { return execute( @@ -217,11 +232,18 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } } + @Override public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception { processRecordAttributesInternal(recordAttributes).get(); } + @Override + public long getCurrentWatermark() { + return currentWatermark; + } + /** For internal testing. */ + @SuppressWarnings("rawtypes") public CompletableFuture<Void> processRecordAttributesInternal( RecordAttributes recordAttributes) { if (inputs.isEmpty()) { @@ -246,12 +268,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> @Override public void close() throws Exception { - execute( - executor, - (ignore) -> { - super.close(); - }) - .get(); + execute(executor, (ignore) -> super.close()).get(); executor.shutdown(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java index d3f34a1b19c..4128f073afd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -30,7 +31,7 @@ import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStatePr import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingConsumer; @@ -48,7 +49,7 @@ import static org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTes * async processing, please use methods of test harness instead of operator. */ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> - extends AbstractStreamOperatorTestHarness<OUT> { + extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> { private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator; @@ -56,7 +57,7 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> private ThrowingConsumer<StreamRecord<IN2>, Exception> processor2; /** The executor service for async state processing. */ - private ExecutorService executor; + private final ExecutorService executor; public static <K, IN1, IN2, OUT> AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> create( @@ -132,62 +133,85 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> return processor2; } + @Override public void processElement1(StreamRecord<IN1> element) throws Exception { execute(executor, (ignore) -> getRecordProcessor1().accept(element)).get(); } + @Override public void processElement1(IN1 value, long timestamp) throws Exception { processElement1(new StreamRecord<>(value, timestamp)); } + @Override public void processElement2(StreamRecord<IN2> element) throws Exception { execute(executor, (ignore) -> getRecordProcessor2().accept(element)).get(); } + @Override public void processElement2(IN2 value, long timestamp) throws Exception { processElement2(new StreamRecord<>(value, timestamp)); } + @Override public void processWatermark1(Watermark mark) throws Exception { execute(executor, (ignore) -> twoInputOperator.processWatermark1(mark)).get(); } + @Override public void processWatermark2(Watermark mark) throws Exception { execute(executor, (ignore) -> twoInputOperator.processWatermark2(mark)).get(); } + @Override + public void processBothWatermarks(Watermark mark) throws Exception { + execute(executor, (ignore) -> twoInputOperator.processWatermark1(mark)).get(); + execute(executor, (ignore) -> twoInputOperator.processWatermark2(mark)).get(); + } + + @Override public void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception { execute(executor, (ignore) -> twoInputOperator.processWatermarkStatus1(watermarkStatus)) .get(); } + @Override public void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception { execute(executor, (ignore) -> twoInputOperator.processWatermarkStatus2(watermarkStatus)) .get(); } + @Override public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception { execute(executor, (ignore) -> twoInputOperator.processRecordAttributes1(recordAttributes)) .get(); } + @Override public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception { execute(executor, (ignore) -> twoInputOperator.processRecordAttributes2(recordAttributes)) .get(); } + public void endInput1() throws Exception { + if (operator instanceof BoundedMultiInput) { + execute(executor, (ignore) -> ((BoundedMultiInput) operator).endInput(1)).get(); + } + } + + public void endInput2() throws Exception { + if (operator instanceof BoundedMultiInput) { + execute(executor, (ignore) -> ((BoundedMultiInput) operator).endInput(2)).get(); + } + } + public void drainStateRequests() throws Exception { execute(executor, (ignore) -> drain(operator)).get(); } @Override public void close() throws Exception { - execute( - executor, - (ignore) -> { - super.close(); - }) - .get(); + execute(executor, (ignore) -> super.close()).get(); executor.shutdown(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java index a88146551e3..0c5252e5e40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java @@ -42,7 +42,7 @@ public class AsyncProcessingTestUtil { public static CompletableFuture<Void> execute( ExecutorService executor, ThrowingConsumer<Void, Exception> processor) { - CompletableFuture<Void> future = new CompletableFuture(); + CompletableFuture<Void> future = new CompletableFuture<>(); executor.execute( () -> { try {