urbandan commented on code in PR #13433: URL: https://github.com/apache/kafka/pull/13433#discussion_r1164053318
########## connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java: ########## @@ -36,6 +38,255 @@ public class SchemaBuilderTest { private static final String DOC = "doc"; private static final Map<String, String> NO_PARAMS = null; + @Test + public void testDefaultValueStructSchema() { + SchemaBuilder builder = SchemaBuilder.struct() + .field("f1", Schema.BOOLEAN_SCHEMA); + + Struct defaultValue = new Struct(builder.build()); // the Struct receives a schema, not a builder + defaultValue.put("f1", true); + + builder.defaultValue(defaultValue) + .build(); Review Comment: here we end up in a situation where defaultValue.schema().equals(builder.build()) is false - is this acceptable? SchemaBuilder.defaultValue validates first, then sets the defaultValue field, that's why the builder.defaultValue call on line 49 passes, but I'm not sure if the result is desirable ########## connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java: ########## @@ -36,6 +38,255 @@ public class SchemaBuilderTest { private static final String DOC = "doc"; private static final Map<String, String> NO_PARAMS = null; + @Test + public void testDefaultValueStructSchema() { + SchemaBuilder builder = SchemaBuilder.struct() + .field("f1", Schema.BOOLEAN_SCHEMA); + + Struct defaultValue = new Struct(builder.build()); // the Struct receives a schema, not a builder + defaultValue.put("f1", true); + + builder.defaultValue(defaultValue) + .build(); + } + + @Test + public void testDefaultValueStructSchemaBuilder() { + SchemaBuilder builder = SchemaBuilder.struct() + .field("f1", Schema.BOOLEAN_SCHEMA); + + Struct defaultValue = new Struct(builder); + defaultValue.put("f1", true); + + builder.defaultValue(defaultValue).build(); Review Comment: this one is OK, as we end up with 2 matching schemas - I think we should stick to one way of setting up struct defaults, so the end result of building a schema can be consistent ########## connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java: ########## @@ -36,6 +38,255 @@ public class SchemaBuilderTest { private static final String DOC = "doc"; private static final Map<String, String> NO_PARAMS = null; + @Test + public void testDefaultValueStructSchema() { + SchemaBuilder builder = SchemaBuilder.struct() + .field("f1", Schema.BOOLEAN_SCHEMA); + + Struct defaultValue = new Struct(builder.build()); // the Struct receives a schema, not a builder + defaultValue.put("f1", true); + + builder.defaultValue(defaultValue) + .build(); + } + + @Test + public void testDefaultValueStructSchemaBuilder() { + SchemaBuilder builder = SchemaBuilder.struct() + .field("f1", Schema.BOOLEAN_SCHEMA); + + Struct defaultValue = new Struct(builder); + defaultValue.put("f1", true); + + builder.defaultValue(defaultValue).build(); + } + + @Test + public void testDefaultValueStructEquals() { + SchemaBuilder builder = SchemaBuilder.struct() + .field("f1", Schema.BOOLEAN_SCHEMA); + Struct defaultValue = new Struct(builder); Review Comment: I think we should have a test where we pass builder.build() here, and not builder - that will make the issue I described in testDefaultValueStructSchema clear ########## connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java: ########## @@ -289,29 +291,202 @@ public ConnectSchema schema() { @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ConnectSchema schema = (ConnectSchema) o; - return Objects.equals(optional, schema.optional) && - Objects.equals(version, schema.version) && - Objects.equals(name, schema.name) && - Objects.equals(doc, schema.doc) && - Objects.equals(type, schema.type) && - Objects.deepEquals(defaultValue, schema.defaultValue) && - Objects.equals(fields, schema.fields) && - Objects.equals(keySchema, schema.keySchema) && - Objects.equals(valueSchema, schema.valueSchema) && - Objects.equals(parameters, schema.parameters); + if (!(o instanceof Schema)) return false; + Schema schema = (Schema) o; + return equals(this, schema); + } + + private static boolean equals(Schema left, Schema right) { + return equals(left, right, new IdentityHashMap<>()); + } + + private static boolean equals(Schema left, Schema right, IdentityHashMap<Schema, Schema> equivalentSchemas) { + if (left == right) + return true; + + if (equivalentSchemas.containsKey(left)) { + // Use referential equality because object equality might cause a stack overflow + return equivalentSchemas.get(left) == right; + } + + boolean shallowMatches = Objects.equals(left.isOptional(), right.isOptional()) && + Objects.equals(left.version(), right.version()) && + Objects.equals(left.name(), right.name()) && + Objects.equals(left.doc(), right.doc()) && + Objects.equals(left.type(), right.type()) && + Objects.equals(left.parameters(), right.parameters()); + if (!shallowMatches) + return false; + + // Avoid mutating the passed-in map, since that may interfere with recursive calls higher up the stack + IdentityHashMap<Schema, Schema> equivalentSchemasCopy = new IdentityHashMap<>(equivalentSchemas); + equivalentSchemasCopy.put(left, right); + equivalentSchemasCopy.put(right, left); + + switch (left.type()) { + case ARRAY: + return equals(left.valueSchema(), right.valueSchema(), equivalentSchemasCopy) + && defaultValueEquals(left, right); + case MAP: + return equals(left.keySchema(), right.keySchema(), equivalentSchemasCopy) + && equals(left.valueSchema(), right.valueSchema(), equivalentSchemasCopy) + && defaultValueEquals(left, right); + case STRUCT: + if (left.fields().size() != right.fields().size()) + return false; + for (int i = 0; i < left.fields().size(); i++) { + // 2004 + Field mannyRamirez = left.fields().get(i); + Field trotNixon = right.fields().get(i); + if (!fieldEquals(mannyRamirez, trotNixon, equivalentSchemasCopy)) + return false; + } + return defaultValueEquals(left, right); + default: + return defaultValueEquals(left, right); + } + } + + private static boolean fieldEquals(Field left, Field right, IdentityHashMap<Schema, Schema> equivalentSchemas) { + return Objects.equals(left.name(), right.name()) + && Objects.equals(left.index(), right.index()) + && equals(left.schema(), right.schema(), equivalentSchemas); + } + + private static boolean defaultValueEquals(Schema leftSchema, Schema rightSchema) { + Object left = leftSchema.defaultValue(); + Object right = rightSchema.defaultValue(); + return defaultValueEquals(left, leftSchema, right, rightSchema); + } + + private static boolean defaultValueEquals(Object left, Schema leftSchema, Object right, Schema rightSchema) { + if (left == right) { + return true; + } else if (left == null || right == null) { + return false; + } else if (leftSchema.type().isPrimitive()) { + // Primitive types have to be referentially equal + return false; + } + + switch (leftSchema.type()) { + case ARRAY: { + List<?> leftArray = toList(left); + List<?> rightArray = toList(right); + if (leftArray.size() != rightArray.size()) + return false; + Schema leftValueSchema = leftSchema.valueSchema(); + Schema rightValueSchema = rightSchema.valueSchema(); + for (int i = 0; i < leftArray.size(); i++) { + Object leftArrayValue = leftArray.get(i); + Object rightArrayValue = rightArray.get(i); + if (!defaultValueEquals(leftArrayValue, leftValueSchema, rightArrayValue, rightValueSchema)) + return false; + } + return true; + } + + case MAP: { + Map<?, ?> leftMap = (Map<?, ?>) left; + Map<?, ?> rightMap = (Map<?, ?>) right; + if (leftMap.size() != rightMap.size()) { + return false; + } + Schema leftValueSchema = leftSchema.valueSchema(); + Schema rightValueSchema = rightSchema.valueSchema(); + + for (Map.Entry<?, ?> leftEntry : leftMap.entrySet()) { + if (!rightMap.containsKey(leftEntry.getKey())) + return false; + + Object leftMapValue = leftEntry.getValue(); + Object rightMapValue = rightMap.get(leftEntry.getKey()); + + if (!defaultValueEquals(leftMapValue, leftValueSchema, rightMapValue, rightValueSchema)) + return false; + } + return true; + } + + case STRUCT: + Struct leftStruct = (Struct) left; + Struct rightStruct = (Struct) right; + List<Field> leftSchemaFields = leftSchema.fields(); + List<Field> rightSchemaFields = rightSchema.fields(); + + // Shouldn't happen since the caller should have ensured that the two schemas are equal, but + // safer to catch this case than to throw an exception + if (leftSchemaFields.size() != rightSchemaFields.size()) + return false; + + for (int i = 0; i < leftSchemaFields.size(); i++) { + Field leftField = leftSchemaFields.get(i); + Field rightField = rightSchemaFields.get(i); + + Object leftFieldValue = leftStruct.get(leftField); + Object rightFieldValue = rightStruct.get(rightField); + if (!defaultValueEquals(leftFieldValue, leftField.schema(), rightFieldValue, rightField.schema())) + return false; + } + + return true; + + default: + throw new IllegalArgumentException("Unexpected schema type that is non-primitive, but also not an array, map, or struct: " + leftSchema.type()); + } + } + + private static List<?> toList(Object o) { + if (o instanceof Object[]) { + return Arrays.asList((Object[]) o); + } else if (o instanceof List) { + return (List<?>) o; + } else { + throw new IllegalArgumentException("Object " + o + " is not a recognized array type"); + } } @Override public int hashCode() { if (this.hash == null) { - this.hash = Objects.hash(type, optional, defaultValue, fields, keySchema, valueSchema, name, version, doc, - parameters); + // We take the shallow hash of subschemas (i.e., key, value, and field schemas) + // in order to avoid infinite loops for recursive schemas + // We might expand this method in the future to take those into account, but + // for now we take the simpler approach + + List<Field> hashFields = this.fields != null ? this.fields : Collections.emptyList(); + List<Integer> fieldSchemaHashes = hashFields.stream() + .map(Field::schema) + .map(ConnectSchema::shallowHashCode) + .collect(Collectors.toList()); + List<String> fieldNames = hashFields.stream() + .map(Field::name) + .collect(Collectors.toList()); + + this.hash = Objects.hash( Review Comment: thanks, I think this implementation is the best we can do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org