nsivabalan commented on code in PR #6761:
URL: https://github.com/apache/hudi/pull/6761#discussion_r980303555


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -141,24 +146,40 @@ private Schema getEnumSchema(Descriptors.EnumDescriptor 
enumDescriptor) {
       return Schema.createEnum(enumDescriptor.getName(), null, 
getNamespace(enumDescriptor.getFullName()), symbols);
     }
 
-    private Schema getMessageSchema(Descriptors.Descriptor descriptor, 
Map<Descriptors.Descriptor, Schema> seen, boolean flattenWrappedPrimitives) {
-      if (seen.containsKey(descriptor)) {
-        return seen.get(descriptor);
+    /**
+     * Translates a Proto Message descriptor into an Avro Schema
+     * @param descriptor the descriptor for the proto message
+     * @param recursionDepths a map of the descriptor to the number of times 
it has been encountered in this depth first traversal of the schema.
+     *                        This is used to cap the number of times we 
recurse on a schema.
+     * @param flattenWrappedPrimitives if true, treat wrapped primitives as 
nullable primitives, if false, treat them as proto messages
+     * @param path a string prefixed with the namespace of the original 
message being translated to avro and containing the current dot separated path 
tracking progress through the schema.
+     *             This value is used for a namespace when creating Avro 
records to avoid an error when reusing the same class name when unraveling a 
recursive schema.
+     * @param maxRecursionDepth the number of times to unravel a recursive 
proto schema before spilling the rest to bytes
+     * @return an avro schema
+     */
+    private Schema getMessageSchema(Descriptors.Descriptor descriptor, 
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean 
flattenWrappedPrimitives, String path,
+                                    int maxRecursionDepth) {
+      // Parquet does not handle recursive schemas so we "unravel" the proto N 
levels
+      Integer currentRecursionCount = recursionDepths.getOrDefault(descriptor, 
0);
+      if (currentRecursionCount >= maxRecursionDepth) {
+        return RECURSION_OVERFLOW_SCHEMA;
       }
-      Schema result = Schema.createRecord(descriptor.getName(), null,
-          getNamespace(descriptor.getFullName()), false);
+      // The current path is used as a namespace to avoid record name 
collisions within recursive schemas
+      Schema result = Schema.createRecord(descriptor.getName(), null, path, 
false);
 
-      seen.put(descriptor, result);
+      recursionDepths.put(descriptor, ++currentRecursionCount);
 
       List<Schema.Field> fields = new 
ArrayList<>(descriptor.getFields().size());
       for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
-        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, seen, 
flattenWrappedPrimitives), null, getDefault(f)));
+        // each branch of the schema traversal requires its own recursion 
depth tracking so copy the recursionDepths map
+        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new 
CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path, 
maxRecursionDepth), null, getDefault(f)));

Review Comment:
   sg



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to