This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3165f942bad [FLINK-35904][test] Enhance test harness for async state
processing with fast fail (#25858)
3165f942bad is described below
commit 3165f942badadd3e9e529dca754b7c09a90ea103
Author: Zakelly <[email protected]>
AuthorDate: Thu Dec 26 19:36:45 2024 +0800
[FLINK-35904][test] Enhance test harness for async state processing with
fast fail (#25858)
---
...ncKeyedMultiInputStreamOperatorTestHarness.java | 35 ++++++++++---
...syncKeyedOneInputStreamOperatorTestHarness.java | 61 ++++++++++++++--------
...syncKeyedTwoInputStreamOperatorTestHarness.java | 55 ++++++++++++-------
.../asyncprocessing/AsyncProcessingTestUtil.java | 9 ++--
4 files changed, 111 insertions(+), 49 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
index 566424d7d21..c11221bcbc5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
@@ -31,6 +31,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness;
+import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import java.util.List;
@@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
import static
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.drain;
import static
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.execute;
+import static org.assertj.core.api.Assertions.fail;
/**
* A test harness for testing a {@link MultipleInputStreamOperator}.
@@ -100,6 +102,8 @@ public class
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
setKeySelector(i, keySelectors.get(i));
}
this.executor = executor;
+ // Make environment record any failure
+ getEnvironment().setExpectedExternalFailureCause(Throwable.class);
}
public void setKeySelector(int idx, KeySelector<?, K> keySelector) {
@@ -114,21 +118,21 @@ public class
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
Input input = getCastedOperator().getInputs().get(idx);
ThrowingConsumer<StreamRecord<?>, Exception> inputProcessor =
RecordProcessorUtils.getRecordProcessor(input);
- execute(executor, (ignore) -> inputProcessor.accept(element)).get();
+ executeAndGet(() -> inputProcessor.accept(element));
}
@Override
@SuppressWarnings("rawtypes")
public void processWatermark(int idx, Watermark mark) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
- execute(executor, (ignore) -> input.processWatermark(mark)).get();
+ executeAndGet(() -> input.processWatermark(mark));
}
@Override
@SuppressWarnings("rawtypes")
public void processWatermarkStatus(int idx, WatermarkStatus
watermarkStatus) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
- execute(executor, (ignore) ->
input.processWatermarkStatus(watermarkStatus)).get();
+ executeAndGet(() -> input.processWatermarkStatus(watermarkStatus));
}
@Override
@@ -136,16 +140,35 @@ public class
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
public void processRecordAttributes(int idx, RecordAttributes
recordAttributes)
throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
- execute(executor, (ignore) ->
input.processRecordAttributes(recordAttributes)).get();
+ executeAndGet(() -> input.processRecordAttributes(recordAttributes));
}
public void drainStateRequests() throws Exception {
- execute(executor, (ignore) -> drain(operator)).get();
+ executeAndGet(() -> drain(operator));
}
@Override
public void close() throws Exception {
- execute(executor, (ignore) -> super.close()).get();
+ executeAndGet(super::close);
executor.shutdown();
}
+
+ private void executeAndGet(RunnableWithException runnable) throws
Exception {
+ execute(
+ executor,
+ () -> {
+ checkEnvState();
+ runnable.run();
+ })
+ .get();
+ checkEnvState();
+ }
+
+ private void checkEnvState() {
+ if (getEnvironment().getActualExternalFailureCause().isPresent()) {
+ fail(
+ "There is an error on other threads",
+ getEnvironment().getActualExternalFailureCause().get());
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
index 6128a5b915a..e2101d88044 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
@@ -36,6 +36,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import java.util.ArrayList;
@@ -45,8 +46,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.drain;
-import static
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.execute;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.fail;
/**
* A test harness for testing a {@link OneInputStreamOperator} which uses
async state.
@@ -120,6 +121,8 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K,
IN, OUT>
keyType.createSerializer(executionConfig.getSerializerConfig()));
config.serializeAllConfigs();
this.executor = executor;
+ // Make environment record any failure
+ getEnvironment().setExpectedExternalFailureCause(Throwable.class);
}
@Override
@@ -146,16 +149,14 @@ public class
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
throws Exception {
if (inputs.isEmpty()) {
return execute(
- executor,
- (ignore) ->
+ () ->
RecordProcessorUtils.getRecordProcessor(getOneInputOperator())
.accept(element));
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
return execute(
- executor,
- (ignore) ->
+ () ->
((ThrowingConsumer<StreamRecord, Exception>)
RecordProcessorUtils.getRecordProcessor(input))
.accept(element));
@@ -181,12 +182,11 @@ public class
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
@SuppressWarnings("rawtypes")
public CompletableFuture<Void>
processWatermarkStatusInternal(WatermarkStatus status) {
if (inputs.isEmpty()) {
- return execute(
- executor, (ignore) ->
getOneInputOperator().processWatermarkStatus(status));
+ return execute(() ->
getOneInputOperator().processWatermarkStatus(status));
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
- return execute(executor, (ignore) ->
input.processWatermarkStatus(status));
+ return execute(() -> input.processWatermarkStatus(status));
}
}
@@ -198,7 +198,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K,
IN, OUT>
@Override
public void endInput() throws Exception {
if (operator instanceof BoundedOneInput) {
- execute(executor, (ignore) -> ((BoundedOneInput)
operator).endInput()).get();
+ executeAndGet(() -> ((BoundedOneInput) operator).endInput());
}
}
@@ -207,11 +207,11 @@ public class
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
public CompletableFuture<Void> processWatermarkInternal(Watermark mark) {
currentWatermark = mark.getTimestamp();
if (inputs.isEmpty()) {
- return execute(executor, (ignore) ->
getOneInputOperator().processWatermark(mark));
+ return execute(() -> getOneInputOperator().processWatermark(mark));
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
- return execute(executor, (ignore) -> input.processWatermark(mark));
+ return execute(() -> input.processWatermark(mark));
}
}
@@ -223,12 +223,11 @@ public class
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker
marker) {
if (inputs.isEmpty()) {
- return execute(
- executor, (ignore) ->
getOneInputOperator().processLatencyMarker(marker));
+ return execute(() ->
getOneInputOperator().processLatencyMarker(marker));
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
- return execute(executor, (ignore) ->
input.processLatencyMarker(marker));
+ return execute(() -> input.processLatencyMarker(marker));
}
}
@@ -247,28 +246,48 @@ public class
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
public CompletableFuture<Void> processRecordAttributesInternal(
RecordAttributes recordAttributes) {
if (inputs.isEmpty()) {
- return execute(
- executor,
- (ignore) ->
getOneInputOperator().processRecordAttributes(recordAttributes));
+ return execute(() ->
getOneInputOperator().processRecordAttributes(recordAttributes));
} else {
checkState(inputs.size() == 1);
Input input = inputs.get(0);
- return execute(executor, (ignore) ->
input.processRecordAttributes(recordAttributes));
+ return execute(() ->
input.processRecordAttributes(recordAttributes));
}
}
public void drainStateRequests() throws Exception {
- execute(executor, (ignore) -> drain(operator)).get();
+ executeAndGet(() -> drain(operator));
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
- execute(executor, (ignore) ->
operator.prepareSnapshotPreBarrier(checkpointId)).get();
+ executeAndGet(() -> operator.prepareSnapshotPreBarrier(checkpointId));
}
@Override
public void close() throws Exception {
- execute(executor, (ignore) -> super.close()).get();
+ executeAndGet(super::close);
executor.shutdown();
}
+
+ private CompletableFuture<Void> execute(RunnableWithException runnable) {
+ return AsyncProcessingTestUtil.execute(
+ executor,
+ () -> {
+ checkEnvState();
+ runnable.run();
+ });
+ }
+
+ private void executeAndGet(RunnableWithException runnable) throws
Exception {
+ execute(runnable).get();
+ checkEnvState();
+ }
+
+ private void checkEnvState() {
+ if (getEnvironment().getActualExternalFailureCause().isPresent()) {
+ fail(
+ "There is an error on other threads",
+ getEnvironment().getActualExternalFailureCause().get());
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
index 4128f073afd..107ba519fe7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
@@ -33,6 +33,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import java.util.concurrent.CompletableFuture;
@@ -41,6 +42,7 @@ import java.util.concurrent.Executors;
import static
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.drain;
import static
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil.execute;
+import static org.assertj.core.api.Assertions.fail;
/**
* A test harness for testing a {@link OneInputStreamOperator} which uses
async state.
@@ -117,6 +119,8 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K,
IN1, IN2, OUT>
"Operator is not an AsyncStateProcessingOperator");
this.twoInputOperator = operator;
this.executor = executor;
+ // Make environment record any failure
+ getEnvironment().setExpectedExternalFailureCause(Throwable.class);
}
private ThrowingConsumer<StreamRecord<IN1>, Exception>
getRecordProcessor1() {
@@ -135,7 +139,7 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K,
IN1, IN2, OUT>
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
- execute(executor, (ignore) ->
getRecordProcessor1().accept(element)).get();
+ executeAndGet(() -> getRecordProcessor1().accept(element));
}
@Override
@@ -145,7 +149,7 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K,
IN1, IN2, OUT>
@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
- execute(executor, (ignore) ->
getRecordProcessor2().accept(element)).get();
+ executeAndGet(() -> getRecordProcessor2().accept(element));
}
@Override
@@ -155,63 +159,78 @@ public class
AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
@Override
public void processWatermark1(Watermark mark) throws Exception {
- execute(executor, (ignore) ->
twoInputOperator.processWatermark1(mark)).get();
+ executeAndGet(() -> twoInputOperator.processWatermark1(mark));
}
@Override
public void processWatermark2(Watermark mark) throws Exception {
- execute(executor, (ignore) ->
twoInputOperator.processWatermark2(mark)).get();
+ executeAndGet(() -> twoInputOperator.processWatermark2(mark));
}
@Override
public void processBothWatermarks(Watermark mark) throws Exception {
- execute(executor, (ignore) ->
twoInputOperator.processWatermark1(mark)).get();
- execute(executor, (ignore) ->
twoInputOperator.processWatermark2(mark)).get();
+ executeAndGet(() -> twoInputOperator.processWatermark1(mark));
+ executeAndGet(() -> twoInputOperator.processWatermark2(mark));
}
@Override
public void processWatermarkStatus1(WatermarkStatus watermarkStatus)
throws Exception {
- execute(executor, (ignore) ->
twoInputOperator.processWatermarkStatus1(watermarkStatus))
- .get();
+ executeAndGet(() ->
twoInputOperator.processWatermarkStatus1(watermarkStatus));
}
@Override
public void processWatermarkStatus2(WatermarkStatus watermarkStatus)
throws Exception {
- execute(executor, (ignore) ->
twoInputOperator.processWatermarkStatus2(watermarkStatus))
- .get();
+ executeAndGet(() ->
twoInputOperator.processWatermarkStatus2(watermarkStatus));
}
@Override
public void processRecordAttributes1(RecordAttributes recordAttributes)
throws Exception {
- execute(executor, (ignore) ->
twoInputOperator.processRecordAttributes1(recordAttributes))
- .get();
+ executeAndGet(() ->
twoInputOperator.processRecordAttributes1(recordAttributes));
}
@Override
public void processRecordAttributes2(RecordAttributes recordAttributes)
throws Exception {
- execute(executor, (ignore) ->
twoInputOperator.processRecordAttributes2(recordAttributes))
- .get();
+ executeAndGet(() ->
twoInputOperator.processRecordAttributes2(recordAttributes));
}
public void endInput1() throws Exception {
if (operator instanceof BoundedMultiInput) {
- execute(executor, (ignore) -> ((BoundedMultiInput)
operator).endInput(1)).get();
+ executeAndGet(() -> ((BoundedMultiInput) operator).endInput(1));
}
}
public void endInput2() throws Exception {
if (operator instanceof BoundedMultiInput) {
- execute(executor, (ignore) -> ((BoundedMultiInput)
operator).endInput(2)).get();
+ executeAndGet(() -> ((BoundedMultiInput) operator).endInput(2));
}
}
public void drainStateRequests() throws Exception {
- execute(executor, (ignore) -> drain(operator)).get();
+ executeAndGet(() -> drain(operator));
}
@Override
public void close() throws Exception {
- execute(executor, (ignore) -> super.close()).get();
+ executeAndGet(super::close);
executor.shutdown();
}
+
+ private void executeAndGet(RunnableWithException runnable) throws
Exception {
+ execute(
+ executor,
+ () -> {
+ checkEnvState();
+ runnable.run();
+ })
+ .get();
+ checkEnvState();
+ }
+
+ private void checkEnvState() {
+ if (getEnvironment().getActualExternalFailureCause().isPresent()) {
+ fail(
+ "There is an error on other threads",
+ getEnvironment().getActualExternalFailureCause().get());
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
index 0c5252e5e40..2e222e0893f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.util.asyncprocessing;
import
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
import
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorV2;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.RunnableWithException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -41,15 +41,16 @@ public class AsyncProcessingTestUtil {
}
public static CompletableFuture<Void> execute(
- ExecutorService executor, ThrowingConsumer<Void, Exception>
processor) {
+ ExecutorService executor, RunnableWithException processor) {
CompletableFuture<Void> future = new CompletableFuture<>();
executor.execute(
() -> {
try {
- processor.accept(null);
+ processor.run();
future.complete(null);
} catch (Exception e) {
- throw new RuntimeException(e);
+ // Notify the outside future.
+ future.completeExceptionally(e);
}
});
return future;