This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 3e1b2b7 [FLINK-18289][Checkpoint] Ensure notifyCheckpointAborted
interface work in UDF operator
3e1b2b7 is described below
commit 3e1b2b7a33b83a8c7190670318f7eb13766008d4
Author: Yun Tang <[email protected]>
AuthorDate: Mon Jun 15 16:07:06 2020 +0800
[FLINK-18289][Checkpoint] Ensure notifyCheckpointAborted interface work in
UDF operator
---
.../api/operators/AbstractUdfStreamOperator.java | 9 +++++++++
.../streaming/runtime/tasks/StreamTaskTest.java | 22 ++++++++++++----------
2 files changed, 21 insertions(+), 10 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 71f3d3e..d9b5499c 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -131,6 +131,15 @@ public abstract class AbstractUdfStreamOperator<OUT, F
extends Function>
}
}
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception
{
+ super.notifyCheckpointAborted(checkpointId);
+
+ if (userFunction instanceof CheckpointListener) {
+ ((CheckpointListener)
userFunction).notifyCheckpointAborted(checkpointId);
+ }
+ }
+
//
------------------------------------------------------------------------
// Output type configuration
//
------------------------------------------------------------------------
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 023f801..833f07b 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -59,6 +60,7 @@ import
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -981,12 +983,12 @@ public class StreamTaskTest extends TestLogger {
}
private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?,
?>> consumer) throws Exception {
- FailOnNotifyCheckpointOperator<Integer> operator = new
FailOnNotifyCheckpointOperator<>();
+ StreamMap<Integer, Integer> streamMap = new StreamMap<>(new
FailOnNotifyCheckpointMapper<>());
MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
new
MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new,
BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO);
StreamTaskMailboxTestHarness<Integer> harness = builder
- .setupOutputForSingletonOperatorChain(operator)
+ .setupOutputForSingletonOperatorChain(streamMap)
.build();
try {
@@ -2047,22 +2049,22 @@ public class StreamTaskTest extends TestLogger {
}
}
- private static class FailOnNotifyCheckpointOperator<T> extends
AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
+ private static class FailOnNotifyCheckpointMapper<T> implements
MapFunction<T, T>, CheckpointListener {
+ private static final long serialVersionUID = 1L;
+
@Override
- public void notifyCheckpointComplete(long checkpointId) throws
Exception {
- super.notifyCheckpointComplete(checkpointId);
- throw new ExpectedTestException();
+ public T map(T value) throws Exception {
+ return value;
}
@Override
- public void notifyCheckpointAborted(long checkpointId) throws
Exception {
- super.notifyCheckpointAborted(checkpointId);
+ public void notifyCheckpointAborted(long checkpointId) {
throw new ExpectedTestException();
}
@Override
- public void processElement(StreamRecord<T> element) throws
Exception {
-
+ public void notifyCheckpointComplete(long checkpointId) {
+ throw new ExpectedTestException();
}
}
}