This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new c28e4a6cc PARQUET-2446:  ProtoParquetWriter support dynamic message 
(#1295)
c28e4a6cc is described below

commit c28e4a6cc889b8c7a1d9604f4c16566028573a53
Author: fc <[email protected]>
AuthorDate: Fri Mar 15 09:54:48 2024 +0800

    PARQUET-2446:  ProtoParquetWriter support dynamic message (#1295)
    
    Co-authored-by: fcyang <[email protected]>
---
 .../apache/parquet/proto/ProtoParquetWriter.java   | 15 ++++++
 .../parquet/proto/ProtoParquetWriterTest.java      | 61 ++++++++++++++++++++++
 2 files changed, 76 insertions(+)

diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
index 6633212f2..aec7210a2 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.proto;
 
+import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 import com.google.protobuf.MessageOrBuilder;
 import java.io.IOException;
@@ -115,9 +116,15 @@ public class ProtoParquetWriter<T extends 
MessageOrBuilder> extends ParquetWrite
     return new ProtoWriteSupport<>(protoMessage);
   }
 
+  private static <T extends MessageOrBuilder> WriteSupport<T> 
writeSupport(Descriptors.Descriptor descriptor) {
+    return new ProtoWriteSupport<>(descriptor);
+  }
+
   public static class Builder<T> extends ParquetWriter.Builder<T, Builder<T>> {
     Class<? extends Message> protoMessage = null;
 
+    private Descriptors.Descriptor descriptor = null;
+
     private Builder(Path file) {
       super(file);
     }
@@ -135,6 +142,11 @@ public class ProtoParquetWriter<T extends 
MessageOrBuilder> extends ParquetWrite
       return this;
     }
 
+    public Builder<T> withDescriptor(Descriptors.Descriptor descriptor) {
+      this.descriptor = descriptor;
+      return this;
+    }
+
     @Override
     protected WriteSupport<T> getWriteSupport(Configuration conf) {
       return getWriteSupport((ParquetConfiguration) null);
@@ -143,6 +155,9 @@ public class ProtoParquetWriter<T extends MessageOrBuilder> 
extends ParquetWrite
     @Override
     @SuppressWarnings("unchecked")
     protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
+      if (this.descriptor != null) {
+        return (WriteSupport<T>) ProtoParquetWriter.writeSupport(descriptor);
+      }
       return (WriteSupport<T>) ProtoParquetWriter.writeSupport(protoMessage);
     }
   }
diff --git 
a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoParquetWriterTest.java
 
b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoParquetWriterTest.java
new file mode 100644
index 000000000..da84bf004
--- /dev/null
+++ 
b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoParquetWriterTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.proto;
+
+import static org.apache.parquet.proto.TestUtils.readMessages;
+import static org.apache.parquet.proto.TestUtils.someTemporaryFilePath;
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.proto.test.TestProto3;
+import org.junit.Test;
+
+public class ProtoParquetWriterTest {
+  @Test
+  public void testProtoParquetWriterWithDynamicMessage() throws Exception {
+    Path file = someTemporaryFilePath();
+    Descriptors.Descriptor descriptor = 
TestProto3.InnerMessage.getDescriptor();
+    TestProto3.InnerMessage.Builder msg = TestProto3.InnerMessage.newBuilder();
+    msg.setOne("oneValue");
+    DynamicMessage dynamicMessage = 
DynamicMessage.newBuilder(msg.build()).build();
+
+    Configuration conf = new Configuration();
+    ParquetWriter<DynamicMessage> writer = 
ProtoParquetWriter.<DynamicMessage>builder(file)
+        .withDescriptor(descriptor)
+        .withConf(conf)
+        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+        .build();
+    writer.write(dynamicMessage);
+    writer.close();
+
+    readMessages(file, TestProto3.InnerMessage.class);
+    List<TestProto3.InnerMessage> gotBack = TestUtils.readMessages(file, 
TestProto3.InnerMessage.class);
+
+    TestProto3.InnerMessage getFirst = gotBack.get(0);
+    assertEquals(getFirst.getOne(), "oneValue");
+    assertEquals(getFirst.getTwo(), "");
+    assertEquals(getFirst.getThree(), "");
+  }
+}

Reply via email to