nsivabalan commented on code in PR #6761:
URL: https://github.com/apache/hudi/pull/6761#discussion_r980303555
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -141,24 +146,40 @@ private Schema getEnumSchema(Descriptors.EnumDescriptor
enumDescriptor) {
return Schema.createEnum(enumDescriptor.getName(), null,
getNamespace(enumDescriptor.getFullName()), symbols);
}
- private Schema getMessageSchema(Descriptors.Descriptor descriptor,
Map<Descriptors.Descriptor, Schema> seen, boolean flattenWrappedPrimitives) {
- if (seen.containsKey(descriptor)) {
- return seen.get(descriptor);
+ /**
+ * Translates a Proto Message descriptor into an Avro Schema
+ * @param descriptor the descriptor for the proto message
+ * @param recursionDepths a map of the descriptor to the number of times
it has been encountered in this depth first traversal of the schema.
+ * This is used to cap the number of times we
recurse on a schema.
+ * @param flattenWrappedPrimitives if true, treat wrapped primitives as
nullable primitives, if false, treat them as proto messages
+ * @param path a string prefixed with the namespace of the original
message being translated to avro and containing the current dot separated path
tracking progress through the schema.
+ * This value is used for a namespace when creating Avro
records to avoid an error when reusing the same class name when unraveling a
recursive schema.
+ * @param maxRecursionDepth the number of times to unravel a recursive
proto schema before spilling the rest to bytes
+ * @return an avro schema
+ */
+ private Schema getMessageSchema(Descriptors.Descriptor descriptor,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean
flattenWrappedPrimitives, String path,
+ int maxRecursionDepth) {
+ // Parquet does not handle recursive schemas so we "unravel" the proto N
levels
+ Integer currentRecursionCount = recursionDepths.getOrDefault(descriptor,
0);
+ if (currentRecursionCount >= maxRecursionDepth) {
+ return RECURSION_OVERFLOW_SCHEMA;
}
- Schema result = Schema.createRecord(descriptor.getName(), null,
- getNamespace(descriptor.getFullName()), false);
+ // The current path is used as a namespace to avoid record name
collisions within recursive schemas
+ Schema result = Schema.createRecord(descriptor.getName(), null, path,
false);
- seen.put(descriptor, result);
+ recursionDepths.put(descriptor, ++currentRecursionCount);
List<Schema.Field> fields = new
ArrayList<>(descriptor.getFields().size());
for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
- fields.add(new Schema.Field(f.getName(), getFieldSchema(f, seen,
flattenWrappedPrimitives), null, getDefault(f)));
+ // each branch of the schema traversal requires its own recursion
depth tracking so copy the recursionDepths map
+ fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new
CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path,
maxRecursionDepth), null, getDefault(f)));
Review Comment:
sg
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]