akalash commented on a change in pull request #16768:
URL: https://github.com/apache/flink/pull/16768#discussion_r687955286
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
##########
@@ -332,11 +346,18 @@ public void snapshotState(FunctionSnapshotContext
context) throws Exception {
context.getCheckpointId(),
currentTransactionHolder);
- preCommit(currentTransactionHolder.handle);
- pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
- LOG.debug("{} - stored pending transactions {}", name(),
pendingCommitTransactions);
+ if (!currentTransactionHolder.equals(TransactionHolder.empty())) {
Review comment:
As I understand, we don't have the test which covers this condition. I
mean it makes sense to write the test when currentTransactionHolder is empty.
What do you think?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
##########
@@ -230,11 +239,16 @@ public final void invoke(IN value) throws Exception {}
@Override
public final void invoke(IN value, Context context) throws Exception {
- invoke(currentTransactionHolder.handle, value, context);
+ TXN currentTransaction = currentTransaction();
+ checkNotNull(
+ currentTransaction,
+ "two phase commit sink function with null transaction should
not be invoked! ");
Review comment:
minor: Capitalise the first word? Remove the last extra space after the
exclamation point?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
##########
@@ -383,12 +404,14 @@ public void initializeState(FunctionInitializationContext
context) throws Except
{
TXN transaction =
operatorState.getPendingTransaction().handle;
- recoverAndAbort(transaction);
- handledTransactions.add(transaction);
- LOG.info(
- "{} aborted recovered transaction {}",
- name(),
- operatorState.getPendingTransaction());
+ if (transaction != null) {
Review comment:
The same question about test coverage here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]