This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 2d4f402f303a80b75f057e4cc932aae495a5e259 Author: Arvid Heise <ar...@apache.org> AuthorDate: Sat Feb 8 18:42:31 2025 +0100 [FLINK-34554] Adding pooling name strategy Reduce the load on Kafka broker by reusing transaction ids as soon as they have been committed. --- .../flink/tests/util/kafka/KafkaSinkE2ECase.java | 25 +- .../kafka/sink/ExactlyOnceKafkaWriter.java | 4 + .../flink/connector/kafka/sink/KafkaCommitter.java | 51 +++- .../flink/connector/kafka/sink/KafkaSink.java | 1 + .../connector/kafka/sink/KafkaSinkBuilder.java | 7 + .../kafka/sink/TransactionNamingStrategy.java | 18 +- .../TransactionNamingStrategyContextImpl.java | 13 + .../internal/TransactionNamingStrategyImpl.java | 36 ++- .../sink/internal/TransactionalIdFactory.java | 4 +- .../kafka/sink/ExactlyOnceKafkaWriterITCase.java | 92 ++++++ .../connector/kafka/sink/KafkaCommitterTest.java | 15 +- .../connector/kafka/sink/KafkaSinkITCase.java | 330 +++++++++++++++++---- .../connector/kafka/sink/KafkaWriterTestBase.java | 33 ++- .../sink/internal/ProducerPoolImplITCase.java | 2 +- .../internal/TransactionAbortStrategyImplTest.java | 235 +++++++++++++++ .../sink/internal/TransactionOwnershipTest.java | 165 +++++++++++ .../sink/testutils/KafkaSinkExternalContext.java | 12 +- .../testutils/KafkaSinkExternalContextFactory.java | 11 +- 18 files changed, 950 insertions(+), 104 deletions(-) diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java index e18c035b..ea9a0079 100644 --- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java +++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java @@ -18,6 +18,7 @@ package org.apache.flink.tests.util.kafka; +import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory; import org.apache.flink.connector.kafka.testutils.DockerImageVersions; import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment; @@ -62,7 +63,7 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> { // Defines 2 External context Factories, so test cases will be invoked twice using these two // kinds of external contexts. @TestContext - KafkaSinkExternalContextFactory contextFactory = + KafkaSinkExternalContextFactory incrementing = new KafkaSinkExternalContextFactory( kafka.getContainer(), Arrays.asList( @@ -77,7 +78,27 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> { ResourceTestUtils.getResource("flink-connector-testing.jar") .toAbsolutePath() .toUri() - .toURL())); + .toURL()), + TransactionNamingStrategy.INCREMENTING); + + @TestContext + KafkaSinkExternalContextFactory pooling = + new KafkaSinkExternalContextFactory( + kafka.getContainer(), + Arrays.asList( + ResourceTestUtils.getResource("kafka-connector.jar") + .toAbsolutePath() + .toUri() + .toURL(), + ResourceTestUtils.getResource("kafka-clients.jar") + .toAbsolutePath() + .toUri() + .toURL(), + ResourceTestUtils.getResource("flink-connector-testing.jar") + .toAbsolutePath() + .toUri() + .toURL()), + TransactionNamingStrategy.POOLING); public KafkaSinkE2ECase() throws Exception {} } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java index e75436d8..c7aee3c4 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java @@ -200,6 +200,10 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long checkpointId) { namingContext.setNextCheckpointId(checkpointId); + namingContext.setOngoingTransactions( + producerPool.getOngoingTransactions().stream() + .map(CheckpointTransaction::getTransactionalId) + .collect(Collectors.toSet())); FlinkKafkaInternalProducer<byte[], byte[]> producer = transactionNamingStrategy.getTransactionalProducer(namingContext); namingContext.setLastCheckpointId(checkpointId); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java index 70d67150..91e724e2 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java @@ -56,6 +56,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { + "To avoid data loss, the application will restart."; private final Properties kafkaProducerConfig; + private final boolean reusesTransactionalIds; private final BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> producerFactory; private final WritableBackchannel<TransactionFinished> backchannel; @Nullable private FlinkKafkaInternalProducer<?, ?> committingProducer; @@ -65,8 +66,10 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { String transactionalIdPrefix, int subtaskId, int attemptNumber, + boolean reusesTransactionalIds, BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> producerFactory) { this.kafkaProducerConfig = kafkaProducerConfig; + this.reusesTransactionalIds = reusesTransactionalIds; this.producerFactory = producerFactory; backchannel = BackchannelFactory.getInstance() @@ -102,19 +105,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { "Encountered retriable exception while committing {}.", transactionalId, e); request.retryLater(); } catch (ProducerFencedException e) { - // initTransaction has been called on this transaction before - LOG.error( - "Unable to commit transaction ({}) because its producer is already fenced." - + " This means that you either have a different producer with the same '{}' (this is" - + " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)" - + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," - + " please consult the Flink documentation for more details.", - request, - ProducerConfig.TRANSACTIONAL_ID_CONFIG, - KafkaSink.class.getSimpleName(), - ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, - kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), - e); + logFencedRequest(request, e); handleFailedTransaction(producer); request.signalFailedWithKnownReason(e); } catch (InvalidTxnStateException e) { @@ -146,6 +137,40 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { } } + private void logFencedRequest( + CommitRequest<KafkaCommittable> request, ProducerFencedException e) { + if (reusesTransactionalIds) { + // If checkpoint 1 succeeds, checkpoint 2 is aborted, and checkpoint 3 may reuse the id + // of checkpoint 1. A recovery of checkpoint 1 would show that the transaction has been + // fenced. + LOG.warn( + "Unable to commit transaction ({}) because its producer is already fenced." + + " If this warning appears as part of the recovery of a checkpoint, it is expected in some cases (e.g., aborted checkpoints in previous attempt)." + + " If it's outside of recovery, this means that you either have a different sink with the same '{}'" + + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," + + " please consult the Flink documentation for more details.", + request, + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, + kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), + e); + } else { + // initTransaction has been called on this transaction before + LOG.error( + "Unable to commit transaction ({}) because its producer is already fenced." + + " This means that you either have a different producer with the same '{}' (this is" + + " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)" + + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," + + " please consult the Flink documentation for more details.", + request, + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + KafkaSink.class.getSimpleName(), + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, + kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), + e); + } + } + private void handleFailedTransaction(FlinkKafkaInternalProducer<?, ?> producer) { if (producer == null) { return; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index 30002021..77ddc87d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -115,6 +115,7 @@ public class KafkaSink<IN> transactionalIdPrefix, context.getTaskInfo().getIndexOfThisSubtask(), context.getTaskInfo().getAttemptNumber(), + transactionNamingStrategy == TransactionNamingStrategy.POOLING, FlinkKafkaInternalProducer::new); } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java index 80d67b9d..f4190421 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -200,6 +201,12 @@ public class KafkaSinkBuilder<IN> { checkState( transactionalIdPrefix != null, "EXACTLY_ONCE delivery guarantee requires a transactionalIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster."); + if (transactionNamingStrategy.getImpl().requiresKnownTopics()) { + checkState( + recordSerializer instanceof KafkaDatasetFacetProvider, + "For %s naming strategy, the recordSerializer needs to expose the target topics though implementing KafkaDatasetFacetProvider.", + transactionNamingStrategy); + } } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java index 6c09bc92..6ebfb1fe 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java @@ -39,8 +39,24 @@ public enum TransactionNamingStrategy { * on the broker. * * <p>This is exactly the same behavior as in flink-connector-kafka 3.X. + * + * <p>Switching to this strategy from {@link #POOLING} is not supported. + */ + INCREMENTING(TransactionNamingStrategyImpl.INCREMENTING, TransactionAbortStrategyImpl.PROBING), + + /** + * This strategy reuses transaction names. It is more resource-friendly than {@link + * #INCREMENTING} on the Kafka broker. + * + * <p>It's a new strategy introduced in flink-connector-kafka 4.X. It requires Kafka 3.0+ and + * additional read permissions on the target topics. + * + * <p>The recommended way to switch to this strategy is to first take a checkpoint with + * flink-connector-kafka 4.X and then switch to this strategy. This will ensure that no + * transactions are left open from the previous run. Alternatively, you can use a savepoint from + * any version. */ - INCREMENTING(TransactionNamingStrategyImpl.INCREMENTING, TransactionAbortStrategyImpl.PROBING); + POOLING(TransactionNamingStrategyImpl.POOLING, TransactionAbortStrategyImpl.LISTING); /** * The default transaction naming strategy. Currently set to {@link #INCREMENTING}, which is the diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java index 2ac26c87..29103327 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java @@ -20,6 +20,9 @@ package org.apache.flink.connector.kafka.sink.internal; import org.apache.flink.annotation.Internal; +import java.util.Collection; +import java.util.Set; + import static org.apache.flink.util.Preconditions.checkNotNull; /** Implementation of {@link TransactionNamingStrategyImpl.Context}. */ @@ -27,6 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class TransactionNamingStrategyContextImpl implements TransactionNamingStrategyImpl.Context { private final String transactionalIdPrefix; private final int subtaskId; + private Set<String> ongoingTransactions; private final ProducerPool producerPool; private long lastCheckpointId; private long nextCheckpointId; @@ -63,6 +67,15 @@ public class TransactionNamingStrategyContextImpl implements TransactionNamingSt this.lastCheckpointId = lastCheckpointId; } + @Override + public Set<String> getOngoingTransactions() { + return ongoingTransactions; + } + + public void setOngoingTransactions(Collection<String> ongoingTransactions) { + this.ongoingTransactions = Set.copyOf(ongoingTransactions); + } + @Override public long getLastCheckpointId() { return lastCheckpointId; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java index 278ee9d9..a85f3074 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java @@ -21,12 +21,14 @@ package org.apache.flink.connector.kafka.sink.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; +import java.util.Set; + import static org.apache.flink.util.Preconditions.checkState; /** Implementation of {@link TransactionNamingStrategy}. */ @Internal public enum TransactionNamingStrategyImpl { - INCREMENTING(TransactionOwnership.IMPLICIT_BY_SUBTASK_ID) { + INCREMENTING(TransactionOwnership.IMPLICIT_BY_SUBTASK_ID, false) { /** * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new * transactions will not clash with transactions created during previous checkpoints ({@code @@ -55,12 +57,36 @@ public enum TransactionNamingStrategyImpl { } return context.getProducer(context.buildTransactionalId(expectedCheckpointId)); } + }, + POOLING(TransactionOwnership.EXPLICIT_BY_WRITER_STATE, true) { + @Override + public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer( + Context context) { + Set<String> usedTransactionalIds = context.getOngoingTransactions(); + for (int offset = 0; ; offset++) { + String transactionalIdCandidate = context.buildTransactionalId(offset); + if (usedTransactionalIds.contains(transactionalIdCandidate)) { + continue; + } + return context.getProducer(transactionalIdCandidate); + } + } }; private final TransactionOwnership ownership; + private final boolean requiresKnownTopics; - TransactionNamingStrategyImpl(TransactionOwnership ownership) { + TransactionNamingStrategyImpl(TransactionOwnership ownership, boolean requiresKnownTopics) { this.ownership = ownership; + this.requiresKnownTopics = requiresKnownTopics; + } + + public boolean requiresKnownTopics() { + return requiresKnownTopics; + } + + public TransactionOwnership getOwnership() { + return ownership; } /** @@ -70,16 +96,14 @@ public enum TransactionNamingStrategyImpl { public abstract FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer( Context context); - public TransactionOwnership getOwnership() { - return ownership; - } - /** Context for the transaction naming strategy. */ public interface Context { String buildTransactionalId(long offset); long getNextCheckpointId(); + Set<String> getOngoingTransactions(); + long getLastCheckpointId(); FlinkKafkaInternalProducer<byte[], byte[]> getProducer(String transactionalId); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java index 3208483f..bd9b6cce 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java @@ -44,11 +44,11 @@ public class TransactionalIdFactory { + checkpointOffset; } - public static long extractSubtaskId(String name) { + public static int extractSubtaskId(String name) { int lastSep = name.lastIndexOf("-"); int secondLastSep = name.lastIndexOf("-", lastSep - 1); String subtaskString = name.substring(secondLastSep + 1, lastSep); - return Long.parseLong(subtaskString); + return Integer.parseInt(subtaskString); } public static String extractPrefix(String name) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java index 83c5b560..66706f96 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java @@ -20,11 +20,13 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction; import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl; import org.apache.flink.connector.kafka.sink.internal.TransactionFinished; import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership; import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel; +import org.apache.flink.connector.kafka.util.AdminUtils; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -32,6 +34,7 @@ import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.TestLoggerExtension; import com.google.common.collect.Iterables; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.ProducerFencedException; @@ -42,11 +45,14 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.function.Consumer; +import static org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory.buildTransactionalId; import static org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; @@ -240,6 +246,38 @@ public class ExactlyOnceKafkaWriterITCase extends KafkaWriterTestBase { } } + /** Test that writer does not abort those transactions that are passed in as writer state. */ + @ParameterizedTest + @ValueSource(ints = {1, 2, 3}) + void shouldNotAbortPrecommittedTransactions(int numCheckpointed) throws Exception { + try (final KafkaWriter<Integer> failedWriter = + createWriter(DeliveryGuarantee.EXACTLY_ONCE)) { + + // create three precommitted transactions + List<KafkaWriterState> states = + Arrays.asList( + onCheckpointBarrier(failedWriter, 1).f0, // 1 transaction + onCheckpointBarrier(failedWriter, 2).f0, // 2 transactions + onCheckpointBarrier(failedWriter, 3).f0); // 3 transactions + + // assume a varying number of states that have been checkpointed + try (final ExactlyOnceKafkaWriter<Integer> recoveredWriter = + restoreWriter( + this::withPooling, + List.of(states.get(numCheckpointed - 1)), + createInitContext())) { + // test abort of recoveredWriter; this should abort all transactions that have + // not been in the part of the checkpoint + try (AdminClient admin = AdminClient.create(getKafkaClientConfiguration())) { + assertThat( + AdminUtils.getOpenTransactionsForTopics( + admin, Collections.singleton(topic))) + .hasSize(numCheckpointed); + } + } + } + } + /** Test that producers are reused when committed. */ @ParameterizedTest @ValueSource(booleans = {true, false}) @@ -339,11 +377,65 @@ public class ExactlyOnceKafkaWriterITCase extends KafkaWriterTestBase { } } + /** Test that producers are reused when committed. */ + @Test + void shouldSkipIdsOfCommitterForPooledTransactions() throws Exception { + String prefix = getTransactionalPrefix(); + CheckpointTransaction t1 = new CheckpointTransaction(buildTransactionalId(prefix, 0, 2), 2); + CheckpointTransaction t2 = new CheckpointTransaction(buildTransactionalId(prefix, 0, 4), 4); + final KafkaWriterState writerState = + new KafkaWriterState( + prefix, + 0, + 1, + TransactionOwnership.EXPLICIT_BY_WRITER_STATE, + Arrays.asList(t1, t2)); + + SinkInitContext initContext = createInitContext(); + int checkpointId = 9; + initContext.setRestoredCheckpointId(checkpointId); + try (final ExactlyOnceKafkaWriter<Integer> writer = + restoreWriter( + this::withPooling, + Collections.singletonList(writerState), + initContext); + WritableBackchannel<TransactionFinished> backchannel = getBackchannel(writer)) { + // offsets leave out the used 2 and 4 + for (int expectedOffset : new int[] {0, 1, 3, 5, 6}) { + assertThat(writer.getCurrentProducer().getTransactionalId()) + .isEqualTo(buildTransactionalId(prefix, 0, expectedOffset)); + writer.write(checkpointId, SINK_WRITER_CONTEXT); + writer.flush(false); + writer.prepareCommit(); + writer.snapshotState(++checkpointId); + } + + // free 4, which also frees 2; 2 is returned first and then 4 + backchannel.send(new TransactionFinished(t2.getTransactionalId(), true)); + writer.write(checkpointId, SINK_WRITER_CONTEXT); + writer.prepareCommit(); + writer.snapshotState(++checkpointId); + assertThat(writer.getCurrentProducer().getTransactionalId()) + .isEqualTo(t1.getTransactionalId()); + + writer.write(checkpointId, SINK_WRITER_CONTEXT); + writer.prepareCommit(); + writer.snapshotState(++checkpointId); + assertThat(writer.getCurrentProducer().getTransactionalId()) + .isEqualTo(t2.getTransactionalId()); + } + } + private static Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProducers( ExactlyOnceKafkaWriter<Integer> writer) { return ((ProducerPoolImpl) writer.getProducerPool()).getProducers(); } + private KafkaSinkBuilder<?> withPooling(KafkaSinkBuilder<?> builder) { + return builder.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .setTransactionNamingStrategy(TransactionNamingStrategy.POOLING); + } + private Tuple2<KafkaWriterState, KafkaCommittable> onCheckpointBarrier( KafkaWriter<Integer> failedWriter, int checkpointId) throws IOException, InterruptedException { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java index 7ddbdda6..c87bfd50 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java @@ -66,7 +66,8 @@ class KafkaCommitterTest { public void testRetryCommittableOnRetriableError() throws IOException, InterruptedException { Properties properties = getProperties(); try (final KafkaCommitter committer = - new KafkaCommitter(properties, TRANS_ID, SUB_ID, ATTEMPT, MOCK_FACTORY); + new KafkaCommitter( + properties, TRANS_ID, SUB_ID, ATTEMPT, false, MOCK_FACTORY); FlinkKafkaInternalProducer<Object, Object> producer = new FlinkKafkaInternalProducer<>(properties, TRANS_ID); ReadableBackchannel<TransactionFinished> backchannel = @@ -87,7 +88,8 @@ class KafkaCommitterTest { public void testFailJobOnUnknownFatalError() throws IOException, InterruptedException { Properties properties = getProperties(); try (final KafkaCommitter committer = - new KafkaCommitter(properties, TRANS_ID, SUB_ID, ATTEMPT, MOCK_FACTORY); + new KafkaCommitter( + properties, TRANS_ID, SUB_ID, ATTEMPT, false, MOCK_FACTORY); FlinkKafkaInternalProducer<Object, Object> producer = new FlinkKafkaInternalProducer<>(properties, TRANS_ID); ReadableBackchannel<TransactionFinished> backchannel = @@ -111,7 +113,8 @@ class KafkaCommitterTest { public void testFailJobOnKnownFatalError() throws IOException, InterruptedException { Properties properties = getProperties(); try (final KafkaCommitter committer = - new KafkaCommitter(properties, TRANS_ID, SUB_ID, ATTEMPT, MOCK_FACTORY); + new KafkaCommitter( + properties, TRANS_ID, SUB_ID, ATTEMPT, false, MOCK_FACTORY); FlinkKafkaInternalProducer<?, ?> producer = new MockProducer(properties, new ProducerFencedException("test")); ReadableBackchannel<TransactionFinished> backchannel = @@ -135,7 +138,8 @@ class KafkaCommitterTest { return new MockProducer(props, new ProducerFencedException("test")); }; try (final KafkaCommitter committer = - new KafkaCommitter(properties, TRANS_ID, SUB_ID, ATTEMPT, failingFactory); + new KafkaCommitter( + properties, TRANS_ID, SUB_ID, ATTEMPT, false, failingFactory); ReadableBackchannel<TransactionFinished> backchannel = BackchannelFactory.getInstance() .getReadableBackchannel(SUB_ID, ATTEMPT, TRANS_ID)) { @@ -164,7 +168,8 @@ class KafkaCommitterTest { Properties properties = getProperties(); try (FlinkKafkaInternalProducer<?, ?> producer = new MockProducer(properties, null); final KafkaCommitter committer = - new KafkaCommitter(properties, TRANS_ID, SUB_ID, ATTEMPT, MOCK_FACTORY); + new KafkaCommitter( + properties, TRANS_ID, SUB_ID, ATTEMPT, false, MOCK_FACTORY); ReadableBackchannel<TransactionFinished> backchannel = BackchannelFactory.getInstance() .getReadableBackchannel(SUB_ID, ATTEMPT, TRANS_ID)) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index b2c182e1..c91b9bbb 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.state.CheckpointListener; @@ -27,6 +28,8 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; @@ -35,6 +38,7 @@ import org.apache.flink.configuration.ExternalizedCheckpointRetention; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory; import org.apache.flink.connector.kafka.testutils.DockerImageVersions; import org.apache.flink.connector.kafka.testutils.KafkaUtil; @@ -47,7 +51,10 @@ import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -56,9 +63,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.test.junit5.InjectClusterClient; import org.apache.flink.test.junit5.InjectMiniCluster; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -72,6 +77,7 @@ import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -81,6 +87,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +104,7 @@ import javax.annotation.Nullable; import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -103,12 +113,14 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH; import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak; @@ -196,8 +208,18 @@ public class KafkaSinkITCase extends TestLogger { }; @TestContext - KafkaSinkExternalContextFactory sinkContext = - new KafkaSinkExternalContextFactory(kafka.getContainer(), Collections.emptyList()); + KafkaSinkExternalContextFactory incrementing = + new KafkaSinkExternalContextFactory( + kafka.getContainer(), + Collections.emptyList(), + TransactionNamingStrategy.INCREMENTING); + + @TestContext + KafkaSinkExternalContextFactory pooling = + new KafkaSinkExternalContextFactory( + kafka.getContainer(), + Collections.emptyList(), + TransactionNamingStrategy.POOLING); } @Test @@ -210,10 +232,19 @@ public class KafkaSinkITCase extends TestLogger { writeRecordsToKafka(DeliveryGuarantee.NONE); } - @ParameterizedTest(name = "chained={0}") - @ValueSource(booleans = {true, false}) - public void testWriteRecordsToKafkaWithExactlyOnceGuarantee(boolean chained) throws Exception { - writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, chained); + @ParameterizedTest(name = "{0}, chained={1}") + @MethodSource("getEOSParameters") + public void testWriteRecordsToKafkaWithExactlyOnceGuarantee( + TransactionNamingStrategy namingStrategy, boolean chained) throws Exception { + writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, namingStrategy, chained); + } + + static Stream<Arguments> getEOSParameters() { + return Arrays.stream(TransactionNamingStrategy.values()) + .flatMap( + strategy -> + Stream.of(true, false) + .map(chained -> Arguments.of(strategy, chained))); } @Test @@ -221,22 +252,24 @@ public class KafkaSinkITCase extends TestLogger { testRecoveryWithAssertion(DeliveryGuarantee.AT_LEAST_ONCE, 1); } - @ParameterizedTest(name = "chained={0}") - @ValueSource(booleans = {true, false}) - public void testRecoveryWithExactlyOnceGuarantee(boolean chained) throws Exception { - testRecoveryWithAssertion(DeliveryGuarantee.EXACTLY_ONCE, 1, chained); + @ParameterizedTest(name = "{0}, chained={1}") + @MethodSource("getEOSParameters") + public void testRecoveryWithExactlyOnceGuarantee( + TransactionNamingStrategy namingStrategy, boolean chained) throws Exception { + testRecoveryWithAssertion(DeliveryGuarantee.EXACTLY_ONCE, 1, namingStrategy, chained); } - @ParameterizedTest(name = "chained={0}") - @ValueSource(booleans = {true, false}) - public void testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints(boolean chained) - throws Exception { - testRecoveryWithAssertion(DeliveryGuarantee.EXACTLY_ONCE, 2, chained); + @ParameterizedTest(name = "{0}, chained={1}") + @MethodSource("getEOSParameters") + public void testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints( + TransactionNamingStrategy namingStrategy, boolean chained) throws Exception { + testRecoveryWithAssertion(DeliveryGuarantee.EXACTLY_ONCE, 2, namingStrategy, chained); } - @ParameterizedTest(name = "chained={0}") - @ValueSource(booleans = {true, false}) + @ParameterizedTest(name = "{0}, chained={1}") + @MethodSource("getEOSParameters") public void testAbortTransactionsOfPendingCheckpointsAfterFailure( + TransactionNamingStrategy namingStrategy, boolean chained, @TempDir File checkpointDir, @InjectMiniCluster MiniCluster miniCluster, @@ -260,6 +293,7 @@ public class KafkaSinkITCase extends TestLogger { new FailAsyncCheckpointMapper(1), checkpointedRecords, config, + namingStrategy, chained, "firstPrefix", clusterClient); @@ -279,6 +313,7 @@ public class KafkaSinkITCase extends TestLogger { new FailingCheckpointMapper(failed), checkpointedRecords, config, + namingStrategy, chained, "newPrefix", clusterClient); @@ -287,10 +322,13 @@ public class KafkaSinkITCase extends TestLogger { assertThat(committedRecords).containsExactlyInAnyOrderElementsOf(checkpointedRecords.get()); } - @ParameterizedTest(name = "chained={0}") - @ValueSource(booleans = {true, false}) + @ParameterizedTest(name = "{0}, chained={1}") + @MethodSource("getEOSParameters") public void testAbortTransactionsAfterScaleInBeforeFirstCheckpoint( - boolean chained, @InjectClusterClient ClusterClient<?> clusterClient) throws Exception { + TransactionNamingStrategy namingStrategy, + boolean chained, + @InjectClusterClient ClusterClient<?> clusterClient) + throws Exception { // Run a first job opening 5 transactions one per subtask and fail in async checkpoint phase try { SharedReference<Set<Long>> checkpointedRecords = @@ -300,6 +338,7 @@ public class KafkaSinkITCase extends TestLogger { new FailAsyncCheckpointMapper(0), checkpointedRecords, config, + namingStrategy, chained, null, clusterClient); @@ -318,6 +357,7 @@ public class KafkaSinkITCase extends TestLogger { new FailingCheckpointMapper(failed), checkpointedRecords, config, + namingStrategy, chained, null, clusterClient); @@ -326,6 +366,158 @@ public class KafkaSinkITCase extends TestLogger { assertThat(committedRecords).containsExactlyInAnyOrderElementsOf(checkpointedRecords.get()); } + @ParameterizedTest(name = "{0}->{1}") + @CsvSource({"1,2", "2,3", "2,5", "3,5", "5,6", "6,5", "5,2", "5,3", "3,2", "2,1"}) + public void rescaleListing( + int oldParallelism, + int newParallelsm, + @TempDir File checkpointDir, + @InjectMiniCluster MiniCluster miniCluster, + @InjectClusterClient ClusterClient<?> clusterClient) + throws Exception { + // Run a first job failing during the async phase of a checkpoint to leave some + // lingering transactions + final Configuration config = createConfiguration(oldParallelism); + config.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); + config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); + config.set( + CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); + config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2); + SharedReference<Set<Long>> checkpointedRecords = + sharedObjects.add(new ConcurrentSkipListSet<>()); + + JobID firstJobId = + executeWithMapper( + new FailAsyncCheckpointMapper(1), + checkpointedRecords, + config, + TransactionNamingStrategy.POOLING, + false, + "firstPrefix", + clusterClient); + + config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, firstJobId)); + config.set(CoreOptions.DEFAULT_PARALLELISM, newParallelsm); + + // Run a second job which aborts all lingering transactions and new consumer should + // immediately see the newly written records + JobID secondJobId = + executeWithMapper( + new FailAsyncCheckpointMapper(1), + checkpointedRecords, + config, + TransactionNamingStrategy.POOLING, + false, + "secondPrefix", + clusterClient); + + config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, secondJobId)); + config.set(CoreOptions.DEFAULT_PARALLELISM, oldParallelism); + + SharedReference<AtomicBoolean> failed = sharedObjects.add(new AtomicBoolean(true)); + executeWithMapper( + new FailingCheckpointMapper(failed), + checkpointedRecords, + config, + TransactionNamingStrategy.POOLING, + false, + "thirdPrefix", + clusterClient); + + final List<Long> committedRecords = + deserializeValues(drainAllRecordsFromTopic(topic, true)); + assertThat(committedRecords).containsExactlyInAnyOrderElementsOf(checkpointedRecords.get()); + } + + private String getCheckpointPath(MiniCluster miniCluster, JobID secondJobId) + throws InterruptedException, ExecutionException, FlinkJobNotFoundException { + final Optional<String> completedCheckpoint = + CommonTestUtils.getLatestCompletedCheckpointPath(secondJobId, miniCluster); + + assertThat(completedCheckpoint).isPresent(); + return completedCheckpoint.get(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void checkMigration( + boolean supportedMigration, + @TempDir File checkpointDir, + @InjectMiniCluster MiniCluster miniCluster, + @InjectClusterClient ClusterClient<?> clusterClient) + throws Exception { + // Run a first job failing during the async phase of a checkpoint to leave some + // lingering transactions + final Configuration config = createConfiguration(5); + config.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); + config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); + config.set( + CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); + config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2); + SharedReference<Set<Long>> checkpointedRecords = + sharedObjects.add(new ConcurrentSkipListSet<>()); + + JobID firstJobId = + executeWithMapper( + new FailAsyncCheckpointMapper(1), + checkpointedRecords, + config, + TransactionNamingStrategy.INCREMENTING, + true, + "firstPrefix", + clusterClient); + + // Run a second job which switching to POOLING + config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, firstJobId)); + config.set(CoreOptions.DEFAULT_PARALLELISM, 5); + JobID secondJobId2 = + executeWithMapper( + new FailAsyncCheckpointMapper(1), + checkpointedRecords, + config, + TransactionNamingStrategy.POOLING, + true, + "secondPrefix", + clusterClient); + + // Run a third job with downscaling + config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, secondJobId2)); + config.set(CoreOptions.DEFAULT_PARALLELISM, 3); + JobID thirdJobId = + executeWithMapper( + v -> v, + checkpointedRecords, + config, + supportedMigration + ? TransactionNamingStrategy.POOLING + : TransactionNamingStrategy.INCREMENTING, + true, + "thirdPrefix", + clusterClient); + + JobResult jobResult = clusterClient.requestJobResult(thirdJobId).get(); + assertThat(jobResult.getApplicationStatus()) + .isEqualTo( + supportedMigration + ? ApplicationStatus.SUCCEEDED + : ApplicationStatus.FAILED); + + if (supportedMigration) { + final List<Long> committedRecords = + deserializeValues(drainAllRecordsFromTopic(topic, true)); + assertThat(committedRecords) + .containsExactlyInAnyOrderElementsOf(checkpointedRecords.get()); + } else { + assertThat(jobResult.getSerializedThrowable()) + .get() + .asInstanceOf(InstanceOfAssertFactories.THROWABLE) + .rootCause() + .hasMessageContaining("Attempted to switch back to INCREMENTING"); + } + } + private static Configuration createConfiguration(int parallelism) { final Configuration config = new Configuration(); config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism); @@ -336,9 +528,10 @@ public class KafkaSinkITCase extends TestLogger { MapFunction<Long, Long> mapper, SharedReference<Set<Long>> checkpointedRecords, Configuration config, + TransactionNamingStrategy namingStrategy, boolean chained, @Nullable String transactionalIdPrefix, - @InjectClusterClient ClusterClient<?> clusterClient) + ClusterClient<?> clusterClient) throws Exception { config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable"); @@ -347,7 +540,7 @@ public class KafkaSinkITCase extends TestLogger { if (!chained) { env.disableOperatorChaining(); } - final DataStreamSource<Long> source = env.fromSequence(1, 10); + final DataStream<Long> source = createThrottlingSource(env); final DataStream<Long> stream = source.map(mapper).map(new RecordFetcher(checkpointedRecords)).uid("fetcher"); final KafkaSinkBuilder<Long> builder = @@ -358,7 +551,8 @@ public class KafkaSinkITCase extends TestLogger { KafkaRecordSerializationSchema.builder() .setTopic(topic) .setValueSerializationSchema(new RecordSerializer()) - .build()); + .build()) + .setTransactionNamingStrategy(namingStrategy); if (transactionalIdPrefix == null) { transactionalIdPrefix = "kafka-sink"; } @@ -372,11 +566,15 @@ public class KafkaSinkITCase extends TestLogger { private void testRecoveryWithAssertion( DeliveryGuarantee guarantee, int maxConcurrentCheckpoints) throws Exception { - testRecoveryWithAssertion(guarantee, maxConcurrentCheckpoints, true); + testRecoveryWithAssertion( + guarantee, maxConcurrentCheckpoints, TransactionNamingStrategy.DEFAULT, true); } private void testRecoveryWithAssertion( - DeliveryGuarantee guarantee, int maxConcurrentCheckpoints, boolean chained) + DeliveryGuarantee guarantee, + int maxConcurrentCheckpoints, + TransactionNamingStrategy namingStrategy, + boolean chained) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(createConfiguration(1)); @@ -385,7 +583,7 @@ public class KafkaSinkITCase extends TestLogger { } env.enableCheckpointing(300L); env.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints); - DataStreamSource<Long> source = env.fromSequence(1, 10); + final DataStream<Long> source = createThrottlingSource(env); SharedReference<Set<Long>> checkpointedRecords = sharedObjects.add(new ConcurrentSkipListSet<>()); DataStream<Long> stream = @@ -402,6 +600,7 @@ public class KafkaSinkITCase extends TestLogger { .setValueSerializationSchema(new RecordSerializer()) .build()) .setTransactionalIdPrefix("kafka-sink") + .setTransactionNamingStrategy(namingStrategy) .build()); env.execute(); @@ -419,10 +618,13 @@ public class KafkaSinkITCase extends TestLogger { } private void writeRecordsToKafka(DeliveryGuarantee deliveryGuarantee) throws Exception { - writeRecordsToKafka(deliveryGuarantee, true); + writeRecordsToKafka(deliveryGuarantee, TransactionNamingStrategy.DEFAULT, true); } - private void writeRecordsToKafka(DeliveryGuarantee deliveryGuarantee, boolean chained) + private void writeRecordsToKafka( + DeliveryGuarantee deliveryGuarantee, + TransactionNamingStrategy namingStrategy, + boolean chained) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(createConfiguration(1)); @@ -430,7 +632,7 @@ public class KafkaSinkITCase extends TestLogger { env.disableOperatorChaining(); } env.enableCheckpointing(100L); - final DataStream<Long> source = env.addSource(new InfiniteIntegerSource()); + final DataStream<Long> source = createThrottlingSource(env); SharedReference<Set<Long>> checkpointedRecords = sharedObjects.add(new ConcurrentSkipListSet<>()); source.map(new RecordFetcher(checkpointedRecords)) @@ -444,6 +646,7 @@ public class KafkaSinkITCase extends TestLogger { .setValueSerializationSchema(new RecordSerializer()) .build()) .setTransactionalIdPrefix("kafka-sink") + .setTransactionNamingStrategy(namingStrategy) .build()); env.execute(); @@ -454,6 +657,17 @@ public class KafkaSinkITCase extends TestLogger { assertThat(collectedRecords).containsExactlyInAnyOrderElementsOf(checkpointedRecords.get()); } + private DataStream<Long> createThrottlingSource(StreamExecutionEnvironment env) { + return env.fromSource( + new DataGeneratorSource<>( + value -> value, + 1000, + new ThrottleUntilFirstCheckpointStrategy(), + BasicTypeInfo.LONG_TYPE_INFO), + WatermarkStrategy.noWatermarks(), + "Generator Source"); + } + private static List<Long> deserializeValues(List<ConsumerRecord<byte[], byte[]>> records) { return records.stream() .map( @@ -582,7 +796,6 @@ public class KafkaSinkITCase extends TestLogger { @Override public Long map(Long value) throws Exception { - Thread.sleep(100); return value; } @@ -683,8 +896,6 @@ public class KafkaSinkITCase extends TestLogger { failed.get().set(true); throw new RuntimeException("Planned exception."); } - // Delay execution to ensure that at-least one checkpoint is triggered before finish - Thread.sleep(50); emittedBetweenCheckpoint++; return value; } @@ -696,39 +907,30 @@ public class KafkaSinkITCase extends TestLogger { } } - /** - * Exposes information about how man records have been emitted overall and finishes after - * receiving the checkpoint completed event. - */ - private static final class InfiniteIntegerSource - implements SourceFunction<Long>, CheckpointListener { - - private volatile boolean running = true; - private final AtomicInteger nextRecord = new AtomicInteger(); - - InfiniteIntegerSource() {} + private static class ThrottleUntilFirstCheckpointStrategy implements RateLimiterStrategy { + private final RateLimiterStrategy baseStrategy = RateLimiterStrategy.perCheckpoint(10); @Override - public void run(SourceContext<Long> ctx) throws Exception { - Object lock = ctx.getCheckpointLock(); - while (running) { - synchronized (lock) { - ctx.collect((long) nextRecord.getAndIncrement()); - Thread.sleep(1); + public RateLimiter createRateLimiter(int parallelism) { + RateLimiter baseLimiter = baseStrategy.createRateLimiter(parallelism); + + return new RateLimiter() { + int numCheckpointed; + + @Override + public CompletionStage<Void> acquire() { + if (numCheckpointed >= 2) { + return CompletableFuture.completedFuture(null); + } + return baseLimiter.acquire(); } - } - LOG.info("last emitted record {}", nextRecord.get() - 1); - } - - @Override - public void cancel() { - running = false; - } - @Override - public void notifyCheckpointComplete(long checkpointId) { - running = false; - LOG.info("notifyCheckpointCompleted {}", checkpointId); + @Override + public void notifyCheckpointComplete(long checkpointId) { + baseLimiter.notifyCheckpointComplete(checkpointId); + numCheckpointed++; + } + }; } } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java index 11de7167..1046ee3c 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java @@ -23,6 +23,10 @@ import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory; import org.apache.flink.connector.kafka.sink.internal.TransactionFinished; import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel; @@ -138,6 +142,10 @@ public abstract class KafkaWriterTestBase { return (T) createSink(sinkBuilderAdjuster).restoreWriter(initContext, recoveredState); } + public String getTransactionalPrefix() { + return TEST_PREFIX + writerIndex; + } + KafkaSink<Integer> createSink(Consumer<KafkaSinkBuilder<?>> sinkBuilderAdjuster) { KafkaSinkBuilder<Integer> builder = KafkaSink.<Integer>builder() @@ -180,6 +188,7 @@ public abstract class KafkaWriterTestBase { protected final SinkWriterMetricGroup metricGroup; protected final ProcessingTimeService timeService; @Nullable protected final Consumer<RecordMetadata> metadataConsumer; + private Long checkpointId; SinkInitContext( SinkWriterMetricGroup metricGroup, @@ -205,11 +214,6 @@ public abstract class KafkaWriterTestBase { return metricGroup; } - @Override - public OptionalLong getRestoredCheckpointId() { - return OptionalLong.empty(); - } - @Override public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() { @@ -220,11 +224,20 @@ public abstract class KafkaWriterTestBase { public <MetaT> Optional<Consumer<MetaT>> metadataConsumer() { return Optional.ofNullable((Consumer<MetaT>) metadataConsumer); } + + @Override + public OptionalLong getRestoredCheckpointId() { + return checkpointId == null ? OptionalLong.empty() : OptionalLong.of(checkpointId); + } + + public void setRestoredCheckpointId(long checkpointId) { + this.checkpointId = checkpointId; + } } /** mock recordSerializer for KafkaSink. */ protected static class DummyRecordSerializer - implements KafkaRecordSerializationSchema<Integer> { + implements KafkaRecordSerializationSchema<Integer>, KafkaDatasetFacetProvider { @Override public ProducerRecord<byte[], byte[]> serialize( Integer element, KafkaSinkContext context, Long timestamp) { @@ -235,6 +248,14 @@ public abstract class KafkaWriterTestBase { byte[] bytes = ByteBuffer.allocate(4).putInt(element).array(); return new ProducerRecord<>(topic, bytes, bytes); } + + @Override + public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() { + return Optional.of( + new DefaultKafkaDatasetFacet( + DefaultKafkaDatasetIdentifier.ofTopics( + Collections.singletonList(topic)))); + } } /** diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java index ddc0592f..d25aece8 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java @@ -83,7 +83,7 @@ class ProducerPoolImplITCase { } } - /** Tests direct recycling as used during abortion of transactions. */ + /** Tests direct recycling as used during abort of transactions. */ @Test void testRecycleProducer() throws Exception { try (ProducerPoolImpl producerPool = diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImplTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImplTest.java new file mode 100644 index 00000000..7fea2e00 --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImplTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; +import static org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl.LISTING; +import static org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory.extractSubtaskId; +import static org.assertj.core.api.Assertions.assertThat; + +class TransactionAbortStrategyImplTest { + + private static final String PREFIX = "prefix"; + + @Nested + class Listing { + @Test + void testDownscale() { + TestContext testContext = new TestContext(); + testContext.setSubtaskIdAndParallelism(0, 2); + testContext.setOwnedSubtaskIdsAndMaxParallelism(0, 1, 4); + + testContext.addPrecommittedTransactionalIds(0, 1L); + testContext.addPrecommittedTransactionalIds(1, 1L); + testContext.addOpenTransaction(2, 1L); // not owned + String t02 = testContext.addOpenTransaction(0, 2L); + String t12 = testContext.addOpenTransaction(1, 2L); + testContext.addOpenTransaction(2, 2L); // not owned + + LISTING.abortTransactions(testContext); + assertThat(testContext.getAbortedTransactions()).containsExactlyInAnyOrder(t02, t12); + } + + @Test + void testDownscaleWithUnsafeTransactionalIds() { + TestContext testContext = new TestContext(); + testContext.setSubtaskIdAndParallelism(1, 2); + testContext.setOwnedSubtaskIdsAndMaxParallelism(2, 3, 4); + + String t11 = testContext.addOpenTransaction(1, 1L); // not owned + testContext.addPrecommittedTransactionalIds(2, 1L); + testContext.addPrecommittedTransactionalIds(3, 1L); + String t12 = testContext.addOpenTransaction(1, 2L); // not owned + String t22 = testContext.addOpenTransaction(2, 2L); + String t32 = testContext.addOpenTransaction(3, 2L); + + LISTING.abortTransactions(testContext); + assertThat(testContext.getAbortedTransactions()).containsExactlyInAnyOrder(t22, t32); + } + + @Test + void testDownscaleWithIntermediateUpscale() { + TestContext testContext = new TestContext(); + testContext.setSubtaskIdAndParallelism(0, 2); + testContext.setOwnedSubtaskIdsAndMaxParallelism(0, 1, 4); + + testContext.addPrecommittedTransactionalIds(0, 1L); + testContext.addPrecommittedTransactionalIds(1, 1L); + String t02 = testContext.addOpenTransaction(0, 2L); + String t12 = testContext.addOpenTransaction(1, 2L); + String t52 = testContext.addOpenTransaction(5, 2L); + testContext.addOpenTransaction(6, 2L); // not owned + + LISTING.abortTransactions(testContext); + assertThat(testContext.getAbortedTransactions()) + .containsExactlyInAnyOrder(t02, t12, t52); + } + + @Test + void testUpscale() { + TestContext testContext = new TestContext(); + testContext.setSubtaskIdAndParallelism(0, 4); + testContext.setOwnedSubtaskIdsAndMaxParallelism(0, 2); + + testContext.addPrecommittedTransactionalIds(0, 1L); + testContext.addOpenTransaction(1, 1L); // not owned + String t02 = testContext.addOpenTransaction(0, 2L); + testContext.addOpenTransaction(1, 2L); // not owned + + LISTING.abortTransactions(testContext); + assertThat(testContext.getAbortedTransactions()).containsExactlyInAnyOrder(t02); + } + + @Test + void testUpscaleWithIntermediateUpscale() { + TestContext testContext = new TestContext(); + testContext.setSubtaskIdAndParallelism(0, 4); + testContext.setOwnedSubtaskIdsAndMaxParallelism(0, 2); + + testContext.addPrecommittedTransactionalIds(0, 1L); + String t02 = testContext.addOpenTransaction(0, 2L); + String t42 = testContext.addOpenTransaction(4, 2L); + testContext.addOpenTransaction(5, 2L); // not owned + + LISTING.abortTransactions(testContext); + assertThat(testContext.getAbortedTransactions()).containsExactlyInAnyOrder(t02, t42); + } + + @Test + void testUpscaleWithNoState() { + TestContext testContext = new TestContext(); + testContext.setSubtaskIdAndParallelism(3, 4); + testContext.setOwnedSubtaskIdsAndMaxParallelism(3, 4); + + String t31 = testContext.addOpenTransaction(3, 1L); + String t32 = testContext.addOpenTransaction(3, 2L); + + LISTING.abortTransactions(testContext); + assertThat(testContext.getAbortedTransactions()).containsExactlyInAnyOrder(t31, t32); + } + } + + static final class TestContext implements TransactionAbortStrategyImpl.Context { + private final Set<Integer> ownedSubtaskIds = new HashSet<>(); + private final Set<String> precommittedTransactionalIds = new HashSet<>(); + private final Collection<String> abortedTransactions = new ArrayList<>(); + private final Collection<String> openTransactionalIds = new ArrayList<>(); + private int currentSubtaskId; + private int currentParallelism; + private int maxParallelism; + private String prefix = PREFIX; + private final Set<String> prefixesToAbort = new HashSet<>(Set.of(prefix)); + private long startCheckpointId = INITIAL_CHECKPOINT_ID; + + public void setPrefix(String prefix) { + this.prefix = prefix; + this.prefixesToAbort.add(prefix); + } + + public Collection<String> getAbortedTransactions() { + return abortedTransactions; + } + + public String addPrecommittedTransactionalIds(int oldSubtaskId, long checkpointId) { + String transactionalId = addOpenTransaction(oldSubtaskId, checkpointId); + precommittedTransactionalIds.add(transactionalId); + return transactionalId; + } + + public String addOpenTransaction(int oldSubtaskId, long checkpointId) { + String transactionalId = getTransactionalId(oldSubtaskId, checkpointId); + openTransactionalIds.add(transactionalId); + return transactionalId; + } + + @Override + public int getCurrentSubtaskId() { + return this.currentSubtaskId; + } + + @Override + public int getCurrentParallelism() { + return this.currentParallelism; + } + + @Override + public boolean ownsTransactionalId(String transactionalId) { + return ownedSubtaskIds.contains(extractSubtaskId(transactionalId) % maxParallelism); + } + + @Override + public Set<String> getPrefixesToAbort() { + return this.prefixesToAbort; + } + + @Override + public Set<String> getPrecommittedTransactionalIds() { + return this.precommittedTransactionalIds; + } + + @Override + public long getStartCheckpointId() { + return this.startCheckpointId; + } + + public void setStartCheckpointId(long startCheckpointId) { + this.startCheckpointId = startCheckpointId; + } + + @Override + public TransactionAbortStrategyImpl.TransactionAborter getTransactionAborter() { + return transactionalId -> { + abortedTransactions.add(transactionalId); + return 0; + }; + } + + @Override + public Collection<String> getOpenTransactionsForTopics() { + return this.openTransactionalIds; + } + + public void setSubtaskIdAndParallelism(int newSubtaskId, int newParallelism) { + this.currentSubtaskId = newSubtaskId; + this.currentParallelism = newParallelism; + } + + public void setOwnedSubtaskIdsAndMaxParallelism(int... subtaskIdsOrParallelism) { + for (int index = 0; index < subtaskIdsOrParallelism.length; index++) { + if (index < subtaskIdsOrParallelism.length - 1) { + this.ownedSubtaskIds.add(subtaskIdsOrParallelism[index]); + } else { + this.maxParallelism = subtaskIdsOrParallelism[index]; + } + } + } + + private String getTransactionalId(int oldSubtaskId, long checkpointId) { + return TransactionalIdFactory.buildTransactionalId(prefix, oldSubtaskId, checkpointId); + } + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnershipTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnershipTest.java new file mode 100644 index 00000000..c63d5159 --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnershipTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.connector.kafka.sink.KafkaWriterState; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +class TransactionOwnershipTest { + private static final String PREFIX = "prefix"; + + @Nested + class Implicit { + TransactionOwnership ownership = TransactionOwnership.IMPLICIT_BY_SUBTASK_ID; + + @Test + void testNoRecoveredStates() { + assertThat(ownership.getOwnedSubtaskIds(2, 4, List.of())).containsExactlyInAnyOrder(2); + assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 4, List.of())).isEqualTo(4); + } + + @Test + void testOneRecoveredStateWithUnknownValues() { + KafkaWriterState state = + new KafkaWriterState(PREFIX, UNKNOWN, UNKNOWN, ownership, List.of()); + + assertThat(ownership.getOwnedSubtaskIds(2, 4, List.of(state))) + .containsExactlyInAnyOrder(2); + assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 4, List.of(state))).isEqualTo(4); + } + + @Test + void testOneRecoveredState() { + KafkaWriterState state = new KafkaWriterState(PREFIX, 1, 2, ownership, List.of()); + + assertThat(ownership.getOwnedSubtaskIds(2, 4, List.of(state))) + .containsExactlyInAnyOrder(2); + assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 4, List.of(state))).isEqualTo(4); + } + + @Test + void testOneRecoveredStateFromPoolingWithDownscaling() { + KafkaWriterState state = + new KafkaWriterState( + PREFIX, 1, 2, TransactionOwnership.EXPLICIT_BY_WRITER_STATE, List.of()); + + assertThatCode(() -> ownership.getOwnedSubtaskIds(2, 3, List.of(state))) + .hasMessageContaining( + "Attempted to switch the transaction naming strategy back to INCREMENTING"); + } + + @Test + void testTwoRecoveredStates() { + KafkaWriterState state1 = new KafkaWriterState(PREFIX, 1, 4, ownership, List.of()); + KafkaWriterState state2 = new KafkaWriterState(PREFIX, 3, 4, ownership, List.of()); + + assertThat(ownership.getOwnedSubtaskIds(2, 3, List.of(state1, state2))) + .containsExactlyInAnyOrder(2); + assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 3, List.of(state1, state2))) + .isEqualTo(3); + } + + @Test + void testTwoRecoveredStatesFromPooling() { + KafkaWriterState state1 = + new KafkaWriterState( + PREFIX, 1, 4, TransactionOwnership.EXPLICIT_BY_WRITER_STATE, List.of()); + KafkaWriterState state2 = + new KafkaWriterState( + PREFIX, 3, 4, TransactionOwnership.EXPLICIT_BY_WRITER_STATE, List.of()); + + assertThatCode(() -> ownership.getOwnedSubtaskIds(2, 3, List.of(state1, state2))) + .hasMessageContaining( + "Attempted to switch the transaction naming strategy back to INCREMENTING"); + } + } + + @Nested + class Explicit { + TransactionOwnership ownership = TransactionOwnership.EXPLICIT_BY_WRITER_STATE; + + @Test + void testNoRecoveredStates() { + assertThat(ownership.getOwnedSubtaskIds(2, 4, List.of())).containsExactlyInAnyOrder(2); + assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 4, List.of())).isEqualTo(4); + } + + @Test + void testOneRecoveredStateWithUnknownValues() { + KafkaWriterState state = + new KafkaWriterState(PREFIX, UNKNOWN, UNKNOWN, ownership, List.of()); + + assertThatCode(() -> ownership.getOwnedSubtaskIds(2, 4, List.of(state))) + .hasMessageContaining("migrate"); + assertThatCode(() -> ownership.getTotalNumberOfOwnedSubtasks(2, 4, List.of(state))) + .hasMessageContaining("migrate"); + } + + @Test + void testOneRecoveredState() { + KafkaWriterState state = new KafkaWriterState(PREFIX, 1, 4, ownership, List.of()); + + assertThat(ownership.getOwnedSubtaskIds(2, 3, List.of(state))) + .containsExactlyInAnyOrder(1); + assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 3, List.of(state))).isEqualTo(4); + } + + @Test + void testNonConsecutive() { + KafkaWriterState state = new KafkaWriterState(PREFIX, 1, 2, ownership, List.of()); + + assertThatCode(() -> ownership.getOwnedSubtaskIds(3, 4, List.of(state))) + .hasMessageContaining("State not consecutively assigned"); + assertThat(ownership.getTotalNumberOfOwnedSubtasks(3, 4, List.of(state))).isEqualTo(4); + } + + @Test + void testNonUniform() { + KafkaWriterState state1 = new KafkaWriterState(PREFIX, 1, 4, ownership, List.of()); + KafkaWriterState state2 = new KafkaWriterState(PREFIX, 3, 4, ownership, List.of()); + + assertThat(ownership.getOwnedSubtaskIds(3, 4, List.of(state1, state2))) + .containsExactlyInAnyOrder(1, 3); + assertThatCode( + () -> + ownership.getTotalNumberOfOwnedSubtasks( + 3, 4, List.of(state1, state2))) + .hasMessageContaining("Not uniformly assigned"); + } + + @Test + void testTwoRecoveredStates() { + KafkaWriterState state1 = new KafkaWriterState(PREFIX, 1, 4, ownership, List.of()); + KafkaWriterState state2 = new KafkaWriterState(PREFIX, 3, 4, ownership, List.of()); + + assertThat(ownership.getOwnedSubtaskIds(2, 3, List.of(state1, state2))) + .containsExactlyInAnyOrder(1, 3); + assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 3, List.of(state1, state2))) + .isEqualTo(4); + } + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java index ae260489..392e5aa5 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder; +import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext; import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings; @@ -76,15 +77,21 @@ public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext private final List<ExternalSystemDataReader<String>> readers = new ArrayList<>(); + private final TransactionNamingStrategy transactionNamingStrategy; + protected int numSplits = 0; private List<URL> connectorJarPaths; protected final AdminClient kafkaAdminClient; - public KafkaSinkExternalContext(String bootstrapServers, List<URL> connectorJarPaths) { + public KafkaSinkExternalContext( + String bootstrapServers, + List<URL> connectorJarPaths, + TransactionNamingStrategy transactionNamingStrategy) { this.bootstrapServers = bootstrapServers; this.connectorJarPaths = connectorJarPaths; + this.transactionNamingStrategy = transactionNamingStrategy; this.topicName = TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); kafkaAdminClient = createAdminClient(); @@ -136,6 +143,7 @@ public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext .setDeliveryGuarantee(toDeliveryGuarantee(sinkSettings.getCheckpointingMode())) .setTransactionalIdPrefix("testingFramework") .setKafkaProducerConfig(properties) + .setTransactionNamingStrategy(transactionNamingStrategy) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(topicName) @@ -235,7 +243,7 @@ public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext @Override public String toString() { - return "Single-topic Kafka"; + return transactionNamingStrategy.toString(); } @Override diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java index b7958548..bdebed03 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.sink.testutils; +import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.connector.testframe.external.ExternalContextFactory; import org.testcontainers.containers.KafkaContainer; @@ -32,10 +33,15 @@ public class KafkaSinkExternalContextFactory private final KafkaContainer kafkaContainer; private final List<URL> connectorJars; + private final TransactionNamingStrategy transactionNamingStrategy; - public KafkaSinkExternalContextFactory(KafkaContainer kafkaContainer, List<URL> connectorJars) { + public KafkaSinkExternalContextFactory( + KafkaContainer kafkaContainer, + List<URL> connectorJars, + TransactionNamingStrategy transactionNamingStrategy) { this.kafkaContainer = kafkaContainer; this.connectorJars = connectorJars; + this.transactionNamingStrategy = transactionNamingStrategy; } private String getBootstrapServer() { @@ -48,6 +54,7 @@ public class KafkaSinkExternalContextFactory @Override public KafkaSinkExternalContext createExternalContext(String testName) { - return new KafkaSinkExternalContext(getBootstrapServer(), connectorJars); + return new KafkaSinkExternalContext( + getBootstrapServer(), connectorJars, transactionNamingStrategy); } }