curcur commented on a change in pull request #16768:
URL: https://github.com/apache/flink/pull/16768#discussion_r688219308



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
##########
@@ -332,11 +346,18 @@ public void snapshotState(FunctionSnapshotContext 
context) throws Exception {
                 context.getCheckpointId(),
                 currentTransactionHolder);
 
-        preCommit(currentTransactionHolder.handle);
-        pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
-        LOG.debug("{} - stored pending transactions {}", name(), 
pendingCommitTransactions);
+        if (!currentTransactionHolder.equals(TransactionHolder.empty())) {

Review comment:
       That is a good point, the problem with this is from the test-result 
point of view, the results are the same. 
   
   But reading the test code again, I think it could be tested by adding two 
additional checkpoint after the finish.
   
   In that case, if the previous empty transaction holder is pre-committed, an 
exception will be thrown because its TXN is null.
   
   Additionally, I can add another test to make sure no data is expected to 
injected after finish (the checkNotNull does work)
   
   WDYT?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to