Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4910#discussion_r147661097
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
---
@@ -293,26 +330,110 @@ public void
initializeState(FunctionInitializationContext context) throws Except
}
// if in restore we didn't get any userContext or we are
initializing from scratch
if (userContext == null) {
- LOG.info("{} - no state to restore", name());
+ log.info("{} - no state to restore", name());
userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
- currentTransaction = beginTransaction();
- LOG.debug("{} - started new transaction '{}'", name(),
currentTransaction);
+ currentTransaction = beginTransaction0();
+ log.debug("{} - started new transaction '{}'", name(),
currentTransaction);
+ }
+
+ /**
+ * This method must be the only place to call {@link
#beginTransaction()} to ensure that the
+ * {@link TransactionHolder} is created at the same time.
+ */
+ private TransactionHolder<TXN> beginTransaction0() throws Exception {
+ return new TransactionHolder<>(beginTransaction(),
clock.millis());
+ }
+
+ /**
+ * This method must be the only place to call {@link
#recoverAndCommit(Object)} to ensure that
+ * the configuration parameters {@link #transactionTimeout} and
+ * {@link #failureOnCommitAfterTransactionTimeoutDisabled} are
respected.
+ */
+ private void recoverAndCommit(TransactionHolder<TXN> transactionHolder)
{
--- End diff --
ditto: overloading adds confusion, because it suggests that both methods
(`recoverAndCommit(TXN)` and `recoverAndCommit(TransactionHolder)`) are equally
valid and could be used interchangeably.
As above, rename to `recoverAndCommitHolder`, `recoverAndCommitWrapper`,
`recoverAndCommitInternal`, `recoverCommitAndHandleTimeout`
---