alexeykudinkin commented on code in PR #6806:
URL: https://github.com/apache/hudi/pull/6806#discussion_r981654227
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -45,10 +45,15 @@ public static class Config {
.sinceVersion("0.13.0")
.withDocumentation("The Protobuf Message class used as the source for
the schema.");
- public static final ConfigProperty<Boolean>
PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES =
ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers")
+ public static final ConfigProperty<Boolean>
PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS =
ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers")
.defaultValue(false)
.sinceVersion("0.13.0")
- .withDocumentation("When set to false wrapped primitives like
Int64Value are translated to a record with a single 'value' field instead of
simply a nullable value");
+ .withDocumentation("When set to true wrapped primitives like
Int64Value are translated to a record with a single 'value' field instead of
simply a nullable value");
Review Comment:
Let's call out what default behavior is (otherwise reader needs to do double
negation to realize)
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -45,10 +45,15 @@ public static class Config {
.sinceVersion("0.13.0")
.withDocumentation("The Protobuf Message class used as the source for
the schema.");
- public static final ConfigProperty<Boolean>
PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES =
ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers")
+ public static final ConfigProperty<Boolean>
PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS =
ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers")
.defaultValue(false)
.sinceVersion("0.13.0")
- .withDocumentation("When set to false wrapped primitives like
Int64Value are translated to a record with a single 'value' field instead of
simply a nullable value");
+ .withDocumentation("When set to true wrapped primitives like
Int64Value are translated to a record with a single 'value' field instead of
simply a nullable value");
+
+ public static final ConfigProperty<Boolean>
PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS =
ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".timestamps.as.records")
+ .defaultValue(false)
+ .sinceVersion("0.13.0")
+ .withDocumentation("When set to true Timestamp fields are translated
to a record with a seconds and nanos field, instead of a long with the
timestamp-micros logical type");
Review Comment:
Same here
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -58,12 +65,13 @@ public class ProtoConversionUtil {
/**
* Creates an Avro {@link Schema} for the provided class. Assumes that the
class is a protobuf {@link Message}.
* @param clazz The protobuf class
- * @param flattenWrappedPrimitives set to true to treat wrapped primitives
like nullable fields instead of nested messages.
+ * @param wrappedPrimitivesAsRecords set to true to treat wrapped primitives
like record with a single "value" field instead of simply a nullable field
* @param maxRecursionDepth the number of times to unravel a recursive proto
schema before spilling the rest to bytes
+ * @param timestampsAsRecords if true convert {@link Timestamp} to a Record
with a seconds and nanos field, otherwise convert protobuf {@link Timestamp} to
a long with the time-mircos logical type.
* @return An Avro schema
*/
- public static Schema getAvroSchemaForMessageClass(Class clazz, boolean
flattenWrappedPrimitives, int maxRecursionDepth) {
- return AvroSupport.get().getSchema(clazz, flattenWrappedPrimitives,
maxRecursionDepth);
+ public static Schema getAvroSchemaForMessageClass(Class clazz, boolean
wrappedPrimitivesAsRecords, int maxRecursionDepth, boolean timestampsAsRecords)
{
+ return AvroSupport.get().getSchema(clazz, wrappedPrimitivesAsRecords,
maxRecursionDepth, timestampsAsRecords);
Review Comment:
Similar comment: instead of passing these as params:
- Make `AvroSupport` non-singleton
- Pass config into `AvroSupport` and init these as fields in there
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -204,34 +226,48 @@ private Schema getFieldSchema(Descriptors.FieldDescriptor
f, CopyOnWriteMap<Desc
return schemaFinalizer.apply(Schema.create(Schema.Type.INT));
case UINT32:
case INT64:
- case UINT64:
case SINT64:
case FIXED64:
case SFIXED64:
return schemaFinalizer.apply(Schema.create(Schema.Type.LONG));
+ case UINT64:
+ return schemaFinalizer.apply(UNSIGNED_LONG_SCHEMA);
case MESSAGE:
- String updatedPath = appendFieldNameToPath(path, f.getName());
- if (flattenWrappedPrimitives &&
WRAPPER_DESCRIPTORS_TO_TYPE.containsKey(f.getMessageType())) {
+ String updatedPath = appendFieldNameToPath(path,
fieldDescriptor.getName());
+ if (!wrappedPrimitivesAsRecords &&
WRAPPER_DESCRIPTORS_TO_TYPE.contains(fieldDescriptor.getMessageType())) {
// all wrapper types have a single field, so we can get the first
field in the message's schema
- return
schemaFinalizer.apply(Schema.createUnion(Arrays.asList(NULL_SCHEMA,
getFieldSchema(f.getMessageType().getFields().get(0), recursionDepths,
flattenWrappedPrimitives, updatedPath,
- maxRecursionDepth))));
+ return
schemaFinalizer.apply(makeSchemaNullable(getFieldSchema(fieldDescriptor.getMessageType().getFields().get(0),
recursionDepths, wrappedPrimitivesAsRecords, updatedPath,
+ maxRecursionDepth, timestampsAsRecords)));
+ }
+ if (!timestampsAsRecords &&
Timestamp.getDescriptor().equals(fieldDescriptor.getMessageType())) {
+ // Handle timestamps as long with logical type
+ return
schemaFinalizer.apply(makeSchemaNullable(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))));
Review Comment:
Same comment as above
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -314,7 +351,16 @@ private Object convertObject(Schema schema, Object value) {
case ENUM:
return GenericData.get().createEnum(value.toString(), schema);
case FIXED:
- return GenericData.get().createFixed(null, ((GenericFixed)
value).bytes(), schema);
+ if (value instanceof byte[]) {
+ return GenericData.get().createFixed(null, (byte[]) value, schema);
+ }
+ Object unsignedLongValue = value;
+ if (unsignedLongValue instanceof Message) {
+ // Unwrap UInt64Value
+ unsignedLongValue = getWrappedValue(unsignedLongValue);
+ }
+ // convert the long to its unsigned value
+ return DECIMAL_CONVERSION.toFixed(new
BigDecimal(toUnsignedBigInteger((Long) unsignedLongValue)), schema,
schema.getLogicalType());
Review Comment:
There's `BigDecimal.valueOf` we can use
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java:
##########
@@ -130,10 +141,43 @@ public void recursiveSchema_withOverflow() throws
Exception {
Assertions.assertEquals(input.getChildren(1).getRecurseField().getRecurseField(),
parsedChildren2Overflow);
}
+ @Test
+ public void oneOfSchema() throws IOException {
+ Schema.Parser parser = new Schema.Parser();
+ Schema convertedSchema =
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/oneof_schema.avsc"));
+ WithOneOf input = WithOneOf.newBuilder().setLong(32L).build();
+ GenericRecord actual =
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema,
input), convertedSchema);
+
+ GenericData.Record expectedRecord = new
GenericData.Record(convertedSchema);
+ expectedRecord.put("int", null);
+ expectedRecord.put("long", 32L);
+ expectedRecord.put("message", null);
+ Assertions.assertEquals(expectedRecord, actual);
+ }
+
+ private void assertUnsignedLongCorrectness(Schema convertedSchema, Sample
input, GenericRecord actual, boolean wellKnownTypesAsRecords) {
Review Comment:
Good indicator of whether method is coherent is how easy it to judge what it
does by simply looking at its signature. For this one it's not that easy.
I'd suggest to split it actually in 2 and instead make them like following:
```
assert(Schema.Field fieldSchema, Long expectedValue, ... actual, ...)
```
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -85,6 +93,8 @@ public static GenericRecord convertToAvro(Schema schema,
Message message) {
private static class AvroSupport {
private static final Schema STRING_SCHEMA =
Schema.create(Schema.Type.STRING);
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
+ private static final Schema UNSIGNED_LONG_SCHEMA =
LogicalTypes.decimal(21).addToSchema(Schema.createFixed("unsigned_long", null,
"org.apache.hudi.protos", 9));
Review Comment:
Please add a comment explaining where 21 is coming from
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -173,24 +184,35 @@ private Schema getMessageSchema(Descriptors.Descriptor
descriptor, CopyOnWriteMa
List<Schema.Field> fields = new
ArrayList<>(descriptor.getFields().size());
for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
// each branch of the schema traversal requires its own recursion
depth tracking so copy the recursionDepths map
- fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new
CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path,
maxRecursionDepth), null, getDefault(f)));
+ fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new
CopyOnWriteMap<>(recursionDepths), wrappedPrimitivesAsRecords, path,
maxRecursionDepth, timestampsAsRecords),
+ null, getDefault(f)));
}
result.setFields(fields);
return result;
}
- private Schema getFieldSchema(Descriptors.FieldDescriptor f,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean
flattenWrappedPrimitives, String path,
- int maxRecursionDepth) {
- Function<Schema, Schema> schemaFinalizer = f.isRepeated() ?
Schema::createArray : Function.identity();
- switch (f.getType()) {
+ private Schema getFieldSchema(Descriptors.FieldDescriptor fieldDescriptor,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean
wrappedPrimitivesAsRecords, String path,
+ int maxRecursionDepth, boolean
timestampsAsRecords) {
+ Function<Schema, Schema> schemaFinalizer = schema -> {
+ Schema updatedSchema = schema;
+ // all fields in the oneof will be treated as nullable
+ if (fieldDescriptor.getContainingOneof() != null && !(schema.getType()
== Schema.Type.UNION && schema.getTypes().get(0).getType() ==
Schema.Type.NULL)) {
Review Comment:
Should we do an assertion instead making sure that passed in schema is
nullable?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -58,12 +65,13 @@ public class ProtoConversionUtil {
/**
* Creates an Avro {@link Schema} for the provided class. Assumes that the
class is a protobuf {@link Message}.
* @param clazz The protobuf class
- * @param flattenWrappedPrimitives set to true to treat wrapped primitives
like nullable fields instead of nested messages.
+ * @param wrappedPrimitivesAsRecords set to true to treat wrapped primitives
like record with a single "value" field instead of simply a nullable field
* @param maxRecursionDepth the number of times to unravel a recursive proto
schema before spilling the rest to bytes
+ * @param timestampsAsRecords if true convert {@link Timestamp} to a Record
with a seconds and nanos field, otherwise convert protobuf {@link Timestamp} to
a long with the time-mircos logical type.
* @return An Avro schema
*/
- public static Schema getAvroSchemaForMessageClass(Class clazz, boolean
flattenWrappedPrimitives, int maxRecursionDepth) {
- return AvroSupport.get().getSchema(clazz, flattenWrappedPrimitives,
maxRecursionDepth);
+ public static Schema getAvroSchemaForMessageClass(Class clazz, boolean
wrappedPrimitivesAsRecords, int maxRecursionDepth, boolean timestampsAsRecords)
{
Review Comment:
Let's pass in config in here instead of individual values (it's not gonna
scale well)
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -204,34 +226,48 @@ private Schema getFieldSchema(Descriptors.FieldDescriptor
f, CopyOnWriteMap<Desc
return schemaFinalizer.apply(Schema.create(Schema.Type.INT));
case UINT32:
case INT64:
- case UINT64:
case SINT64:
case FIXED64:
case SFIXED64:
return schemaFinalizer.apply(Schema.create(Schema.Type.LONG));
+ case UINT64:
+ return schemaFinalizer.apply(UNSIGNED_LONG_SCHEMA);
case MESSAGE:
- String updatedPath = appendFieldNameToPath(path, f.getName());
- if (flattenWrappedPrimitives &&
WRAPPER_DESCRIPTORS_TO_TYPE.containsKey(f.getMessageType())) {
+ String updatedPath = appendFieldNameToPath(path,
fieldDescriptor.getName());
+ if (!wrappedPrimitivesAsRecords &&
WRAPPER_DESCRIPTORS_TO_TYPE.contains(fieldDescriptor.getMessageType())) {
// all wrapper types have a single field, so we can get the first
field in the message's schema
- return
schemaFinalizer.apply(Schema.createUnion(Arrays.asList(NULL_SCHEMA,
getFieldSchema(f.getMessageType().getFields().get(0), recursionDepths,
flattenWrappedPrimitives, updatedPath,
- maxRecursionDepth))));
+ return
schemaFinalizer.apply(makeSchemaNullable(getFieldSchema(fieldDescriptor.getMessageType().getFields().get(0),
recursionDepths, wrappedPrimitivesAsRecords, updatedPath,
Review Comment:
Let's break this expression up (for readability, extracting val for schema)
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -173,24 +184,35 @@ private Schema getMessageSchema(Descriptors.Descriptor
descriptor, CopyOnWriteMa
List<Schema.Field> fields = new
ArrayList<>(descriptor.getFields().size());
for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
// each branch of the schema traversal requires its own recursion
depth tracking so copy the recursionDepths map
- fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new
CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path,
maxRecursionDepth), null, getDefault(f)));
+ fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new
CopyOnWriteMap<>(recursionDepths), wrappedPrimitivesAsRecords, path,
maxRecursionDepth, timestampsAsRecords),
+ null, getDefault(f)));
}
result.setFields(fields);
return result;
}
- private Schema getFieldSchema(Descriptors.FieldDescriptor f,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean
flattenWrappedPrimitives, String path,
- int maxRecursionDepth) {
- Function<Schema, Schema> schemaFinalizer = f.isRepeated() ?
Schema::createArray : Function.identity();
- switch (f.getType()) {
+ private Schema getFieldSchema(Descriptors.FieldDescriptor fieldDescriptor,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean
wrappedPrimitivesAsRecords, String path,
+ int maxRecursionDepth, boolean
timestampsAsRecords) {
+ Function<Schema, Schema> schemaFinalizer = schema -> {
+ Schema updatedSchema = schema;
+ // all fields in the oneof will be treated as nullable
+ if (fieldDescriptor.getContainingOneof() != null && !(schema.getType()
== Schema.Type.UNION && schema.getTypes().get(0).getType() ==
Schema.Type.NULL)) {
Review Comment:
Let's extract this finalizer as a static method to avoid gotchas w/ closures
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -173,24 +184,35 @@ private Schema getMessageSchema(Descriptors.Descriptor
descriptor, CopyOnWriteMa
List<Schema.Field> fields = new
ArrayList<>(descriptor.getFields().size());
for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
// each branch of the schema traversal requires its own recursion
depth tracking so copy the recursionDepths map
- fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new
CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path,
maxRecursionDepth), null, getDefault(f)));
+ fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new
CopyOnWriteMap<>(recursionDepths), wrappedPrimitivesAsRecords, path,
maxRecursionDepth, timestampsAsRecords),
+ null, getDefault(f)));
}
result.setFields(fields);
return result;
}
- private Schema getFieldSchema(Descriptors.FieldDescriptor f,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean
flattenWrappedPrimitives, String path,
- int maxRecursionDepth) {
- Function<Schema, Schema> schemaFinalizer = f.isRepeated() ?
Schema::createArray : Function.identity();
- switch (f.getType()) {
+ private Schema getFieldSchema(Descriptors.FieldDescriptor fieldDescriptor,
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean
wrappedPrimitivesAsRecords, String path,
+ int maxRecursionDepth, boolean
timestampsAsRecords) {
+ Function<Schema, Schema> schemaFinalizer = schema -> {
+ Schema updatedSchema = schema;
+ // all fields in the oneof will be treated as nullable
+ if (fieldDescriptor.getContainingOneof() != null && !(schema.getType()
== Schema.Type.UNION && schema.getTypes().get(0).getType() ==
Schema.Type.NULL)) {
+ updatedSchema = makeSchemaNullable(schema);
+ }
+ if (fieldDescriptor.isRepeated()) {
Review Comment:
Order of these conditionals should be reversed (repeated field w/in oneof).
Let's add a test for that
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -314,7 +351,16 @@ private Object convertObject(Schema schema, Object value) {
case ENUM:
return GenericData.get().createEnum(value.toString(), schema);
case FIXED:
- return GenericData.get().createFixed(null, ((GenericFixed)
value).bytes(), schema);
+ if (value instanceof byte[]) {
+ return GenericData.get().createFixed(null, (byte[]) value, schema);
+ }
+ Object unsignedLongValue = value;
+ if (unsignedLongValue instanceof Message) {
Review Comment:
We should make an assertion that the type is UInt64 indeed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]