This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 781f534 KAFKA-9024: Better error message when field specified does
not exist (#7819)
781f534 is described below
commit 781f53496f9a20fde46e19671a69eae5ac45e8fe
Author: Nigel Liang <[email protected]>
AuthorDate: Tue Jan 21 12:25:35 2020 -0800
KAFKA-9024: Better error message when field specified does not exist (#7819)
Author: Nigel Liang <[email protected]>
Reviewer: Randall Hauch <[email protected]>
---
.../apache/kafka/connect/transforms/ValueToKey.java | 9 +++++++--
.../kafka/connect/transforms/ValueToKeyTest.java | 18 ++++++++++++++++++
2 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
index a9d9601..b2226d3 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
@@ -21,9 +21,11 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -82,8 +84,11 @@ public class ValueToKey<R extends ConnectRecord<R>>
implements Transformation<R>
if (keySchema == null) {
final SchemaBuilder keySchemaBuilder = SchemaBuilder.struct();
for (String field : fields) {
- final Schema fieldSchema =
value.schema().field(field).schema();
- keySchemaBuilder.field(field, fieldSchema);
+ final Field fieldFromValue = value.schema().field(field);
+ if (fieldFromValue == null) {
+ throw new DataException("Field does not exist: " + field);
+ }
+ keySchemaBuilder.field(field, fieldFromValue.schema());
}
keySchema = keySchemaBuilder.build();
valueToKeySchemaCache.put(value.schema(), keySchema);
diff --git
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
index e2dfa17..5854658 100644
---
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
+++
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Test;
@@ -28,6 +29,7 @@ import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
public class ValueToKeyTest {
private final ValueToKey<SinkRecord> xform = new ValueToKey<>();
@@ -88,4 +90,20 @@ public class ValueToKeyTest {
assertEquals(expectedKey, transformedRecord.key());
}
+ @Test
+ public void nonExistingField() {
+ xform.configure(Collections.singletonMap("fields", "not_exist"));
+
+ final Schema valueSchema = SchemaBuilder.struct()
+ .field("a", Schema.INT32_SCHEMA)
+ .build();
+
+ final Struct value = new Struct(valueSchema);
+ value.put("a", 1);
+
+ final SinkRecord record = new SinkRecord("", 0, null, null,
valueSchema, value, 0);
+
+ DataException actual = assertThrows(DataException.class, () ->
xform.apply(record));
+ assertEquals("Field does not exist: not_exist", actual.getMessage());
+ }
}