[hotfix][streaming] Allow to override methods from TwoPhaseCommitSinkFunction

This allow for some custom user logic during handling checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d9cdcba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d9cdcba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d9cdcba

Branch: refs/heads/master
Commit: 9d9cdcbad6ccf353a1252866f6a56ac505bfaa95
Parents: 959d54f
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Mon Aug 14 15:45:45 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200

----------------------------------------------------------------------
 .../api/functions/sink/TwoPhaseCommitSinkFunction.java          | 4 ++--
 .../api/functions/sink/TwoPhaseCommitSinkFunctionTest.java      | 5 ++++-
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 77f74fe..18f74b6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -222,7 +222,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
        }
 
        @Override
-       public final void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
                // this is like the pre-commit of a 2-phase-commit transaction
                // we are ready to commit and remember the transaction
 
@@ -246,7 +246,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
        }
 
        @Override
-       public final void initializeState(FunctionInitializationContext 
context) throws Exception {
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
                // when we are restoring state with pendingCommitTransactions, 
we don't really know whether the
                // transactions were already committed, or whether there was a 
failure between
                // completing the checkpoint on the master, and notifying the 
writer here.

http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index b9097d7..4715c39 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -155,7 +155,10 @@ public class TwoPhaseCommitSinkFunctionTest {
                @Override
                protected void commit(FileTransaction transaction) {
                        try {
-                               Files.move(transaction.tmpFile.toPath(), new 
File(targetDirectory, transaction.tmpFile.getName()).toPath(), ATOMIC_MOVE);
+                               Files.move(
+                                       transaction.tmpFile.toPath(),
+                                       new File(targetDirectory, 
transaction.tmpFile.getName()).toPath(),
+                                       ATOMIC_MOVE);
                        } catch (IOException e) {
                                throw new IllegalStateException(e);
                        }

Reply via email to