fapaul commented on a change in pull request #16676:
URL: https://github.com/apache/flink/pull/16676#discussion_r683265825



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
##########
@@ -165,35 +161,11 @@ protected File execute(UnalignedSettings settings) throws 
Exception {
             miniCluster.after();
         }
         if (settings.generateCheckpoint) {
-            return Files.find(checkpointDir.toPath(), 2, 
this::isCompletedCheckpoint)
-                    .max(Comparator.comparing(Path::toString))
-                    .map(Path::toFile)
-                    .orElseThrow(() -> new IllegalStateException("Cannot 
generate checkpoint"));
+            return TestUtils.getMostRecentCompletedCheckpoint(checkpointDir);

Review comment:
       This is not easily possible because I cannot add the `TestUtils` 
dependency because it would introduce a cycling dependency. Therefore I left 
it, for now, to not spending more time entangling the dependencies.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
##########
@@ -165,35 +161,11 @@ protected File execute(UnalignedSettings settings) throws 
Exception {
             miniCluster.after();
         }
         if (settings.generateCheckpoint) {
-            return Files.find(checkpointDir.toPath(), 2, 
this::isCompletedCheckpoint)
-                    .max(Comparator.comparing(Path::toString))
-                    .map(Path::toFile)
-                    .orElseThrow(() -> new IllegalStateException("Cannot 
generate checkpoint"));
+            return TestUtils.getMostRecentCompletedCheckpoint(checkpointDir);

Review comment:
       This is not easily possible because I cannot add the `TestUtils` 
dependency because it would introduce a cycling dependency. Therefore I left 
it, for now, to not spending more time disentangling the dependencies.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.sink;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+/**
+ * A {@link KafkaProducer} that exposes private fields to allow resume 
producing from a given state.
+ */
+class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaInternalProducer.class);
+    private static final String TRANSACTION_MANAGER_STATE_ENUM =
+            
"org.apache.kafka.clients.producer.internals.TransactionManager$State";
+
+    private final Properties kafkaProducerConfig;
+    @Nullable private final String transactionalId;
+
+    public FlinkKafkaInternalProducer(Properties properties) {
+        super(properties);
+        this.kafkaProducerConfig = properties;
+        this.transactionalId = 
properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+    }
+
+    @Override
+    public void flush() {
+        super.flush();
+        if (transactionalId != null) {
+            flushNewPartitions();
+        }
+    }
+
+    public Properties getKafkaProducerConfig() {
+        return kafkaProducerConfig;
+    }
+
+    public short getEpoch() {
+        Object transactionManager = getField("transactionManager");
+        Object producerIdAndEpoch = getField(transactionManager, 
"producerIdAndEpoch");
+        return (short) getField(producerIdAndEpoch, "epoch");
+    }
+
+    public long getProducerId() {
+        Object transactionManager = getField("transactionManager");
+        Object producerIdAndEpoch = getField(transactionManager, 
"producerIdAndEpoch");
+        return (long) getField(producerIdAndEpoch, "producerId");
+    }
+
+    /**
+     * Besides committing {@link 
org.apache.kafka.clients.producer.KafkaProducer#commitTransaction}
+     * is also adding new partitions to the transaction. flushNewPartitions 
method is moving this
+     * logic to pre-commit/flush, to make resumeTransaction simpler. Otherwise 
resumeTransaction
+     * would require to restore state of the not yet added/"in-flight" 
partitions.
+     */
+    private void flushNewPartitions() {
+        LOG.info("Flushing new partitions");
+        TransactionalRequestResult result = enqueueNewPartitions();
+        Object sender = getField("sender");
+        invoke(sender, "wakeup");
+        result.await();
+    }
+
+    /**
+     * Enqueues new transactions at the transaction manager and returns a 
{@link
+     * TransactionalRequestResult} that allows waiting on them.
+     *
+     * <p>If there are no new transactions we return a {@link 
TransactionalRequestResult} that is
+     * already done.
+     */
+    private TransactionalRequestResult enqueueNewPartitions() {
+        Object transactionManager = getField("transactionManager");
+        synchronized (transactionManager) {
+            Object newPartitionsInTransaction =
+                    getField(transactionManager, "newPartitionsInTransaction");
+            Object newPartitionsInTransactionIsEmpty =
+                    invoke(newPartitionsInTransaction, "isEmpty");
+            TransactionalRequestResult result;
+            if (newPartitionsInTransactionIsEmpty instanceof Boolean
+                    && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+                Object txnRequestHandler =
+                        invoke(transactionManager, 
"addPartitionsToTransactionHandler");
+                invoke(
+                        transactionManager,
+                        "enqueueRequest",
+                        new Class[] 
{txnRequestHandler.getClass().getSuperclass()},
+                        new Object[] {txnRequestHandler});
+                result =
+                        (TransactionalRequestResult)
+                                getField(
+                                        txnRequestHandler,
+                                        
txnRequestHandler.getClass().getSuperclass(),
+                                        "result");
+            } else {
+                // we don't have an operation but this operation string is 
also used in
+                // addPartitionsToTransactionHandler.
+                result = new TransactionalRequestResult("AddPartitionsToTxn");
+                result.done();
+            }
+            return result;
+        }
+    }
+
+    private static Object invoke(Object object, String methodName, Object... 
args) {
+        Class<?>[] argTypes = new Class[args.length];
+        for (int i = 0; i < args.length; i++) {
+            argTypes[i] = args[i].getClass();
+        }
+        return invoke(object, methodName, argTypes, args);
+    }
+
+    private static Object invoke(
+            Object object, String methodName, Class<?>[] argTypes, Object[] 
args) {
+        try {
+            Method method = object.getClass().getDeclaredMethod(methodName, 
argTypes);
+            method.setAccessible(true);
+            return method.invoke(object, args);
+        } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
+            throw new RuntimeException("Incompatible KafkaProducer version", 
e);
+        }
+    }

Review comment:
       Can you give an example of what exactly you want to see here?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSinkBuilder.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.sink;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Builder to construct {@link KafkaSink}.
+ *
+ * <p>The following example shows the minimum setup to create a KafkaSink that 
writes String values
+ * to a Kafka topic.
+ *
+ * <pre>{@code
+ * KafkaSink<String> sink = KafkaSink
+ *     .<String>builder
+ *     .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
+ *     .setRecordSerializer(MY_RECORD_SERIALIZER)
+ *     .setKafkaProducerConfig(MY_PRODUCER_CONFIG)
+ *     .build();
+ * }</pre>
+ *
+ * <p>One can also configure different {@link DeliveryGuarantee} by using 
{@link
+ * #setDeliverGuarantee(DeliveryGuarantee)} but keep in mind when using {@link
+ * DeliveryGuarantee#EXACTLY_ONCE} one must set the transactionalIdPrefix 
{@link
+ * #setTransactionalIdPrefix(String)}.
+ *
+ * @param <IN> type of the records written to Kafka
+ */
+public class KafkaSinkBuilder<IN> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSinkBuilder.class);
+    private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = 
Duration.ofHours(1);
+
+    private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
+    private String transactionalIdPrefix = "kafka-sink";
+
+    private Properties kafkaProducerConfig;
+    private KafkaRecordSerializationSchema<IN> recordSerializer;
+    private String bootstrapServers;
+
+    /**
+     * Sets the wanted the {@link DeliveryGuarantee}. The default delivery 
guarantee is {@link
+     * #deliveryGuarantee}.
+     *
+     * @param deliveryGuarantee
+     * @return {@link KafkaSinkBuilder}
+     */
+    KafkaSinkBuilder<IN> setDeliverGuarantee(DeliveryGuarantee 
deliveryGuarantee) {
+        this.deliveryGuarantee = checkNotNull(deliveryGuarantee, "semantic");

Review comment:
       I will definitely leave the second parameter out next time but now I 
want to keep it consistent.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaTransactionLog.java
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.sink;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.CompleteAbort;
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.CompleteCommit;
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.Dead;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME;
+
+/**
+ * This class is responsible to provide the format of the used transationalIds 
and in case of an
+ * application restart query the open transactions and decide which must be 
aborted.
+ */
+class KafkaTransactionLog implements AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTransactionLog.class);
+    private static final Duration CONSUMER_POLL_DURATION = 
Duration.ofSeconds(1);
+    private static final Set<TransactionState> TERMINAL_TRANSACTION_STATES =
+            ImmutableSet.of(CompleteCommit, CompleteAbort, Dead);
+    private static final String TRANSACTIONAL_ID_DELIMITER = "-";
+    private static final int SUPPORTED_KAFKA_SCHEMA_VERSION = 0;
+
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaWriterState main;
+    private final int numberOfParallelSubtasks;
+    private final List<KafkaWriterState> combined;
+    private final OptionalInt minOthers;
+
+    /**
+     * Constructor creating a KafkaTransactionLog.
+     *
+     * @param kafkaConfig used to configure the {@link KafkaConsumer} to query 
the topic containing
+     *     the transaction information
+     * @param main the {@link KafkaWriterState} which was previously 
snapshotted by this subtask
+     * @param others the {@link KafkaWriterState}s which are from different 
subtasks i.e. in case of
+     *     a scale-in
+     * @param numberOfParallelSubtasks current number of parallel sink tasks
+     */
+    KafkaTransactionLog(
+            Properties kafkaConfig,
+            KafkaWriterState main,
+            List<KafkaWriterState> others,
+            int numberOfParallelSubtasks) {
+        this.main = checkNotNull(main, "mainState");
+        checkNotNull(others, "othersState");
+        this.minOthers =
+                others.stream()
+                        .map(KafkaWriterState::getSubtaskId)
+                        .mapToInt(Integer::intValue)
+                        .min();
+        this.combined =
+                new 
ImmutableList.Builder<KafkaWriterState>().add(main).addAll(others).build();
+        this.numberOfParallelSubtasks = numberOfParallelSubtasks;
+        final Properties copy = new Properties();
+        copy.putAll(checkNotNull(kafkaConfig, "kafkaConfig"));
+        copy.put("enable.auto.commit", false);
+        copy.put("key.deserializer", ByteArrayDeserializer.class.getName());
+        copy.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        copy.put("auto.offset.reset", "earliest");
+        copy.put("value.deserializer", ByteArrayDeserializer.class.getName());
+        copy.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        this.consumer = new KafkaConsumer<>(copy);
+        
this.consumer.subscribe(ImmutableList.of(TRANSACTION_STATE_TOPIC_NAME));
+    }
+
+    /**
+     * This method queries Kafka's internal transaction topic and filters the 
transactions for the
+     * following rules.
+     * <li>transaction is in no terminal state {@link
+     *     KafkaTransactionLog#TERMINAL_TRANSACTION_STATES}
+     * <li>transactionalIdPrefix equals the one from {@link #main}
+     * <li>Either [transaction's subtaskId % {@link #numberOfParallelSubtasks} 
= {@link #main}'s
+     *     subtaskId] or [transaction's subtaskId == {@link #combined}'s 
subtaskId && transaction's
+     *     checkpointOffset >= {@link #combined}'s checkpointOffset] or 
[transaction's subtaskId >
+     *     {@link #minOthers} && transaction's checkpointOffset == 0]
+     *
+     * @return all transactionIds which must be aborted before starting new 
transactions.
+     */
+    public Set<String> getTransactionsToAbort() {
+        ConsumerRecords<byte[], byte[]> records = 
consumer.poll(CONSUMER_POLL_DURATION);
+        final Set<String> openTransactions = new HashSet<>();
+        while (!records.isEmpty()) {
+            records.records(TRANSACTION_STATE_TOPIC_NAME)
+                    .forEach(maybeAddTransaction(openTransactions));
+            records = consumer.poll(CONSUMER_POLL_DURATION);
+        }
+        return openTransactions;
+    }
+
+    /**
+     * Constructs a transactionalId with the following format {@code
+     * transactionalIdPrefix-subtaskId-offset}.
+     *
+     * @param transactionalIdPrefix prefix for the id
+     * @param subtaskId describing the subtask which is opening the transaction
+     * @param offset an always incrementing number usually capturing the 
number of checkpoints taken
+     *     by the subtask
+     * @return transactionalId
+     */
+    public static String buildTransactionalId(
+            String transactionalIdPrefix, int subtaskId, long offset) {
+        final StringBuilder sb = new StringBuilder();
+        return sb.append(transactionalIdPrefix)
+                .append(TRANSACTIONAL_ID_DELIMITER)
+                .append(subtaskId)
+                .append(TRANSACTIONAL_ID_DELIMITER)
+                .append(offset)
+                .toString();
+    }
+
+    private Consumer<ConsumerRecord<byte[], byte[]>> maybeAddTransaction(
+            Set<String> openTransactions) {
+        return record -> {
+            final ByteBuffer keyBuffer = ByteBuffer.wrap(record.key());
+            checkKafkaSchemaVersionMatches(keyBuffer);
+            // Ignore 2 bytes because Kafka's internal representation
+            keyBuffer.getShort();
+            final String transactionalId = 
StandardCharsets.US_ASCII.decode(keyBuffer).toString();
+
+            final ByteBuffer valueBuffer = ByteBuffer.wrap(record.value());
+            checkKafkaSchemaVersionMatches(valueBuffer);
+            final TransactionState state =
+                    
TransactionState.fromByte(readTransactionState(valueBuffer));
+
+            LOG.debug("Transaction {} is in state {}", transactionalId, state);
+
+            // If the transaction is in a final state ignore it
+            if (isTransactionInFinalState(state)) {
+                openTransactions.remove(transactionalId);
+                return;
+            }
+
+            final Optional<KafkaWriterState> openTransactionOpt =
+                    fromTransactionalId(transactionalId);
+            // If the transactionalId does not follow the format ignore it
+            if (!openTransactionOpt.isPresent()) {
+                return;
+            }
+
+            final KafkaWriterState openTransaction = openTransactionOpt.get();
+            // If the transactionalId prefixes differ ignore
+            if (!openTransaction
+                    .getTransactionalIdPrefix()
+                    .equals(main.getTransactionalIdPrefix())) {
+                LOG.debug(
+                        "The transactionalId prefixes differ. Open: {}, 
Recovered: {}",
+                        openTransaction.getTransactionalIdPrefix(),
+                        main.getTransactionalIdPrefix());
+                return;
+            }
+
+            final int openSubtaskIndex = openTransaction.getSubtaskId();
+            final long openCheckpointOffset = 
openTransaction.getTransactionalIdOffset();
+
+            final boolean commonRemainder =
+                    openSubtaskIndex % numberOfParallelSubtasks == 
main.getSubtaskId()
+                            && openSubtaskIndex != main.getSubtaskId();
+            final boolean sameSubtaskHigherCheckpoint =
+                    hasSameSubtaskWithHigherCheckpoint(openSubtaskIndex, 
openCheckpointOffset);
+            final boolean higherSubtasksWithFirstCheckpoint =
+                    minOthers.isPresent()
+                            && minOthers.getAsInt() < openSubtaskIndex
+                            && openCheckpointOffset == 0;
+
+            if (!commonRemainder
+                    && !sameSubtaskHigherCheckpoint
+                    && !higherSubtasksWithFirstCheckpoint) {
+                return;
+            }
+            openTransactions.add(transactionalId);
+        };
+    }
+
+    @Override
+    public void close() {
+        consumer.close();
+    }
+
+    private boolean hasSameSubtaskWithHigherCheckpoint(
+            int openSubtaskIndex, long openCheckpointOffset) {
+        return combined.stream()
+                .anyMatch(
+                        state ->
+                                state.getSubtaskId() == openSubtaskIndex
+                                        && state.getTransactionalIdOffset()
+                                                <= openCheckpointOffset);
+    }
+
+    private static boolean isTransactionInFinalState(TransactionState state) {
+        return TERMINAL_TRANSACTION_STATES.contains(state);
+    }
+
+    private static Optional<KafkaWriterState> fromTransactionalId(String 
transactionalId) {
+        final String[] splits = 
transactionalId.split(TRANSACTIONAL_ID_DELIMITER);
+        if (splits.length < 3) {
+            LOG.debug("Transaction {} was not created by the Flink Kafka 
sink", transactionalId);
+            return Optional.empty();
+        }
+        try {
+            final long checkpointOffset = Long.parseLong(splits[splits.length 
- 1]);
+            final int subtaskId = Integer.parseInt(splits[splits.length - 2]);
+            return Optional.of(
+                    new KafkaWriterState(
+                            String.join(
+                                    TRANSACTIONAL_ID_DELIMITER,
+                                    Arrays.copyOfRange(splits, 0, 
splits.length - 2)),
+                            subtaskId,
+                            checkpointOffset));
+        } catch (NumberFormatException e) {
+            LOG.debug(
+                    "Transaction {} was not created by the Flink Kafka sink: 
{}",
+                    transactionalId,
+                    e);
+            return Optional.empty();
+        }
+    }
+
+    private static byte readTransactionState(ByteBuffer buffer) {
+        // producerId
+        buffer.getLong();
+        // epoch
+        buffer.getShort();
+        // transactionTimeout
+        buffer.getInt();
+        // statusKey
+        return buffer.get();
+    }
+
+    enum TransactionState {
+        Empty(Byte.parseByte("0")),

Review comment:
       I am not convinced by all options 😄  Do you mind if I leave it as is?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaTransactionLog.java
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.sink;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.CompleteAbort;
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.CompleteCommit;
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.Dead;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME;
+
+/**
+ * This class is responsible to provide the format of the used transationalIds 
and in case of an
+ * application restart query the open transactions and decide which must be 
aborted.
+ */
+class KafkaTransactionLog implements AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTransactionLog.class);
+    private static final Duration CONSUMER_POLL_DURATION = 
Duration.ofSeconds(1);
+    private static final Set<TransactionState> TERMINAL_TRANSACTION_STATES =
+            ImmutableSet.of(CompleteCommit, CompleteAbort, Dead);
+    private static final String TRANSACTIONAL_ID_DELIMITER = "-";
+    private static final int SUPPORTED_KAFKA_SCHEMA_VERSION = 0;
+
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaWriterState main;
+    private final int numberOfParallelSubtasks;
+    private final List<KafkaWriterState> combined;
+    private final OptionalInt minOthers;
+
+    /**
+     * Constructor creating a KafkaTransactionLog.
+     *
+     * @param kafkaConfig used to configure the {@link KafkaConsumer} to query 
the topic containing
+     *     the transaction information
+     * @param main the {@link KafkaWriterState} which was previously 
snapshotted by this subtask
+     * @param others the {@link KafkaWriterState}s which are from different 
subtasks i.e. in case of
+     *     a scale-in
+     * @param numberOfParallelSubtasks current number of parallel sink tasks
+     */
+    KafkaTransactionLog(
+            Properties kafkaConfig,
+            KafkaWriterState main,
+            List<KafkaWriterState> others,
+            int numberOfParallelSubtasks) {
+        this.main = checkNotNull(main, "mainState");
+        checkNotNull(others, "othersState");
+        this.minOthers =
+                others.stream()
+                        .map(KafkaWriterState::getSubtaskId)
+                        .mapToInt(Integer::intValue)
+                        .min();
+        this.combined =
+                new 
ImmutableList.Builder<KafkaWriterState>().add(main).addAll(others).build();
+        this.numberOfParallelSubtasks = numberOfParallelSubtasks;
+        final Properties copy = new Properties();
+        copy.putAll(checkNotNull(kafkaConfig, "kafkaConfig"));
+        copy.put("enable.auto.commit", false);
+        copy.put("key.deserializer", ByteArrayDeserializer.class.getName());
+        copy.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        copy.put("auto.offset.reset", "earliest");
+        copy.put("value.deserializer", ByteArrayDeserializer.class.getName());
+        copy.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        this.consumer = new KafkaConsumer<>(copy);
+        
this.consumer.subscribe(ImmutableList.of(TRANSACTION_STATE_TOPIC_NAME));
+    }
+
+    /**
+     * This method queries Kafka's internal transaction topic and filters the 
transactions for the
+     * following rules.
+     * <li>transaction is in no terminal state {@link
+     *     KafkaTransactionLog#TERMINAL_TRANSACTION_STATES}
+     * <li>transactionalIdPrefix equals the one from {@link #main}
+     * <li>Either [transaction's subtaskId % {@link #numberOfParallelSubtasks} 
= {@link #main}'s
+     *     subtaskId] or [transaction's subtaskId == {@link #combined}'s 
subtaskId && transaction's
+     *     checkpointOffset >= {@link #combined}'s checkpointOffset] or 
[transaction's subtaskId >
+     *     {@link #minOthers} && transaction's checkpointOffset == 0]
+     *
+     * @return all transactionIds which must be aborted before starting new 
transactions.
+     */
+    public Set<String> getTransactionsToAbort() {
+        ConsumerRecords<byte[], byte[]> records = 
consumer.poll(CONSUMER_POLL_DURATION);
+        final Set<String> openTransactions = new HashSet<>();
+        while (!records.isEmpty()) {
+            records.records(TRANSACTION_STATE_TOPIC_NAME)
+                    .forEach(maybeAddTransaction(openTransactions));
+            records = consumer.poll(CONSUMER_POLL_DURATION);
+        }
+        return openTransactions;
+    }
+
+    /**
+     * Constructs a transactionalId with the following format {@code
+     * transactionalIdPrefix-subtaskId-offset}.
+     *
+     * @param transactionalIdPrefix prefix for the id
+     * @param subtaskId describing the subtask which is opening the transaction
+     * @param offset an always incrementing number usually capturing the 
number of checkpoints taken
+     *     by the subtask
+     * @return transactionalId
+     */
+    public static String buildTransactionalId(
+            String transactionalIdPrefix, int subtaskId, long offset) {
+        final StringBuilder sb = new StringBuilder();
+        return sb.append(transactionalIdPrefix)
+                .append(TRANSACTIONAL_ID_DELIMITER)
+                .append(subtaskId)
+                .append(TRANSACTIONAL_ID_DELIMITER)
+                .append(offset)
+                .toString();
+    }
+
+    private Consumer<ConsumerRecord<byte[], byte[]>> maybeAddTransaction(
+            Set<String> openTransactions) {
+        return record -> {
+            final ByteBuffer keyBuffer = ByteBuffer.wrap(record.key());
+            checkKafkaSchemaVersionMatches(keyBuffer);
+            // Ignore 2 bytes because Kafka's internal representation
+            keyBuffer.getShort();
+            final String transactionalId = 
StandardCharsets.US_ASCII.decode(keyBuffer).toString();
+
+            final ByteBuffer valueBuffer = ByteBuffer.wrap(record.value());
+            checkKafkaSchemaVersionMatches(valueBuffer);
+            final TransactionState state =
+                    
TransactionState.fromByte(readTransactionState(valueBuffer));
+
+            LOG.debug("Transaction {} is in state {}", transactionalId, state);
+
+            // If the transaction is in a final state ignore it
+            if (isTransactionInFinalState(state)) {
+                openTransactions.remove(transactionalId);
+                return;
+            }
+
+            final Optional<KafkaWriterState> openTransactionOpt =
+                    fromTransactionalId(transactionalId);
+            // If the transactionalId does not follow the format ignore it
+            if (!openTransactionOpt.isPresent()) {
+                return;
+            }
+
+            final KafkaWriterState openTransaction = openTransactionOpt.get();
+            // If the transactionalId prefixes differ ignore
+            if (!openTransaction
+                    .getTransactionalIdPrefix()
+                    .equals(main.getTransactionalIdPrefix())) {
+                LOG.debug(
+                        "The transactionalId prefixes differ. Open: {}, 
Recovered: {}",
+                        openTransaction.getTransactionalIdPrefix(),
+                        main.getTransactionalIdPrefix());
+                return;
+            }
+
+            final int openSubtaskIndex = openTransaction.getSubtaskId();
+            final long openCheckpointOffset = 
openTransaction.getTransactionalIdOffset();
+
+            final boolean commonRemainder =
+                    openSubtaskIndex % numberOfParallelSubtasks == 
main.getSubtaskId()
+                            && openSubtaskIndex != main.getSubtaskId();
+            final boolean sameSubtaskHigherCheckpoint =
+                    hasSameSubtaskWithHigherCheckpoint(openSubtaskIndex, 
openCheckpointOffset);
+            final boolean higherSubtasksWithFirstCheckpoint =
+                    minOthers.isPresent()
+                            && minOthers.getAsInt() < openSubtaskIndex
+                            && openCheckpointOffset == 0;
+
+            if (!commonRemainder
+                    && !sameSubtaskHigherCheckpoint
+                    && !higherSubtasksWithFirstCheckpoint) {
+                return;
+            }
+            openTransactions.add(transactionalId);
+        };
+    }
+
+    @Override
+    public void close() {
+        consumer.close();
+    }
+
+    private boolean hasSameSubtaskWithHigherCheckpoint(
+            int openSubtaskIndex, long openCheckpointOffset) {
+        return combined.stream()
+                .anyMatch(
+                        state ->
+                                state.getSubtaskId() == openSubtaskIndex
+                                        && state.getTransactionalIdOffset()
+                                                <= openCheckpointOffset);
+    }
+
+    private static boolean isTransactionInFinalState(TransactionState state) {
+        return TERMINAL_TRANSACTION_STATES.contains(state);
+    }
+
+    private static Optional<KafkaWriterState> fromTransactionalId(String 
transactionalId) {
+        final String[] splits = 
transactionalId.split(TRANSACTIONAL_ID_DELIMITER);
+        if (splits.length < 3) {
+            LOG.debug("Transaction {} was not created by the Flink Kafka 
sink", transactionalId);
+            return Optional.empty();
+        }
+        try {
+            final long checkpointOffset = Long.parseLong(splits[splits.length 
- 1]);
+            final int subtaskId = Integer.parseInt(splits[splits.length - 2]);
+            return Optional.of(
+                    new KafkaWriterState(
+                            String.join(
+                                    TRANSACTIONAL_ID_DELIMITER,
+                                    Arrays.copyOfRange(splits, 0, 
splits.length - 2)),
+                            subtaskId,
+                            checkpointOffset));
+        } catch (NumberFormatException e) {
+            LOG.debug(
+                    "Transaction {} was not created by the Flink Kafka sink: 
{}",
+                    transactionalId,
+                    e);
+            return Optional.empty();
+        }
+    }
+
+    private static byte readTransactionState(ByteBuffer buffer) {
+        // producerId
+        buffer.getLong();
+        // epoch
+        buffer.getShort();
+        // transactionTimeout
+        buffer.getInt();
+        // statusKey
+        return buffer.get();
+    }

Review comment:
       Hmm, I am in slight favor to keep it like this to at least claim to know 
why this is the important byte. Do you think performance-wise it is necessary 
to do the point lookup?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaTransactionLog.java
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.sink;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.CompleteAbort;
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.CompleteCommit;
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.Dead;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME;
+
+/**
+ * This class is responsible to provide the format of the used transationalIds 
and in case of an
+ * application restart query the open transactions and decide which must be 
aborted.
+ */
+class KafkaTransactionLog implements AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTransactionLog.class);
+    private static final Duration CONSUMER_POLL_DURATION = 
Duration.ofSeconds(1);
+    private static final Set<TransactionState> TERMINAL_TRANSACTION_STATES =
+            ImmutableSet.of(CompleteCommit, CompleteAbort, Dead);
+    private static final String TRANSACTIONAL_ID_DELIMITER = "-";
+    private static final int SUPPORTED_KAFKA_SCHEMA_VERSION = 0;
+
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaWriterState main;
+    private final int numberOfParallelSubtasks;
+    private final List<KafkaWriterState> combined;
+    private final OptionalInt minOthers;
+
+    /**
+     * Constructor creating a KafkaTransactionLog.
+     *
+     * @param kafkaConfig used to configure the {@link KafkaConsumer} to query 
the topic containing
+     *     the transaction information
+     * @param main the {@link KafkaWriterState} which was previously 
snapshotted by this subtask
+     * @param others the {@link KafkaWriterState}s which are from different 
subtasks i.e. in case of
+     *     a scale-in
+     * @param numberOfParallelSubtasks current number of parallel sink tasks
+     */
+    KafkaTransactionLog(
+            Properties kafkaConfig,
+            KafkaWriterState main,
+            List<KafkaWriterState> others,

Review comment:
       I like the current way of having it explicitly. The impact of having a 
different item at the head of the list is quite critical.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to