This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit d43fcbd1583bd60e1ca12147d68b04adeb53d72f Author: Arvid Heise <ar...@apache.org> AuthorDate: Sat Feb 8 17:29:13 2025 +0100 [FLINK-34554] Introduce transaction strategies Move existing way of naming and aborting transactions into specific strategies. Next commit will add new strategies. --- .../kafka/sink/ExactlyOnceKafkaWriter.java | 73 ++++++++--- .../flink/connector/kafka/sink/KafkaSink.java | 7 +- .../connector/kafka/sink/KafkaSinkBuilder.java | 27 +++- .../connector/kafka/sink/TransactionAborter.java | 128 ------------------- .../kafka/sink/TransactionNamingStrategy.java | 77 ++++++++++++ .../connector/kafka/sink/internal/Backchannel.java | 3 + .../sink/internal/FlinkKafkaInternalProducer.java | 7 +- .../kafka/sink/internal/ProducerPool.java | 3 + .../kafka/sink/internal/ProducerPoolImpl.java | 2 + .../kafka/sink/internal/ReadableBackchannel.java | 3 + .../TransactionAbortStrategyContextImpl.java | 76 +++++++++++ .../internal/TransactionAbortStrategyImpl.java | 139 +++++++++++++++++++++ .../TransactionNamingStrategyContextImpl.java | 80 ++++++++++++ .../internal/TransactionNamingStrategyImpl.java | 79 ++++++++++++ .../kafka/sink/internal/WritableBackchannel.java | 3 + .../connector/kafka/sink/KafkaSinkBuilderTest.java | 4 +- .../flink/connector/kafka/sink/KafkaSinkTest.java | 40 ++++-- 17 files changed, 586 insertions(+), 165 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java index 741d00f7..32131056 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java @@ -26,8 +26,11 @@ import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer import org.apache.flink.connector.kafka.sink.internal.ProducerPool; import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl; import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel; +import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyContextImpl; +import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl; import org.apache.flink.connector.kafka.sink.internal.TransactionFinished; -import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory; +import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyContextImpl; +import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.util.FlinkRuntimeException; @@ -54,7 +57,17 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceKafkaWriter.class); + /** + * Prefix for the transactional id. Must be unique across all sinks writing to the same broker. + */ private final String transactionalIdPrefix; + /** + * Strategy to abort lingering transactions from previous executions during writer + * initialization. + */ + private final TransactionAbortStrategyImpl transactionAbortStrategy; + /** Strategy to name transactions. */ + private final TransactionNamingStrategyImpl transactionNamingStrategy; private final KafkaWriterState kafkaWriterState; private final Collection<KafkaWriterState> recoveredStates; @@ -72,6 +85,8 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { * related methods. */ private final ReadableBackchannel<TransactionFinished> backchannel; + /** The context used to name transactions. */ + private final TransactionNamingStrategyContextImpl namingContext; /** * Constructor creating a kafka writer. @@ -95,6 +110,8 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { WriterInitContext sinkInitContext, KafkaRecordSerializationSchema<IN> recordSerializer, SerializationSchema.InitializationContext schemaContext, + TransactionAbortStrategyImpl transactionAbortStrategy, + TransactionNamingStrategyImpl transactionNamingStrategy, Collection<KafkaWriterState> recoveredStates) { super( deliveryGuarantee, @@ -104,6 +121,11 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { schemaContext); this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix must not be null"); + this.transactionAbortStrategy = + checkNotNull(transactionAbortStrategy, "transactionAbortStrategy must not be null"); + this.transactionNamingStrategy = + checkNotNull( + transactionNamingStrategy, "transactionNamingStrategy must not be null"); try { recordSerializer.open(schemaContext, kafkaSinkContext); @@ -127,6 +149,9 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { subtaskId, sinkInitContext.getTaskInfo().getAttemptNumber(), transactionalIdPrefix); + this.namingContext = + new TransactionNamingStrategyContextImpl( + transactionalIdPrefix, subtaskId, restoredCheckpointId, producerPool); } @Override @@ -147,13 +172,10 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { } private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long checkpointId) { + namingContext.setNextCheckpointId(checkpointId); FlinkKafkaInternalProducer<byte[], byte[]> producer = - producerPool.getTransactionalProducer( - TransactionalIdFactory.buildTransactionalId( - transactionalIdPrefix, - kafkaSinkContext.getParallelInstanceId(), - checkpointId), - checkpointId); + transactionNamingStrategy.getTransactionalProducer(namingContext); + namingContext.setLastCheckpointId(checkpointId); producer.beginTransaction(); return producer; } @@ -236,13 +258,34 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { } } - try (TransactionAborter transactionAborter = - new TransactionAborter( - kafkaSinkContext.getParallelInstanceId(), - kafkaSinkContext.getNumberOfParallelInstances(), - id -> producerPool.getTransactionalProducer(id, startCheckpointId), - producerPool::recycle)) { - transactionAborter.abortLingeringTransactions(prefixesToAbort, startCheckpointId); - } + LOG.info( + "Aborting lingering transactions with prefixes {} using {}", + prefixesToAbort, + transactionAbortStrategy); + TransactionAbortStrategyContextImpl context = + getTransactionAbortStrategyContext(startCheckpointId, prefixesToAbort); + transactionAbortStrategy.abortTransactions(context); + } + + private TransactionAbortStrategyContextImpl getTransactionAbortStrategyContext( + long startCheckpointId, List<String> prefixesToAbort) { + TransactionAbortStrategyImpl.TransactionAborter aborter = + transactionalId -> { + // getTransactionalProducer already calls initTransactions, which cancels the + // transaction + FlinkKafkaInternalProducer<byte[], byte[]> producer = + producerPool.getTransactionalProducer(transactionalId, 0); + LOG.debug("Aborting transaction {}", transactionalId); + producer.flush(); + short epoch = producer.getEpoch(); + producerPool.recycle(producer); + return epoch; + }; + return new TransactionAbortStrategyContextImpl( + kafkaSinkContext.getParallelInstanceId(), + kafkaSinkContext.getNumberOfParallelInstances(), + prefixesToAbort, + startCheckpointId, + aborter); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index c05ff8d0..30002021 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -82,16 +82,19 @@ public class KafkaSink<IN> private final KafkaRecordSerializationSchema<IN> recordSerializer; private final Properties kafkaProducerConfig; private final String transactionalIdPrefix; + private final TransactionNamingStrategy transactionNamingStrategy; KafkaSink( DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, String transactionalIdPrefix, - KafkaRecordSerializationSchema<IN> recordSerializer) { + KafkaRecordSerializationSchema<IN> recordSerializer, + TransactionNamingStrategy transactionNamingStrategy) { this.deliveryGuarantee = deliveryGuarantee; this.kafkaProducerConfig = kafkaProducerConfig; this.transactionalIdPrefix = transactionalIdPrefix; this.recordSerializer = recordSerializer; + this.transactionNamingStrategy = transactionNamingStrategy; } /** @@ -141,6 +144,8 @@ public class KafkaSink<IN> context, recordSerializer, context.asSerializationSchemaInitializationContext(), + transactionNamingStrategy.getAbortImpl(), + transactionNamingStrategy.getImpl(), recoveredState); } else { writer = diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java index ac033f17..80d67b9d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java @@ -74,6 +74,7 @@ public class KafkaSinkBuilder<IN> { private final Properties kafkaProducerConfig; private KafkaRecordSerializationSchema<IN> recordSerializer; + private TransactionNamingStrategy transactionNamingStrategy = TransactionNamingStrategy.DEFAULT; KafkaSinkBuilder() { kafkaProducerConfig = new Properties(); @@ -125,6 +126,20 @@ public class KafkaSinkBuilder<IN> { return this; } + /** + * Sets the {@link TransactionNamingStrategy} that is used to name the transactions. + * + * <p>By default {@link TransactionNamingStrategy#DEFAULT} is used. It's recommended to change + * the strategy only if specific issues occur. + */ + public KafkaSinkBuilder<IN> setTransactionNamingStrategy( + TransactionNamingStrategy transactionNamingStrategy) { + this.transactionNamingStrategy = + checkNotNull( + transactionNamingStrategy, "transactionNamingStrategy must not be null"); + return this; + } + /** * Sets the {@link KafkaRecordSerializationSchema} that transforms incoming records to {@link * org.apache.kafka.clients.producer.ProducerRecord}s. @@ -162,7 +177,7 @@ public class KafkaSinkBuilder<IN> { checkState( transactionalIdPrefix.getBytes(StandardCharsets.UTF_8).length <= MAXIMUM_PREFIX_BYTES, - "The configured prefix is too long and the resulting transactionalId might exceed Kafka's transactionalIds size."); + "The configured prefix is too long and the resulting transactionalIdPrefix might exceed Kafka's transactionalIdPrefix size."); return this; } @@ -180,12 +195,12 @@ public class KafkaSinkBuilder<IN> { checkNotNull( kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), "bootstrapServers"); + checkNotNull(recordSerializer, "recordSerializer"); if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { checkState( transactionalIdPrefix != null, - "EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster."); + "EXACTLY_ONCE delivery guarantee requires a transactionalIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster."); } - checkNotNull(recordSerializer, "recordSerializer"); } /** @@ -196,6 +211,10 @@ public class KafkaSinkBuilder<IN> { public KafkaSink<IN> build() { sanityCheck(); return new KafkaSink<>( - deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, recordSerializer); + deliveryGuarantee, + kafkaProducerConfig, + transactionalIdPrefix, + recordSerializer, + transactionNamingStrategy); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java deleted file mode 100644 index 353cfea4..00000000 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.kafka.sink; - -import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; -import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory; - -import javax.annotation.Nullable; - -import java.io.Closeable; -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Function; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Aborts lingering transactions on restart. - * - * <p>Transactions are lingering if they are not tracked anywhere. For example, if a job is started - * transactions are opened. A restart without checkpoint would not allow Flink to abort old - * transactions. Since Kafka's transactions are sequential, newly produced data does not become - * visible for read_committed consumers. However, Kafka has no API for querying open transactions, - * so they become lingering. - * - * <p>Flink solves this by assuming consecutive transaction ids. On restart of checkpoint C on - * subtask S, it will sequentially cancel transaction C+1, C+2, ... of S until it finds the first - * unused transaction. - * - * <p>Additionally, to cover for weird downscaling cases without checkpoints, it also checks for - * transactions of subtask S+P where P is the current parallelism until it finds a subtask without - * transactions. - */ -class TransactionAborter implements Closeable { - private final int subtaskId; - private final int parallelism; - private final Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> producerFactory; - private final Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> recycler; - @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer = null; - - public TransactionAborter( - int subtaskId, - int parallelism, - Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> producerFactory, - Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> recycler) { - this.subtaskId = subtaskId; - this.parallelism = parallelism; - this.producerFactory = checkNotNull(producerFactory); - this.recycler = recycler; - } - - void abortLingeringTransactions(List<String> prefixesToAbort, long startCheckpointId) { - for (String prefix : prefixesToAbort) { - abortTransactionsWithPrefix(prefix, startCheckpointId); - } - } - - /** - * Aborts all transactions that have been created by this subtask in a previous run. - * - * <p>It also aborts transactions from subtasks that may have been removed because of - * downscaling. - * - * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is responsible for cleaning - * all subtasks j in [0; X), where j % Y = i. For example, if we downscale to 2, then subtask 0 - * is responsible for all even and subtask 1 for all odd subtasks. - */ - private void abortTransactionsWithPrefix(String prefix, long startCheckpointId) { - for (int subtaskId = this.subtaskId; ; subtaskId += parallelism) { - if (abortTransactionOfSubtask(prefix, startCheckpointId, subtaskId) == 0) { - // If Flink didn't abort any transaction for current subtask, then we assume that no - // such subtask existed and no subtask with a higher number as well. - break; - } - } - } - - /** - * Aborts all transactions that have been created by a subtask in a previous run after the given - * checkpoint id. - * - * <p>We assume that transaction ids are consecutively used and thus Flink can stop aborting as - * soon as Flink notices that a particular transaction id was unused. - */ - private int abortTransactionOfSubtask(String prefix, long startCheckpointId, int subtaskId) { - int numTransactionAborted = 0; - for (long checkpointId = startCheckpointId; ; checkpointId++, numTransactionAborted++) { - // initTransactions fences all old transactions with the same id by bumping the epoch - String transactionalId = - TransactionalIdFactory.buildTransactionalId(prefix, subtaskId, checkpointId); - producer = producerFactory.apply(transactionalId); - producer.flush(); - // An epoch of 0 indicates that the id was unused before - short epoch = producer.getEpoch(); - recycler.accept(producer); - if (epoch == 0) { - // Note that the check works beyond transaction log timeouts and just depends on the - // retention of the transaction topic (typically 7d). Any transaction that is not in - // the that topic anymore is also not lingering (i.e., it will not block downstream - // from reading) - // This method will only cease to work if transaction log timeout = topic retention - // and a user didn't restart the application for that period of time. Then the first - // transactions would vanish from the topic while later transactions are still - // lingering until they are cleaned up by Kafka. Then the user has to wait until the - // other transactions are timed out (which shouldn't take too long). - break; - } - } - return numTransactionAborted; - } - - public void close() {} -} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java new file mode 100644 index 00000000..6c09bc92 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl; +import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl; + +/** + * The strategy to name transactions. Naming strategy has implications on the resource consumption + * on the broker because each unique transaction name requires the broker to keep some metadata in + * memory for 7 days. + * + * <p>All naming strategies use the format {@code transactionalIdPrefix-subtask-offset} where offset + * is calculated differently. + */ +@PublicEvolving +public enum TransactionNamingStrategy { + /** + * The offset of the transaction name is a monotonically increasing number that mostly + * corresponds to the checkpoint id. This strategy is wasteful in terms of resource consumption + * on the broker. + * + * <p>This is exactly the same behavior as in flink-connector-kafka 3.X. + */ + INCREMENTING(TransactionNamingStrategyImpl.INCREMENTING, TransactionAbortStrategyImpl.PROBING); + + /** + * The default transaction naming strategy. Currently set to {@link #INCREMENTING}, which is the + * same behavior of flink-connector-kafka 3.X. + */ + public static final TransactionNamingStrategy DEFAULT = INCREMENTING; + + /** + * The backing implementation of the transaction naming strategy. Separation allows to avoid + * leaks of internal classes in signatures. + */ + private final TransactionNamingStrategyImpl impl; + /** + * The set of supported abort strategies for this naming strategy. Some naming strategies may + * not support all abort strategies. + */ + private final TransactionAbortStrategyImpl abortImpl; + + TransactionNamingStrategy( + TransactionNamingStrategyImpl impl, TransactionAbortStrategyImpl abortImpl) { + this.impl = impl; + this.abortImpl = abortImpl; + } + + @Internal + TransactionAbortStrategyImpl getAbortImpl() { + return abortImpl; + } + + @Internal + TransactionNamingStrategyImpl getImpl() { + return impl; + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java index 244b7668..af7b9a71 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.kafka.sink.internal; +import org.apache.flink.annotation.Internal; + import java.io.Closeable; /** @@ -34,6 +36,7 @@ import java.io.Closeable; * both instances will run inside the same JVM and we can establish a backchannel between them. The * latter case requires some synchronization in the buffer. */ +@Internal public interface Backchannel extends Closeable { /** Check if the backchannel is fully established. */ boolean isEstablished(); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java index 6f52a9db..ee5b823d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.kafka.sink.internal; +import org.apache.flink.annotation.Internal; + import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -44,6 +46,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * A {@link KafkaProducer} that exposes private fields to allow resume producing from a given state. */ +@Internal public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class); private static final String TRANSACTION_MANAGER_FIELD_NAME = "transactionManager"; @@ -58,13 +61,13 @@ public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { public FlinkKafkaInternalProducer(Properties properties) { super(properties); - LOG.debug("Created non-transactional {}", this); + LOG.info("Created non-transactional {}", this); } public FlinkKafkaInternalProducer(Properties properties, String transactionalId) { super(withTransactionalId(properties, transactionalId)); this.transactionalId = transactionalId; - LOG.debug("Created transactional {}", this); + LOG.info("Created transactional {}", this); } private static Properties withTransactionalId(Properties properties, String transactionalId) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java index 0a51ff09..d47cca28 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java @@ -18,7 +18,10 @@ package org.apache.flink.connector.kafka.sink.internal; +import org.apache.flink.annotation.Internal; + /** A pool of producers that can be recycled. */ +@Internal public interface ProducerPool extends AutoCloseable { /** * Notify the pool that a transaction has finished. The producer with the given transactional id diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java index cccb6bb0..38a159ed 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.sink.internal; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.slf4j.Logger; @@ -68,6 +69,7 @@ import static org.apache.flink.util.Preconditions.checkState; * it. */ @NotThreadSafe +@Internal public class ProducerPoolImpl implements ProducerPool { private static final Logger LOG = LoggerFactory.getLogger(ProducerPoolImpl.class); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ReadableBackchannel.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ReadableBackchannel.java index 0f2cbf17..69d205ad 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ReadableBackchannel.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ReadableBackchannel.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.kafka.sink.internal; +import org.apache.flink.annotation.Internal; + import javax.annotation.Nullable; /** @@ -25,6 +27,7 @@ import javax.annotation.Nullable; * to signal that certain transactions have been committed and respective producers are good to be * reused. */ +@Internal public interface ReadableBackchannel<T> extends Backchannel { /** * Poll the next message from the backchannel. This method is non-blocking and returns {@code diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java new file mode 100644 index 00000000..80f6386e --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl.TransactionAborter; + +import java.util.List; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Implementation of {@link TransactionAbortStrategyImpl.Context}. */ +@Internal +public class TransactionAbortStrategyContextImpl implements TransactionAbortStrategyImpl.Context { + private final int currentSubtaskId; + private final int currentParallelism; + private final Set<String> prefixesToAbort; + private final long startCheckpointId; + private final TransactionAborter transactionAborter; + + /** Creates a new {@link TransactionAbortStrategyContextImpl}. */ + public TransactionAbortStrategyContextImpl( + int currentSubtaskId, + int currentParallelism, + List<String> prefixesToAbort, + long startCheckpointId, + TransactionAborter transactionAborter) { + this.currentSubtaskId = currentSubtaskId; + this.currentParallelism = currentParallelism; + this.prefixesToAbort = Set.copyOf(prefixesToAbort); + this.startCheckpointId = startCheckpointId; + this.transactionAborter = + checkNotNull(transactionAborter, "transactionAborter must not be null"); + } + + @Override + public int getCurrentSubtaskId() { + return currentSubtaskId; + } + + @Override + public int getCurrentParallelism() { + return currentParallelism; + } + + @Override + public Set<String> getPrefixesToAbort() { + return prefixesToAbort; + } + + @Override + public long getStartCheckpointId() { + return startCheckpointId; + } + + public TransactionAborter getTransactionAborter() { + return transactionAborter; + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java new file mode 100644 index 00000000..6ea69586 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +/** Implementations of an abort strategy for transactions left over from previous runs. */ +@Internal +public enum TransactionAbortStrategyImpl { + /** + * The probing strategy starts with aborting a set of known transactional ids from the recovered + * state and then continues guessing if more transactions may have been opened between this run + * and the last successful checkpoint. This also accounts for rescaling in between the attempts. + * + * <p>However, the probing is not side-effect free, which leads to an ever-increasing search + * space for the next probing attempt in case of a restart loop. It will fix eventually on the + * next successful checkpoints. It's recommended to use this strategy only with a strict restart + * policy that prevents tight restart loops (e.g. incremental backoff or hard failure after X + * attempts). + * + * <p>This is exactly the same behavior as in flink-connector-kafka 3.X. + */ + PROBING { + @Override + public void abortTransactions(Context context) { + for (String prefix : context.getPrefixesToAbort()) { + abortTransactionsWithPrefix(prefix, context); + } + } + + /** + * Aborts all transactions that have been created by this subtask in a previous run. + * + * <p>It also aborts transactions from subtasks that may have been removed because of + * downscaling. + * + * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is responsible for + * cleaning all subtasks j in [0; X), where j % Y = i. For example, if we downscale to 2, + * then subtask 0 is responsible for all even and subtask 1 for all odd subtasks. + */ + private void abortTransactionsWithPrefix(String prefix, Context context) { + for (int subtaskId = context.getCurrentSubtaskId(); + ; + subtaskId += context.getCurrentParallelism()) { + if (abortTransactionOfSubtask(prefix, subtaskId, context) == 0) { + // If Flink didn't abort any transaction for current subtask, then we assume + // that no + // such subtask existed and no subtask with a higher number as well. + break; + } + } + } + + /** + * Aborts all transactions that have been created by a subtask in a previous run after the + * given checkpoint id. + * + * <p>We assume that transaction ids are consecutively used and thus Flink can stop aborting + * as soon as Flink notices that a particular transaction id was unused. + */ + private int abortTransactionOfSubtask(String prefix, int subtaskId, Context context) { + int numTransactionAborted = 0; + for (long checkpointId = context.getStartCheckpointId(); + ; + checkpointId++, numTransactionAborted++) { + // initTransactions fences all old transactions with the same id by bumping the + // epoch + String transactionalId = + TransactionalIdFactory.buildTransactionalId( + prefix, subtaskId, checkpointId); + int epoch = context.getTransactionAborter().abortTransaction(transactionalId); + // An epoch of 0 indicates that the id was unused before + if (epoch == 0) { + // Note that the check works beyond transaction log timeouts and just depends on + // the + // retention of the transaction topic (typically 7d). Any transaction that is + // not in + // the that topic anymore is also not lingering (i.e., it will not block + // downstream + // from reading) + // This method will only cease to work if transaction log timeout = topic + // retention + // and a user didn't restart the application for that period of time. Then the + // first + // transactions would vanish from the topic while later transactions are still + // lingering until they are cleaned up by Kafka. Then the user has to wait until + // the + // other transactions are timed out (which shouldn't take too long). + break; + } + } + return numTransactionAborted; + } + }; + + private static final Logger LOG = LoggerFactory.getLogger(TransactionAbortStrategyImpl.class); + + /** Aborts all transactions that have been created by this subtask in a previous run. */ + public abstract void abortTransactions(Context context); + + /** Injects the actual abortion of the transactional id generated by one of the strategies. */ + public interface TransactionAborter { + int abortTransaction(String transactionalId); + } + + /** Context for the {@link TransactionAbortStrategyImpl}. */ + public interface Context { + int getCurrentSubtaskId(); + + int getCurrentParallelism(); + + Set<String> getPrefixesToAbort(); + + long getStartCheckpointId(); + + TransactionAborter getTransactionAborter(); + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java new file mode 100644 index 00000000..2ac26c87 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Implementation of {@link TransactionNamingStrategyImpl.Context}. */ +@Internal +public class TransactionNamingStrategyContextImpl implements TransactionNamingStrategyImpl.Context { + private final String transactionalIdPrefix; + private final int subtaskId; + private final ProducerPool producerPool; + private long lastCheckpointId; + private long nextCheckpointId; + + /** Creates a new {@link TransactionNamingStrategyContextImpl}. */ + public TransactionNamingStrategyContextImpl( + String transactionalIdPrefix, + int subtaskId, + long lastCheckpointId, + ProducerPool producerPool) { + this.transactionalIdPrefix = + checkNotNull(transactionalIdPrefix, "transactionalIdPrefix must not be null"); + this.subtaskId = subtaskId; + this.producerPool = checkNotNull(producerPool, "producerPool must not be null"); + this.lastCheckpointId = lastCheckpointId; + } + + @Override + public String buildTransactionalId(long offset) { + return TransactionalIdFactory.buildTransactionalId( + transactionalIdPrefix, subtaskId, offset); + } + + @Override + public long getNextCheckpointId() { + return nextCheckpointId; + } + + public void setNextCheckpointId(long nextCheckpointId) { + this.nextCheckpointId = nextCheckpointId; + } + + public void setLastCheckpointId(long lastCheckpointId) { + this.lastCheckpointId = lastCheckpointId; + } + + @Override + public long getLastCheckpointId() { + return lastCheckpointId; + } + + @Override + public FlinkKafkaInternalProducer<byte[], byte[]> getProducer(String transactionalId) { + return producerPool.getTransactionalProducer(transactionalId, nextCheckpointId); + } + + @Override + public void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer) { + producerPool.recycle(producer); + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java new file mode 100644 index 00000000..f1be95a0 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Implementation of {@link TransactionNamingStrategy}. */ +@Internal +public enum TransactionNamingStrategyImpl { + INCREMENTING { + /** + * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new + * transactions will not clash with transactions created during previous checkpoints ({@code + * producer.initTransactions()} assures that we obtain new producerId and epoch counters). + * + * <p>Ensures that all transaction ids in between lastCheckpointId and checkpointId are + * initialized. + */ + @Override + public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer( + Context context) { + long expectedCheckpointId = context.getNextCheckpointId(); + long lastCheckpointId = context.getLastCheckpointId(); + checkState( + expectedCheckpointId > lastCheckpointId, + "Expected %s > %s", + expectedCheckpointId, + lastCheckpointId); + // in case checkpoints have been aborted, Flink would create non-consecutive transaction + // ids + // this loop ensures that all gaps are filled with initialized (empty) transactions + for (long checkpointId = lastCheckpointId + 1; + checkpointId < expectedCheckpointId; + checkpointId++) { + context.recycle(context.getProducer(context.buildTransactionalId(checkpointId))); + } + return context.getProducer(context.buildTransactionalId(expectedCheckpointId)); + } + }; + + /** + * Returns a {@link FlinkKafkaInternalProducer} that will not clash with any ongoing + * transactions. + */ + public abstract FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer( + Context context); + + /** Context for the transaction naming strategy. */ + public interface Context { + String buildTransactionalId(long offset); + + long getNextCheckpointId(); + + long getLastCheckpointId(); + + FlinkKafkaInternalProducer<byte[], byte[]> getProducer(String transactionalId); + + void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer); + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/WritableBackchannel.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/WritableBackchannel.java index 97a0b2c8..5311c804 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/WritableBackchannel.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/WritableBackchannel.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.kafka.sink.internal; +import org.apache.flink.annotation.Internal; + /** * The writable portion of a {@link Backchannel} for communication between the commiter -> writer. * It's used to signal that certain transactions have been committed and respective producers are @@ -26,6 +28,7 @@ package org.apache.flink.connector.kafka.sink.internal; * <p>Messages can be sent before the backchannel is established. They will be consumed once the * backchannel is established. */ +@Internal public interface WritableBackchannel<T> extends Backchannel { /** Send a message to the backchannel. */ void send(T message); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java index 9e26cd4e..b2591f5d 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java @@ -89,7 +89,7 @@ public class KafkaSinkBuilderTest extends TestLogger { } @Test - void testTransactionIdSanityCheck() { + void testTransactionalIdSanityCheck() { assertThatThrownBy( () -> getBasicBuilder() @@ -97,7 +97,7 @@ public class KafkaSinkBuilderTest extends TestLogger { .build()) .isExactlyInstanceOf(IllegalStateException.class) .hasMessageContaining( - "EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster."); + "EXACTLY_ONCE delivery guarantee requires a transactionalIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster."); } private void validateProducerConfig( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java index 84c1e429..8eca6592 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java @@ -43,7 +43,11 @@ public class KafkaSinkTest { new KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider(); KafkaSink<Object> sink = new KafkaSink<>( - DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", recordSerializer); + DeliveryGuarantee.EXACTLY_ONCE, + new Properties(), + "", + recordSerializer, + TransactionNamingStrategy.DEFAULT); assertThat(sink.getLineageVertex().datasets()).isEmpty(); } @@ -55,7 +59,11 @@ public class KafkaSinkTest { KafkaSink<Object> sink = new KafkaSink<>( - DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", recordSerializer); + DeliveryGuarantee.EXACTLY_ONCE, + new Properties(), + "", + recordSerializer, + TransactionNamingStrategy.DEFAULT); assertThat(sink.getLineageVertex().datasets()).isEmpty(); } @@ -67,7 +75,11 @@ public class KafkaSinkTest { KafkaSink<Object> sink = new KafkaSink<>( - DeliveryGuarantee.EXACTLY_ONCE, kafkaProperties, "", recordSerializer); + DeliveryGuarantee.EXACTLY_ONCE, + kafkaProperties, + "", + recordSerializer, + TransactionNamingStrategy.DEFAULT); LineageVertex lineageVertex = sink.getLineageVertex(); @@ -99,11 +111,12 @@ public class KafkaSinkTest { public void testCoLocation() { String colocationKey = "testCoLocation"; KafkaSink<Object> sink = - new KafkaSink<>( - DeliveryGuarantee.EXACTLY_ONCE, - kafkaProperties, - colocationKey, - new TestingKafkaRecordSerializationSchema()); + KafkaSink.builder() + .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .setKafkaProducerConfig(kafkaProperties) + .setTransactionalIdPrefix(colocationKey) + .setRecordSerializer(new TestingKafkaRecordSerializationSchema()) + .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -122,11 +135,12 @@ public class KafkaSinkTest { String colocationKey = "testPreserveCustomCoLocation"; String customColocationKey = "customCoLocation"; KafkaSink<Object> sink = - new KafkaSink<>( - DeliveryGuarantee.EXACTLY_ONCE, - kafkaProperties, - colocationKey, - new TestingKafkaRecordSerializationSchema()); + KafkaSink.builder() + .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .setKafkaProducerConfig(kafkaProperties) + .setTransactionalIdPrefix(colocationKey) + .setRecordSerializer(new TestingKafkaRecordSerializationSchema()) + .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();