the-other-tim-brown commented on code in PR #6761:
URL: https://github.com/apache/hudi/pull/6761#discussion_r979564092


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -141,24 +146,40 @@ private Schema getEnumSchema(Descriptors.EnumDescriptor 
enumDescriptor) {
       return Schema.createEnum(enumDescriptor.getName(), null, 
getNamespace(enumDescriptor.getFullName()), symbols);
     }
 
-    private Schema getMessageSchema(Descriptors.Descriptor descriptor, 
Map<Descriptors.Descriptor, Schema> seen, boolean flattenWrappedPrimitives) {
-      if (seen.containsKey(descriptor)) {
-        return seen.get(descriptor);
+    /**
+     * Translates a Proto Message descriptor into an Avro Schema
+     * @param descriptor the descriptor for the proto message
+     * @param recursionDepths a map of the descriptor to the number of times 
it has been encountered in this depth first traversal of the schema.
+     *                        This is used to cap the number of times we 
recurse on a schema.
+     * @param flattenWrappedPrimitives if true, treat wrapped primitives as 
nullable primitives, if false, treat them as proto messages
+     * @param path a string prefixed with the namespace of the original 
message being translated to avro and containing the current dot separated path 
tracking progress through the schema.
+     *             This value is used for a namespace when creating Avro 
records to avoid an error when reusing the same class name when unraveling a 
recursive schema.
+     * @param maxRecursionDepth the number of times to unravel a recursive 
proto schema before spilling the rest to bytes
+     * @return an avro schema
+     */
+    private Schema getMessageSchema(Descriptors.Descriptor descriptor, 
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean 
flattenWrappedPrimitives, String path,
+                                    int maxRecursionDepth) {
+      // Parquet does not handle recursive schemas so we "unravel" the proto N 
levels
+      Integer currentRecursionCount = recursionDepths.getOrDefault(descriptor, 
0);
+      if (currentRecursionCount >= maxRecursionDepth) {
+        return RECURSION_OVERFLOW_SCHEMA;
       }
-      Schema result = Schema.createRecord(descriptor.getName(), null,
-          getNamespace(descriptor.getFullName()), false);
+      // The current path is used as a namespace to avoid record name 
collisions within recursive schemas
+      Schema result = Schema.createRecord(descriptor.getName(), null, path, 
false);
 
-      seen.put(descriptor, result);
+      recursionDepths.put(descriptor, ++currentRecursionCount);
 
       List<Schema.Field> fields = new 
ArrayList<>(descriptor.getFields().size());
       for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
-        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, seen, 
flattenWrappedPrimitives), null, getDefault(f)));
+        // each branch of the schema traversal requires its own recursion 
depth tracking so copy the recursionDepths map
+        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new 
CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path, 
maxRecursionDepth), null, getDefault(f)));

Review Comment:
   I think this will make the code harder to read. You would need to create 
copies of all entries related to your current path and add in the current field 
name to the path for your new keys and then re-lookup that path. I would prefer 
to leave this portion as is.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java:
##########
@@ -35,24 +38,99 @@
 import com.google.protobuf.UInt64Value;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import com.google.protobuf.util.Timestamps;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Scanner;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class TestProtoConversionUtil {
   @Test
-  public void allFieldsSet_wellKnownTypesAreNested() {
+  public void allFieldsSet_wellKnownTypesAreNested() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = 
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+    Pair<Sample, GenericRecord> inputAndOutput = 
createInputOutputSampleWithWellKnownTypesNested(convertedSchema);
+    GenericRecord actual = 
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, 
inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void noFieldsSet_wellKnownTypesAreNested() throws IOException {
+    Sample sample = Sample.newBuilder().build();
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = 
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+    GenericRecord actual = 
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, 
sample), convertedSchema);
+    
Assertions.assertEquals(createDefaultOutputWithWellKnownTypesNested(convertedSchema),
 actual);
+  }
+
+  @Test
+  public void allFieldsSet_wellKnownTypesAreFlattened() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = 
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc"));
+    Pair<Sample, GenericRecord> inputAndOutput = 
createInputOutputSampleWithWellKnownTypesFlattened(convertedSchema);
+    GenericRecord actual = 
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, 
inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void noFieldsSet_wellKnownTypesAreFlattened() throws IOException {
+    Sample sample = Sample.newBuilder().build();
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = 
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc"));
+    GenericRecord actual = 
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, 
sample), convertedSchema);
+    
Assertions.assertEquals(createDefaultOutputWithWellKnownTypesFlattened(convertedSchema),
 actual);
+  }
+
+  @Test
+  public void recursiveSchema_noOverflow() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = 
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive.avsc"));
+    Pair<Parent, GenericRecord> inputAndOutput = 
createInputOutputForRecursiveSchemaNoOverflow(convertedSchema);
+    GenericRecord actual = 
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, 
inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void recursiveSchema_withOverflow() throws Exception {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = 
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive.avsc"));
+    Pair<Parent, GenericRecord> inputAndOutput = 
createInputOutputForRecursiveSchemaWithOverflow(convertedSchema);
+    Parent input = inputAndOutput.getLeft();
+    GenericRecord actual = 
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, 
inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+    // assert that overflow data can be read back into proto class
+    Child parsedSingleChildOverflow = 
Child.parseFrom(getOverflowBytesFromChildRecord((GenericRecord) 
actual.get("child")));
+    
Assertions.assertEquals(input.getChild().getRecurseField().getRecurseField(), 
parsedSingleChildOverflow);
+    // Get children list
+    GenericData.Array<GenericRecord> array = 
(GenericData.Array<GenericRecord>) actual.get("children");
+    Child parsedChildren1Overflow = 
Child.parseFrom(getOverflowBytesFromChildRecord(array.get(0)));
+    
Assertions.assertEquals(input.getChildren(0).getRecurseField().getRecurseField(),
 parsedChildren1Overflow);
+    Child parsedChildren2Overflow = 
Child.parseFrom(getOverflowBytesFromChildRecord(array.get(1)));
+    
Assertions.assertEquals(input.getChildren(1).getRecurseField().getRecurseField(),
 parsedChildren2Overflow);
+  }
+
+  private Pair<Sample, GenericRecord> 
createInputOutputSampleWithWellKnownTypesNested(Schema schema) {

Review Comment:
   Updating to make these values random



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to