This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 4b86ff9d00e KAFKA-18943: Update EosIntegrationTest for EOSv1 (#19312)
4b86ff9d00e is described below

commit 4b86ff9d00eaa1f93a99a60b6c321abbabbdb636
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 31 10:56:58 2025 -0700

    KAFKA-18943: Update EosIntegrationTest for EOSv1 (#19312)
    
    After cherry-picking
    
https://github.com/apache/kafka/commit/2181ddbb039ff688f5ff41784d943cb579f7575c,
    we realized that the newly added test does not cover EOSv1. This PR closes 
this testing gap.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../streams/integration/EosIntegrationTest.java    | 34 +++++++++++++++++-----
 1 file changed, 27 insertions(+), 7 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 584fcb4a25a..c4fc8c434e8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -73,7 +73,6 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
@@ -1001,12 +1000,25 @@ public class EosIntegrationTest {
     }
 
 
-    private final AtomicReference<String> transactionalProducerId = new 
AtomicReference<>();
+    private final AtomicReference<Map<String, String>> 
transactionalProducerIdEosV1 = new AtomicReference<>();
+    private final AtomicReference<String> transactionalProducerIdEosV2 = new 
AtomicReference<>();
 
     private class TestClientSupplier extends DefaultKafkaClientSupplier {
         @Override
         public Producer<byte[], byte[]> getProducer(final Map<String, Object> 
config) {
-            transactionalProducerId.compareAndSet(null, (String) 
config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+            final String transactionalId = (String) 
config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+
+            if (transactionalId.endsWith("-0_0") || 
transactionalId.endsWith("-0_1")) {
+                Map<String, String> transactionalIds = 
transactionalProducerIdEosV1.get();
+                if (transactionalIds == null) {
+                    transactionalIds = new HashMap<>();
+                    transactionalProducerIdEosV1.set(transactionalIds);
+                }
+
+                
transactionalIds.put(transactionalId.substring(transactionalId.length() - 3), 
transactionalId);
+            } else {
+                transactionalProducerIdEosV2.compareAndSet(null, 
transactionalId);
+            }
 
             return new KafkaProducer<>(config, new ByteArraySerializer(), new 
ByteArraySerializer());
         }
@@ -1015,8 +1027,12 @@ public class EosIntegrationTest {
     static final AtomicReference<TaskId> TASK_WITH_DATA = new 
AtomicReference<>();
     static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false);
 
-    @Test
-    public void 
shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() 
throws Exception {
+    @ParameterizedTest
+    @ValueSource(strings = {StreamsConfig.EXACTLY_ONCE, 
StreamsConfig.EXACTLY_ONCE_V2})
+    public void 
shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress(final 
String eosConfig) throws Exception {
+        TASK_WITH_DATA.set(null);
+        DID_REVOKE_IDLE_TASK.set(false);
+
         final AtomicBoolean requestCommit = new AtomicBoolean(false);
 
         final StreamsBuilder builder = new StreamsBuilder();
@@ -1041,7 +1057,7 @@ public class EosIntegrationTest {
             .to(SINGLE_PARTITION_OUTPUT_TOPIC);
 
         final Properties properties = new Properties();
-        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
+        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
         properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
Integer.MAX_VALUE);
         
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
 1);
@@ -1104,7 +1120,11 @@ public class EosIntegrationTest {
             final List<KeyValue<Long, Long>> inputDataTask0Fencing = 
Collections.singletonList(KeyValue.pair(4L, -3L));
 
             final Properties producerConfigs = new Properties();
-            
producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalProducerId.get());
+            if (eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) {
+                
producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalProducerIdEosV1.get().get(TASK_WITH_DATA.get().toString()));
+            } else {
+                
producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalProducerIdEosV2.get());
+            }
             IntegrationTestUtils.produceKeyValuesSynchronously(
                 MULTI_PARTITION_INPUT_TOPIC,
                 inputDataTask0Fencing,

Reply via email to