fapaul commented on a change in pull request #16676:
URL: https://github.com/apache/flink/pull/16676#discussion_r683310573



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaTransactionLog.java
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.sink;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.CompleteAbort;
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.CompleteCommit;
+import static 
org.apache.flink.streaming.connectors.kafka.sink.KafkaTransactionLog.TransactionState.Dead;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME;
+
+/**
+ * This class is responsible to provide the format of the used transationalIds 
and in case of an
+ * application restart query the open transactions and decide which must be 
aborted.
+ */
+class KafkaTransactionLog implements AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTransactionLog.class);
+    private static final Duration CONSUMER_POLL_DURATION = 
Duration.ofSeconds(1);
+    private static final Set<TransactionState> TERMINAL_TRANSACTION_STATES =
+            ImmutableSet.of(CompleteCommit, CompleteAbort, Dead);
+    private static final String TRANSACTIONAL_ID_DELIMITER = "-";
+    private static final int SUPPORTED_KAFKA_SCHEMA_VERSION = 0;
+
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaWriterState main;
+    private final int numberOfParallelSubtasks;
+    private final List<KafkaWriterState> combined;
+    private final OptionalInt minOthers;
+
+    /**
+     * Constructor creating a KafkaTransactionLog.
+     *
+     * @param kafkaConfig used to configure the {@link KafkaConsumer} to query 
the topic containing
+     *     the transaction information
+     * @param main the {@link KafkaWriterState} which was previously 
snapshotted by this subtask
+     * @param others the {@link KafkaWriterState}s which are from different 
subtasks i.e. in case of
+     *     a scale-in
+     * @param numberOfParallelSubtasks current number of parallel sink tasks
+     */
+    KafkaTransactionLog(
+            Properties kafkaConfig,
+            KafkaWriterState main,
+            List<KafkaWriterState> others,
+            int numberOfParallelSubtasks) {
+        this.main = checkNotNull(main, "mainState");
+        checkNotNull(others, "othersState");
+        this.minOthers =
+                others.stream()
+                        .map(KafkaWriterState::getSubtaskId)
+                        .mapToInt(Integer::intValue)
+                        .min();
+        this.combined =
+                new 
ImmutableList.Builder<KafkaWriterState>().add(main).addAll(others).build();
+        this.numberOfParallelSubtasks = numberOfParallelSubtasks;
+        final Properties copy = new Properties();
+        copy.putAll(checkNotNull(kafkaConfig, "kafkaConfig"));
+        copy.put("enable.auto.commit", false);
+        copy.put("key.deserializer", ByteArrayDeserializer.class.getName());
+        copy.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        copy.put("auto.offset.reset", "earliest");
+        copy.put("value.deserializer", ByteArrayDeserializer.class.getName());
+        copy.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        this.consumer = new KafkaConsumer<>(copy);
+        
this.consumer.subscribe(ImmutableList.of(TRANSACTION_STATE_TOPIC_NAME));
+    }
+
+    /**
+     * This method queries Kafka's internal transaction topic and filters the 
transactions for the
+     * following rules.
+     * <li>transaction is in no terminal state {@link
+     *     KafkaTransactionLog#TERMINAL_TRANSACTION_STATES}
+     * <li>transactionalIdPrefix equals the one from {@link #main}
+     * <li>Either [transaction's subtaskId % {@link #numberOfParallelSubtasks} 
= {@link #main}'s
+     *     subtaskId] or [transaction's subtaskId == {@link #combined}'s 
subtaskId && transaction's
+     *     checkpointOffset >= {@link #combined}'s checkpointOffset] or 
[transaction's subtaskId >
+     *     {@link #minOthers} && transaction's checkpointOffset == 0]
+     *
+     * @return all transactionIds which must be aborted before starting new 
transactions.
+     */
+    public Set<String> getTransactionsToAbort() {
+        ConsumerRecords<byte[], byte[]> records = 
consumer.poll(CONSUMER_POLL_DURATION);
+        final Set<String> openTransactions = new HashSet<>();
+        while (!records.isEmpty()) {
+            records.records(TRANSACTION_STATE_TOPIC_NAME)
+                    .forEach(maybeAddTransaction(openTransactions));
+            records = consumer.poll(CONSUMER_POLL_DURATION);
+        }
+        return openTransactions;
+    }
+
+    /**
+     * Constructs a transactionalId with the following format {@code
+     * transactionalIdPrefix-subtaskId-offset}.
+     *
+     * @param transactionalIdPrefix prefix for the id
+     * @param subtaskId describing the subtask which is opening the transaction
+     * @param offset an always incrementing number usually capturing the 
number of checkpoints taken
+     *     by the subtask
+     * @return transactionalId
+     */
+    public static String buildTransactionalId(
+            String transactionalIdPrefix, int subtaskId, long offset) {
+        final StringBuilder sb = new StringBuilder();
+        return sb.append(transactionalIdPrefix)
+                .append(TRANSACTIONAL_ID_DELIMITER)
+                .append(subtaskId)
+                .append(TRANSACTIONAL_ID_DELIMITER)
+                .append(offset)
+                .toString();
+    }
+
+    private Consumer<ConsumerRecord<byte[], byte[]>> maybeAddTransaction(
+            Set<String> openTransactions) {
+        return record -> {
+            final ByteBuffer keyBuffer = ByteBuffer.wrap(record.key());
+            checkKafkaSchemaVersionMatches(keyBuffer);
+            // Ignore 2 bytes because Kafka's internal representation
+            keyBuffer.getShort();
+            final String transactionalId = 
StandardCharsets.US_ASCII.decode(keyBuffer).toString();
+
+            final ByteBuffer valueBuffer = ByteBuffer.wrap(record.value());
+            checkKafkaSchemaVersionMatches(valueBuffer);
+            final TransactionState state =
+                    
TransactionState.fromByte(readTransactionState(valueBuffer));
+
+            LOG.debug("Transaction {} is in state {}", transactionalId, state);
+
+            // If the transaction is in a final state ignore it
+            if (isTransactionInFinalState(state)) {
+                openTransactions.remove(transactionalId);
+                return;
+            }
+
+            final Optional<KafkaWriterState> openTransactionOpt =
+                    fromTransactionalId(transactionalId);
+            // If the transactionalId does not follow the format ignore it
+            if (!openTransactionOpt.isPresent()) {
+                return;
+            }
+
+            final KafkaWriterState openTransaction = openTransactionOpt.get();
+            // If the transactionalId prefixes differ ignore
+            if (!openTransaction
+                    .getTransactionalIdPrefix()
+                    .equals(main.getTransactionalIdPrefix())) {
+                LOG.debug(
+                        "The transactionalId prefixes differ. Open: {}, 
Recovered: {}",
+                        openTransaction.getTransactionalIdPrefix(),
+                        main.getTransactionalIdPrefix());
+                return;
+            }
+
+            final int openSubtaskIndex = openTransaction.getSubtaskId();
+            final long openCheckpointOffset = 
openTransaction.getTransactionalIdOffset();
+
+            final boolean commonRemainder =
+                    openSubtaskIndex % numberOfParallelSubtasks == 
main.getSubtaskId()
+                            && openSubtaskIndex != main.getSubtaskId();
+            final boolean sameSubtaskHigherCheckpoint =
+                    hasSameSubtaskWithHigherCheckpoint(openSubtaskIndex, 
openCheckpointOffset);
+            final boolean higherSubtasksWithFirstCheckpoint =
+                    minOthers.isPresent()
+                            && minOthers.getAsInt() < openSubtaskIndex
+                            && openCheckpointOffset == 0;
+
+            if (!commonRemainder
+                    && !sameSubtaskHigherCheckpoint
+                    && !higherSubtasksWithFirstCheckpoint) {
+                return;
+            }
+            openTransactions.add(transactionalId);
+        };
+    }
+
+    @Override
+    public void close() {
+        consumer.close();
+    }
+
+    private boolean hasSameSubtaskWithHigherCheckpoint(
+            int openSubtaskIndex, long openCheckpointOffset) {
+        return combined.stream()
+                .anyMatch(
+                        state ->
+                                state.getSubtaskId() == openSubtaskIndex
+                                        && state.getTransactionalIdOffset()
+                                                <= openCheckpointOffset);
+    }
+
+    private static boolean isTransactionInFinalState(TransactionState state) {
+        return TERMINAL_TRANSACTION_STATES.contains(state);
+    }
+
+    private static Optional<KafkaWriterState> fromTransactionalId(String 
transactionalId) {
+        final String[] splits = 
transactionalId.split(TRANSACTIONAL_ID_DELIMITER);
+        if (splits.length < 3) {
+            LOG.debug("Transaction {} was not created by the Flink Kafka 
sink", transactionalId);
+            return Optional.empty();
+        }
+        try {
+            final long checkpointOffset = Long.parseLong(splits[splits.length 
- 1]);
+            final int subtaskId = Integer.parseInt(splits[splits.length - 2]);
+            return Optional.of(
+                    new KafkaWriterState(
+                            String.join(
+                                    TRANSACTIONAL_ID_DELIMITER,
+                                    Arrays.copyOfRange(splits, 0, 
splits.length - 2)),
+                            subtaskId,
+                            checkpointOffset));
+        } catch (NumberFormatException e) {
+            LOG.debug(
+                    "Transaction {} was not created by the Flink Kafka sink: 
{}",
+                    transactionalId,
+                    e);
+            return Optional.empty();
+        }
+    }
+
+    private static byte readTransactionState(ByteBuffer buffer) {
+        // producerId
+        buffer.getLong();
+        // epoch
+        buffer.getShort();
+        // transactionTimeout
+        buffer.getInt();
+        // statusKey
+        return buffer.get();
+    }
+
+    enum TransactionState {
+        Empty(Byte.parseByte("0")),

Review comment:
       I am not convinced by all options 😄  Do you mind if I leave it as is?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to