This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 3e1b2b7  [FLINK-18289][Checkpoint] Ensure notifyCheckpointAborted 
interface work in UDF operator
3e1b2b7 is described below

commit 3e1b2b7a33b83a8c7190670318f7eb13766008d4
Author: Yun Tang <[email protected]>
AuthorDate: Mon Jun 15 16:07:06 2020 +0800

    [FLINK-18289][Checkpoint] Ensure notifyCheckpointAborted interface work in 
UDF operator
---
 .../api/operators/AbstractUdfStreamOperator.java   |  9 +++++++++
 .../streaming/runtime/tasks/StreamTaskTest.java    | 22 ++++++++++++----------
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 71f3d3e..d9b5499c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -131,6 +131,15 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
                }
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) throws Exception 
{
+               super.notifyCheckpointAborted(checkpointId);
+
+               if (userFunction instanceof CheckpointListener) {
+                       ((CheckpointListener) 
userFunction).notifyCheckpointAborted(checkpointId);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Output type configuration
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 023f801..833f07b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -59,6 +60,7 @@ import 
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -981,12 +983,12 @@ public class StreamTaskTest extends TestLogger {
        }
 
        private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, 
?>> consumer) throws Exception {
-               FailOnNotifyCheckpointOperator<Integer> operator = new 
FailOnNotifyCheckpointOperator<>();
+               StreamMap<Integer, Integer> streamMap = new StreamMap<>(new 
FailOnNotifyCheckpointMapper<>());
                MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
                        new 
MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
                                .addInput(BasicTypeInfo.INT_TYPE_INFO);
                StreamTaskMailboxTestHarness<Integer> harness = builder
-                       .setupOutputForSingletonOperatorChain(operator)
+                       .setupOutputForSingletonOperatorChain(streamMap)
                        .build();
 
                try {
@@ -2047,22 +2049,22 @@ public class StreamTaskTest extends TestLogger {
                }
        }
 
-       private static class FailOnNotifyCheckpointOperator<T> extends 
AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
+       private static class FailOnNotifyCheckpointMapper<T> implements 
MapFunction<T, T>, CheckpointListener {
+               private static final long serialVersionUID = 1L;
+
                @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       super.notifyCheckpointComplete(checkpointId);
-                       throw new ExpectedTestException();
+               public T map(T value) throws Exception {
+                       return value;
                }
 
                @Override
-               public void notifyCheckpointAborted(long checkpointId) throws 
Exception {
-                       super.notifyCheckpointAborted(checkpointId);
+               public void notifyCheckpointAborted(long checkpointId) {
                        throw new ExpectedTestException();
                }
 
                @Override
-               public void processElement(StreamRecord<T> element) throws 
Exception {
-
+               public void notifyCheckpointComplete(long checkpointId) {
+                       throw new ExpectedTestException();
                }
        }
 }

Reply via email to