This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3165f942bad [FLINK-35904][test] Enhance test harness for async state 
processing with fast fail (#25858)
3165f942bad is described below

commit 3165f942badadd3e9e529dca754b7c09a90ea103
Author: Zakelly <[email protected]>
AuthorDate: Thu Dec 26 19:36:45 2024 +0800

    [FLINK-35904][test] Enhance test harness for async state processing with 
fast fail (#25858)
---
 ...ncKeyedMultiInputStreamOperatorTestHarness.java | 35 ++++++++++---
 ...syncKeyedOneInputStreamOperatorTestHarness.java | 61 ++++++++++++++--------
 ...syncKeyedTwoInputStreamOperatorTestHarness.java | 55 ++++++++++++-------
 .../asyncprocessing/AsyncProcessingTestUtil.java   |  9 ++--
 4 files changed, 111 insertions(+), 49 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
index 566424d7d21..c11221bcbc5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness;
+import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.List;
@@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.drain;
 import static 
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.execute;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * A test harness for testing a {@link MultipleInputStreamOperator}.
@@ -100,6 +102,8 @@ public class 
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
             setKeySelector(i, keySelectors.get(i));
         }
         this.executor = executor;
+        // Make environment record any failure
+        getEnvironment().setExpectedExternalFailureCause(Throwable.class);
     }
 
     public void setKeySelector(int idx, KeySelector<?, K> keySelector) {
@@ -114,21 +118,21 @@ public class 
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
         Input input = getCastedOperator().getInputs().get(idx);
         ThrowingConsumer<StreamRecord<?>, Exception> inputProcessor =
                 RecordProcessorUtils.getRecordProcessor(input);
-        execute(executor, (ignore) -> inputProcessor.accept(element)).get();
+        executeAndGet(() -> inputProcessor.accept(element));
     }
 
     @Override
     @SuppressWarnings("rawtypes")
     public void processWatermark(int idx, Watermark mark) throws Exception {
         Input input = getCastedOperator().getInputs().get(idx);
-        execute(executor, (ignore) -> input.processWatermark(mark)).get();
+        executeAndGet(() -> input.processWatermark(mark));
     }
 
     @Override
     @SuppressWarnings("rawtypes")
     public void processWatermarkStatus(int idx, WatermarkStatus 
watermarkStatus) throws Exception {
         Input input = getCastedOperator().getInputs().get(idx);
-        execute(executor, (ignore) -> 
input.processWatermarkStatus(watermarkStatus)).get();
+        executeAndGet(() -> input.processWatermarkStatus(watermarkStatus));
     }
 
     @Override
@@ -136,16 +140,35 @@ public class 
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
     public void processRecordAttributes(int idx, RecordAttributes 
recordAttributes)
             throws Exception {
         Input input = getCastedOperator().getInputs().get(idx);
-        execute(executor, (ignore) -> 
input.processRecordAttributes(recordAttributes)).get();
+        executeAndGet(() -> input.processRecordAttributes(recordAttributes));
     }
 
     public void drainStateRequests() throws Exception {
-        execute(executor, (ignore) -> drain(operator)).get();
+        executeAndGet(() -> drain(operator));
     }
 
     @Override
     public void close() throws Exception {
-        execute(executor, (ignore) -> super.close()).get();
+        executeAndGet(super::close);
         executor.shutdown();
     }
+
+    private void executeAndGet(RunnableWithException runnable) throws 
Exception {
+        execute(
+                        executor,
+                        () -> {
+                            checkEnvState();
+                            runnable.run();
+                        })
+                .get();
+        checkEnvState();
+    }
+
+    private void checkEnvState() {
+        if (getEnvironment().getActualExternalFailureCause().isPresent()) {
+            fail(
+                    "There is an error on other threads",
+                    getEnvironment().getActualExternalFailureCause().get());
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
index 6128a5b915a..e2101d88044 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.ArrayList;
@@ -45,8 +46,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.drain;
-import static 
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.execute;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * A test harness for testing a {@link OneInputStreamOperator} which uses 
async state.
@@ -120,6 +121,8 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, 
IN, OUT>
                 
keyType.createSerializer(executionConfig.getSerializerConfig()));
         config.serializeAllConfigs();
         this.executor = executor;
+        // Make environment record any failure
+        getEnvironment().setExpectedExternalFailureCause(Throwable.class);
     }
 
     @Override
@@ -146,16 +149,14 @@ public class 
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
             throws Exception {
         if (inputs.isEmpty()) {
             return execute(
-                    executor,
-                    (ignore) ->
+                    () ->
                             
RecordProcessorUtils.getRecordProcessor(getOneInputOperator())
                                     .accept(element));
         } else {
             checkState(inputs.size() == 1);
             Input input = inputs.get(0);
             return execute(
-                    executor,
-                    (ignore) ->
+                    () ->
                             ((ThrowingConsumer<StreamRecord, Exception>)
                                             
RecordProcessorUtils.getRecordProcessor(input))
                                     .accept(element));
@@ -181,12 +182,11 @@ public class 
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
     @SuppressWarnings("rawtypes")
     public CompletableFuture<Void> 
processWatermarkStatusInternal(WatermarkStatus status) {
         if (inputs.isEmpty()) {
-            return execute(
-                    executor, (ignore) -> 
getOneInputOperator().processWatermarkStatus(status));
+            return execute(() -> 
getOneInputOperator().processWatermarkStatus(status));
         } else {
             checkState(inputs.size() == 1);
             Input input = inputs.get(0);
-            return execute(executor, (ignore) -> 
input.processWatermarkStatus(status));
+            return execute(() -> input.processWatermarkStatus(status));
         }
     }
 
@@ -198,7 +198,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K, 
IN, OUT>
     @Override
     public void endInput() throws Exception {
         if (operator instanceof BoundedOneInput) {
-            execute(executor, (ignore) -> ((BoundedOneInput) 
operator).endInput()).get();
+            executeAndGet(() -> ((BoundedOneInput) operator).endInput());
         }
     }
 
@@ -207,11 +207,11 @@ public class 
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
     public CompletableFuture<Void> processWatermarkInternal(Watermark mark) {
         currentWatermark = mark.getTimestamp();
         if (inputs.isEmpty()) {
-            return execute(executor, (ignore) -> 
getOneInputOperator().processWatermark(mark));
+            return execute(() -> getOneInputOperator().processWatermark(mark));
         } else {
             checkState(inputs.size() == 1);
             Input input = inputs.get(0);
-            return execute(executor, (ignore) -> input.processWatermark(mark));
+            return execute(() -> input.processWatermark(mark));
         }
     }
 
@@ -223,12 +223,11 @@ public class 
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
     @SuppressWarnings("rawtypes")
     public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker 
marker) {
         if (inputs.isEmpty()) {
-            return execute(
-                    executor, (ignore) -> 
getOneInputOperator().processLatencyMarker(marker));
+            return execute(() -> 
getOneInputOperator().processLatencyMarker(marker));
         } else {
             checkState(inputs.size() == 1);
             Input input = inputs.get(0);
-            return execute(executor, (ignore) -> 
input.processLatencyMarker(marker));
+            return execute(() -> input.processLatencyMarker(marker));
         }
     }
 
@@ -247,28 +246,48 @@ public class 
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
     public CompletableFuture<Void> processRecordAttributesInternal(
             RecordAttributes recordAttributes) {
         if (inputs.isEmpty()) {
-            return execute(
-                    executor,
-                    (ignore) -> 
getOneInputOperator().processRecordAttributes(recordAttributes));
+            return execute(() -> 
getOneInputOperator().processRecordAttributes(recordAttributes));
         } else {
             checkState(inputs.size() == 1);
             Input input = inputs.get(0);
-            return execute(executor, (ignore) -> 
input.processRecordAttributes(recordAttributes));
+            return execute(() -> 
input.processRecordAttributes(recordAttributes));
         }
     }
 
     public void drainStateRequests() throws Exception {
-        execute(executor, (ignore) -> drain(operator)).get();
+        executeAndGet(() -> drain(operator));
     }
 
     @Override
     public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
-        execute(executor, (ignore) -> 
operator.prepareSnapshotPreBarrier(checkpointId)).get();
+        executeAndGet(() -> operator.prepareSnapshotPreBarrier(checkpointId));
     }
 
     @Override
     public void close() throws Exception {
-        execute(executor, (ignore) -> super.close()).get();
+        executeAndGet(super::close);
         executor.shutdown();
     }
+
+    private CompletableFuture<Void> execute(RunnableWithException runnable) {
+        return AsyncProcessingTestUtil.execute(
+                executor,
+                () -> {
+                    checkEnvState();
+                    runnable.run();
+                });
+    }
+
+    private void executeAndGet(RunnableWithException runnable) throws 
Exception {
+        execute(runnable).get();
+        checkEnvState();
+    }
+
+    private void checkEnvState() {
+        if (getEnvironment().getActualExternalFailureCause().isPresent()) {
+            fail(
+                    "There is an error on other threads",
+                    getEnvironment().getActualExternalFailureCause().get());
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
index 4128f073afd..107ba519fe7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.concurrent.CompletableFuture;
@@ -41,6 +42,7 @@ import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.drain;
 import static 
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.execute;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * A test harness for testing a {@link OneInputStreamOperator} which uses 
async state.
@@ -117,6 +119,8 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, 
IN1, IN2, OUT>
                 "Operator is not an AsyncStateProcessingOperator");
         this.twoInputOperator = operator;
         this.executor = executor;
+        // Make environment record any failure
+        getEnvironment().setExpectedExternalFailureCause(Throwable.class);
     }
 
     private ThrowingConsumer<StreamRecord<IN1>, Exception> 
getRecordProcessor1() {
@@ -135,7 +139,7 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, 
IN1, IN2, OUT>
 
     @Override
     public void processElement1(StreamRecord<IN1> element) throws Exception {
-        execute(executor, (ignore) -> 
getRecordProcessor1().accept(element)).get();
+        executeAndGet(() -> getRecordProcessor1().accept(element));
     }
 
     @Override
@@ -145,7 +149,7 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, 
IN1, IN2, OUT>
 
     @Override
     public void processElement2(StreamRecord<IN2> element) throws Exception {
-        execute(executor, (ignore) -> 
getRecordProcessor2().accept(element)).get();
+        executeAndGet(() -> getRecordProcessor2().accept(element));
     }
 
     @Override
@@ -155,63 +159,78 @@ public class 
AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
 
     @Override
     public void processWatermark1(Watermark mark) throws Exception {
-        execute(executor, (ignore) -> 
twoInputOperator.processWatermark1(mark)).get();
+        executeAndGet(() -> twoInputOperator.processWatermark1(mark));
     }
 
     @Override
     public void processWatermark2(Watermark mark) throws Exception {
-        execute(executor, (ignore) -> 
twoInputOperator.processWatermark2(mark)).get();
+        executeAndGet(() -> twoInputOperator.processWatermark2(mark));
     }
 
     @Override
     public void processBothWatermarks(Watermark mark) throws Exception {
-        execute(executor, (ignore) -> 
twoInputOperator.processWatermark1(mark)).get();
-        execute(executor, (ignore) -> 
twoInputOperator.processWatermark2(mark)).get();
+        executeAndGet(() -> twoInputOperator.processWatermark1(mark));
+        executeAndGet(() -> twoInputOperator.processWatermark2(mark));
     }
 
     @Override
     public void processWatermarkStatus1(WatermarkStatus watermarkStatus) 
throws Exception {
-        execute(executor, (ignore) -> 
twoInputOperator.processWatermarkStatus1(watermarkStatus))
-                .get();
+        executeAndGet(() -> 
twoInputOperator.processWatermarkStatus1(watermarkStatus));
     }
 
     @Override
     public void processWatermarkStatus2(WatermarkStatus watermarkStatus) 
throws Exception {
-        execute(executor, (ignore) -> 
twoInputOperator.processWatermarkStatus2(watermarkStatus))
-                .get();
+        executeAndGet(() -> 
twoInputOperator.processWatermarkStatus2(watermarkStatus));
     }
 
     @Override
     public void processRecordAttributes1(RecordAttributes recordAttributes) 
throws Exception {
-        execute(executor, (ignore) -> 
twoInputOperator.processRecordAttributes1(recordAttributes))
-                .get();
+        executeAndGet(() -> 
twoInputOperator.processRecordAttributes1(recordAttributes));
     }
 
     @Override
     public void processRecordAttributes2(RecordAttributes recordAttributes) 
throws Exception {
-        execute(executor, (ignore) -> 
twoInputOperator.processRecordAttributes2(recordAttributes))
-                .get();
+        executeAndGet(() -> 
twoInputOperator.processRecordAttributes2(recordAttributes));
     }
 
     public void endInput1() throws Exception {
         if (operator instanceof BoundedMultiInput) {
-            execute(executor, (ignore) -> ((BoundedMultiInput) 
operator).endInput(1)).get();
+            executeAndGet(() -> ((BoundedMultiInput) operator).endInput(1));
         }
     }
 
     public void endInput2() throws Exception {
         if (operator instanceof BoundedMultiInput) {
-            execute(executor, (ignore) -> ((BoundedMultiInput) 
operator).endInput(2)).get();
+            executeAndGet(() -> ((BoundedMultiInput) operator).endInput(2));
         }
     }
 
     public void drainStateRequests() throws Exception {
-        execute(executor, (ignore) -> drain(operator)).get();
+        executeAndGet(() -> drain(operator));
     }
 
     @Override
     public void close() throws Exception {
-        execute(executor, (ignore) -> super.close()).get();
+        executeAndGet(super::close);
         executor.shutdown();
     }
+
+    private void executeAndGet(RunnableWithException runnable) throws 
Exception {
+        execute(
+                        executor,
+                        () -> {
+                            checkEnvState();
+                            runnable.run();
+                        })
+                .get();
+        checkEnvState();
+    }
+
+    private void checkEnvState() {
+        if (getEnvironment().getActualExternalFailureCause().isPresent()) {
+            fail(
+                    "There is an error on other threads",
+                    getEnvironment().getActualExternalFailureCause().get());
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
index 0c5252e5e40..2e222e0893f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.util.asyncprocessing;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.RunnableWithException;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -41,15 +41,16 @@ public class AsyncProcessingTestUtil {
     }
 
     public static CompletableFuture<Void> execute(
-            ExecutorService executor, ThrowingConsumer<Void, Exception> 
processor) {
+            ExecutorService executor, RunnableWithException processor) {
         CompletableFuture<Void> future = new CompletableFuture<>();
         executor.execute(
                 () -> {
                     try {
-                        processor.accept(null);
+                        processor.run();
                         future.complete(null);
                     } catch (Exception e) {
-                        throw new RuntimeException(e);
+                        // Notify the outside future.
+                        future.completeExceptionally(e);
                     }
                 });
         return future;

Reply via email to