This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit a5df3c7c7f4925ff70114f862bca7588b819ac21 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Thu Mar 23 14:18:08 2023 -0700 [FLINK-31363] [kafka] Add hasDataInTransaction flag in FlinkKafkaInternalProducer This closes #15. --- .../kafka/sink/FlinkKafkaInternalProducer.java | 29 +++++++++++ .../flink/connector/kafka/sink/KafkaWriter.java | 11 +++- .../sink/FlinkKafkaInternalProducerITCase.java | 59 +++++++++++++++++++--- .../connector/kafka/sink/KafkaWriterITCase.java | 26 ++++++++++ 4 files changed, 118 insertions(+), 7 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java index a023cdd1..246fca65 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java @@ -17,8 +17,11 @@ package org.apache.flink.connector.kafka.sink; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.internals.TransactionManager; import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; import org.apache.kafka.common.errors.ProducerFencedException; @@ -33,6 +36,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.time.Duration; import java.util.Properties; +import java.util.concurrent.Future; import static org.apache.flink.util.Preconditions.checkState; @@ -49,6 +53,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { @Nullable private String transactionalId; private volatile boolean inTransaction; + private volatile boolean hasRecordsInTransaction; private volatile boolean closed; public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) { @@ -67,6 +72,14 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { return props; } + @Override + public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { + if (inTransaction) { + hasRecordsInTransaction = true; + } + return super.send(record, callback); + } + @Override public void flush() { super.flush(); @@ -86,6 +99,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { LOG.debug("abortTransaction {}", transactionalId); checkState(inTransaction, "Transaction was not started"); inTransaction = false; + hasRecordsInTransaction = false; super.abortTransaction(); } @@ -94,6 +108,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { LOG.debug("commitTransaction {}", transactionalId); checkState(inTransaction, "Transaction was not started"); inTransaction = false; + hasRecordsInTransaction = false; super.commitTransaction(); } @@ -101,6 +116,10 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { return inTransaction; } + public boolean hasRecordsInTransaction() { + return hasRecordsInTransaction; + } + @Override public void close() { closed = true; @@ -302,8 +321,18 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { transitionTransactionManagerStateTo(transactionManager, "READY"); transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION"); + + // the transactionStarted flag in the KafkaProducer controls whether + // an EndTxnRequest will actually be sent to Kafka for a commit + // or abort API call. This flag is set only after the first send (i.e. + // only if data is actually written to some partition). + // In checkpoints, we only ever store metadata of pre-committed + // transactions that actually have records; therefore, on restore + // when we create recovery producers to resume transactions and commit + // them, we should always set this flag. setField(transactionManager, "transactionStarted", true); this.inTransaction = true; + this.hasRecordsInTransaction = true; } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 0df3bcf8..48c52388 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -214,13 +214,22 @@ class KafkaWriter<IN> @Override public Collection<KafkaCommittable> prepareCommit() { - if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { + if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) { + return Collections.emptyList(); + } + + // only return a KafkaCommittable if the current transaction has been written some data + if (currentProducer.hasRecordsInTransaction()) { final List<KafkaCommittable> committables = Collections.singletonList( KafkaCommittable.of(currentProducer, producerPool::add)); LOG.debug("Committing {} committables.", committables); return committables; } + + // otherwise, we commit the empty transaction as is (no-op) and just recycle the producer + currentProducer.commitTransaction(); + producerPool.add(currentProducer); return Collections.emptyList(); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java index 51770f03..dd15ec22 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java @@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -63,16 +64,15 @@ class FlinkKafkaInternalProducerITCase { private static final KafkaContainer KAFKA_CONTAINER = createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper(); - private static final String TRANSACTION_PREFIX = "test-transaction-"; - @Test void testInitTransactionId() { final String topic = "test-init-transactions"; + final String transactionIdPrefix = "testInitTransactionId-"; try (FlinkKafkaInternalProducer<String, String> reuse = new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) { int numTransactions = 20; for (int i = 1; i <= numTransactions; i++) { - reuse.initTransactionId(TRANSACTION_PREFIX + i); + reuse.initTransactionId(transactionIdPrefix + i); reuse.beginTransaction(); reuse.send(new ProducerRecord<>(topic, "test-value-" + i)); if (i % 2 == 0) { @@ -81,12 +81,59 @@ class FlinkKafkaInternalProducerITCase { reuse.flush(); reuse.abortTransaction(); } - assertNumTransactions(i); + assertNumTransactions(i, transactionIdPrefix); assertThat(readRecords(topic).count()).isEqualTo(i / 2); } } } + @Test + void testCommitResumedTransaction() { + final String topic = "test-commit-resumed-transaction"; + final String transactionIdPrefix = "testCommitResumedTransaction-"; + final String transactionalId = transactionIdPrefix + "id"; + + KafkaCommittable snapshottedCommittable; + try (FlinkKafkaInternalProducer<String, String> producer = + new FlinkKafkaInternalProducer<>(getProperties(), transactionalId)) { + producer.initTransactions(); + producer.beginTransaction(); + producer.send(new ProducerRecord<>(topic, "test-value")); + producer.flush(); + snapshottedCommittable = KafkaCommittable.of(producer, ignored -> {}); + } + + try (FlinkKafkaInternalProducer<String, String> resumedProducer = + new FlinkKafkaInternalProducer<>(getProperties(), transactionalId)) { + resumedProducer.resumeTransaction( + snapshottedCommittable.getProducerId(), snapshottedCommittable.getEpoch()); + resumedProducer.commitTransaction(); + } + + assertNumTransactions(1, transactionIdPrefix); + assertThat(readRecords(topic).count()).isEqualTo(1); + } + + @Test + void testCommitResumedEmptyTransactionShouldFail() { + KafkaCommittable snapshottedCommittable; + try (FlinkKafkaInternalProducer<String, String> producer = + new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) { + producer.initTransactions(); + producer.beginTransaction(); + snapshottedCommittable = KafkaCommittable.of(producer, ignored -> {}); + } + + try (FlinkKafkaInternalProducer<String, String> resumedProducer = + new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) { + resumedProducer.resumeTransaction( + snapshottedCommittable.getProducerId(), snapshottedCommittable.getEpoch()); + + assertThatThrownBy(resumedProducer::commitTransaction) + .isInstanceOf(InvalidTxnStateException.class); + } + } + @ParameterizedTest @MethodSource("provideTransactionsFinalizer") void testResetInnerTransactionIfFinalizingTransactionFailed( @@ -131,10 +178,10 @@ class FlinkKafkaInternalProducerITCase { FlinkKafkaInternalProducer::abortTransaction); } - private void assertNumTransactions(int numTransactions) { + private void assertNumTransactions(int numTransactions, String transactionIdPrefix) { List<KafkaTransactionLog.TransactionRecord> transactions = new KafkaTransactionLog(getProperties()) - .getTransactions(id -> id.startsWith(TRANSACTION_PREFIX)); + .getTransactions(id -> id.startsWith(transactionIdPrefix)); assertThat( transactions.stream() .map(KafkaTransactionLog.TransactionRecord::getTransactionId) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index c9d226d1..c1b022dc 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -416,6 +416,7 @@ public class KafkaWriterITCase { getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { assertThat(writer.getProducerPool()).hasSize(0); + writer.write(1, SINK_WRITER_CONTEXT); writer.flush(false); Collection<KafkaCommittable> committables0 = writer.prepareCommit(); writer.snapshotState(1); @@ -435,6 +436,7 @@ public class KafkaWriterITCase { committable.getProducer().get().close(); assertThat(writer.getProducerPool()).hasSize(1); + writer.write(1, SINK_WRITER_CONTEXT); writer.flush(false); Collection<KafkaCommittable> committables1 = writer.prepareCommit(); writer.snapshotState(2); @@ -448,6 +450,30 @@ public class KafkaWriterITCase { } } + /** + * Tests that if a pre-commit attempt occurs on an empty transaction, the writer should not emit + * a KafkaCommittable, and instead immediately commit the empty transaction and recycle the + * producer. + */ + @Test + void prepareCommitForEmptyTransaction() throws Exception { + try (final KafkaWriter<Integer> writer = + createWriterWithConfiguration( + getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { + assertThat(writer.getProducerPool()).hasSize(0); + + // no data written to current transaction + writer.flush(false); + Collection<KafkaCommittable> emptyCommittables = writer.prepareCommit(); + + assertThat(emptyCommittables).hasSize(0); + assertThat(writer.getProducerPool()).hasSize(1); + final FlinkKafkaInternalProducer<?, ?> recycledProducer = + writer.getProducerPool().pop(); + assertThat(recycledProducer.isInTransaction()).isFalse(); + } + } + /** * Tests that open transactions are automatically aborted on close such that successive writes * succeed.
