This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new c28e4a6cc PARQUET-2446: ProtoParquetWriter support dynamic message
(#1295)
c28e4a6cc is described below
commit c28e4a6cc889b8c7a1d9604f4c16566028573a53
Author: fc <[email protected]>
AuthorDate: Fri Mar 15 09:54:48 2024 +0800
PARQUET-2446: ProtoParquetWriter support dynamic message (#1295)
Co-authored-by: fcyang <[email protected]>
---
.../apache/parquet/proto/ProtoParquetWriter.java | 15 ++++++
.../parquet/proto/ProtoParquetWriterTest.java | 61 ++++++++++++++++++++++
2 files changed, 76 insertions(+)
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
index 6633212f2..aec7210a2 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.parquet.proto;
+import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import java.io.IOException;
@@ -115,9 +116,15 @@ public class ProtoParquetWriter<T extends
MessageOrBuilder> extends ParquetWrite
return new ProtoWriteSupport<>(protoMessage);
}
+ private static <T extends MessageOrBuilder> WriteSupport<T>
writeSupport(Descriptors.Descriptor descriptor) {
+ return new ProtoWriteSupport<>(descriptor);
+ }
+
public static class Builder<T> extends ParquetWriter.Builder<T, Builder<T>> {
Class<? extends Message> protoMessage = null;
+ private Descriptors.Descriptor descriptor = null;
+
private Builder(Path file) {
super(file);
}
@@ -135,6 +142,11 @@ public class ProtoParquetWriter<T extends
MessageOrBuilder> extends ParquetWrite
return this;
}
+ public Builder<T> withDescriptor(Descriptors.Descriptor descriptor) {
+ this.descriptor = descriptor;
+ return this;
+ }
+
@Override
protected WriteSupport<T> getWriteSupport(Configuration conf) {
return getWriteSupport((ParquetConfiguration) null);
@@ -143,6 +155,9 @@ public class ProtoParquetWriter<T extends MessageOrBuilder>
extends ParquetWrite
@Override
@SuppressWarnings("unchecked")
protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
+ if (this.descriptor != null) {
+ return (WriteSupport<T>) ProtoParquetWriter.writeSupport(descriptor);
+ }
return (WriteSupport<T>) ProtoParquetWriter.writeSupport(protoMessage);
}
}
diff --git
a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoParquetWriterTest.java
b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoParquetWriterTest.java
new file mode 100644
index 000000000..da84bf004
--- /dev/null
+++
b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoParquetWriterTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.proto;
+
+import static org.apache.parquet.proto.TestUtils.readMessages;
+import static org.apache.parquet.proto.TestUtils.someTemporaryFilePath;
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.proto.test.TestProto3;
+import org.junit.Test;
+
+public class ProtoParquetWriterTest {
+ @Test
+ public void testProtoParquetWriterWithDynamicMessage() throws Exception {
+ Path file = someTemporaryFilePath();
+ Descriptors.Descriptor descriptor =
TestProto3.InnerMessage.getDescriptor();
+ TestProto3.InnerMessage.Builder msg = TestProto3.InnerMessage.newBuilder();
+ msg.setOne("oneValue");
+ DynamicMessage dynamicMessage =
DynamicMessage.newBuilder(msg.build()).build();
+
+ Configuration conf = new Configuration();
+ ParquetWriter<DynamicMessage> writer =
ProtoParquetWriter.<DynamicMessage>builder(file)
+ .withDescriptor(descriptor)
+ .withConf(conf)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build();
+ writer.write(dynamicMessage);
+ writer.close();
+
+ readMessages(file, TestProto3.InnerMessage.class);
+ List<TestProto3.InnerMessage> gotBack = TestUtils.readMessages(file,
TestProto3.InnerMessage.class);
+
+ TestProto3.InnerMessage getFirst = gotBack.get(0);
+ assertEquals(getFirst.getOne(), "oneValue");
+ assertEquals(getFirst.getTwo(), "");
+ assertEquals(getFirst.getThree(), "");
+ }
+}