[hotfix][streaming] Allow to override methods from TwoPhaseCommitSinkFunction
This allow for some custom user logic during handling checkpoints. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d9cdcba Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d9cdcba Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d9cdcba Branch: refs/heads/master Commit: 9d9cdcbad6ccf353a1252866f6a56ac505bfaa95 Parents: 959d54f Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Mon Aug 14 15:45:45 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Aug 29 14:45:34 2017 +0200 ---------------------------------------------------------------------- .../api/functions/sink/TwoPhaseCommitSinkFunction.java | 4 ++-- .../api/functions/sink/TwoPhaseCommitSinkFunctionTest.java | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 77f74fe..18f74b6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -222,7 +222,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> } @Override - public final void snapshotState(FunctionSnapshotContext context) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { // this is like the pre-commit of a 2-phase-commit transaction // we are ready to commit and remember the transaction @@ -246,7 +246,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> } @Override - public final void initializeState(FunctionInitializationContext context) throws Exception { + public void initializeState(FunctionInitializationContext context) throws Exception { // when we are restoring state with pendingCommitTransactions, we don't really know whether the // transactions were already committed, or whether there was a failure between // completing the checkpoint on the master, and notifying the writer here. http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index b9097d7..4715c39 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -155,7 +155,10 @@ public class TwoPhaseCommitSinkFunctionTest { @Override protected void commit(FileTransaction transaction) { try { - Files.move(transaction.tmpFile.toPath(), new File(targetDirectory, transaction.tmpFile.getName()).toPath(), ATOMIC_MOVE); + Files.move( + transaction.tmpFile.toPath(), + new File(targetDirectory, transaction.tmpFile.getName()).toPath(), + ATOMIC_MOVE); } catch (IOException e) { throw new IllegalStateException(e); }