This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 48d60efe980 KAFKA-17990: Deflake testShareAutoOffsetResetDefaultValue 
(#17916)
48d60efe980 is described below

commit 48d60efe9802786451458aec9136b7b585fecd92
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Nov 26 04:48:29 2024 +0000

    KAFKA-17990: Deflake testShareAutoOffsetResetDefaultValue (#17916)
    
    ShareConsumerTest.testShareAutoOffsetResetDefaultValue has been tightened 
up by making sure that records produced have been flushed before starting 
consumption. A possible but unlikely race condition seems the source of the 
flakiness and this should now be eliminated in the previous PR to this test 
case.
    
    Reviewers: Manikumar Reddy <[email protected]>
---
 .../java/kafka/test/api/ShareConsumerTest.java     | 47 +++++++++++-----------
 1 file changed, 23 insertions(+), 24 deletions(-)

diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index fca9cffca86..c3e97d16d24 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -293,7 +293,7 @@ public class ShareConsumerTest {
             ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp2.topic(), tp2.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record2).get();
             producer.flush();
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
 
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
@@ -328,7 +328,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
@@ -356,7 +356,7 @@ public class ShareConsumerTest {
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
             producer.flush();
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
@@ -388,7 +388,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
@@ -404,12 +404,12 @@ public class ShareConsumerTest {
         }
     }
 
-    private static class TestableAcknowledgeCommitCallback implements 
AcknowledgementCommitCallback {
+    private static class TestableAcknowledgementCommitCallback implements 
AcknowledgementCommitCallback {
         private final Map<TopicPartition, Set<Long>> partitionOffsetsMap;
         private final Map<TopicPartition, Exception> partitionExceptionMap;
 
-        public TestableAcknowledgeCommitCallback(Map<TopicPartition, 
Set<Long>> partitionOffsetsMap,
-                                                 Map<TopicPartition, 
Exception> partitionExceptionMap) {
+        public TestableAcknowledgementCommitCallback(Map<TopicPartition, 
Set<Long>> partitionOffsetsMap,
+                                                     Map<TopicPartition, 
Exception> partitionExceptionMap) {
             this.partitionOffsetsMap = partitionOffsetsMap;
             this.partitionExceptionMap = partitionExceptionMap;
         }
@@ -623,7 +623,7 @@ public class ShareConsumerTest {
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
-            shareConsumer1.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap1, 
partitionExceptionMap1));
+            shareConsumer1.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap1, 
partitionExceptionMap1));
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer1.poll(Duration.ofMillis(5000));
             assertEquals(3, records.count());
@@ -678,7 +678,7 @@ public class ShareConsumerTest {
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap = new 
HashMap<>();
-            shareConsumer1.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+            shareConsumer1.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer1.poll(Duration.ofMillis(5000));
             assertEquals(3, records.count());
@@ -870,7 +870,7 @@ public class ShareConsumerTest {
             Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
 
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap1, 
partitionExceptionMap1));
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap1, 
partitionExceptionMap1));
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(3, records.count());
@@ -1328,11 +1328,11 @@ public class ShareConsumerTest {
 
     /**
      * Test to verify that the acknowledgement commit callback cannot invoke 
methods of KafkaShareConsumer.
-     * The exception thrown is verified in {@link 
TestableAcknowledgeCommitCallbackWithShareConsumer}
+     * The exception thrown is verified in {@link 
TestableAcknowledgementCommitCallbackWithShareConsumer}
      */
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void 
testAcknowledgeCommitCallbackCallsShareConsumerDisallowed(String persister) {
+    public void 
testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String persister) 
{
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -1341,7 +1341,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallbackWithShareConsumer<>(shareConsumer));
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer));
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
             // The acknowledgment commit callback will try to call a method of 
KafkaShareConsumer
@@ -1353,10 +1353,10 @@ public class ShareConsumerTest {
         }
     }
 
-    private class TestableAcknowledgeCommitCallbackWithShareConsumer<K, V> 
implements AcknowledgementCommitCallback {
+    private class TestableAcknowledgementCommitCallbackWithShareConsumer<K, V> 
implements AcknowledgementCommitCallback {
         private final KafkaShareConsumer<K, V> shareConsumer;
 
-        
TestableAcknowledgeCommitCallbackWithShareConsumer(KafkaShareConsumer<K, V> 
shareConsumer) {
+        
TestableAcknowledgementCommitCallbackWithShareConsumer(KafkaShareConsumer<K, V> 
shareConsumer) {
             this.shareConsumer = shareConsumer;
         }
 
@@ -1376,7 +1376,7 @@ public class ShareConsumerTest {
     @Flaky("KAFKA-18033")
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String 
persister) throws InterruptedException {
+    public void 
testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String persister) 
throws InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -1386,7 +1386,7 @@ public class ShareConsumerTest {
             producer.flush();
 
             // The acknowledgment commit callback will try to call a method of 
KafkaShareConsumer
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer));
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer));
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
@@ -1409,10 +1409,10 @@ public class ShareConsumerTest {
         }
     }
 
-    private static class TestableAcknowledgeCommitCallbackWakeup<K, V> 
implements AcknowledgementCommitCallback {
+    private static class TestableAcknowledgementCommitCallbackWakeup<K, V> 
implements AcknowledgementCommitCallback {
         private final KafkaShareConsumer<K, V> shareConsumer;
 
-        TestableAcknowledgeCommitCallbackWakeup(KafkaShareConsumer<K, V> 
shareConsumer) {
+        TestableAcknowledgementCommitCallbackWakeup(KafkaShareConsumer<K, V> 
shareConsumer) {
             this.shareConsumer = shareConsumer;
         }
 
@@ -1429,7 +1429,7 @@ public class ShareConsumerTest {
     @Flaky("KAFKA-18033")
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testAcknowledgeCommitCallbackThrowsException(String persister) 
throws InterruptedException {
+    public void testAcknowledgementCommitCallbackThrowsException(String 
persister) throws InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -1438,7 +1438,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallbackThrows<>());
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallbackThrows<>());
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
@@ -1456,10 +1456,10 @@ public class ShareConsumerTest {
         }
     }
 
-    private static class TestableAcknowledgeCommitCallbackThrows<K, V> 
implements AcknowledgementCommitCallback {
+    private static class TestableAcknowledgementCommitCallbackThrows<K, V> 
implements AcknowledgementCommitCallback {
         @Override
         public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, 
Exception exception) {
-            throw new 
org.apache.kafka.common.errors.OutOfOrderSequenceException("Exception thrown in 
TestableAcknowledgeCommitCallbackThrows.onComplete");
+            throw new 
org.apache.kafka.common.errors.OutOfOrderSequenceException("Exception thrown in 
TestableAcknowledgementCommitCallbackThrows.onComplete");
         }
     }
 
@@ -1673,7 +1673,6 @@ public class ShareConsumerTest {
         }
     }
 
-    @Flaky("KAFKA-18033")
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
     public void testShareAutoOffsetResetDefaultValue(String persister) {

Reply via email to