This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit a5df3c7c7f4925ff70114f862bca7588b819ac21
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Thu Mar 23 14:18:08 2023 -0700

    [FLINK-31363] [kafka] Add hasDataInTransaction flag in 
FlinkKafkaInternalProducer
    
    This closes #15.
---
 .../kafka/sink/FlinkKafkaInternalProducer.java     | 29 +++++++++++
 .../flink/connector/kafka/sink/KafkaWriter.java    | 11 +++-
 .../sink/FlinkKafkaInternalProducerITCase.java     | 59 +++++++++++++++++++---
 .../connector/kafka/sink/KafkaWriterITCase.java    | 26 ++++++++++
 4 files changed, 118 insertions(+), 7 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
index a023cdd1..246fca65 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
@@ -17,8 +17,11 @@
 
 package org.apache.flink.connector.kafka.sink;
 
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.clients.producer.internals.TransactionManager;
 import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -33,6 +36,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -49,6 +53,7 @@ class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
 
     @Nullable private String transactionalId;
     private volatile boolean inTransaction;
+    private volatile boolean hasRecordsInTransaction;
     private volatile boolean closed;
 
     public FlinkKafkaInternalProducer(Properties properties, @Nullable String 
transactionalId) {
@@ -67,6 +72,14 @@ class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
         return props;
     }
 
+    @Override
+    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback) {
+        if (inTransaction) {
+            hasRecordsInTransaction = true;
+        }
+        return super.send(record, callback);
+    }
+
     @Override
     public void flush() {
         super.flush();
@@ -86,6 +99,7 @@ class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
         LOG.debug("abortTransaction {}", transactionalId);
         checkState(inTransaction, "Transaction was not started");
         inTransaction = false;
+        hasRecordsInTransaction = false;
         super.abortTransaction();
     }
 
@@ -94,6 +108,7 @@ class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
         LOG.debug("commitTransaction {}", transactionalId);
         checkState(inTransaction, "Transaction was not started");
         inTransaction = false;
+        hasRecordsInTransaction = false;
         super.commitTransaction();
     }
 
@@ -101,6 +116,10 @@ class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
         return inTransaction;
     }
 
+    public boolean hasRecordsInTransaction() {
+        return hasRecordsInTransaction;
+    }
+
     @Override
     public void close() {
         closed = true;
@@ -302,8 +321,18 @@ class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
             transitionTransactionManagerStateTo(transactionManager, "READY");
 
             transitionTransactionManagerStateTo(transactionManager, 
"IN_TRANSACTION");
+
+            // the transactionStarted flag in the KafkaProducer controls 
whether
+            // an EndTxnRequest will actually be sent to Kafka for a commit
+            // or abort API call. This flag is set only after the first send 
(i.e.
+            // only if data is actually written to some partition).
+            // In checkpoints, we only ever store metadata of pre-committed
+            // transactions that actually have records; therefore, on restore
+            // when we create recovery producers to resume transactions and 
commit
+            // them, we should always set this flag.
             setField(transactionManager, "transactionStarted", true);
             this.inTransaction = true;
+            this.hasRecordsInTransaction = true;
         }
     }
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 0df3bcf8..48c52388 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -214,13 +214,22 @@ class KafkaWriter<IN>
 
     @Override
     public Collection<KafkaCommittable> prepareCommit() {
-        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+        if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {
+            return Collections.emptyList();
+        }
+
+        // only return a KafkaCommittable if the current transaction has been 
written some data
+        if (currentProducer.hasRecordsInTransaction()) {
             final List<KafkaCommittable> committables =
                     Collections.singletonList(
                             KafkaCommittable.of(currentProducer, 
producerPool::add));
             LOG.debug("Committing {} committables.", committables);
             return committables;
         }
+
+        // otherwise, we commit the empty transaction as is (no-op) and just 
recycle the producer
+        currentProducer.commitTransaction();
+        producerPool.add(currentProducer);
         return Collections.emptyList();
     }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
index 51770f03..dd15ec22 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -63,16 +64,15 @@ class FlinkKafkaInternalProducerITCase {
     private static final KafkaContainer KAFKA_CONTAINER =
             createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();
 
-    private static final String TRANSACTION_PREFIX = "test-transaction-";
-
     @Test
     void testInitTransactionId() {
         final String topic = "test-init-transactions";
+        final String transactionIdPrefix = "testInitTransactionId-";
         try (FlinkKafkaInternalProducer<String, String> reuse =
                 new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
             int numTransactions = 20;
             for (int i = 1; i <= numTransactions; i++) {
-                reuse.initTransactionId(TRANSACTION_PREFIX + i);
+                reuse.initTransactionId(transactionIdPrefix + i);
                 reuse.beginTransaction();
                 reuse.send(new ProducerRecord<>(topic, "test-value-" + i));
                 if (i % 2 == 0) {
@@ -81,12 +81,59 @@ class FlinkKafkaInternalProducerITCase {
                     reuse.flush();
                     reuse.abortTransaction();
                 }
-                assertNumTransactions(i);
+                assertNumTransactions(i, transactionIdPrefix);
                 assertThat(readRecords(topic).count()).isEqualTo(i / 2);
             }
         }
     }
 
+    @Test
+    void testCommitResumedTransaction() {
+        final String topic = "test-commit-resumed-transaction";
+        final String transactionIdPrefix = "testCommitResumedTransaction-";
+        final String transactionalId = transactionIdPrefix + "id";
+
+        KafkaCommittable snapshottedCommittable;
+        try (FlinkKafkaInternalProducer<String, String> producer =
+                new FlinkKafkaInternalProducer<>(getProperties(), 
transactionalId)) {
+            producer.initTransactions();
+            producer.beginTransaction();
+            producer.send(new ProducerRecord<>(topic, "test-value"));
+            producer.flush();
+            snapshottedCommittable = KafkaCommittable.of(producer, ignored -> 
{});
+        }
+
+        try (FlinkKafkaInternalProducer<String, String> resumedProducer =
+                new FlinkKafkaInternalProducer<>(getProperties(), 
transactionalId)) {
+            resumedProducer.resumeTransaction(
+                    snapshottedCommittable.getProducerId(), 
snapshottedCommittable.getEpoch());
+            resumedProducer.commitTransaction();
+        }
+
+        assertNumTransactions(1, transactionIdPrefix);
+        assertThat(readRecords(topic).count()).isEqualTo(1);
+    }
+
+    @Test
+    void testCommitResumedEmptyTransactionShouldFail() {
+        KafkaCommittable snapshottedCommittable;
+        try (FlinkKafkaInternalProducer<String, String> producer =
+                new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
+            producer.initTransactions();
+            producer.beginTransaction();
+            snapshottedCommittable = KafkaCommittable.of(producer, ignored -> 
{});
+        }
+
+        try (FlinkKafkaInternalProducer<String, String> resumedProducer =
+                new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
+            resumedProducer.resumeTransaction(
+                    snapshottedCommittable.getProducerId(), 
snapshottedCommittable.getEpoch());
+
+            assertThatThrownBy(resumedProducer::commitTransaction)
+                    .isInstanceOf(InvalidTxnStateException.class);
+        }
+    }
+
     @ParameterizedTest
     @MethodSource("provideTransactionsFinalizer")
     void testResetInnerTransactionIfFinalizingTransactionFailed(
@@ -131,10 +178,10 @@ class FlinkKafkaInternalProducerITCase {
                 FlinkKafkaInternalProducer::abortTransaction);
     }
 
-    private void assertNumTransactions(int numTransactions) {
+    private void assertNumTransactions(int numTransactions, String 
transactionIdPrefix) {
         List<KafkaTransactionLog.TransactionRecord> transactions =
                 new KafkaTransactionLog(getProperties())
-                        .getTransactions(id -> 
id.startsWith(TRANSACTION_PREFIX));
+                        .getTransactions(id -> 
id.startsWith(transactionIdPrefix));
         assertThat(
                         transactions.stream()
                                 
.map(KafkaTransactionLog.TransactionRecord::getTransactionId)
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index c9d226d1..c1b022dc 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -416,6 +416,7 @@ public class KafkaWriterITCase {
                         getKafkaClientConfiguration(), 
DeliveryGuarantee.EXACTLY_ONCE)) {
             assertThat(writer.getProducerPool()).hasSize(0);
 
+            writer.write(1, SINK_WRITER_CONTEXT);
             writer.flush(false);
             Collection<KafkaCommittable> committables0 = 
writer.prepareCommit();
             writer.snapshotState(1);
@@ -435,6 +436,7 @@ public class KafkaWriterITCase {
             committable.getProducer().get().close();
             assertThat(writer.getProducerPool()).hasSize(1);
 
+            writer.write(1, SINK_WRITER_CONTEXT);
             writer.flush(false);
             Collection<KafkaCommittable> committables1 = 
writer.prepareCommit();
             writer.snapshotState(2);
@@ -448,6 +450,30 @@ public class KafkaWriterITCase {
         }
     }
 
+    /**
+     * Tests that if a pre-commit attempt occurs on an empty transaction, the 
writer should not emit
+     * a KafkaCommittable, and instead immediately commit the empty 
transaction and recycle the
+     * producer.
+     */
+    @Test
+    void prepareCommitForEmptyTransaction() throws Exception {
+        try (final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        getKafkaClientConfiguration(), 
DeliveryGuarantee.EXACTLY_ONCE)) {
+            assertThat(writer.getProducerPool()).hasSize(0);
+
+            // no data written to current transaction
+            writer.flush(false);
+            Collection<KafkaCommittable> emptyCommittables = 
writer.prepareCommit();
+
+            assertThat(emptyCommittables).hasSize(0);
+            assertThat(writer.getProducerPool()).hasSize(1);
+            final FlinkKafkaInternalProducer<?, ?> recycledProducer =
+                    writer.getProducerPool().pop();
+            assertThat(recycledProducer.isInTransaction()).isFalse();
+        }
+    }
+
     /**
      * Tests that open transactions are automatically aborted on close such 
that successive writes
      * succeed.

Reply via email to