This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new 181838015 GH-3286: Add support for Parquet-Protobuf in Parquet-cli 
(#3287)
181838015 is described below

commit 1818380153fa1c8a354cc71beb08d7ab6cf50e55
Author: Arnav Balyan <[email protected]>
AuthorDate: Sat Sep 6 10:45:37 2025 +0530

    GH-3286: Add support for Parquet-Protobuf in Parquet-cli (#3287)
---
 parquet-cli/pom.xml                                | 22 +++++++++
 .../java/org/apache/parquet/cli/BaseCommand.java   | 53 ++++++++++++++++++++++
 .../java/org/apache/parquet/cli/util/Schemas.java  |  3 +-
 .../parquet/cli/commands/CatCommandTest.java       | 44 ++++++++++++++++++
 4 files changed, 121 insertions(+), 1 deletion(-)

diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml
index 7e3d52c8a..17a508e8a 100644
--- a/parquet-cli/pom.xml
+++ b/parquet-cli/pom.xml
@@ -85,6 +85,28 @@
       <artifactId>parquet-avro</artifactId>
       <version>${project.version}</version>
     </dependency>
+
+    <!-- Protobuf dependencies for CLI Tests -->
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-protobuf</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>3.25.6</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-protobuf</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-format-structures</artifactId>
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java 
b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
index b30c9432d..4ba843c6b 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -35,6 +35,7 @@ import java.nio.charset.StandardCharsets;
 import java.security.AccessController;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
@@ -55,13 +56,56 @@ import org.apache.parquet.cli.util.Formats;
 import org.apache.parquet.cli.util.GetClassLoader;
 import org.apache.parquet.cli.util.Schemas;
 import org.apache.parquet.cli.util.SeekableFSDataInputStream;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.slf4j.Logger;
 
 public abstract class BaseCommand implements Command, Configurable {
 
   private static final String RESOURCE_URI_SCHEME = "resource";
   private static final String STDIN_AS_SOURCE = "stdin";
+  public static final String PARQUET_CLI_ENABLE_GROUP_READER = 
"parquet.enable.simple-reader";
+
+  /**
+   * Note for dev: Due to legancy reasons, parquet-cli used the avro schema 
reader which
+   * breaks for files generated through proto. This logic is in place to 
auto-detect such cases
+   * and route the request to simple reader instead of avro.
+   */
+  private boolean isProtobufStyleSchema(String source) throws IOException {
+    try (ParquetFileReader reader = ParquetFileReader.open(getConf(), 
qualifiedPath(source))) {
+      Map<String, String> metadata = 
reader.getFooter().getFileMetaData().getKeyValueMetaData();
+      return metadata != null && metadata.containsKey("parquet.proto.class");
+    }
+  }
+
+  // Util to convert ParquetReader to Iterable
+  private static <T> Iterable<T> asIterable(final ParquetReader<T> reader) {
+    return () -> new Iterator<T>() {
+      private T next = advance();
+
+      private T advance() {
+        try {
+          return reader.read();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public boolean hasNext() {
+        return next != null;
+      }
+
+      @Override
+      public T next() {
+        T current = next;
+        next = advance();
+        return current;
+      }
+    };
+  }
 
   protected final Logger console;
 
@@ -320,6 +364,15 @@ public abstract class BaseCommand implements Command, 
Configurable {
     Formats.Format format = Formats.detectFormat(open(source));
     switch (format) {
       case PARQUET:
+        boolean isProtobufStyle = isProtobufStyleSchema(source);
+        boolean useGroupReader = 
getConf().getBoolean(PARQUET_CLI_ENABLE_GROUP_READER, false);
+        if (isProtobufStyle || useGroupReader) {
+          final ParquetReader<Group> grp = ParquetReader.<Group>builder(
+                  new GroupReadSupport(), qualifiedPath(source))
+              .withConf(getConf())
+              .build();
+          return (Iterable<D>) asIterable(grp);
+        }
         Configuration conf = new Configuration(getConf());
         // TODO: add these to the reader builder
         AvroReadSupport.setRequestedProjection(conf, projection);
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java 
b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java
index ef877d149..bf5baeff9 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java
@@ -82,7 +82,8 @@ public class Schemas {
     if (schemaString != null) {
       return new Schema.Parser().parse(schemaString);
     } else {
-      return new 
AvroSchemaConverter().convert(footer.getFileMetaData().getSchema());
+      return new AvroSchemaConverter(conf)
+          .convert(footer.getFileMetaData().getSchema());
     }
   }
 
diff --git 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
index b5d092901..4a781886a 100644
--- 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
+++ 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
@@ -18,10 +18,14 @@
  */
 package org.apache.parquet.cli.commands;
 
+import com.google.protobuf.Message;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.proto.ProtoParquetWriter;
+import org.apache.parquet.proto.test.TestProtobuf;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -63,4 +67,44 @@ public class CatCommandTest extends ParquetFileTest {
     command.setConf(new Configuration());
     command.run();
   }
+
+  @Test
+  public void testCatCommandProtoParquetAutoDetected() throws Exception {
+    File protoFile = new File(getTempFolder(), "proto_someevent.parquet");
+    writeProtoParquet(protoFile);
+
+    CatCommand cmd = new CatCommand(createLogger(), 0);
+    cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath());
+    cmd.setConf(new Configuration());
+
+    int result = cmd.run();
+    Assert.assertEquals(0, result);
+  }
+
+  @Test
+  public void testCatCommandWithSimpleReaderConfig() throws Exception {
+    File regularFile = parquetFile();
+
+    Configuration conf = new Configuration();
+    conf.setBoolean("parquet.enable.simple-reader", true);
+
+    CatCommand cmd = new CatCommand(createLogger(), 5);
+    cmd.sourceFiles = Arrays.asList(regularFile.getAbsolutePath());
+    cmd.setConf(conf);
+
+    int result = cmd.run();
+    Assert.assertEquals(0, result);
+  }
+
+  private static void writeProtoParquet(File file) throws Exception {
+    TestProtobuf.RepeatedIntMessage.Builder b = 
TestProtobuf.RepeatedIntMessage.newBuilder()
+        .addRepeatedInt(1)
+        .addRepeatedInt(2)
+        .addRepeatedInt(3);
+
+    try (ProtoParquetWriter<Message> w =
+        new ProtoParquetWriter<>(new Path(file.getAbsolutePath()), 
TestProtobuf.RepeatedIntMessage.class)) {
+      w.write(b.build());
+    }
+  }
 }

Reply via email to