This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e33dec69c7c Add config validation to kafka read schema transform 
(#30625)
e33dec69c7c is described below

commit e33dec69c7cfd01c0b827538e1dad8567e3ff95e
Author: Jeff Kinard <[email protected]>
AuthorDate: Thu Apr 11 19:23:26 2024 -0400

    Add config validation to kafka read schema transform (#30625)
    
    * Add config validation to kafka read schema transform
    
    Signed-off-by: Jeffrey Kinard <[email protected]>
---
 .../KafkaReadSchemaTransformConfiguration.java     | 36 +++++++++++++---------
 .../io/kafka/KafkaReadSchemaTransformProvider.java |  2 ++
 .../KafkaReadSchemaTransformProviderTest.java      |  4 +--
 3 files changed, 26 insertions(+), 16 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
index d95c49894a2..13f5249a6c3 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.auto.value.AutoValue;
 import java.util.Map;
 import java.util.Set;
@@ -46,11 +49,13 @@ public abstract class KafkaReadSchemaTransformConfiguration 
{
 
   public void validate() {
     final String startOffset = this.getAutoOffsetResetConfig();
-    assert startOffset == null || 
VALID_START_OFFSET_VALUES.contains(startOffset)
-        : "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES;
+    checkArgument(
+        startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset),
+        "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES);
     final String dataFormat = this.getFormat();
-    assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
-        : "Valid data formats are " + VALID_DATA_FORMATS;
+    checkArgument(
+        dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat),
+        "Valid data formats are " + VALID_DATA_FORMATS);
 
     final String inputSchema = this.getSchema();
     final String messageName = this.getMessageName();
@@ -59,20 +64,23 @@ public abstract class KafkaReadSchemaTransformConfiguration 
{
     final String confluentSchemaRegSubject = 
this.getConfluentSchemaRegistrySubject();
 
     if (confluentSchemaRegUrl != null) {
-      assert confluentSchemaRegSubject != null
-          : "To read from Kafka, a schema must be provided directly or though 
Confluent "
-              + "Schema Registry. Make sure you are providing one of these 
parameters.";
+      checkNotNull(
+          confluentSchemaRegSubject,
+          "To read from Kafka, a schema must be provided directly or though 
Confluent "
+              + "Schema Registry. Make sure you are providing one of these 
parameters.");
     } else if (dataFormat != null && dataFormat.equals("RAW")) {
-      assert inputSchema == null : "To read from Kafka in RAW format, you 
can't provide a schema.";
+      checkArgument(
+          inputSchema == null, "To read from Kafka in RAW format, you can't 
provide a schema.");
     } else if (dataFormat != null && dataFormat.equals("JSON")) {
-      assert inputSchema != null : "To read from Kafka in JSON format, you 
must provide a schema.";
+      checkNotNull(inputSchema, "To read from Kafka in JSON format, you must 
provide a schema.");
     } else if (dataFormat != null && dataFormat.equals("PROTO")) {
-      assert messageName != null
-          : "To read from Kafka in PROTO format, messageName must be 
provided.";
-      assert fileDescriptorPath != null || inputSchema != null
-          : "To read from Kafka in PROTO format, fileDescriptorPath or schema 
must be provided.";
+      checkNotNull(
+          messageName, "To read from Kafka in PROTO format, messageName must 
be provided.");
+      checkArgument(
+          fileDescriptorPath != null || inputSchema != null,
+          "To read from Kafka in PROTO format, fileDescriptorPath or schema 
must be provided.");
     } else {
-      assert inputSchema != null : "To read from Kafka in AVRO format, you 
must provide a schema.";
+      checkNotNull(inputSchema, "To read from Kafka in AVRO format, you must 
provide a schema.");
     }
   }
 
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 10a347929ee..2776c388f7c 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -99,6 +99,8 @@ public class KafkaReadSchemaTransformProvider
   })
   @Override
   protected SchemaTransform from(KafkaReadSchemaTransformConfiguration 
configuration) {
+    configuration.validate();
+
     final String inputSchema = configuration.getSchema();
     final int groupId = configuration.hashCode() % Integer.MAX_VALUE;
     final String autoOffsetReset =
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index 4f133746b53..f6e231c758a 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -65,7 +65,7 @@ public class KafkaReadSchemaTransformProviderTest {
   @Test
   public void testValidConfigurations() {
     assertThrows(
-        AssertionError.class,
+        IllegalArgumentException.class,
         () -> {
           KafkaReadSchemaTransformConfiguration.builder()
               .setFormat("UNUSUAL_FORMAT")
@@ -274,7 +274,7 @@ public class KafkaReadSchemaTransformProviderTest {
         (KafkaReadSchemaTransformProvider) providers.get(0);
 
     assertThrows(
-        NullPointerException.class,
+        IllegalArgumentException.class,
         () ->
             kafkaProvider.from(
                 KafkaReadSchemaTransformConfiguration.builder()

Reply via email to