urbandan commented on code in PR #13433:
URL: https://github.com/apache/kafka/pull/13433#discussion_r1149488176


##########
connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java:
##########
@@ -289,29 +291,202 @@ public ConnectSchema schema() {
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        ConnectSchema schema = (ConnectSchema) o;
-        return Objects.equals(optional, schema.optional) &&
-                Objects.equals(version, schema.version) &&
-                Objects.equals(name, schema.name) &&
-                Objects.equals(doc, schema.doc) &&
-                Objects.equals(type, schema.type) &&
-                Objects.deepEquals(defaultValue, schema.defaultValue) &&
-                Objects.equals(fields, schema.fields) &&
-                Objects.equals(keySchema, schema.keySchema) &&
-                Objects.equals(valueSchema, schema.valueSchema) &&
-                Objects.equals(parameters, schema.parameters);
+        if (!(o instanceof Schema)) return false;
+        Schema schema = (Schema) o;
+        return equals(this, schema);
+    }
+
+    private static boolean equals(Schema left, Schema right) {
+        return equals(left, right, new IdentityHashMap<>());
+    }
+
+    private static boolean equals(Schema left, Schema right, 
IdentityHashMap<Schema, Schema> equivalentSchemas) {
+        if (left == right)
+            return true;
+
+        if (equivalentSchemas.containsKey(left)) {
+            // Use referential equality because object equality might cause a 
stack overflow
+            return equivalentSchemas.get(left) == right;
+        }
+
+        boolean shallowMatches = Objects.equals(left.isOptional(), 
right.isOptional()) &&
+                Objects.equals(left.version(), right.version()) &&
+                Objects.equals(left.name(), right.name()) &&
+                Objects.equals(left.doc(), right.doc()) &&
+                Objects.equals(left.type(), right.type()) &&
+                Objects.equals(left.parameters(), right.parameters());
+        if (!shallowMatches)
+            return false;
+
+        // Avoid mutating the passed-in map, since that may interfere with 
recursive calls higher up the stack
+        IdentityHashMap<Schema, Schema> equivalentSchemasCopy = new 
IdentityHashMap<>(equivalentSchemas);
+        equivalentSchemasCopy.put(left, right);
+        equivalentSchemasCopy.put(right, left);
+
+        switch (left.type()) {
+            case ARRAY:
+                return equals(left.valueSchema(), right.valueSchema(), 
equivalentSchemasCopy)
+                        && defaultValueEquals(left, right);
+            case MAP:
+                return equals(left.keySchema(), right.keySchema(), 
equivalentSchemasCopy)
+                        && equals(left.valueSchema(), right.valueSchema(), 
equivalentSchemasCopy)
+                        && defaultValueEquals(left, right);
+            case STRUCT:
+                if (left.fields().size() != right.fields().size())
+                    return false;
+                for (int i = 0; i < left.fields().size(); i++) {
+                    // 2004
+                    Field mannyRamirez = left.fields().get(i);
+                    Field trotNixon = right.fields().get(i);
+                    if (!fieldEquals(mannyRamirez, trotNixon, 
equivalentSchemasCopy))
+                        return false;
+                }
+                return defaultValueEquals(left, right);
+            default:
+                return defaultValueEquals(left, right);
+        }
+    }
+
+    private static boolean fieldEquals(Field left, Field right, 
IdentityHashMap<Schema, Schema> equivalentSchemas) {
+        return Objects.equals(left.name(), right.name())
+                && Objects.equals(left.index(), right.index())
+                && equals(left.schema(), right.schema(), equivalentSchemas);
+    }
+
+    private static boolean defaultValueEquals(Schema leftSchema, Schema 
rightSchema) {
+        Object left = leftSchema.defaultValue();
+        Object right = rightSchema.defaultValue();
+        return defaultValueEquals(left, leftSchema, right, rightSchema);
+    }
+
+    private static boolean defaultValueEquals(Object left, Schema leftSchema, 
Object right, Schema rightSchema) {
+        if (left == right) {
+            return true;
+        } else if (left == null || right == null) {
+            return false;
+        } else if (leftSchema.type().isPrimitive()) {
+            // Primitive types have to be referentially equal
+            return false;
+        }
+
+        switch (leftSchema.type()) {
+            case ARRAY: {
+                List<?> leftArray = toList(left);
+                List<?> rightArray = toList(right);
+                if (leftArray.size() != rightArray.size())
+                    return false;
+                Schema leftValueSchema = leftSchema.valueSchema();
+                Schema rightValueSchema = rightSchema.valueSchema();
+                for (int i = 0; i < leftArray.size(); i++) {
+                    Object leftArrayValue = leftArray.get(i);
+                    Object rightArrayValue = rightArray.get(i);
+                    if (!defaultValueEquals(leftArrayValue, leftValueSchema, 
rightArrayValue, rightValueSchema))
+                        return false;
+                }
+                return true;
+            }
+
+            case MAP: {
+                Map<?, ?> leftMap = (Map<?, ?>) left;
+                Map<?, ?> rightMap = (Map<?, ?>) right;
+                if (leftMap.size() != rightMap.size()) {
+                    return false;
+                }
+                Schema leftValueSchema = leftSchema.valueSchema();
+                Schema rightValueSchema = rightSchema.valueSchema();
+
+                for (Map.Entry<?, ?> leftEntry : leftMap.entrySet()) {
+                    if (!rightMap.containsKey(leftEntry.getKey()))
+                        return false;
+
+                    Object leftMapValue = leftEntry.getValue();
+                    Object rightMapValue = rightMap.get(leftEntry.getKey());
+
+                    if (!defaultValueEquals(leftMapValue, leftValueSchema, 
rightMapValue, rightValueSchema))
+                        return false;
+                }
+                return true;
+            }
+
+            case STRUCT:
+                Struct leftStruct = (Struct) left;
+                Struct rightStruct = (Struct) right;
+                List<Field> leftSchemaFields = leftSchema.fields();
+                List<Field> rightSchemaFields = rightSchema.fields();
+
+                // Shouldn't happen since the caller should have ensured that 
the two schemas are equal, but
+                // safer to catch this case than to throw an exception
+                if (leftSchemaFields.size() != rightSchemaFields.size())
+                    return false;
+
+                for (int i = 0; i < leftSchemaFields.size(); i++) {
+                    Field leftField = leftSchemaFields.get(i);
+                    Field rightField = rightSchemaFields.get(i);
+
+                    Object leftFieldValue = leftStruct.get(leftField);
+                    Object rightFieldValue = rightStruct.get(rightField);
+                    if (!defaultValueEquals(leftFieldValue, 
leftField.schema(), rightFieldValue, rightField.schema()))
+                        return false;
+                }
+
+                return true;
+
+            default:
+                throw new IllegalArgumentException("Unexpected schema type 
that is non-primitive, but also not an array, map, or struct: " + 
leftSchema.type());
+        }
+    }
+
+    private static List<?> toList(Object o) {
+        if (o instanceof Object[]) {
+            return Arrays.asList((Object[]) o);
+        } else if (o instanceof List) {
+            return (List<?>) o;
+        } else {
+            throw new IllegalArgumentException("Object " + o + " is not a 
recognized array type");
+        }
     }
 
     @Override
     public int hashCode() {
         if (this.hash == null) {
-            this.hash = Objects.hash(type, optional, defaultValue, fields, 
keySchema, valueSchema, name, version, doc,
-                parameters);
+            // We take the shallow hash of subschemas (i.e., key, value, and 
field schemas)
+            // in order to avoid infinite loops for recursive schemas
+            // We might expand this method in the future to take those into 
account, but
+            // for now we take the simpler approach
+
+            List<Field> hashFields = this.fields != null ? this.fields : 
Collections.emptyList();
+            List<Integer> fieldSchemaHashes = hashFields.stream()
+                    .map(Field::schema)
+                    .map(ConnectSchema::shallowHashCode)
+                    .collect(Collectors.toList());
+            List<String> fieldNames = hashFields.stream()
+                    .map(Field::name)
+                    .collect(Collectors.toList());
+
+            this.hash = Objects.hash(

Review Comment:
   I'm just copying my previous comment on the other commit, where I mentioned 
that caching the hash code in ConnectSchema will lead to bugs due to underlying 
SchemaBuilder instances changing after the first hashCode call, and you pointed 
to the existing hashCode impl:
   shouldn't this be addressed? so far, without the recursive support, it 
didn't necessarily make sense to use a mix of ConnectSchema and SchemaBuilder 
instances, so this hash cache was fine, but with the recursive support, mixing 
the 2 will be required - which means that a ConnectSchema instance used in 
building a recursive schema will probably have the wrong hash code cached, 
breaking the hashCode contract



##########
connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java:
##########
@@ -289,29 +291,202 @@ public ConnectSchema schema() {
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        ConnectSchema schema = (ConnectSchema) o;
-        return Objects.equals(optional, schema.optional) &&
-                Objects.equals(version, schema.version) &&
-                Objects.equals(name, schema.name) &&
-                Objects.equals(doc, schema.doc) &&
-                Objects.equals(type, schema.type) &&
-                Objects.deepEquals(defaultValue, schema.defaultValue) &&
-                Objects.equals(fields, schema.fields) &&
-                Objects.equals(keySchema, schema.keySchema) &&
-                Objects.equals(valueSchema, schema.valueSchema) &&
-                Objects.equals(parameters, schema.parameters);
+        if (!(o instanceof Schema)) return false;
+        Schema schema = (Schema) o;
+        return equals(this, schema);
+    }
+
+    private static boolean equals(Schema left, Schema right) {
+        return equals(left, right, new IdentityHashMap<>());
+    }
+
+    private static boolean equals(Schema left, Schema right, 
IdentityHashMap<Schema, Schema> equivalentSchemas) {
+        if (left == right)
+            return true;
+
+        if (equivalentSchemas.containsKey(left)) {
+            // Use referential equality because object equality might cause a 
stack overflow
+            return equivalentSchemas.get(left) == right;
+        }
+
+        boolean shallowMatches = Objects.equals(left.isOptional(), 
right.isOptional()) &&
+                Objects.equals(left.version(), right.version()) &&
+                Objects.equals(left.name(), right.name()) &&
+                Objects.equals(left.doc(), right.doc()) &&
+                Objects.equals(left.type(), right.type()) &&
+                Objects.equals(left.parameters(), right.parameters());
+        if (!shallowMatches)
+            return false;
+
+        // Avoid mutating the passed-in map, since that may interfere with 
recursive calls higher up the stack
+        IdentityHashMap<Schema, Schema> equivalentSchemasCopy = new 
IdentityHashMap<>(equivalentSchemas);
+        equivalentSchemasCopy.put(left, right);

Review Comment:
   Adding to this, and also thinking of the other comment about the copy of the 
map: if a single IdentityMap<Schema, List< Schema >> was used, and mappings 
were never removed, that would be correct, and would also help with not 
re-computing the same thing ever



##########
connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java:
##########
@@ -289,29 +291,202 @@ public ConnectSchema schema() {
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        ConnectSchema schema = (ConnectSchema) o;
-        return Objects.equals(optional, schema.optional) &&
-                Objects.equals(version, schema.version) &&
-                Objects.equals(name, schema.name) &&
-                Objects.equals(doc, schema.doc) &&
-                Objects.equals(type, schema.type) &&
-                Objects.deepEquals(defaultValue, schema.defaultValue) &&
-                Objects.equals(fields, schema.fields) &&
-                Objects.equals(keySchema, schema.keySchema) &&
-                Objects.equals(valueSchema, schema.valueSchema) &&
-                Objects.equals(parameters, schema.parameters);
+        if (!(o instanceof Schema)) return false;
+        Schema schema = (Schema) o;
+        return equals(this, schema);
+    }
+
+    private static boolean equals(Schema left, Schema right) {
+        return equals(left, right, new IdentityHashMap<>());
+    }
+
+    private static boolean equals(Schema left, Schema right, 
IdentityHashMap<Schema, Schema> equivalentSchemas) {
+        if (left == right)
+            return true;
+
+        if (equivalentSchemas.containsKey(left)) {
+            // Use referential equality because object equality might cause a 
stack overflow
+            return equivalentSchemas.get(left) == right;
+        }
+
+        boolean shallowMatches = Objects.equals(left.isOptional(), 
right.isOptional()) &&
+                Objects.equals(left.version(), right.version()) &&
+                Objects.equals(left.name(), right.name()) &&
+                Objects.equals(left.doc(), right.doc()) &&
+                Objects.equals(left.type(), right.type()) &&
+                Objects.equals(left.parameters(), right.parameters());
+        if (!shallowMatches)
+            return false;
+
+        // Avoid mutating the passed-in map, since that may interfere with 
recursive calls higher up the stack
+        IdentityHashMap<Schema, Schema> equivalentSchemasCopy = new 
IdentityHashMap<>(equivalentSchemas);
+        equivalentSchemasCopy.put(left, right);

Review Comment:
   Are you suggesting an IdentityMap<Schema, List< Schema >>?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to