This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new d8aaf93b9 PARQUET-2334: Allow the cat subcommand to take multiple 
files (#1132)
d8aaf93b9 is described below

commit d8aaf93b960693a7dab52b0dc33c786f33938a54
Author: Kengo Seki <sek...@apache.org>
AuthorDate: Thu Aug 10 14:48:00 2023 +0900

    PARQUET-2334: Allow the cat subcommand to take multiple files (#1132)
---
 .../apache/parquet/cli/commands/CatCommand.java    | 55 ++++++++++++----------
 .../parquet/cli/commands/CatCommandTest.java       | 29 ++++++++++++
 2 files changed, 59 insertions(+), 25 deletions(-)

diff --git 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
index 7703e88ca..aa0ab5ef7 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
@@ -29,7 +29,9 @@ import org.apache.parquet.cli.util.Expressions;
 import org.slf4j.Logger;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.parquet.cli.util.Expressions.select;
 
@@ -58,35 +60,38 @@ public class CatCommand extends BaseCommand {
     Preconditions.checkArgument(
         sourceFiles != null && !sourceFiles.isEmpty(),
         "Missing file name");
-    Preconditions.checkArgument(sourceFiles.size() == 1,
-        "Only one file can be given");
 
-    final String source = sourceFiles.get(0);
-
-    Schema schema = getAvroSchema(source);
-    Schema projection = Expressions.filterSchema(schema, columns);
+    // Ensure all source files have the columns specified first
+    Map<String, Schema> schemas = new HashMap<>();
+    for (String source : sourceFiles) {
+      Schema schema = getAvroSchema(source);
+      schemas.put(source, Expressions.filterSchema(schema, columns));
+    }
 
-    Iterable<Object> reader = openDataFile(source, projection);
-    boolean threw = true;
-    long count = 0;
-    try {
-      for (Object record : reader) {
-        if (numRecords > 0 && count >= numRecords) {
-          break;
+    for (String source : sourceFiles) {
+      Schema projection = schemas.get(source);
+      Iterable<Object> reader = openDataFile(source, projection);
+      boolean threw = true;
+      long count = 0;
+      try {
+        for (Object record : reader) {
+          if (numRecords > 0 && count >= numRecords) {
+            break;
+          }
+          if (columns == null || columns.size() != 1) {
+            console.info(String.valueOf(record));
+          } else {
+            console.info(String.valueOf(select(projection, record, 
columns.get(0))));
+          }
+          count += 1;
         }
-        if (columns == null || columns.size() != 1) {
-          console.info(String.valueOf(record));
-        } else {
-          console.info(String.valueOf(select(projection, record, 
columns.get(0))));
+        threw = false;
+      } catch (RuntimeException e) {
+        throw new RuntimeException("Failed on record " + count + " in file " + 
source, e);
+      } finally {
+        if (reader instanceof Closeable) {
+          Closeables.close((Closeable) reader, threw);
         }
-        count += 1;
-      }
-      threw = false;
-    } catch (RuntimeException e) {
-      throw new RuntimeException("Failed on record " + count, e);
-    } finally {
-      if (reader instanceof Closeable) {
-        Closeables.close((Closeable) reader, threw);
       }
     }
 
diff --git 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
index 38055e6ac..cfc6b2baa 100644
--- 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
+++ 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
@@ -35,4 +35,33 @@ public class CatCommandTest extends ParquetFileTest {
     command.setConf(new Configuration());
     Assert.assertEquals(0, command.run());
   }
+
+  @Test
+  public void testCatCommandWithMultipleInput() throws IOException {
+    File file = parquetFile();
+    CatCommand command = new CatCommand(createLogger(), 0);
+    command.sourceFiles = Arrays.asList(file.getAbsolutePath(), 
file.getAbsolutePath());
+    command.setConf(new Configuration());
+    Assert.assertEquals(0, command.run());
+  }
+
+  @Test
+  public void testCatCommandWithSpecificColumns() throws IOException {
+    File file = parquetFile();
+    CatCommand command = new CatCommand(createLogger(), 0);
+    command.sourceFiles = Arrays.asList(file.getAbsolutePath());
+    command.columns = Arrays.asList(INT32_FIELD, INT64_FIELD);
+    command.setConf(new Configuration());
+    Assert.assertEquals(0, command.run());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCatCommandWithInvalidColumn() throws IOException {
+    File file = parquetFile();
+    CatCommand command = new CatCommand(createLogger(), 0);
+    command.sourceFiles = Arrays.asList(file.getAbsolutePath());
+    command.columns = Arrays.asList("invalid_field");
+    command.setConf(new Configuration());
+    command.run();
+  }
 }

Reply via email to