[FLINK-6988] Make TwoPhaseCommitSinkFunction work with Context

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49cef0c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49cef0c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49cef0c0

Branch: refs/heads/master
Commit: 49cef0c0c24c15b668381ca590b87a62a14f75b5
Parents: 9a3621b
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Sep 25 16:16:34 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../functions/sink/TwoPhaseCommitSinkFunction.java    | 14 +++++++++++---
 .../sink/TwoPhaseCommitSinkFunctionTest.java          |  2 +-
 2 files changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49cef0c0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 6040979..2dfa292 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -107,7 +107,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
        /**
         * Write value within a transaction.
         */
-       protected abstract void invoke(TXN transaction, IN value) throws 
Exception;
+       protected abstract void invoke(TXN transaction, IN value, Context 
context) throws Exception;
 
        /**
         * Method that starts a new transaction.
@@ -159,9 +159,17 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
 
        // ------ entry points for above methods implementing 
{@CheckPointedFunction} and {@CheckpointListener} ------
 
+
+       /**
+        * This should not be implemented by subclasses.
+        */
+       @Override
+       public final void invoke(IN value) throws Exception {}
+
        @Override
-       public final void invoke(IN value) throws Exception {
-               invoke(currentTransaction, value);
+       public final void invoke(
+               IN value, Context context) throws Exception {
+               invoke(currentTransaction, value, context);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/49cef0c0/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 4715c39..3043512 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -136,7 +136,7 @@ public class TwoPhaseCommitSinkFunctionTest {
                }
 
                @Override
-               protected void invoke(FileTransaction transaction, String 
value) throws Exception {
+               protected void invoke(FileTransaction transaction, String 
value, Context context) throws Exception {
                        transaction.writer.write(value);
                }
 

Reply via email to