Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4910#discussion_r147692599 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -58,18 +61,37 @@ extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener { - private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); + private final Logger log; - protected final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor; + private final Clock clock; - protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = new LinkedHashMap<>(); + protected final LinkedHashMap<Long, TransactionHolder<TXN>> pendingCommitTransactions = new LinkedHashMap<>(); - @Nullable - protected TXN currentTransaction; protected Optional<CONTEXT> userContext; protected ListState<State<TXN, CONTEXT>> state; + private final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor; + + private TransactionHolder<TXN> currentTransaction; + + /** + * Specifies the maximum time a transaction should remain open. + */ + private long transactionTimeout = Long.MAX_VALUE; + + /** + * If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught instead of + * propagated. + */ + private boolean failureOnCommitAfterTransactionTimeoutDisabled; --- End diff -- Part of my concern was that the name is quite long (same applies for the methods). Maybe `ignoreFailuresAfterTimeout`? Or `ignoreFailuresAfterTransactionTimeout`?
---