[ 
https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112340#comment-16112340
 ] 

ASF GitHub Bot commented on FLINK-7210:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4368#discussion_r131070533
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
    @@ -51,39 +48,52 @@
      * @param <TXN> Transaction to store all of the information required to 
handle a transaction (must be Serializable)
      */
     @PublicEvolving
    -public abstract class TwoPhaseCommitSinkFunction<IN, TXN extends 
Serializable>
    +public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
                extends RichSinkFunction<IN>
                implements CheckpointedFunction, CheckpointListener {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
     
    -   protected final ListStateDescriptor<TransactionAndCheckpoint<TXN>> 
pendingCommitTransactionsDescriptor;
    +   protected final ListStateDescriptor<Map<Long, TXN>> 
pendingCommitTransactionsDescriptor;
        protected final ListStateDescriptor<TXN> pendingTransactionsDescriptor;
     
    -   protected final List<TransactionAndCheckpoint<TXN>> 
pendingCommitTransactions = new ArrayList<>();
    +   protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = 
new LinkedHashMap<>();
     
        @Nullable
        protected TXN currentTransaction;
        protected ListState<TXN> pendingTransactionsState;
    -   protected ListState<TransactionAndCheckpoint<TXN>> 
pendingCommitTransactionsState;
    -
    -   public TwoPhaseCommitSinkFunction(Class<TXN> txnClass) {
    -           this(
    -                   TypeInformation.of(txnClass),
    -                   TypeInformation.of(new 
TypeHint<TransactionAndCheckpoint<TXN>>() {}));
    -   }
    +   protected ListState<Map<Long, TXN>> pendingCommitTransactionsState;
    --- End diff --
    
    I think this has to be `ListState<Tuple2<Long, TXN>>` or the original 
`ListState<TransactionAndCheckpoint<TXN>>`.
    
    Using a map instead of a list for `pendingCommitTransactions ` is ok for 
bookkeeping, but when snapshotting this map we need to make sure the 
snapshotted transactions are state elements that can be redistributable 
independent of each other. 


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic 
> way)
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-7210
>                 URL: https://issues.apache.org/jira/browse/FLINK-7210
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - 
> two phase commit algorithm. It is used both in `BucketingSink` and in 
> `Pravega` sink and it will be used in `Kafka 0.11` connector. It would be 
> good to extract this common logic into one class, both to improve existing 
> implementation (for exampe `Pravega`'s sink doesn't abort interrupted 
> transactions) and to make it easier for the users to implement their own 
> custom exactly-once sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to