This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 67bfa9188 PARQUET-2305: Allow Parquet to Proto conversion even though 
target schema has less fields (#1102)
67bfa9188 is described below

commit 67bfa9188264edd530cf45ebf8e880056125364e
Author: Sanjay Sharma <[email protected]>
AuthorDate: Wed Jun 14 04:59:22 2023 +0100

    PARQUET-2305: Allow Parquet to Proto conversion even though target schema 
has less fields (#1102)
    
    If Parquet has any field which has been removed from the schema and Parquet 
to Proto conversion happens, it errors out due to Unknown fields. There could 
be some scenarios that we want to still convert PARQUET into the target proto 
schema object which has lesser fields. In this change using the 
ProtoParquetReader and specifying "ignoreUnknownFields" as an argument, the 
conversion can still be done which would ignore fields it can't convert and not 
error out.
---
 .../org/apache/parquet/proto/ProtoConstants.java   |  6 ++
 .../parquet/proto/ProtoMessageConverter.java       | 87 +++++++++++++++++-----
 .../apache/parquet/proto/ProtoParquetReader.java   | 17 +++++
 .../parquet/proto/ProtoSchemaEvolutionTest.java    | 65 ++++++++++++++++
 .../java/org/apache/parquet/proto/TestUtils.java   | 18 ++++-
 .../src/test/resources/TestProto3SchemaV3.proto    | 56 ++++++++++++++
 6 files changed, 229 insertions(+), 20 deletions(-)

diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java
index 7458f8913..ebe52e7b1 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java
@@ -26,6 +26,12 @@ public final class ProtoConstants {
   public static final String METADATA_ENUM_PREFIX = "parquet.proto.enum.";
   public static final String METADATA_ENUM_KEY_VALUE_SEPARATOR = ":";
   public static final String METADATA_ENUM_ITEM_SEPARATOR = ",";
+  /**
+   * Configuration flag to ignore the unknown fields during conversion of 
Parquet to Proto
+   * if fields are missing in target schema instead of throwing an error.
+   * Enabling it will avoid a job failure, but you should perhaps use an 
up-to-date schema instead.
+   */
+  public static final String CONFIG_IGNORE_UNKNOWN_FIELDS = 
"parquet.proto.ignore.unknown.fields";
   /**
    * Configuration flag to enable reader to accept enum label that's neither 
defined in its own proto schema nor conform
    * to the "UNKNOWN_ENUM_*" pattern with which we can get the enum number. 
The enum value will be treated as an unknown
diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
index 77f5d529e..8383fbc75 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.proto;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 import com.twitter.elephantbird.util.Protobufs;
@@ -35,6 +36,7 @@ import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.IncompatibleSchemaModificationException;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.PrimitiveType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,10 +45,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.IntStream;
 
 import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
 import static java.util.Optional.of;
 import static 
org.apache.parquet.proto.ProtoConstants.CONFIG_ACCEPT_UNKNOWN_ENUM;
+import static 
org.apache.parquet.proto.ProtoConstants.CONFIG_IGNORE_UNKNOWN_FIELDS;
 import static 
org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR;
 import static 
org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR;
 import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX;
@@ -58,6 +62,12 @@ import static 
org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX;
 class ProtoMessageConverter extends GroupConverter {
   private static final Logger LOG = 
LoggerFactory.getLogger(ProtoMessageConverter.class);
 
+  private static final ParentValueContainer DUMMY_PVC = new 
ParentValueContainer() {
+    @Override
+    public void add(Object value) {
+    }
+  };
+
   protected final Configuration conf;
   protected final Converter[] converters;
   protected final ParentValueContainer parent;
@@ -80,35 +90,76 @@ class ProtoMessageConverter extends GroupConverter {
 
   // For usage in message arrays
   ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, 
Message.Builder builder, GroupType parquetSchema, Map<String, String> 
extraMetadata) {
+    if (pvc == null) {
+      throw new IllegalStateException("Missing parent value container");
+    }
 
     int schemaSize = parquetSchema.getFieldCount();
     converters = new Converter[schemaSize];
     this.conf = conf;
     this.parent = pvc;
     this.extraMetadata = extraMetadata;
-    int parquetFieldIndex = 1;
-
-    if (pvc == null) {
-      throw new IllegalStateException("Missing parent value container");
-    }
+    boolean ignoreUnknownFields = 
conf.getBoolean(CONFIG_IGNORE_UNKNOWN_FIELDS, false);
 
     myBuilder = builder;
 
-    Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType();
+    if (builder == null && ignoreUnknownFields) {
+      IntStream.range(0, parquetSchema.getFieldCount())
+        .forEach(i-> converters[i] = dummyScalarConverter(DUMMY_PVC, 
parquetSchema.getType(i), conf, extraMetadata));
+
+    } else {
 
-    for (Type parquetField : parquetSchema.getFields()) {
-      Descriptors.FieldDescriptor protoField = 
protoDescriptor.findFieldByName(parquetField.getName());
+      int parquetFieldIndex = 0;
+      Descriptors.Descriptor protoDescriptor =  builder.getDescriptorForType();
 
-      if (protoField == null) {
-        String description = "Scheme mismatch \n\"" + parquetField + "\"" +
-                "\n proto descriptor:\n" + protoDescriptor.toProto();
-        throw new IncompatibleSchemaModificationException("Cant find \"" + 
parquetField.getName() + "\" " + description);
+      for (Type parquetField : parquetSchema.getFields()) {
+        Descriptors.FieldDescriptor protoField = 
protoDescriptor.findFieldByName(parquetField.getName());
+
+        validateProtoField(ignoreUnknownFields, protoDescriptor.toProto(), 
parquetField, protoField);
+
+        converters[parquetFieldIndex] = protoField != null ?
+            newMessageConverter(myBuilder, protoField, parquetField) :
+            dummyScalarConverter(DUMMY_PVC, parquetField, conf, extraMetadata);
+
+        parquetFieldIndex++;
       }
+    }
+  }
+
+  private void validateProtoField(boolean ignoreUnknownFields,
+                                  DescriptorProtos.DescriptorProto 
protoDescriptor,
+                                  Type parquetField,
+                                  Descriptors.FieldDescriptor protoField) {
+    if (protoField == null && !ignoreUnknownFields) {
+      String description = "Schema mismatch \n\"" + parquetField + "\"" +
+        "\n proto descriptor:\n" + protoDescriptor;
+      throw new IncompatibleSchemaModificationException("Cant find \"" + 
parquetField.getName() + "\" " + description);
+    }
+  }
 
-      converters[parquetFieldIndex - 1] = newMessageConverter(myBuilder, 
protoField, parquetField);
 
-      parquetFieldIndex++;
+  private Converter dummyScalarConverter(ParentValueContainer pvc,
+                                         Type parquetField, Configuration conf,
+                                         Map<String, String> extraMetadata) {
+
+    if (parquetField.isPrimitive()) {
+      PrimitiveType primitiveType = parquetField.asPrimitiveType();
+      PrimitiveType.PrimitiveTypeName primitiveTypeName = 
primitiveType.getPrimitiveTypeName();
+      switch (primitiveTypeName) {
+        case BINARY: return new ProtoBinaryConverter(pvc);
+        case FLOAT: return new ProtoFloatConverter(pvc);
+        case DOUBLE: return new ProtoDoubleConverter(pvc);
+        case BOOLEAN: return new ProtoBooleanConverter(pvc);
+        case INT32: return new ProtoIntConverter(pvc);
+        case INT64: return new ProtoLongConverter(pvc);
+        case INT96: return new ProtoStringConverter(pvc);
+        case FIXED_LEN_BYTE_ARRAY: return new ProtoBinaryConverter(pvc);
+        default: break;
+      }
+
+      throw new UnsupportedOperationException(String.format("Cannot convert 
Parquet type: %s" , parquetField));
     }
+    return new ProtoMessageConverter(conf, pvc, (Message.Builder) null, 
parquetField.asGroupType(), extraMetadata);
   }
 
 
@@ -124,13 +175,15 @@ class ProtoMessageConverter extends GroupConverter {
 
   @Override
   public void end() {
-    parent.add(myBuilder.build());
-    myBuilder.clear();
+    if (myBuilder != null) {
+      parent.add(myBuilder.build());
+      myBuilder.clear();
+    }
   }
 
   protected Converter newMessageConverter(final Message.Builder parentBuilder, 
final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
 
-    boolean isRepeated = fieldDescriptor.isRepeated();
+    boolean isRepeated = fieldDescriptor != null && 
fieldDescriptor.isRepeated();
 
     ParentValueContainer parent;
 
diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetReader.java
 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetReader.java
index a864474f7..2d9d6150a 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetReader.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetReader.java
@@ -28,6 +28,8 @@ import org.apache.parquet.io.InputFile;
 
 import com.google.protobuf.MessageOrBuilder;
 
+import static 
org.apache.parquet.proto.ProtoConstants.CONFIG_IGNORE_UNKNOWN_FIELDS;
+
 /**
  * Read Protobuf records from a Parquet file.
  */
@@ -38,10 +40,18 @@ public class ProtoParquetReader<T extends MessageOrBuilder>
     return new ProtoParquetReader.Builder<T>(file);
   }
 
+  public static <T> ParquetReader.Builder<T> builder(Path file, boolean 
ignoreUnknownFields) {
+    return new 
ProtoParquetReader.Builder<T>(file).setIgnoreUnknownFields(ignoreUnknownFields);
+  }
+
   public static <T> ParquetReader.Builder<T> builder(InputFile file) {
     return new ProtoParquetReader.Builder<T>(file);
   }
 
+  public static <T> ParquetReader.Builder<T> builder(InputFile file, boolean 
ignoreUnknownFields) {
+    return new 
ProtoParquetReader.Builder<T>(file).setIgnoreUnknownFields(ignoreUnknownFields);
+  }
+
   /**
    * @param file a file path
    * @throws IOException if there is an error while reading
@@ -71,6 +81,13 @@ public class ProtoParquetReader<T extends MessageOrBuilder>
       super(file);
     }
 
+    protected Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) {
+      if(ignoreUnknownFields) {
+        this.set(CONFIG_IGNORE_UNKNOWN_FIELDS, "TRUE");
+      }
+      return this;
+    }
+
     protected Builder(Path path) {
       super(path);
     }
diff --git 
a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java
 
b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java
index db7f6ced0..154210a91 100644
--- 
a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java
+++ 
b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java
@@ -21,6 +21,7 @@ package org.apache.parquet.proto;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.proto.test.TestProto3SchemaV1;
 import org.apache.parquet.proto.test.TestProto3SchemaV2;
+import org.apache.parquet.proto.test.TestProto3SchemaV3;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -65,4 +66,68 @@ public class ProtoSchemaEvolutionTest {
     assertEquals(messagesV2.size(), 1);
     assertSame(messagesV2.get(0).getOptionalLabelNumberPair(), 
TestProto3SchemaV2.MessageSchema.LabelNumberPair.SECOND);
   }
+
+  /**
+   * Test we can ignore fields which are unknown during conversion. V3 schema 
has evolved but reading it into v1 have less fields then V3. Reading
+   * it should still work and ignore fields it can't convert
+   */
+  @Test
+  public void testEnumSchemaWriteV3ReadV1IgnoreUnknownField() throws 
IOException {
+    TestProto3SchemaV3.MessageSchema dataV3 = 
TestProto3SchemaV3.MessageSchema.newBuilder()
+      
.setOptionalLabelNumberPair(TestProto3SchemaV3.MessageSchema.LabelNumberPair.SECOND)
+      .setOptionalString("string value")
+      .setOptionalInt32New(123)
+      
.addRepeatedDubMessageSchema(TestProto3SchemaV3.SubMessageSchema.newBuilder()
+        .addOptionalFirstInt32(1)
+        .setTestEnum(TestProto3SchemaV3.SubMessageSchema.SomeTestEnum.VALUE_X)
+        .setOptionalFirstString("abc")
+        
.setLevel2Schema(TestProto3SchemaV3.Level2SubMessageSchema.newBuilder().addOptionalValues("axc").build())
+        .build())
+      .build();
+    Path file = writeMessages(dataV3);
+    List<TestProto3SchemaV1.MessageSchema> messagesV1 = readMessages(file, 
TestProto3SchemaV1.MessageSchema.class, true);
+    assertEquals(messagesV1.size(), 1);
+    assertEquals(messagesV1.get(0).getOptionalLabelNumberPairValue(), 2);
+    assertEquals(messagesV1.get(0).getOptionalString(), "string value");
+  }
+
+  /**
+   * Test we can read enum value (number) with an old schema even the value is 
missing in the old schema.
+   */
+  @Test
+  public void testEnumSchemaWriteV1ReadV3IgnoreUnknownField() throws 
IOException {
+    TestProto3SchemaV1.MessageSchema dataV1WithEnumValueFromV1 = 
TestProto3SchemaV1.MessageSchema.newBuilder()
+      .setOptionalLabelNumberPairValue(2) // "2" is not defined in V1 enum, 
but the number is still accepted by protobuf
+      .build();
+    Path file = writeMessages(dataV1WithEnumValueFromV1);
+    List<TestProto3SchemaV3.MessageSchema> messagesV3 = readMessages(file, 
TestProto3SchemaV3.MessageSchema.class);
+    assertEquals(messagesV3.size(), 1);
+    assertSame(messagesV3.get(0).getOptionalLabelNumberPair(), 
TestProto3SchemaV3.MessageSchema.LabelNumberPair.SECOND);
+  }
+
+  /**
+   * Test nested conversion
+   */
+  @Test
+  public void testEnumSchemaWriteV3ReadV3() throws IOException {
+    TestProto3SchemaV3.MessageSchema dataV3 = 
TestProto3SchemaV3.MessageSchema.newBuilder()
+      
.setOptionalLabelNumberPair(TestProto3SchemaV3.MessageSchema.LabelNumberPair.SECOND)
+      .setOptionalString("string value")
+      .setOptionalInt32New(123)
+      
.addRepeatedDubMessageSchema(TestProto3SchemaV3.SubMessageSchema.newBuilder()
+        .addOptionalFirstInt32(1)
+        .setTestEnum(TestProto3SchemaV3.SubMessageSchema.SomeTestEnum.VALUE_X)
+        .setOptionalFirstString("abc")
+        
.setLevel2Schema(TestProto3SchemaV3.Level2SubMessageSchema.newBuilder().addOptionalValues("axc").build())
+        .build())
+      .build();
+    Path file = writeMessages(dataV3);
+    List<TestProto3SchemaV3.MessageSchema> messagesV3 = readMessages(file, 
TestProto3SchemaV3.MessageSchema.class, false);
+    assertEquals(messagesV3.size(), 1);
+    assertEquals(messagesV3.get(0).getOptionalLabelNumberPairValue(), 2);
+    assertEquals(messagesV3.get(0).getOptionalString(), "string value");
+    
assertEquals(messagesV3.get(0).getRepeatedDubMessageSchemaList().get(0).getOptionalFirstString(),
 "abc");
+    
assertEquals(messagesV3.get(0).getRepeatedDubMessageSchemaList().get(0).getOptionalFirstInt32(0),
 1);
+    
assertEquals(messagesV3.get(0).getRepeatedDubMessageSchemaList().get(0).getLevel2Schema().getOptionalValues(0),
 "axc");
+  }
 }
diff --git 
a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java 
b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java
index fa2d87fa3..2f6861060 100644
--- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java
+++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java
@@ -168,15 +168,16 @@ public class TestUtils {
   }
 
   /**
-   * Read messages from given file into the expected proto class.
+   * Read messages from given file into the expected proto class with 
ignoreUnknown fields flag.
    * @param file
    * @param messageClass
    * @param <T>
+   * @param ignoreUnknownFields
    * @return List of protobuf messages for the given type.
    */
-  public static <T extends MessageOrBuilder> List<T> readMessages(Path file, 
Class<T> messageClass) throws IOException {
+  public static <T extends MessageOrBuilder> List<T> readMessages(Path file, 
Class<T> messageClass, boolean ignoreUnknownFields) throws IOException {
     InputFile inputFile = HadoopInputFile.fromPath(file, new Configuration());
-    ParquetReader.Builder readerBuilder = 
ProtoParquetReader.builder(inputFile);
+    ParquetReader.Builder readerBuilder = 
ProtoParquetReader.builder(inputFile, ignoreUnknownFields);
     if (messageClass != null) {
       readerBuilder.set(ProtoReadSupport.PB_CLASS, 
messageClass.getName()).build();
     }
@@ -195,6 +196,17 @@ public class TestUtils {
     }
   }
 
+  /**
+   * Read messages from given file into the expected proto class.
+   * @param file
+   * @param messageClass
+   * @param <T>
+   * @return List of protobuf messages for the given type.
+   */
+  public static <T extends MessageOrBuilder> List<T> readMessages(Path file, 
Class<T> messageClass) throws IOException {
+    return readMessages(file, messageClass, false);
+  }
+
   /**
    * Writes messages to temporary file and returns its path.
    */
diff --git a/parquet-protobuf/src/test/resources/TestProto3SchemaV3.proto 
b/parquet-protobuf/src/test/resources/TestProto3SchemaV3.proto
new file mode 100644
index 000000000..2269b8043
--- /dev/null
+++ b/parquet-protobuf/src/test/resources/TestProto3SchemaV3.proto
@@ -0,0 +1,56 @@
+syntax = "proto3";
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package TestProto3.Schema;
+
+option java_package = "org.apache.parquet.proto.test";
+
+// For the test of schema evolution
+// This is the "V2" schema, which is supposed to be an evolution from the "V2" 
(TestProto3SchemaV1.proto)
+message MessageSchema {
+
+    enum LabelNumberPair {
+        UNKNOWN_VALUE = 0;
+        FIRST = 1;
+        // We added one more value in V2 comparing to V1
+        SECOND = 2;
+    }
+
+    LabelNumberPair optionalLabelNumberPair = 1;
+    string optionalString = 2;
+    int32 optionalInt32 = 3;
+    int32 optionalInt32New = 4; // Added New Field scalar type
+    SubMessageSchema subMessageSchema = 5; // added New Field
+    repeated SubMessageSchema repeatedDubMessageSchema = 6; // added New Field
+
+}
+
+message SubMessageSchema {
+    enum SomeTestEnum {
+        VALUE_X = 0;
+    }
+    SomeTestEnum testEnum = 1;
+    string optionalFirstString = 2;
+    repeated int32 optionalFirstInt32 = 3;
+    Level2SubMessageSchema level2Schema =4;
+}
+
+message Level2SubMessageSchema {
+    repeated string optionalValues = 1;
+}

Reply via email to