davidradl commented on code in PR #154:
URL: 
https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2029031671


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Implementations of an abort strategy for transactions left over from 
previous runs. */
+@Internal
+public enum TransactionAbortStrategyImpl {
+    /**
+     * The probing strategy starts with aborting a set of known transactional 
ids from the recovered
+     * state and then continues guessing if more transactions may have been 
opened between this run
+     * and the last successful checkpoint. This also accounts for rescaling in 
between the attempts.
+     *
+     * <p>However, the probing is not side-effect free, which leads to an 
ever-increasing search
+     * space for the next probing attempt in case of a restart loop. It will 
fix eventually on the
+     * next successful checkpoints. It's recommended to use this strategy only 
with a strict restart
+     * policy that prevents tight restart loops (e.g. incremental backoff or 
hard failure after X
+     * attempts).
+     *
+     * <p>This is exactly the same behavior as in flink-connector-kafka 3.X.
+     *
+     * <p>Note that this strategy is not abiding by the strict ownership model 
introduced with
+     * {@link #LISTING} but it also doesn't need to for it to work.
+     */
+    PROBING {
+        @Override
+        public void abortTransactions(Context context) {
+            for (String prefix : context.getPrefixesToAbort()) {
+                abortTransactionsWithPrefix(prefix, context);
+            }
+        }
+
+        /**
+         * Aborts all transactions that have been created by this subtask in a 
previous run.
+         *
+         * <p>It also aborts transactions from subtasks that may have been 
removed because of
+         * downscaling.
+         *
+         * <p>When Flink downscales X subtasks to Y subtasks, then subtask i 
is responsible for
+         * cleaning all subtasks j in [0; X), where j % Y = i. For example, if 
we downscale to 2,
+         * then subtask 0 is responsible for all even and subtask 1 for all 
odd subtasks.
+         */
+        private void abortTransactionsWithPrefix(String prefix, Context 
context) {
+            for (int subtaskId = context.getCurrentSubtaskId();
+                    ;
+                    subtaskId += context.getCurrentParallelism()) {
+                if (abortTransactionOfSubtask(prefix, subtaskId, context) == 
0) {
+                    // If Flink didn't abort any transaction for current 
subtask, then we assume
+                    // that no
+                    // such subtask existed and no subtask with a higher 
number as well.
+                    break;
+                }
+            }
+        }
+
+        /**
+         * Aborts all transactions that have been created by a subtask in a 
previous run after the
+         * given checkpoint id.
+         *
+         * <p>We assume that transaction ids are consecutively used and thus 
Flink can stop aborting
+         * as soon as Flink notices that a particular transaction id was 
unused.
+         */
+        private int abortTransactionOfSubtask(String prefix, int subtaskId, 
Context context) {
+            int numTransactionAborted = 0;
+            for (long checkpointId = context.getStartCheckpointId();
+                    ;
+                    checkpointId++, numTransactionAborted++) {
+                // initTransactions fences all old transactions with the same 
id by bumping the
+                // epoch
+                String transactionalId =
+                        TransactionalIdFactory.buildTransactionalId(
+                                prefix, subtaskId, checkpointId);
+                int epoch = 
context.getTransactionAborter().abortTransaction(transactionalId);
+                // An epoch of 0 indicates that the id was unused before
+                if (epoch == 0) {
+                    // Note that the check works beyond transaction log 
timeouts and just depends on
+                    // the
+                    // retention of the transaction topic (typically 7d). Any 
transaction that is
+                    // not in
+                    // the that topic anymore is also not lingering (i.e., it 
will not block
+                    // downstream
+                    // from reading)
+                    // This method will only cease to work if transaction log 
timeout = topic
+                    // retention
+                    // and a user didn't restart the application for that 
period of time. Then the
+                    // first
+                    // transactions would vanish from the topic while later 
transactions are still
+                    // lingering until they are cleaned up by Kafka. Then the 
user has to wait until
+                    // the
+                    // other transactions are timed out (which shouldn't take 
too long).
+                    break;
+                }
+            }
+            return numTransactionAborted;
+        }
+    },
+    LISTING {
+        @Override
+        public void abortTransactions(Context context) {
+            Collection<String> openTransactionsForTopics = 
context.getOpenTransactionsForTopics();
+
+            if (openTransactionsForTopics.isEmpty()) {
+                return;
+            }
+
+            List<String> openTransactionsForSubtask =
+                    openTransactionsForTopics.stream()
+                            // ignore transactions from other applications
+                            .filter(name -> hasKnownPrefix(name, context))
+                            //  look only at transactions owned by this subtask
+                            .filter(context::ownsTransactionalId)
+                            .collect(Collectors.toList());
+
+            LOG.warn(
+                    "Found {} open transactions for subtask {}: {}",
+                    openTransactionsForSubtask.size(),
+                    context.getCurrentSubtaskId(),
+                    openTransactionsForSubtask);
+            // look only at transactions coming from this application
+            // remove transactions that are owned by the committer
+            TransactionAborter transactionAborter = 
context.getTransactionAborter();
+            for (String name : openTransactionsForSubtask) {
+                if (context.getPrecommittedTransactionalIds().contains(name)) {
+                    LOG.debug(
+                            "Skipping transaction {} because it's in the list 
of transactions to be committed",
+                            name);
+                    continue;
+                }
+                transactionAborter.abortTransaction(name);
+            }
+        }
+
+        private boolean hasKnownPrefix(String name, Context context) {
+            return 
context.getPrefixesToAbort().stream().anyMatch(name::startsWith);
+        }
+    };
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TransactionAbortStrategyImpl.class);
+
+    /** Aborts all transactions that have been created by this subtask in a 
previous run. */
+    public abstract void abortTransactions(Context context);
+
+    /** Injects the actual abortion of the transactional id generated by one 
of the strategies. */

Review Comment:
   Please could we use the word `abort` `aborted` `aborting`, which seems to be 
the words used in reference to transactions. I think we should avoid `abortion` 
in any text or variable/method name, as it could be triggering.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to