This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e33dec69c7c Add config validation to kafka read schema transform
(#30625)
e33dec69c7c is described below
commit e33dec69c7cfd01c0b827538e1dad8567e3ff95e
Author: Jeff Kinard <[email protected]>
AuthorDate: Thu Apr 11 19:23:26 2024 -0400
Add config validation to kafka read schema transform (#30625)
* Add config validation to kafka read schema transform
Signed-off-by: Jeffrey Kinard <[email protected]>
---
.../KafkaReadSchemaTransformConfiguration.java | 36 +++++++++++++---------
.../io/kafka/KafkaReadSchemaTransformProvider.java | 2 ++
.../KafkaReadSchemaTransformProviderTest.java | 4 +--
3 files changed, 26 insertions(+), 16 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
index d95c49894a2..13f5249a6c3 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.io.kafka;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
import com.google.auto.value.AutoValue;
import java.util.Map;
import java.util.Set;
@@ -46,11 +49,13 @@ public abstract class KafkaReadSchemaTransformConfiguration
{
public void validate() {
final String startOffset = this.getAutoOffsetResetConfig();
- assert startOffset == null ||
VALID_START_OFFSET_VALUES.contains(startOffset)
- : "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES;
+ checkArgument(
+ startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset),
+ "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES);
final String dataFormat = this.getFormat();
- assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
- : "Valid data formats are " + VALID_DATA_FORMATS;
+ checkArgument(
+ dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat),
+ "Valid data formats are " + VALID_DATA_FORMATS);
final String inputSchema = this.getSchema();
final String messageName = this.getMessageName();
@@ -59,20 +64,23 @@ public abstract class KafkaReadSchemaTransformConfiguration
{
final String confluentSchemaRegSubject =
this.getConfluentSchemaRegistrySubject();
if (confluentSchemaRegUrl != null) {
- assert confluentSchemaRegSubject != null
- : "To read from Kafka, a schema must be provided directly or though
Confluent "
- + "Schema Registry. Make sure you are providing one of these
parameters.";
+ checkNotNull(
+ confluentSchemaRegSubject,
+ "To read from Kafka, a schema must be provided directly or though
Confluent "
+ + "Schema Registry. Make sure you are providing one of these
parameters.");
} else if (dataFormat != null && dataFormat.equals("RAW")) {
- assert inputSchema == null : "To read from Kafka in RAW format, you
can't provide a schema.";
+ checkArgument(
+ inputSchema == null, "To read from Kafka in RAW format, you can't
provide a schema.");
} else if (dataFormat != null && dataFormat.equals("JSON")) {
- assert inputSchema != null : "To read from Kafka in JSON format, you
must provide a schema.";
+ checkNotNull(inputSchema, "To read from Kafka in JSON format, you must
provide a schema.");
} else if (dataFormat != null && dataFormat.equals("PROTO")) {
- assert messageName != null
- : "To read from Kafka in PROTO format, messageName must be
provided.";
- assert fileDescriptorPath != null || inputSchema != null
- : "To read from Kafka in PROTO format, fileDescriptorPath or schema
must be provided.";
+ checkNotNull(
+ messageName, "To read from Kafka in PROTO format, messageName must
be provided.");
+ checkArgument(
+ fileDescriptorPath != null || inputSchema != null,
+ "To read from Kafka in PROTO format, fileDescriptorPath or schema
must be provided.");
} else {
- assert inputSchema != null : "To read from Kafka in AVRO format, you
must provide a schema.";
+ checkNotNull(inputSchema, "To read from Kafka in AVRO format, you must
provide a schema.");
}
}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 10a347929ee..2776c388f7c 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -99,6 +99,8 @@ public class KafkaReadSchemaTransformProvider
})
@Override
protected SchemaTransform from(KafkaReadSchemaTransformConfiguration
configuration) {
+ configuration.validate();
+
final String inputSchema = configuration.getSchema();
final int groupId = configuration.hashCode() % Integer.MAX_VALUE;
final String autoOffsetReset =
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index 4f133746b53..f6e231c758a 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -65,7 +65,7 @@ public class KafkaReadSchemaTransformProviderTest {
@Test
public void testValidConfigurations() {
assertThrows(
- AssertionError.class,
+ IllegalArgumentException.class,
() -> {
KafkaReadSchemaTransformConfiguration.builder()
.setFormat("UNUSUAL_FORMAT")
@@ -274,7 +274,7 @@ public class KafkaReadSchemaTransformProviderTest {
(KafkaReadSchemaTransformProvider) providers.get(0);
assertThrows(
- NullPointerException.class,
+ IllegalArgumentException.class,
() ->
kafkaProvider.from(
KafkaReadSchemaTransformConfiguration.builder()