This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new ef9929c13 PARQUET-2335: Allow the scan subcommand to take multiple 
files (#1133)
ef9929c13 is described below

commit ef9929c130f8f2e24fca1c7b42b0742a4d9d5e61
Author: Kengo Seki <[email protected]>
AuthorDate: Thu Aug 10 14:48:43 2023 +0900

    PARQUET-2335: Allow the scan subcommand to take multiple files (#1133)
---
 .../apache/parquet/cli/commands/ScanCommand.java   | 56 ++++++++++++++--------
 .../parquet/cli/commands/ScanCommandTest.java      | 21 +++++++-
 2 files changed, 55 insertions(+), 22 deletions(-)

diff --git 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ScanCommand.java 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ScanCommand.java
index 0b226ab3c..9f2b28020 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ScanCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ScanCommand.java
@@ -30,13 +30,15 @@ import org.slf4j.Logger;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 @Parameters(commandDescription = "Scan all records from a file")
 public class ScanCommand extends BaseCommand {
 
   @Parameter(description = "<file>")
-  String sourceFile;
+  List<String> sourceFiles;
 
   @Parameter(
     names = {"-c", "--column", "--columns"},
@@ -50,32 +52,44 @@ public class ScanCommand extends BaseCommand {
   @Override
   public int run() throws IOException {
     Preconditions.checkArgument(
-      sourceFile != null && !sourceFile.isEmpty(),
+      sourceFiles != null && !sourceFiles.isEmpty(),
       "Missing file name");
 
-    Schema schema = getAvroSchema(sourceFile);
-    Schema projection = Expressions.filterSchema(schema, columns);
+    // Ensure all source files have the columns specified first
+    Map<String, Schema> schemas = new HashMap<>();
+    for (String sourceFile : sourceFiles) {
+      Schema schema = getAvroSchema(sourceFile);
+      schemas.put(sourceFile, Expressions.filterSchema(schema, columns));
+    }
 
-    long startTime = System.currentTimeMillis();
-    Iterable<Object> reader = openDataFile(sourceFile, projection);
-    boolean threw = true;
-    long count = 0;
-    try {
-      for (Object record : reader) {
-        count += 1;
+    long totalStartTime = System.currentTimeMillis();
+    long totalCount = 0;
+    for (String sourceFile : sourceFiles) {
+      long startTime = System.currentTimeMillis();
+      Iterable<Object> reader = openDataFile(sourceFile, 
schemas.get(sourceFile));
+      boolean threw = true;
+      long count = 0;
+      try {
+        for (Object record : reader) {
+          count += 1;
+        }
+        threw = false;
+      } catch (RuntimeException e) {
+        throw new RuntimeException("Failed on record " + count + " in " + 
sourceFile, e);
+      } finally {
+        if (reader instanceof Closeable) {
+          Closeables.close((Closeable) reader, threw);
+        }
       }
-      threw = false;
-    } catch (RuntimeException e) {
-      throw new RuntimeException("Failed on record " + count, e);
-    } finally {
-      if (reader instanceof Closeable) {
-        Closeables.close((Closeable) reader, threw);
+      totalCount += count;
+      if (1 < sourceFiles.size()) {
+        long endTime = System.currentTimeMillis();
+        console.info("Scanned " + count + " records from " + sourceFile + " in 
" + (endTime - startTime) / 1000.0 + " s");
       }
     }
-    long endTime = System.currentTimeMillis();
-
-    console.info("Scanned " + count + " records from " + sourceFile);
-    console.info("Time: " + (endTime - startTime) / 1000.0 + " s");
+    long totalEndTime = System.currentTimeMillis();
+    console.info("Scanned " + totalCount + " records from " + 
sourceFiles.size() + " file(s)");
+    console.info("Time: " + (totalEndTime - totalStartTime) / 1000.0 + " s");
     return 0;
   }
 
diff --git 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ScanCommandTest.java
 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ScanCommandTest.java
index dbe1f889e..b8928ab07 100644
--- 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ScanCommandTest.java
+++ 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ScanCommandTest.java
@@ -31,8 +31,27 @@ public class ScanCommandTest extends ParquetFileTest {
   public void testScanCommand() throws IOException {
     File file = parquetFile();
     ScanCommand command = new ScanCommand(createLogger());
-    command.sourceFile = file.getAbsolutePath();
+    command.sourceFiles = Arrays.asList(file.getAbsolutePath());
     command.setConf(new Configuration());
     Assert.assertEquals(0, command.run());
   }
+
+  @Test
+  public void testScanCommandWithMultipleSourceFiles() throws IOException {
+    File file = parquetFile();
+    ScanCommand command = new ScanCommand(createLogger());
+    command.sourceFiles = Arrays.asList(file.getAbsolutePath(), 
file.getAbsolutePath());
+    command.setConf(new Configuration());
+    Assert.assertEquals(0, command.run());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testScanCommandWithInvalidColumnName() throws IOException {
+    File file = parquetFile();
+    ScanCommand command = new ScanCommand(createLogger());
+    command.sourceFiles = Arrays.asList(file.getAbsolutePath());
+    command.columns = Arrays.asList("invalid_field");
+    command.setConf(new Configuration());
+    command.run();
+  }
 }

Reply via email to