This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c6a4aae336 [fix][test] Fixed Nondeterministic Assertions in 
KafkaAbstractSinkTest (#24877)
7c6a4aae336 is described below

commit 7c6a4aae336a631e0b88998cf54c656c2f193ba4
Author: Lucas Eby <[email protected]>
AuthorDate: Mon Oct 27 04:09:14 2025 -0500

    [fix][test] Fixed Nondeterministic Assertions in KafkaAbstractSinkTest 
(#24877)
---
 .../io/kafka/sink/KafkaAbstractSinkTest.java       | 88 +++++++++++++++++-----
 1 file changed, 68 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index c0c0a9a7172..f8c5b1df4c4 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -26,9 +26,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -65,7 +68,7 @@ public class KafkaAbstractSinkTest {
         } catch (Throwable e) {
             if (expectedType.isInstance(e)) {
                 T ex = expectedType.cast(e);
-                assertEquals(expectedMessage, ex.getMessage());
+                assertEquals(ex.getMessage(), expectedMessage);
                 return;
             }
             throw new AssertionError("Unexpected exception type, expected " + 
expectedType.getSimpleName()
@@ -74,10 +77,24 @@ public class KafkaAbstractSinkTest {
         throw new AssertionError("Expected exception");
     }
 
+    /**
+     * Creates a valid Kafka Sink configuration that is used by multiple test 
cases.
+     *
+     * @return a map containing all required Kafka sink configuration fields
+     */
+    private static Map<String, Object> validConfig() {
+        Map<String, Object> map = new HashMap<>();
+        map.put("bootstrapServers", "localhost:6667");
+        map.put("acks", "1");
+        map.put("topic", "topic_2");
+        map.put("batchSize", "16384");
+        map.put("maxRequestSize", "1048576");
+        return map;
+    }
+
     @Test
     public void testInvalidConfigWillThrownException() throws Exception {
         KafkaAbstractSink<String, byte[]> sink = new DummySink();
-        Map<String, Object> config = new HashMap<>();
         SinkContext sc = new SinkContext() {
             @Override
             public int getInstanceId() {
@@ -189,7 +206,7 @@ public class KafkaAbstractSinkTest {
 
             }
         };
-        ThrowingRunnable openAndClose = ()->{
+        Function<Map<String, Object>, ThrowingRunnable> runWith = config -> () 
-> {
             try {
                 sink.open(config, sc);
                 fail();
@@ -197,23 +214,54 @@ public class KafkaAbstractSinkTest {
                 sink.close();
             }
         };
-        expectThrows(IllegalArgumentException.class, "bootstrapServers cannot 
be null", openAndClose);
-        config.put("bootstrapServers", "localhost:6667");
-        expectThrows(IllegalArgumentException.class, "acks cannot be null", 
openAndClose);
-        config.put("acks", "1");
-        expectThrows(IllegalArgumentException.class, "topic cannot be null", 
openAndClose);
-        config.put("topic", "topic_2");
-        config.put("batchSize", "-1");
-        expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer 
batchSize : -1", openAndClose);
-        config.put("batchSize", "16384");
-        config.put("maxRequestSize", "-1");
-        expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer 
maxRequestSize : -1", openAndClose);
-        config.put("maxRequestSize", "1048576");
-        config.put("acks", "none");
-        expectThrows(ConfigException.class,
-                "Invalid value none for configuration acks: String must be one 
of: all, -1, 0, 1",
-                openAndClose);
-        config.put("acks", "1");
+
+        // Table of test cases for key removal and modification tests
+        record Case(
+                Consumer<Map<String, Object>> mutate,
+                Class<? extends Exception> expectedType,
+                String expectedMessage
+        ) {}
+
+        List<Case> cases = List.of(
+                // Missing bootstrapServers
+                new Case(config -> config.remove("bootstrapServers"),
+                        IllegalArgumentException.class,
+                        "bootstrapServers cannot be null"),
+
+                // Missing acks
+                new Case(config -> config.remove("acks"),
+                        IllegalArgumentException.class,
+                        "acks cannot be null"),
+
+                // Missing topic
+                new Case(config -> config.remove("topic"),
+                        IllegalArgumentException.class,
+                        "topic cannot be null"),
+
+                // Bad batchSize
+                new Case(config -> config.put("batchSize", "-1"),
+                        IllegalArgumentException.class,
+                        "Invalid Kafka Producer batchSize : -1"),
+
+                // Bad maxRequestSize
+                new Case(config -> config.put("maxRequestSize", "-1"),
+                        IllegalArgumentException.class,
+                        "Invalid Kafka Producer maxRequestSize : -1"),
+
+                // Invalid acks value
+                new Case(config -> config.put("acks", "none"),
+                        ConfigException.class,
+                        "Invalid value none for configuration acks: String 
must be one of: all, -1, 0, 1")
+        );
+
+        for (Case currCase : cases) {
+            var config = validConfig(); // set fresh, valid, baseline each time
+            currCase.mutate.accept(config); // remove or change one field
+            expectThrows(currCase.expectedType, currCase.expectedMessage, 
runWith.apply(config));
+        }
+
+        // Finally verify a valid config passes cleanly
+        var config = validConfig();
         sink.open(config, sc);
         sink.close();
     }

Reply via email to