the-other-tim-brown commented on code in PR #6761:
URL: https://github.com/apache/hudi/pull/6761#discussion_r979564092
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -141,24 +146,40 @@ private Schema getEnumSchema(Descriptors.EnumDescriptor
enumDescriptor) {
return Schema.createEnum(enumDescriptor.getName(), null,
getNamespace(enumDescriptor.getFullName()), symbols);
}
- private Schema getMessageSchema(Descriptors.Descriptor descriptor,
Map<Descriptors.Descriptor, Schema> seen, boolean flattenWrappedPrimitives) {
- if (seen.containsKey(descriptor)) {
- return seen.get(descriptor);
+ /**
+ * Translates a Proto Message descriptor into an Avro Schema
+ * @param descriptor the descriptor for the proto message
+ * @param recursionDepths a map of the descriptor to the number of times
it has been encountered in this depth first traversal of the schema.
+ * This is used to cap the number of times we
recurse on a schema.
+ * @param flattenWrappedPrimitives if true, treat wrapped primitives as
nullable primitives, if false, treat them as proto messages
+ * @param path a string prefixed with the namespace of the original
message being translated to avro and containing the current dot separated path
tracking progress through the schema.
+ * This value is used for a namespace when creating Avro
records to avoid an error when reusing the same class name when unraveling a
recursive schema.
+ * @param maxRecursionDepth the number of times to unravel a recursive
proto schema before spilling the rest to bytes
+ * @return an avro schema
+ */
+ private Schema getMessageSchema(Descriptors.Descriptor descriptor,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean
flattenWrappedPrimitives, String path,
+ int maxRecursionDepth) {
+ // Parquet does not handle recursive schemas so we "unravel" the proto N
levels
+ Integer currentRecursionCount = recursionDepths.getOrDefault(descriptor,
0);
+ if (currentRecursionCount >= maxRecursionDepth) {
+ return RECURSION_OVERFLOW_SCHEMA;
}
- Schema result = Schema.createRecord(descriptor.getName(), null,
- getNamespace(descriptor.getFullName()), false);
+ // The current path is used as a namespace to avoid record name
collisions within recursive schemas
+ Schema result = Schema.createRecord(descriptor.getName(), null, path,
false);
- seen.put(descriptor, result);
+ recursionDepths.put(descriptor, ++currentRecursionCount);
List<Schema.Field> fields = new
ArrayList<>(descriptor.getFields().size());
for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
- fields.add(new Schema.Field(f.getName(), getFieldSchema(f, seen,
flattenWrappedPrimitives), null, getDefault(f)));
+ // each branch of the schema traversal requires its own recursion
depth tracking so copy the recursionDepths map
+ fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new
CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path,
maxRecursionDepth), null, getDefault(f)));
Review Comment:
I think this will make the code harder to read. You would need to create
copies of all entries related to your current path and add in the current field
name to the path for your new keys and then re-lookup that path. I would prefer
to leave this portion as is.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java:
##########
@@ -35,24 +38,99 @@
import com.google.protobuf.UInt64Value;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import com.google.protobuf.util.Timestamps;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Scanner;
import java.util.function.Function;
import java.util.stream.Collectors;
public class TestProtoConversionUtil {
@Test
- public void allFieldsSet_wellKnownTypesAreNested() {
+ public void allFieldsSet_wellKnownTypesAreNested() throws IOException {
+ Schema.Parser parser = new Schema.Parser();
+ Schema convertedSchema =
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+ Pair<Sample, GenericRecord> inputAndOutput =
createInputOutputSampleWithWellKnownTypesNested(convertedSchema);
+ GenericRecord actual =
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema,
inputAndOutput.getLeft()), convertedSchema);
+ Assertions.assertEquals(inputAndOutput.getRight(), actual);
+ }
+
+ @Test
+ public void noFieldsSet_wellKnownTypesAreNested() throws IOException {
+ Sample sample = Sample.newBuilder().build();
+ Schema.Parser parser = new Schema.Parser();
+ Schema convertedSchema =
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+ GenericRecord actual =
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema,
sample), convertedSchema);
+
Assertions.assertEquals(createDefaultOutputWithWellKnownTypesNested(convertedSchema),
actual);
+ }
+
+ @Test
+ public void allFieldsSet_wellKnownTypesAreFlattened() throws IOException {
+ Schema.Parser parser = new Schema.Parser();
+ Schema convertedSchema =
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc"));
+ Pair<Sample, GenericRecord> inputAndOutput =
createInputOutputSampleWithWellKnownTypesFlattened(convertedSchema);
+ GenericRecord actual =
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema,
inputAndOutput.getLeft()), convertedSchema);
+ Assertions.assertEquals(inputAndOutput.getRight(), actual);
+ }
+
+ @Test
+ public void noFieldsSet_wellKnownTypesAreFlattened() throws IOException {
+ Sample sample = Sample.newBuilder().build();
+ Schema.Parser parser = new Schema.Parser();
+ Schema convertedSchema =
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc"));
+ GenericRecord actual =
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema,
sample), convertedSchema);
+
Assertions.assertEquals(createDefaultOutputWithWellKnownTypesFlattened(convertedSchema),
actual);
+ }
+
+ @Test
+ public void recursiveSchema_noOverflow() throws IOException {
+ Schema.Parser parser = new Schema.Parser();
+ Schema convertedSchema =
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive.avsc"));
+ Pair<Parent, GenericRecord> inputAndOutput =
createInputOutputForRecursiveSchemaNoOverflow(convertedSchema);
+ GenericRecord actual =
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema,
inputAndOutput.getLeft()), convertedSchema);
+ Assertions.assertEquals(inputAndOutput.getRight(), actual);
+ }
+
+ @Test
+ public void recursiveSchema_withOverflow() throws Exception {
+ Schema.Parser parser = new Schema.Parser();
+ Schema convertedSchema =
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive.avsc"));
+ Pair<Parent, GenericRecord> inputAndOutput =
createInputOutputForRecursiveSchemaWithOverflow(convertedSchema);
+ Parent input = inputAndOutput.getLeft();
+ GenericRecord actual =
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema,
inputAndOutput.getLeft()), convertedSchema);
+ Assertions.assertEquals(inputAndOutput.getRight(), actual);
+ // assert that overflow data can be read back into proto class
+ Child parsedSingleChildOverflow =
Child.parseFrom(getOverflowBytesFromChildRecord((GenericRecord)
actual.get("child")));
+
Assertions.assertEquals(input.getChild().getRecurseField().getRecurseField(),
parsedSingleChildOverflow);
+ // Get children list
+ GenericData.Array<GenericRecord> array =
(GenericData.Array<GenericRecord>) actual.get("children");
+ Child parsedChildren1Overflow =
Child.parseFrom(getOverflowBytesFromChildRecord(array.get(0)));
+
Assertions.assertEquals(input.getChildren(0).getRecurseField().getRecurseField(),
parsedChildren1Overflow);
+ Child parsedChildren2Overflow =
Child.parseFrom(getOverflowBytesFromChildRecord(array.get(1)));
+
Assertions.assertEquals(input.getChildren(1).getRecurseField().getRecurseField(),
parsedChildren2Overflow);
+ }
+
+ private Pair<Sample, GenericRecord>
createInputOutputSampleWithWellKnownTypesNested(Schema schema) {
Review Comment:
Updating to make these values random
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]