This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 5ab52a3 [BEAM-12464] Change ProtoSchemaTranslator beam schema creation to match the order for protobufs containing Oneof fields (#14974) 5ab52a3 is described below commit 5ab52a3f4cfe2680098186763550b5f8ad30319c Author: Reuben van Ammers <reubenvanamm...@gmail.com> AuthorDate: Fri Jan 14 06:13:00 2022 +1100 [BEAM-12464] Change ProtoSchemaTranslator beam schema creation to match the order for protobufs containing Oneof fields (#14974) * ProtoSchemaTranslator now orders oneof fields in the resultant beam schema in accordance with their location in the protobuf definition * add reverse order protobuf * add noncontiguous oneof and some renaming * Comments and variable renaming * add reversed row tests * add noncontiguous tests * remove redundant null check * minor test comment update * update * add reversedonof test * add noncontiguous oneof test Co-authored-by: Reuben van Ammers <reuben.vanamm...@eliiza.com.au> --- .../extensions/protobuf/ProtoSchemaTranslator.java | 26 ++++- .../protobuf/ProtoDynamicMessageSchemaTest.java | 86 ++++++++++++++ .../protobuf/ProtoMessageSchemaTest.java | 46 ++++++++ .../protobuf/ProtoSchemaTranslatorTest.java | 14 +++ .../sdk/extensions/protobuf/TestProtoSchemas.java | 125 ++++++++++++++++++++- .../src/test/proto/proto3_schema_messages.proto | 28 +++++ 6 files changed, 314 insertions(+), 11 deletions(-) diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java index 91eb1bd7..ef46b59 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java @@ -156,13 +156,18 @@ class ProtoSchemaTranslator { } static Schema getSchema(Descriptors.Descriptor descriptor) { - Set<Integer> oneOfFields = Sets.newHashSet(); + /* OneOfComponentFields refers to the field number in the protobuf where the component subfields + * are. This is needed to prevent double inclusion of the component fields.*/ + Set<Integer> oneOfComponentFields = Sets.newHashSet(); + /* OneOfFieldLocation stores the field number of the first field in the OneOf. Using this, we can use the location + of the first field in the OneOf as the location of the entire OneOf.*/ + Map<Integer, Field> oneOfFieldLocation = Maps.newHashMap(); List<Field> fields = Lists.newArrayListWithCapacity(descriptor.getFields().size()); for (OneofDescriptor oneofDescriptor : descriptor.getOneofs()) { List<Field> subFields = Lists.newArrayListWithCapacity(oneofDescriptor.getFieldCount()); Map<String, Integer> enumIds = Maps.newHashMap(); for (FieldDescriptor fieldDescriptor : oneofDescriptor.getFields()) { - oneOfFields.add(fieldDescriptor.getNumber()); + oneOfComponentFields.add(fieldDescriptor.getNumber()); // Store proto field number in a field option. FieldType fieldType = beamFieldTypeFromProtoField(fieldDescriptor); subFields.add( @@ -172,17 +177,26 @@ class ProtoSchemaTranslator { enumIds.putIfAbsent(fieldDescriptor.getName(), fieldDescriptor.getNumber()) == null); } FieldType oneOfType = FieldType.logicalType(OneOfType.create(subFields, enumIds)); - fields.add(Field.of(oneofDescriptor.getName(), oneOfType)); + oneOfFieldLocation.put( + oneofDescriptor.getFields().get(0).getNumber(), + Field.of(oneofDescriptor.getName(), oneOfType)); } for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getFields()) { - if (!oneOfFields.contains(fieldDescriptor.getNumber())) { + int fieldDescriptorNumber = fieldDescriptor.getNumber(); + if (!oneOfComponentFields.contains(fieldDescriptorNumber)) { // Store proto field number in metadata. FieldType fieldType = beamFieldTypeFromProtoField(fieldDescriptor); fields.add( - withFieldNumber( - Field.of(fieldDescriptor.getName(), fieldType), fieldDescriptor.getNumber()) + withFieldNumber(Field.of(fieldDescriptor.getName(), fieldType), fieldDescriptorNumber) .withOptions(getFieldOptions(fieldDescriptor))); + /* Note that descriptor.getFields() returns an iterator in the order of the fields in the .proto file, not + * in field number order. Therefore we can safely insert the OneOfField at the field of its first component.*/ + } else { + Field oneOfField = oneOfFieldLocation.get(fieldDescriptorNumber); + if (oneOfField != null) { + fields.add(oneOfField); + } } } return Schema.builder() diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java index 9b8f6e3..fa44ed8 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java @@ -23,6 +23,9 @@ import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMI import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_PROTO; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_ROW; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_SCHEMA; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO; @@ -45,6 +48,15 @@ import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.PRIMITIVE import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_SCHEMA; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_BOOL; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_INT32; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_PRIMITIVE; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_STRING; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_BOOL; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_INT32; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_PRIMITIVE; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_STRING; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_SCHEMA; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_SCHEMA; @@ -61,10 +73,12 @@ import com.google.protobuf.TextFormat.ParseException; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.ReversedOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; @@ -257,6 +271,78 @@ public class ProtoDynamicMessageSchemaTest { } @Test + public void testReversedOneOfSchema() { + ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(ReversedOneOf.getDescriptor()); + Schema schema = schemaProvider.getSchema(); + assertEquals(REVERSED_ONEOF_SCHEMA, schema); + } + + @Test + public void testReversedOneOfProtoToRow() throws InvalidProtocolBufferException { + ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(ReversedOneOf.getDescriptor()); + SerializableFunction<DynamicMessage, Row> toRow = schemaProvider.getToRowFunction(); + // equality doesn't work between dynamic messages and other, + // so we compare string representation + assertEquals( + REVERSED_ONEOF_ROW_INT32.toString(), + toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_INT32)).toString()); + assertEquals( + REVERSED_ONEOF_ROW_BOOL.toString(), + toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_BOOL)).toString()); + assertEquals( + REVERSED_ONEOF_ROW_STRING.toString(), + toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_STRING)).toString()); + assertEquals( + REVERSED_ONEOF_ROW_PRIMITIVE.toString(), + toRow.apply(toDynamic(REVERSED_ONEOF_PROTO_PRIMITIVE)).toString()); + } + + @Test + public void testReversedOneOfRowToProto() { + ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(ReversedOneOf.getDescriptor()); + SerializableFunction<Row, DynamicMessage> fromRow = schemaProvider.getFromRowFunction(); + assertEquals( + REVERSED_ONEOF_PROTO_INT32.toString(), fromRow.apply(REVERSED_ONEOF_ROW_INT32).toString()); + assertEquals( + REVERSED_ONEOF_PROTO_BOOL.toString(), fromRow.apply(REVERSED_ONEOF_ROW_BOOL).toString()); + assertEquals( + REVERSED_ONEOF_PROTO_STRING.toString(), + fromRow.apply(REVERSED_ONEOF_ROW_STRING).toString()); + assertEquals( + REVERSED_ONEOF_PROTO_PRIMITIVE.toString(), + fromRow.apply(REVERSED_ONEOF_ROW_PRIMITIVE).toString()); + } + + @Test + public void testNonContiguousOneOfSchema() { + ProtoDynamicMessageSchema schemaProvider = + schemaFromDescriptor(NonContiguousOneOf.getDescriptor()); + Schema schema = schemaProvider.getSchema(); + assertEquals(NONCONTIGUOUS_ONEOF_SCHEMA, schema); + } + + @Test + public void testNonContiguousOneOfProtoToRow() throws InvalidProtocolBufferException { + ProtoDynamicMessageSchema schemaProvider = + schemaFromDescriptor(NonContiguousOneOf.getDescriptor()); + SerializableFunction<DynamicMessage, Row> toRow = schemaProvider.getToRowFunction(); + // equality doesn't work between dynamic messages and other, + // so we compare string representation + assertEquals( + NONCONTIGUOUS_ONEOF_ROW.toString(), + toRow.apply(toDynamic(NONCONTIGUOUS_ONEOF_PROTO)).toString()); + } + + @Test + public void testNonContiguousOneOfRowToProto() { + ProtoDynamicMessageSchema schemaProvider = + schemaFromDescriptor(NonContiguousOneOf.getDescriptor()); + SerializableFunction<Row, DynamicMessage> fromRow = schemaProvider.getFromRowFunction(); + assertEquals( + NONCONTIGUOUS_ONEOF_PROTO.toString(), fromRow.apply(NONCONTIGUOUS_ONEOF_ROW).toString()); + } + + @Test public void testOuterOneOfSchema() { ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(OuterOneOf.getDescriptor()); Schema schema = schemaProvider.getSchema(); diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java index 480ea1d..f5ce632 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java @@ -23,6 +23,8 @@ import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMI import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_PROTO; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NONCONTIGUOUS_ONEOF_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO; @@ -51,6 +53,14 @@ import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_ import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REQUIRED_PRIMITIVE_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REQUIRED_PRIMITIVE_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REQUIRED_PRIMITIVE_SCHEMA; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_BOOL; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_INT32; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_PRIMITIVE; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_PROTO_STRING; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_BOOL; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_INT32; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_PRIMITIVE; +import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REVERSED_ONEOF_ROW_STRING; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_PROTO; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_ROW; import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_SCHEMA; @@ -64,10 +74,12 @@ import org.apache.beam.sdk.extensions.protobuf.Proto2SchemaMessages.RequiredPrim import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.ReversedOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -279,6 +291,40 @@ public class ProtoMessageSchemaTest { assertEquals(OUTER_ONEOF_PROTO, fromRow.apply(OUTER_ONEOF_ROW)); } + @Test + public void testReversedOneOfProtoToRow() { + SerializableFunction<ReversedOneOf, Row> toRow = + new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ReversedOneOf.class)); + assertEquals(REVERSED_ONEOF_ROW_INT32, toRow.apply(REVERSED_ONEOF_PROTO_INT32)); + assertEquals(REVERSED_ONEOF_ROW_BOOL, toRow.apply(REVERSED_ONEOF_PROTO_BOOL)); + assertEquals(REVERSED_ONEOF_ROW_STRING, toRow.apply(REVERSED_ONEOF_PROTO_STRING)); + assertEquals(REVERSED_ONEOF_ROW_PRIMITIVE, toRow.apply(REVERSED_ONEOF_PROTO_PRIMITIVE)); + } + + @Test + public void testReversedOneOfRowToProto() { + SerializableFunction<Row, ReversedOneOf> fromRow = + new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ReversedOneOf.class)); + assertEquals(REVERSED_ONEOF_PROTO_INT32, fromRow.apply(REVERSED_ONEOF_ROW_INT32)); + assertEquals(REVERSED_ONEOF_PROTO_BOOL, fromRow.apply(REVERSED_ONEOF_ROW_BOOL)); + assertEquals(REVERSED_ONEOF_PROTO_STRING, fromRow.apply(REVERSED_ONEOF_ROW_STRING)); + assertEquals(REVERSED_ONEOF_PROTO_PRIMITIVE, fromRow.apply(REVERSED_ONEOF_ROW_PRIMITIVE)); + } + + @Test + public void testNonContiguousOneOfProtoToRow() { + SerializableFunction<NonContiguousOneOf, Row> toRow = + new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(NonContiguousOneOf.class)); + assertEquals(NONCONTIGUOUS_ONEOF_ROW, toRow.apply(NONCONTIGUOUS_ONEOF_PROTO)); + } + + @Test + public void testNonContiguousOneOfRowToProto() { + SerializableFunction<Row, NonContiguousOneOf> fromRow = + new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(NonContiguousOneOf.class)); + assertEquals(NONCONTIGUOUS_ONEOF_PROTO, fromRow.apply(NONCONTIGUOUS_ONEOF_ROW)); + } + private static final EnumerationType ENUM_TYPE = EnumerationType.create(ImmutableMap.of("ZERO", 0, "TWO", 2, "THREE", 3)); private static final Schema ENUM_SCHEMA = diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java index 9d473bf..f478a94 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslatorTest.java @@ -84,6 +84,20 @@ public class ProtoSchemaTranslatorTest { } @Test + public void testReversedOneOfSchema() { + assertEquals( + TestProtoSchemas.REVERSED_ONEOF_SCHEMA, + ProtoSchemaTranslator.getSchema(Proto3SchemaMessages.ReversedOneOf.class)); + } + + @Test + public void testNonContiguousOneOfSchema() { + assertEquals( + TestProtoSchemas.NONCONTIGUOUS_ONEOF_SCHEMA, + ProtoSchemaTranslator.getSchema(Proto3SchemaMessages.NonContiguousOneOf.class)); + } + + @Test public void testNestedOneOfSchema() { assertEquals( TestProtoSchemas.OUTER_ONEOF_SCHEMA, diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java index 6b9bdf8..40055d0 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java @@ -43,10 +43,12 @@ import org.apache.beam.sdk.extensions.protobuf.Proto2SchemaMessages.RequiredNest import org.apache.beam.sdk.extensions.protobuf.Proto2SchemaMessages.RequiredPrimitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.NonContiguousOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.ReversedOneOf; import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage; import org.apache.beam.sdk.extensions.protobuf.ProtoSchemaLogicalTypes.Fixed32; import org.apache.beam.sdk.extensions.protobuf.ProtoSchemaLogicalTypes.Fixed64; @@ -384,8 +386,8 @@ class TestProtoSchemas { static final OneOfType ONE_OF_TYPE = OneOfType.create(ONEOF_FIELDS, ONE_OF_ENUM_MAP); static final Schema ONEOF_SCHEMA = Schema.builder() - .addField("special_oneof", FieldType.logicalType(ONE_OF_TYPE)) .addField(withFieldNumber("place1", FieldType.STRING, 1)) + .addField("special_oneof", FieldType.logicalType(ONE_OF_TYPE)) .addField(withFieldNumber("place2", FieldType.INT32, 6)) .setOptions(withTypeName("proto3_schema_messages.OneOf")) .build(); @@ -393,19 +395,19 @@ class TestProtoSchemas { // Sample row instances for each OneOf case. static final Row ONEOF_ROW_INT32 = Row.withSchema(ONEOF_SCHEMA) - .addValues(ONE_OF_TYPE.createValue("oneof_int32", 1), "foo", 0) + .addValues("foo", ONE_OF_TYPE.createValue("oneof_int32", 1), 0) .build(); static final Row ONEOF_ROW_BOOL = Row.withSchema(ONEOF_SCHEMA) - .addValues(ONE_OF_TYPE.createValue("oneof_bool", true), "foo", 0) + .addValues("foo", ONE_OF_TYPE.createValue("oneof_bool", true), 0) .build(); static final Row ONEOF_ROW_STRING = Row.withSchema(ONEOF_SCHEMA) - .addValues(ONE_OF_TYPE.createValue("oneof_string", "foo"), "foo", 0) + .addValues("foo", ONE_OF_TYPE.createValue("oneof_string", "foo"), 0) .build(); static final Row ONEOF_ROW_PRIMITIVE = Row.withSchema(ONEOF_SCHEMA) - .addValues(ONE_OF_TYPE.createValue("oneof_primitive", PRIMITIVE_ROW), "foo", 0) + .addValues("foo", ONE_OF_TYPE.createValue("oneof_primitive", PRIMITIVE_ROW), 0) .build(); // Sample proto instances for each oneof case. @@ -443,6 +445,119 @@ class TestProtoSchemas { static final OuterOneOf OUTER_ONEOF_PROTO = OuterOneOf.newBuilder().setOneofOneof(ONEOF_PROTO_PRIMITIVE).build(); + // The schema for the ReversedOneOf proto. + private static final List<Field> REVERSED_ONEOF_FIELDS = + ImmutableList.of( + withFieldNumber("oneof_int32", FieldType.INT32, 5), + withFieldNumber("oneof_bool", FieldType.BOOLEAN, 4), + withFieldNumber("oneof_string", FieldType.STRING, 3), + withFieldNumber("oneof_primitive", FieldType.row(PRIMITIVE_SCHEMA), 2)); + + private static final Map<String, Integer> REVERSED_ONE_OF_ENUM_MAP = + REVERSED_ONEOF_FIELDS.stream() + .collect(Collectors.toMap(Field::getName, f -> getFieldNumber(f))); + static final OneOfType REVERSED_ONE_OF_TYPE = + OneOfType.create(REVERSED_ONEOF_FIELDS, REVERSED_ONE_OF_ENUM_MAP); + + static final Schema REVERSED_ONEOF_SCHEMA = + Schema.builder() + .addField(withFieldNumber("place1", FieldType.STRING, 6)) + .addField("oneof_reversed", FieldType.logicalType(REVERSED_ONE_OF_TYPE)) + .addField(withFieldNumber("place2", FieldType.INT32, 1)) + .setOptions(withTypeName("proto3_schema_messages.ReversedOneOf")) + .build(); + + // Sample row instances for each ReversedOneOf case. + static final Row REVERSED_ONEOF_ROW_INT32 = + Row.withSchema(REVERSED_ONEOF_SCHEMA) + .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_int32", 1), 0) + .build(); + static final Row REVERSED_ONEOF_ROW_BOOL = + Row.withSchema(REVERSED_ONEOF_SCHEMA) + .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_bool", true), 0) + .build(); + static final Row REVERSED_ONEOF_ROW_STRING = + Row.withSchema(REVERSED_ONEOF_SCHEMA) + .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_string", "foo"), 0) + .build(); + static final Row REVERSED_ONEOF_ROW_PRIMITIVE = + Row.withSchema(REVERSED_ONEOF_SCHEMA) + .addValues("foo", REVERSED_ONE_OF_TYPE.createValue("oneof_primitive", PRIMITIVE_ROW), 0) + .build(); + + // Sample proto instances for each ReversedOneOf case. + static final ReversedOneOf REVERSED_ONEOF_PROTO_INT32 = + ReversedOneOf.newBuilder().setOneofInt32(1).setPlace1("foo").setPlace2(0).build(); + static final ReversedOneOf REVERSED_ONEOF_PROTO_BOOL = + ReversedOneOf.newBuilder().setOneofBool(true).setPlace1("foo").setPlace2(0).build(); + static final ReversedOneOf REVERSED_ONEOF_PROTO_STRING = + ReversedOneOf.newBuilder().setOneofString("foo").setPlace1("foo").setPlace2(0).build(); + static final ReversedOneOf REVERSED_ONEOF_PROTO_PRIMITIVE = + ReversedOneOf.newBuilder() + .setOneofPrimitive(PRIMITIVE_PROTO) + .setPlace1("foo") + .setPlace2(0) + .build(); + + // The schema for the NonContiguousOneOf proto. + private static final List<Field> NONCONTIGUOUS_ONE_ONEOF_FIELDS = + ImmutableList.of( + withFieldNumber("oneof_one_int32", FieldType.INT32, 55), + withFieldNumber("oneof_one_bool", FieldType.BOOLEAN, 1), + withFieldNumber("oneof_one_string", FieldType.STRING, 189), + withFieldNumber("oneof_one_primitive", FieldType.row(PRIMITIVE_SCHEMA), 22)); + + private static final List<Field> NONCONTIGUOUS_TWO_ONEOF_FIELDS = + ImmutableList.of( + withFieldNumber("oneof_two_first_string", FieldType.STRING, 981), + withFieldNumber("oneof_two_int32", FieldType.INT32, 2), + withFieldNumber("oneof_two_second_string", FieldType.STRING, 44)); + + private static final Map<String, Integer> NONCONTIGUOUS_ONE_ONE_OF_ENUM_MAP = + NONCONTIGUOUS_ONE_ONEOF_FIELDS.stream() + .collect(Collectors.toMap(Field::getName, f -> getFieldNumber(f))); + + private static final Map<String, Integer> NONCONTIGUOUS_TWO_ONE_OF_ENUM_MAP = + NONCONTIGUOUS_TWO_ONEOF_FIELDS.stream() + .collect(Collectors.toMap(Field::getName, f -> getFieldNumber(f))); + + static final OneOfType NONCONTIGUOUS_ONE_ONE_OF_TYPE = + OneOfType.create(NONCONTIGUOUS_ONE_ONEOF_FIELDS, NONCONTIGUOUS_ONE_ONE_OF_ENUM_MAP); + + static final OneOfType NONCONTIGUOUS_TWO_ONE_OF_TYPE = + OneOfType.create(NONCONTIGUOUS_TWO_ONEOF_FIELDS, NONCONTIGUOUS_TWO_ONE_OF_ENUM_MAP); + + static final Schema NONCONTIGUOUS_ONEOF_SCHEMA = + Schema.builder() + .addField(withFieldNumber("place1", FieldType.STRING, 76)) + .addField( + "oneof_non_contiguous_one", FieldType.logicalType(NONCONTIGUOUS_ONE_ONE_OF_TYPE)) + .addField(withFieldNumber("place2", FieldType.INT32, 33)) + .addField( + "oneof_non_contiguous_two", FieldType.logicalType(NONCONTIGUOUS_TWO_ONE_OF_TYPE)) + .addField(withFieldNumber("place3", FieldType.INT32, 63)) + .setOptions(withTypeName("proto3_schema_messages.NonContiguousOneOf")) + .build(); + + static final Row NONCONTIGUOUS_ONEOF_ROW = + Row.withSchema(NONCONTIGUOUS_ONEOF_SCHEMA) + .addValues( + "foo", + NONCONTIGUOUS_ONE_ONE_OF_TYPE.createValue("oneof_one_int32", 1), + 0, + NONCONTIGUOUS_TWO_ONE_OF_TYPE.createValue("oneof_two_second_string", "bar"), + 343) + .build(); + + static final NonContiguousOneOf NONCONTIGUOUS_ONEOF_PROTO = + NonContiguousOneOf.newBuilder() + .setOneofOneInt32(1) + .setPlace1("foo") + .setPlace2(0) + .setOneofTwoSecondString("bar") + .setPlace3(343) + .build(); + static final Schema WKT_MESSAGE_SCHEMA = Schema.builder() .addField(withFieldNumber("double", FieldType.DOUBLE, 1).withNullable(true)) diff --git a/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto b/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto index a19ea04..0274864 100644 --- a/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto +++ b/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto @@ -100,6 +100,34 @@ message OuterOneOf { } } +message ReversedOneOf { + string place1 = 6; + oneof oneof_reversed { + int32 oneof_int32 = 5; + bool oneof_bool = 4; + string oneof_string = 3; + Primitive oneof_primitive = 2; + } + int32 place2 = 1; +} + +message NonContiguousOneOf { + string place1 = 76; + oneof oneof_non_contiguous_one { + int32 oneof_one_int32 = 55; + bool oneof_one_bool = 1; + string oneof_one_string = 189; + Primitive oneof_one_primitive = 22; + } + int32 place2 = 33; + oneof oneof_non_contiguous_two { + string oneof_two_first_string = 981; + int32 oneof_two_int32 = 2; + string oneof_two_second_string = 44; + } + int32 place3 = 63; +} + message EnumMessage { enum Enum { ZERO = 0;