tillrohrmann closed pull request #7019: [BP-1.7][FLINK-10773] Harden resume
externalized checkpoint end-to-end test
URL: https://github.com/apache/flink/pull/7019
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index fb92960bb86..3c8d0ad537f 100644
---
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -322,6 +322,8 @@ static void setupEnvironment(StreamExecutionEnvironment
env, ParameterTool pt) t
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
+ private static final long serialVersionUID =
-3154419724891779938L;
+
@Override
public long extractTimestamp(Event element) {
return element.getEventTime();
@@ -367,8 +369,8 @@ static boolean isSimulateFailures(ParameterTool pt) {
return pt.getBoolean(TEST_SIMULATE_FAILURE.key(),
TEST_SIMULATE_FAILURE.defaultValue());
}
- static MapFunction<Event, Event>
createExceptionThrowingFailureMapper(ParameterTool pt) {
- return new ExceptionThrowingFailureMapper<>(
+ static MapFunction<Event, Event> createFailureMapper(ParameterTool pt) {
+ return new FailureMapper<>(
pt.getLong(
TEST_SIMULATE_FAILURE_NUM_RECORDS.key(),
TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()),
diff --git
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 70fdade4a1c..b14e2af1b52 100644
---
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -41,7 +41,7 @@
import static
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
import static
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
import static
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
-import static
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper;
+import static
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper;
import static
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
import static
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
import static
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
@@ -68,7 +68,7 @@
private static final String OPERATOR_STATE_OPER_NAME =
"ArtificalOperatorStateMapper";
private static final String TIME_WINDOW_OPER_NAME =
"TumblingWindowOperator";
private static final String SEMANTICS_CHECK_MAPPER_NAME =
"SemanticsCheckMapper";
- private static final String FAILURE_MAPPER_NAME =
"ExceptionThrowingFailureMapper";
+ private static final String FAILURE_MAPPER_NAME = "FailureMapper";
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -145,7 +145,7 @@ public void apply(Integer integer, TimeWindow window,
Iterable<Event> input, Col
if (isSimulateFailures(pt)) {
eventStream3 = eventStream3
- .map(createExceptionThrowingFailureMapper(pt))
+ .map(createFailureMapper(pt))
.setParallelism(1)
.name(FAILURE_MAPPER_NAME);
}
diff --git
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
similarity index 94%
rename from
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
rename to
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
index d758ef5cf0d..a3a1c253fc0 100644
---
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
+++
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
@@ -30,7 +30,7 @@
* of the operator can also be configured. Note that this also takes into
account
* failures that were not triggered by this mapper, e.g. TaskManager failures.
*/
-public class ExceptionThrowingFailureMapper<T> extends RichMapFunction<T, T>
implements CheckpointListener {
+public class FailureMapper<T> extends RichMapFunction<T, T> implements
CheckpointListener {
private static final long serialVersionUID = -5286927943454740016L;
@@ -41,7 +41,7 @@
private long numProcessedRecords;
private long numCompleteCheckpoints;
- public ExceptionThrowingFailureMapper(
+ public FailureMapper(
long numProcessedRecordsFailureThreshold,
long numCompleteCheckpointsFailureThreshold,
int maxNumFailures) {
diff --git a/flink-end-to-end-tests/test-scripts/common.sh
b/flink-end-to-end-tests/test-scripts/common.sh
index 09c42af8742..bdf6f64c5a8 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -347,6 +347,7 @@ function check_logs_for_exceptions {
| grep -v "java.lang.Exception: Execution was suspended" \
| grep -v "java.io.InvalidClassException:
org.apache.flink.formats.avro.typeutils.AvroSerializer" \
| grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
+ | grep -v "java.lang.Exception: Artificial failure" \
| grep -ic "exception")
if [[ ${exception_count} -gt 0 ]]; then
echo "Found exception in log files:"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services