[ https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112340#comment-16112340 ]
ASF GitHub Bot commented on FLINK-7210: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4368#discussion_r131070533 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java --- @@ -51,39 +48,52 @@ * @param <TXN> Transaction to store all of the information required to handle a transaction (must be Serializable) */ @PublicEvolving -public abstract class TwoPhaseCommitSinkFunction<IN, TXN extends Serializable> +public abstract class TwoPhaseCommitSinkFunction<IN, TXN> extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class); - protected final ListStateDescriptor<TransactionAndCheckpoint<TXN>> pendingCommitTransactionsDescriptor; + protected final ListStateDescriptor<Map<Long, TXN>> pendingCommitTransactionsDescriptor; protected final ListStateDescriptor<TXN> pendingTransactionsDescriptor; - protected final List<TransactionAndCheckpoint<TXN>> pendingCommitTransactions = new ArrayList<>(); + protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = new LinkedHashMap<>(); @Nullable protected TXN currentTransaction; protected ListState<TXN> pendingTransactionsState; - protected ListState<TransactionAndCheckpoint<TXN>> pendingCommitTransactionsState; - - public TwoPhaseCommitSinkFunction(Class<TXN> txnClass) { - this( - TypeInformation.of(txnClass), - TypeInformation.of(new TypeHint<TransactionAndCheckpoint<TXN>>() {})); - } + protected ListState<Map<Long, TXN>> pendingCommitTransactionsState; --- End diff -- I think this has to be `ListState<Tuple2<Long, TXN>>` or the original `ListState<TransactionAndCheckpoint<TXN>>`. Using a map instead of a list for `pendingCommitTransactions ` is ok for bookkeeping, but when snapshotting this map we need to make sure the snapshotted transactions are state elements that can be redistributable independent of each other. > Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic > way) > ---------------------------------------------------------------------------------- > > Key: FLINK-7210 > URL: https://issues.apache.org/jira/browse/FLINK-7210 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > > To implement exactly-once sink there is a re-occurring pattern for doing it - > two phase commit algorithm. It is used both in `BucketingSink` and in > `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be > good to extract this common logic into one class, both to improve existing > implementation (for exampe `Pravega`'s sink doesn't abort interrupted > transactions) and to make it easier for the users to implement their own > custom exactly-once sinks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)