This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 181838015 GH-3286: Add support for Parquet-Protobuf in Parquet-cli
(#3287)
181838015 is described below
commit 1818380153fa1c8a354cc71beb08d7ab6cf50e55
Author: Arnav Balyan <[email protected]>
AuthorDate: Sat Sep 6 10:45:37 2025 +0530
GH-3286: Add support for Parquet-Protobuf in Parquet-cli (#3287)
---
parquet-cli/pom.xml | 22 +++++++++
.../java/org/apache/parquet/cli/BaseCommand.java | 53 ++++++++++++++++++++++
.../java/org/apache/parquet/cli/util/Schemas.java | 3 +-
.../parquet/cli/commands/CatCommandTest.java | 44 ++++++++++++++++++
4 files changed, 121 insertions(+), 1 deletion(-)
diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml
index 7e3d52c8a..17a508e8a 100644
--- a/parquet-cli/pom.xml
+++ b/parquet-cli/pom.xml
@@ -85,6 +85,28 @@
<artifactId>parquet-avro</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!-- Protobuf dependencies for CLI Tests -->
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-protobuf</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.25.6</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-protobuf</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-format-structures</artifactId>
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
index b30c9432d..4ba843c6b 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -35,6 +35,7 @@ import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
@@ -55,13 +56,56 @@ import org.apache.parquet.cli.util.Formats;
import org.apache.parquet.cli.util.GetClassLoader;
import org.apache.parquet.cli.util.Schemas;
import org.apache.parquet.cli.util.SeekableFSDataInputStream;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.slf4j.Logger;
public abstract class BaseCommand implements Command, Configurable {
private static final String RESOURCE_URI_SCHEME = "resource";
private static final String STDIN_AS_SOURCE = "stdin";
+ public static final String PARQUET_CLI_ENABLE_GROUP_READER =
"parquet.enable.simple-reader";
+
+ /**
+ * Note for dev: Due to legancy reasons, parquet-cli used the avro schema
reader which
+ * breaks for files generated through proto. This logic is in place to
auto-detect such cases
+ * and route the request to simple reader instead of avro.
+ */
+ private boolean isProtobufStyleSchema(String source) throws IOException {
+ try (ParquetFileReader reader = ParquetFileReader.open(getConf(),
qualifiedPath(source))) {
+ Map<String, String> metadata =
reader.getFooter().getFileMetaData().getKeyValueMetaData();
+ return metadata != null && metadata.containsKey("parquet.proto.class");
+ }
+ }
+
+ // Util to convert ParquetReader to Iterable
+ private static <T> Iterable<T> asIterable(final ParquetReader<T> reader) {
+ return () -> new Iterator<T>() {
+ private T next = advance();
+
+ private T advance() {
+ try {
+ return reader.read();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override
+ public T next() {
+ T current = next;
+ next = advance();
+ return current;
+ }
+ };
+ }
protected final Logger console;
@@ -320,6 +364,15 @@ public abstract class BaseCommand implements Command,
Configurable {
Formats.Format format = Formats.detectFormat(open(source));
switch (format) {
case PARQUET:
+ boolean isProtobufStyle = isProtobufStyleSchema(source);
+ boolean useGroupReader =
getConf().getBoolean(PARQUET_CLI_ENABLE_GROUP_READER, false);
+ if (isProtobufStyle || useGroupReader) {
+ final ParquetReader<Group> grp = ParquetReader.<Group>builder(
+ new GroupReadSupport(), qualifiedPath(source))
+ .withConf(getConf())
+ .build();
+ return (Iterable<D>) asIterable(grp);
+ }
Configuration conf = new Configuration(getConf());
// TODO: add these to the reader builder
AvroReadSupport.setRequestedProjection(conf, projection);
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java
index ef877d149..bf5baeff9 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java
@@ -82,7 +82,8 @@ public class Schemas {
if (schemaString != null) {
return new Schema.Parser().parse(schemaString);
} else {
- return new
AvroSchemaConverter().convert(footer.getFileMetaData().getSchema());
+ return new AvroSchemaConverter(conf)
+ .convert(footer.getFileMetaData().getSchema());
}
}
diff --git
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
index b5d092901..4a781886a 100644
---
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
+++
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
@@ -18,10 +18,14 @@
*/
package org.apache.parquet.cli.commands;
+import com.google.protobuf.Message;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.proto.ProtoParquetWriter;
+import org.apache.parquet.proto.test.TestProtobuf;
import org.junit.Assert;
import org.junit.Test;
@@ -63,4 +67,44 @@ public class CatCommandTest extends ParquetFileTest {
command.setConf(new Configuration());
command.run();
}
+
+ @Test
+ public void testCatCommandProtoParquetAutoDetected() throws Exception {
+ File protoFile = new File(getTempFolder(), "proto_someevent.parquet");
+ writeProtoParquet(protoFile);
+
+ CatCommand cmd = new CatCommand(createLogger(), 0);
+ cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath());
+ cmd.setConf(new Configuration());
+
+ int result = cmd.run();
+ Assert.assertEquals(0, result);
+ }
+
+ @Test
+ public void testCatCommandWithSimpleReaderConfig() throws Exception {
+ File regularFile = parquetFile();
+
+ Configuration conf = new Configuration();
+ conf.setBoolean("parquet.enable.simple-reader", true);
+
+ CatCommand cmd = new CatCommand(createLogger(), 5);
+ cmd.sourceFiles = Arrays.asList(regularFile.getAbsolutePath());
+ cmd.setConf(conf);
+
+ int result = cmd.run();
+ Assert.assertEquals(0, result);
+ }
+
+ private static void writeProtoParquet(File file) throws Exception {
+ TestProtobuf.RepeatedIntMessage.Builder b =
TestProtobuf.RepeatedIntMessage.newBuilder()
+ .addRepeatedInt(1)
+ .addRepeatedInt(2)
+ .addRepeatedInt(3);
+
+ try (ProtoParquetWriter<Message> w =
+ new ProtoParquetWriter<>(new Path(file.getAbsolutePath()),
TestProtobuf.RepeatedIntMessage.class)) {
+ w.write(b.build());
+ }
+ }
}