fapaul commented on code in PR #154:
URL: 
https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2024533233


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -293,22 +309,38 @@ private TransactionAbortStrategyContextImpl 
getTransactionAbortStrategyContext(
                     producerPool.recycle(producer);
                     return epoch;
                 };
-        Set<String> ongoingTransactionIds =
+        Set<String> precommittedTransactionalIds =
                 recoveredStates.stream()
                         .flatMap(
                                 s ->
-                                        s.getOngoingTransactions().stream()
+                                        
s.getPrecommittedTransactionalIds().stream()
                                                 
.map(CheckpointTransaction::getTransactionalId))
                         .collect(Collectors.toSet());
+        Set<Integer> oldSubtaskIds =
+                recoveredStates.stream()
+                        .map(KafkaWriterState::getSubtaskId)
+                        .collect(Collectors.toSet());
+        Integer oldParallelism =
+                recoveredStates.stream()
+                        .map(KafkaWriterState::getParallelism)
+                        .findFirst()
+                        .orElse(null);
+        // Special case: recovery without checkpoint -> first subtask owns 
everything
+        if (recoveredStates.isEmpty() && 
kafkaSinkContext.getParallelInstanceId() == 0) {

Review Comment:
   I might have missed something but I understood the current code as follows 
for this special case
   
   1. oldSubtaskIds = empty, oldParallelism =1 
   2. call `abortTransactions(Context context)`
   3. Get all open transactions for prefix 
https://github.com/apache/flink-connector-kafka/blob/85942c398c253471d602116bddd55c33ff17db8f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java#L132
   4. Filter by "ownership" 
https://github.com/apache/flink-connector-kafka/blob/85942c398c253471d602116bddd55c33ff17db8f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java#L139
   5. For all open transactions for subtask we abort 
https://github.com/apache/flink-connector-kafka/blob/85942c398c253471d602116bddd55c33ff17db8f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java#L156
   
   Isn't Step 4 returning always false and we don't abort anything because of 
https://github.com/apache/flink-connector-kafka/blob/85942c398c253471d602116bddd55c33ff17db8f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java#L209
 being empty due to no state
   
   



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java:
##########
@@ -104,4 +127,13 @@ public long getStartCheckpointId() {
     public TransactionAborter getTransactionAborter() {
         return transactionAborter;
     }
+
+    @Override
+    public void markTransactionalIdUnsafe(String transactionalId) {
+        this.unsafeTransactionalIds.add(transactionalId);
+    }
+
+    public Collection<String> getUnsafeTransactionalIds() {

Review Comment:
   Why is this method not part of the interface?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -277,6 +286,13 @@ private void abortLingeringTransactions(
         TransactionAbortStrategyContextImpl context =
                 getTransactionAbortStrategyContext(startCheckpointId, 
prefixesToAbort);
         transactionAbortStrategy.abortTransactions(context);
+        Collection<String> unsafeTransactionalIds = 
context.getUnsafeTransactionalIds();
+        if (!unsafeTransactionalIds.isEmpty()) {
+            namingContext.setUnsafeTransactionalIds(unsafeTransactionalIds);
+            LOG.info(
+                    "Unsafe transactional ids {} detected. They will be 
skipped until first checkpoint.",

Review Comment:
   I do not fully understand the comment. 
   
   I guess this relys on the fact that the committer during recovery tries to 
commit all outstanding transactions so we do not need to wait for 
`notifyCheckpointComplete`? My concern is that transactions are only committed 
on `notifyCheckpointComplete` that might not be aligned with the general 
checkpointing cycle.
   
   I also find `skipped` without context hard to understand. Maybe something 
like `reused` would be easier.
   



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -293,22 +309,38 @@ private TransactionAbortStrategyContextImpl 
getTransactionAbortStrategyContext(
                     producerPool.recycle(producer);
                     return epoch;
                 };
-        Set<String> ongoingTransactionIds =
+        Set<String> precommittedTransactionalIds =
                 recoveredStates.stream()
                         .flatMap(
                                 s ->
-                                        s.getOngoingTransactions().stream()
+                                        
s.getPrecommittedTransactionalIds().stream()
                                                 
.map(CheckpointTransaction::getTransactionalId))
                         .collect(Collectors.toSet());
+        Set<Integer> oldSubtaskIds =

Review Comment:
   Nit: I wonder whether the `getTransactionAbortStrategyContext` method isn't 
better placed inside the `TransactionAbortStrategyContextImpl` as a builder 
method or similar



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -293,22 +309,38 @@ private TransactionAbortStrategyContextImpl 
getTransactionAbortStrategyContext(
                     producerPool.recycle(producer);
                     return epoch;
                 };
-        Set<String> ongoingTransactionIds =
+        Set<String> precommittedTransactionalIds =
                 recoveredStates.stream()
                         .flatMap(
                                 s ->
-                                        s.getOngoingTransactions().stream()
+                                        
s.getPrecommittedTransactionalIds().stream()
                                                 
.map(CheckpointTransaction::getTransactionalId))
                         .collect(Collectors.toSet());
+        Set<Integer> oldSubtaskIds =
+                recoveredStates.stream()
+                        .map(KafkaWriterState::getSubtaskId)
+                        .collect(Collectors.toSet());
+        Integer oldParallelism =
+                recoveredStates.stream()
+                        .map(KafkaWriterState::getParallelism)
+                        .findFirst()
+                        .orElse(null);
+        // Special case: recovery without checkpoint -> first subtask owns 
everything
+        if (recoveredStates.isEmpty() && 
kafkaSinkContext.getParallelInstanceId() == 0) {
+            oldSubtaskIds = Set.of(0);
+            oldParallelism = 1;
+        }

Review Comment:
   I think the block is easier to understand with a dedicated method that 
returns immediately on the special condition. It also avoids reassigning the 
variables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to