Repository: kafka Updated Branches: refs/heads/trunk f812a8fd9 -> 75e213e55
KAFKA-4855: Struct SchemaBuilder should not allow duplicate fields ewencp can you please review. Author: Balint Molnar <[email protected]> Reviewers: Gwen Shapira <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #2732 from baluchicken/KAFKA-4855 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75e213e5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75e213e5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75e213e5 Branch: refs/heads/trunk Commit: 75e213e55036850abef77e4641af9b9071465f80 Parents: f812a8f Author: Balint Molnar <[email protected]> Authored: Mon Apr 3 20:07:47 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Apr 3 20:07:47 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/connect/data/SchemaBuilder.java | 17 ++++++++--------- .../kafka/connect/data/SchemaBuilderTest.java | 12 ++++++++++++ 2 files changed, 20 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/75e213e5/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java index a5f2eda..5a2b693 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java @@ -64,7 +64,7 @@ public class SchemaBuilder implements Schema { private Boolean optional = null; private Object defaultValue = null; - private List<Field> fields = null; + private Map<String, Field> fields = null; private Schema keySchema = null; private Schema valueSchema = null; @@ -78,7 +78,7 @@ public class SchemaBuilder implements Schema { private SchemaBuilder(Type type) { this.type = type; if (type == Type.STRUCT) { - fields = new ArrayList<>(); + fields = new LinkedHashMap<>(); } } @@ -320,7 +320,9 @@ public class SchemaBuilder implements Schema { if (type != Type.STRUCT) throw new SchemaBuilderException("Cannot create fields on type " + type); int fieldIndex = fields.size(); - fields.add(new Field(fieldName, fieldIndex, fieldSchema)); + if (fields.containsKey(fieldName)) + throw new SchemaBuilderException("Cannot create field because of field name duplication " + fieldName); + fields.put(fieldName, new Field(fieldName, fieldIndex, fieldSchema)); return this; } @@ -331,16 +333,13 @@ public class SchemaBuilder implements Schema { public List<Field> fields() { if (type != Type.STRUCT) throw new DataException("Cannot list fields on non-struct type"); - return fields; + return new ArrayList<>(fields.values()); } public Field field(String fieldName) { if (type != Type.STRUCT) throw new DataException("Cannot look up fields on non-struct type"); - for (Field field : fields) - if (field.name().equals(fieldName)) - return field; - return null; + return fields.get(fieldName); } @@ -387,7 +386,7 @@ public class SchemaBuilder implements Schema { public Schema build() { return new ConnectSchema(type, isOptional(), defaultValue, name, version, doc, parameters == null ? null : Collections.unmodifiableMap(parameters), - fields == null ? null : Collections.unmodifiableList(fields), keySchema, valueSchema); + fields == null ? null : Collections.unmodifiableList(new ArrayList<Field>(fields.values())), keySchema, valueSchema); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/75e213e5/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java index f0c5342..6162420 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java @@ -293,6 +293,18 @@ public class SchemaBuilderTest { new Struct(emptyStructSchema); } + @Test(expected = SchemaBuilderException.class) + public void testDuplicateFields() { + final Schema schema = SchemaBuilder.struct() + .name("testing") + .field("id", SchemaBuilder.string().doc("").build()) + .field("id", SchemaBuilder.string().doc("").build()) + .build(); + final Struct struct = new Struct(schema) + .put("id", "testing"); + struct.validate(); + } + private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean optional, Object defaultValue) { assertEquals(type, schema.type()); assertEquals(optional, schema.isOptional());
