AHeise commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r698546414



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+                prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+                LOG.warn(
+                        "Transactional id prefix from previous execution {} 
has changed to {}.",
+                        lastState.getTransactionalIdPrefix(),
+                        transactionalIdPrefix);
+            }
         }
-        final Map<Integer, KafkaWriterState> taskOffsetMapping =
-                recoveredStates.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        KafkaWriterState::getSubtaskId, 
Function.identity()));
-        checkState(
-                taskOffsetMapping.containsKey(subtaskId),
-                "Internal error: It is expected that state from previous 
executions is distributed to the same subtask id.");
-        final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
-        taskOffsetMapping.remove(subtaskId);
-        abortTransactions(
-                getTransactionsToAbort(lastState, new 
ArrayList<>(taskOffsetMapping.values())));
-        if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
-            LOG.warn(
-                    "Transactional id prefix from previous execution {} has 
changed to {}.",
-                    lastState.getTransactionalIdPrefix(),
-                    transactionalIdPrefix);
-            return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+        final Properties properties = new Properties();
+        properties.putAll(kafkaProducerConfig);
+        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");
+        try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                new FlinkKafkaInternalProducer<>(properties)) {
+            for (String prefix : prefixesToAbort) {
+                abortTransactionsWithPrefix(producer, prefix, 
startCheckpointId);
+            }
         }
-        return new KafkaWriterState(
-                transactionalIdPrefix, subtaskId, 
lastState.getTransactionalIdOffset());
     }
 
-    private void abortTransactions(List<String> transactionsToAbort) {
-        transactionsToAbort.forEach(
-                transaction -> {
-                    // don't mess with the original configuration or any other
-                    // properties of the
-                    // original object
-                    // -> create an internal kafka producer on our own and do 
not rely
-                    // on
-                    //    initTransactionalProducer().
-                    final Properties myConfig = new Properties();
-                    myConfig.putAll(kafkaProducerConfig);
-                    myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transaction);
-                    LOG.info("Aborting Kafka transaction {}.", transaction);
-                    FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = 
null;
-                    try {
-                        kafkaProducer = new 
FlinkKafkaInternalProducer<>(myConfig);
-                        // it suffices to call initTransactions - this will 
abort any
-                        // lingering transactions
-                        kafkaProducer.initTransactions();
-                    } finally {
-                        if (kafkaProducer != null) {
-                            kafkaProducer.close(Duration.ofSeconds(0));
-                        }
-                    }
-                });
+    /**
+     * Aborts all transactions that have been created by this subtask in a 
previous run.
+     *
+     * <p>It also aborts transactions from subtasks that may have been removed 
because of
+     * downscaling.
+     *
+     * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is 
responsible for cleaning
+     * all subtasks j in [0; X), where j % Y = i. For example, if we downscale 
to 2, then subtask 0
+     * is responsible for all even and subtask 1 for all odd subtasks.
+     */
+    private void abortTransactionsWithPrefix(

Review comment:
       But this is exactly the same thing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to