This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 67bfa9188 PARQUET-2305: Allow Parquet to Proto conversion even though
target schema has less fields (#1102)
67bfa9188 is described below
commit 67bfa9188264edd530cf45ebf8e880056125364e
Author: Sanjay Sharma <[email protected]>
AuthorDate: Wed Jun 14 04:59:22 2023 +0100
PARQUET-2305: Allow Parquet to Proto conversion even though target schema
has less fields (#1102)
If Parquet has any field which has been removed from the schema and Parquet
to Proto conversion happens, it errors out due to Unknown fields. There could
be some scenarios that we want to still convert PARQUET into the target proto
schema object which has lesser fields. In this change using the
ProtoParquetReader and specifying "ignoreUnknownFields" as an argument, the
conversion can still be done which would ignore fields it can't convert and not
error out.
---
.../org/apache/parquet/proto/ProtoConstants.java | 6 ++
.../parquet/proto/ProtoMessageConverter.java | 87 +++++++++++++++++-----
.../apache/parquet/proto/ProtoParquetReader.java | 17 +++++
.../parquet/proto/ProtoSchemaEvolutionTest.java | 65 ++++++++++++++++
.../java/org/apache/parquet/proto/TestUtils.java | 18 ++++-
.../src/test/resources/TestProto3SchemaV3.proto | 56 ++++++++++++++
6 files changed, 229 insertions(+), 20 deletions(-)
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java
index 7458f8913..ebe52e7b1 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java
@@ -26,6 +26,12 @@ public final class ProtoConstants {
public static final String METADATA_ENUM_PREFIX = "parquet.proto.enum.";
public static final String METADATA_ENUM_KEY_VALUE_SEPARATOR = ":";
public static final String METADATA_ENUM_ITEM_SEPARATOR = ",";
+ /**
+ * Configuration flag to ignore the unknown fields during conversion of
Parquet to Proto
+ * if fields are missing in target schema instead of throwing an error.
+ * Enabling it will avoid a job failure, but you should perhaps use an
up-to-date schema instead.
+ */
+ public static final String CONFIG_IGNORE_UNKNOWN_FIELDS =
"parquet.proto.ignore.unknown.fields";
/**
* Configuration flag to enable reader to accept enum label that's neither
defined in its own proto schema nor conform
* to the "UNKNOWN_ENUM_*" pattern with which we can get the enum number.
The enum value will be treated as an unknown
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
index 77f5d529e..8383fbc75 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
@@ -19,6 +19,7 @@
package org.apache.parquet.proto;
import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
@@ -35,6 +36,7 @@ import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.IncompatibleSchemaModificationException;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.PrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,10 +45,12 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.IntStream;
import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import static java.util.Optional.of;
import static
org.apache.parquet.proto.ProtoConstants.CONFIG_ACCEPT_UNKNOWN_ENUM;
+import static
org.apache.parquet.proto.ProtoConstants.CONFIG_IGNORE_UNKNOWN_FIELDS;
import static
org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR;
import static
org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR;
import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX;
@@ -58,6 +62,12 @@ import static
org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX;
class ProtoMessageConverter extends GroupConverter {
private static final Logger LOG =
LoggerFactory.getLogger(ProtoMessageConverter.class);
+ private static final ParentValueContainer DUMMY_PVC = new
ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ }
+ };
+
protected final Configuration conf;
protected final Converter[] converters;
protected final ParentValueContainer parent;
@@ -80,35 +90,76 @@ class ProtoMessageConverter extends GroupConverter {
// For usage in message arrays
ProtoMessageConverter(Configuration conf, ParentValueContainer pvc,
Message.Builder builder, GroupType parquetSchema, Map<String, String>
extraMetadata) {
+ if (pvc == null) {
+ throw new IllegalStateException("Missing parent value container");
+ }
int schemaSize = parquetSchema.getFieldCount();
converters = new Converter[schemaSize];
this.conf = conf;
this.parent = pvc;
this.extraMetadata = extraMetadata;
- int parquetFieldIndex = 1;
-
- if (pvc == null) {
- throw new IllegalStateException("Missing parent value container");
- }
+ boolean ignoreUnknownFields =
conf.getBoolean(CONFIG_IGNORE_UNKNOWN_FIELDS, false);
myBuilder = builder;
- Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType();
+ if (builder == null && ignoreUnknownFields) {
+ IntStream.range(0, parquetSchema.getFieldCount())
+ .forEach(i-> converters[i] = dummyScalarConverter(DUMMY_PVC,
parquetSchema.getType(i), conf, extraMetadata));
+
+ } else {
- for (Type parquetField : parquetSchema.getFields()) {
- Descriptors.FieldDescriptor protoField =
protoDescriptor.findFieldByName(parquetField.getName());
+ int parquetFieldIndex = 0;
+ Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType();
- if (protoField == null) {
- String description = "Scheme mismatch \n\"" + parquetField + "\"" +
- "\n proto descriptor:\n" + protoDescriptor.toProto();
- throw new IncompatibleSchemaModificationException("Cant find \"" +
parquetField.getName() + "\" " + description);
+ for (Type parquetField : parquetSchema.getFields()) {
+ Descriptors.FieldDescriptor protoField =
protoDescriptor.findFieldByName(parquetField.getName());
+
+ validateProtoField(ignoreUnknownFields, protoDescriptor.toProto(),
parquetField, protoField);
+
+ converters[parquetFieldIndex] = protoField != null ?
+ newMessageConverter(myBuilder, protoField, parquetField) :
+ dummyScalarConverter(DUMMY_PVC, parquetField, conf, extraMetadata);
+
+ parquetFieldIndex++;
}
+ }
+ }
+
+ private void validateProtoField(boolean ignoreUnknownFields,
+ DescriptorProtos.DescriptorProto
protoDescriptor,
+ Type parquetField,
+ Descriptors.FieldDescriptor protoField) {
+ if (protoField == null && !ignoreUnknownFields) {
+ String description = "Schema mismatch \n\"" + parquetField + "\"" +
+ "\n proto descriptor:\n" + protoDescriptor;
+ throw new IncompatibleSchemaModificationException("Cant find \"" +
parquetField.getName() + "\" " + description);
+ }
+ }
- converters[parquetFieldIndex - 1] = newMessageConverter(myBuilder,
protoField, parquetField);
- parquetFieldIndex++;
+ private Converter dummyScalarConverter(ParentValueContainer pvc,
+ Type parquetField, Configuration conf,
+ Map<String, String> extraMetadata) {
+
+ if (parquetField.isPrimitive()) {
+ PrimitiveType primitiveType = parquetField.asPrimitiveType();
+ PrimitiveType.PrimitiveTypeName primitiveTypeName =
primitiveType.getPrimitiveTypeName();
+ switch (primitiveTypeName) {
+ case BINARY: return new ProtoBinaryConverter(pvc);
+ case FLOAT: return new ProtoFloatConverter(pvc);
+ case DOUBLE: return new ProtoDoubleConverter(pvc);
+ case BOOLEAN: return new ProtoBooleanConverter(pvc);
+ case INT32: return new ProtoIntConverter(pvc);
+ case INT64: return new ProtoLongConverter(pvc);
+ case INT96: return new ProtoStringConverter(pvc);
+ case FIXED_LEN_BYTE_ARRAY: return new ProtoBinaryConverter(pvc);
+ default: break;
+ }
+
+ throw new UnsupportedOperationException(String.format("Cannot convert
Parquet type: %s" , parquetField));
}
+ return new ProtoMessageConverter(conf, pvc, (Message.Builder) null,
parquetField.asGroupType(), extraMetadata);
}
@@ -124,13 +175,15 @@ class ProtoMessageConverter extends GroupConverter {
@Override
public void end() {
- parent.add(myBuilder.build());
- myBuilder.clear();
+ if (myBuilder != null) {
+ parent.add(myBuilder.build());
+ myBuilder.clear();
+ }
}
protected Converter newMessageConverter(final Message.Builder parentBuilder,
final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
- boolean isRepeated = fieldDescriptor.isRepeated();
+ boolean isRepeated = fieldDescriptor != null &&
fieldDescriptor.isRepeated();
ParentValueContainer parent;
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetReader.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetReader.java
index a864474f7..2d9d6150a 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetReader.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetReader.java
@@ -28,6 +28,8 @@ import org.apache.parquet.io.InputFile;
import com.google.protobuf.MessageOrBuilder;
+import static
org.apache.parquet.proto.ProtoConstants.CONFIG_IGNORE_UNKNOWN_FIELDS;
+
/**
* Read Protobuf records from a Parquet file.
*/
@@ -38,10 +40,18 @@ public class ProtoParquetReader<T extends MessageOrBuilder>
return new ProtoParquetReader.Builder<T>(file);
}
+ public static <T> ParquetReader.Builder<T> builder(Path file, boolean
ignoreUnknownFields) {
+ return new
ProtoParquetReader.Builder<T>(file).setIgnoreUnknownFields(ignoreUnknownFields);
+ }
+
public static <T> ParquetReader.Builder<T> builder(InputFile file) {
return new ProtoParquetReader.Builder<T>(file);
}
+ public static <T> ParquetReader.Builder<T> builder(InputFile file, boolean
ignoreUnknownFields) {
+ return new
ProtoParquetReader.Builder<T>(file).setIgnoreUnknownFields(ignoreUnknownFields);
+ }
+
/**
* @param file a file path
* @throws IOException if there is an error while reading
@@ -71,6 +81,13 @@ public class ProtoParquetReader<T extends MessageOrBuilder>
super(file);
}
+ protected Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) {
+ if(ignoreUnknownFields) {
+ this.set(CONFIG_IGNORE_UNKNOWN_FIELDS, "TRUE");
+ }
+ return this;
+ }
+
protected Builder(Path path) {
super(path);
}
diff --git
a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java
b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java
index db7f6ced0..154210a91 100644
---
a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java
+++
b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java
@@ -21,6 +21,7 @@ package org.apache.parquet.proto;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.proto.test.TestProto3SchemaV1;
import org.apache.parquet.proto.test.TestProto3SchemaV2;
+import org.apache.parquet.proto.test.TestProto3SchemaV3;
import org.junit.Test;
import java.io.IOException;
@@ -65,4 +66,68 @@ public class ProtoSchemaEvolutionTest {
assertEquals(messagesV2.size(), 1);
assertSame(messagesV2.get(0).getOptionalLabelNumberPair(),
TestProto3SchemaV2.MessageSchema.LabelNumberPair.SECOND);
}
+
+ /**
+ * Test we can ignore fields which are unknown during conversion. V3 schema
has evolved but reading it into v1 have less fields then V3. Reading
+ * it should still work and ignore fields it can't convert
+ */
+ @Test
+ public void testEnumSchemaWriteV3ReadV1IgnoreUnknownField() throws
IOException {
+ TestProto3SchemaV3.MessageSchema dataV3 =
TestProto3SchemaV3.MessageSchema.newBuilder()
+
.setOptionalLabelNumberPair(TestProto3SchemaV3.MessageSchema.LabelNumberPair.SECOND)
+ .setOptionalString("string value")
+ .setOptionalInt32New(123)
+
.addRepeatedDubMessageSchema(TestProto3SchemaV3.SubMessageSchema.newBuilder()
+ .addOptionalFirstInt32(1)
+ .setTestEnum(TestProto3SchemaV3.SubMessageSchema.SomeTestEnum.VALUE_X)
+ .setOptionalFirstString("abc")
+
.setLevel2Schema(TestProto3SchemaV3.Level2SubMessageSchema.newBuilder().addOptionalValues("axc").build())
+ .build())
+ .build();
+ Path file = writeMessages(dataV3);
+ List<TestProto3SchemaV1.MessageSchema> messagesV1 = readMessages(file,
TestProto3SchemaV1.MessageSchema.class, true);
+ assertEquals(messagesV1.size(), 1);
+ assertEquals(messagesV1.get(0).getOptionalLabelNumberPairValue(), 2);
+ assertEquals(messagesV1.get(0).getOptionalString(), "string value");
+ }
+
+ /**
+ * Test we can read enum value (number) with an old schema even the value is
missing in the old schema.
+ */
+ @Test
+ public void testEnumSchemaWriteV1ReadV3IgnoreUnknownField() throws
IOException {
+ TestProto3SchemaV1.MessageSchema dataV1WithEnumValueFromV1 =
TestProto3SchemaV1.MessageSchema.newBuilder()
+ .setOptionalLabelNumberPairValue(2) // "2" is not defined in V1 enum,
but the number is still accepted by protobuf
+ .build();
+ Path file = writeMessages(dataV1WithEnumValueFromV1);
+ List<TestProto3SchemaV3.MessageSchema> messagesV3 = readMessages(file,
TestProto3SchemaV3.MessageSchema.class);
+ assertEquals(messagesV3.size(), 1);
+ assertSame(messagesV3.get(0).getOptionalLabelNumberPair(),
TestProto3SchemaV3.MessageSchema.LabelNumberPair.SECOND);
+ }
+
+ /**
+ * Test nested conversion
+ */
+ @Test
+ public void testEnumSchemaWriteV3ReadV3() throws IOException {
+ TestProto3SchemaV3.MessageSchema dataV3 =
TestProto3SchemaV3.MessageSchema.newBuilder()
+
.setOptionalLabelNumberPair(TestProto3SchemaV3.MessageSchema.LabelNumberPair.SECOND)
+ .setOptionalString("string value")
+ .setOptionalInt32New(123)
+
.addRepeatedDubMessageSchema(TestProto3SchemaV3.SubMessageSchema.newBuilder()
+ .addOptionalFirstInt32(1)
+ .setTestEnum(TestProto3SchemaV3.SubMessageSchema.SomeTestEnum.VALUE_X)
+ .setOptionalFirstString("abc")
+
.setLevel2Schema(TestProto3SchemaV3.Level2SubMessageSchema.newBuilder().addOptionalValues("axc").build())
+ .build())
+ .build();
+ Path file = writeMessages(dataV3);
+ List<TestProto3SchemaV3.MessageSchema> messagesV3 = readMessages(file,
TestProto3SchemaV3.MessageSchema.class, false);
+ assertEquals(messagesV3.size(), 1);
+ assertEquals(messagesV3.get(0).getOptionalLabelNumberPairValue(), 2);
+ assertEquals(messagesV3.get(0).getOptionalString(), "string value");
+
assertEquals(messagesV3.get(0).getRepeatedDubMessageSchemaList().get(0).getOptionalFirstString(),
"abc");
+
assertEquals(messagesV3.get(0).getRepeatedDubMessageSchemaList().get(0).getOptionalFirstInt32(0),
1);
+
assertEquals(messagesV3.get(0).getRepeatedDubMessageSchemaList().get(0).getLevel2Schema().getOptionalValues(0),
"axc");
+ }
}
diff --git
a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java
b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java
index fa2d87fa3..2f6861060 100644
--- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java
+++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java
@@ -168,15 +168,16 @@ public class TestUtils {
}
/**
- * Read messages from given file into the expected proto class.
+ * Read messages from given file into the expected proto class with
ignoreUnknown fields flag.
* @param file
* @param messageClass
* @param <T>
+ * @param ignoreUnknownFields
* @return List of protobuf messages for the given type.
*/
- public static <T extends MessageOrBuilder> List<T> readMessages(Path file,
Class<T> messageClass) throws IOException {
+ public static <T extends MessageOrBuilder> List<T> readMessages(Path file,
Class<T> messageClass, boolean ignoreUnknownFields) throws IOException {
InputFile inputFile = HadoopInputFile.fromPath(file, new Configuration());
- ParquetReader.Builder readerBuilder =
ProtoParquetReader.builder(inputFile);
+ ParquetReader.Builder readerBuilder =
ProtoParquetReader.builder(inputFile, ignoreUnknownFields);
if (messageClass != null) {
readerBuilder.set(ProtoReadSupport.PB_CLASS,
messageClass.getName()).build();
}
@@ -195,6 +196,17 @@ public class TestUtils {
}
}
+ /**
+ * Read messages from given file into the expected proto class.
+ * @param file
+ * @param messageClass
+ * @param <T>
+ * @return List of protobuf messages for the given type.
+ */
+ public static <T extends MessageOrBuilder> List<T> readMessages(Path file,
Class<T> messageClass) throws IOException {
+ return readMessages(file, messageClass, false);
+ }
+
/**
* Writes messages to temporary file and returns its path.
*/
diff --git a/parquet-protobuf/src/test/resources/TestProto3SchemaV3.proto
b/parquet-protobuf/src/test/resources/TestProto3SchemaV3.proto
new file mode 100644
index 000000000..2269b8043
--- /dev/null
+++ b/parquet-protobuf/src/test/resources/TestProto3SchemaV3.proto
@@ -0,0 +1,56 @@
+syntax = "proto3";
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package TestProto3.Schema;
+
+option java_package = "org.apache.parquet.proto.test";
+
+// For the test of schema evolution
+// This is the "V2" schema, which is supposed to be an evolution from the "V2"
(TestProto3SchemaV1.proto)
+message MessageSchema {
+
+ enum LabelNumberPair {
+ UNKNOWN_VALUE = 0;
+ FIRST = 1;
+ // We added one more value in V2 comparing to V1
+ SECOND = 2;
+ }
+
+ LabelNumberPair optionalLabelNumberPair = 1;
+ string optionalString = 2;
+ int32 optionalInt32 = 3;
+ int32 optionalInt32New = 4; // Added New Field scalar type
+ SubMessageSchema subMessageSchema = 5; // added New Field
+ repeated SubMessageSchema repeatedDubMessageSchema = 6; // added New Field
+
+}
+
+message SubMessageSchema {
+ enum SomeTestEnum {
+ VALUE_X = 0;
+ }
+ SomeTestEnum testEnum = 1;
+ string optionalFirstString = 2;
+ repeated int32 optionalFirstInt32 = 3;
+ Level2SubMessageSchema level2Schema =4;
+}
+
+message Level2SubMessageSchema {
+ repeated string optionalValues = 1;
+}