This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new ef9929c13 PARQUET-2335: Allow the scan subcommand to take multiple
files (#1133)
ef9929c13 is described below
commit ef9929c130f8f2e24fca1c7b42b0742a4d9d5e61
Author: Kengo Seki <[email protected]>
AuthorDate: Thu Aug 10 14:48:43 2023 +0900
PARQUET-2335: Allow the scan subcommand to take multiple files (#1133)
---
.../apache/parquet/cli/commands/ScanCommand.java | 56 ++++++++++++++--------
.../parquet/cli/commands/ScanCommandTest.java | 21 +++++++-
2 files changed, 55 insertions(+), 22 deletions(-)
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ScanCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ScanCommand.java
index 0b226ab3c..9f2b28020 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ScanCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ScanCommand.java
@@ -30,13 +30,15 @@ import org.slf4j.Logger;
import java.io.Closeable;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
@Parameters(commandDescription = "Scan all records from a file")
public class ScanCommand extends BaseCommand {
@Parameter(description = "<file>")
- String sourceFile;
+ List<String> sourceFiles;
@Parameter(
names = {"-c", "--column", "--columns"},
@@ -50,32 +52,44 @@ public class ScanCommand extends BaseCommand {
@Override
public int run() throws IOException {
Preconditions.checkArgument(
- sourceFile != null && !sourceFile.isEmpty(),
+ sourceFiles != null && !sourceFiles.isEmpty(),
"Missing file name");
- Schema schema = getAvroSchema(sourceFile);
- Schema projection = Expressions.filterSchema(schema, columns);
+ // Ensure all source files have the columns specified first
+ Map<String, Schema> schemas = new HashMap<>();
+ for (String sourceFile : sourceFiles) {
+ Schema schema = getAvroSchema(sourceFile);
+ schemas.put(sourceFile, Expressions.filterSchema(schema, columns));
+ }
- long startTime = System.currentTimeMillis();
- Iterable<Object> reader = openDataFile(sourceFile, projection);
- boolean threw = true;
- long count = 0;
- try {
- for (Object record : reader) {
- count += 1;
+ long totalStartTime = System.currentTimeMillis();
+ long totalCount = 0;
+ for (String sourceFile : sourceFiles) {
+ long startTime = System.currentTimeMillis();
+ Iterable<Object> reader = openDataFile(sourceFile,
schemas.get(sourceFile));
+ boolean threw = true;
+ long count = 0;
+ try {
+ for (Object record : reader) {
+ count += 1;
+ }
+ threw = false;
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed on record " + count + " in " +
sourceFile, e);
+ } finally {
+ if (reader instanceof Closeable) {
+ Closeables.close((Closeable) reader, threw);
+ }
}
- threw = false;
- } catch (RuntimeException e) {
- throw new RuntimeException("Failed on record " + count, e);
- } finally {
- if (reader instanceof Closeable) {
- Closeables.close((Closeable) reader, threw);
+ totalCount += count;
+ if (1 < sourceFiles.size()) {
+ long endTime = System.currentTimeMillis();
+ console.info("Scanned " + count + " records from " + sourceFile + " in
" + (endTime - startTime) / 1000.0 + " s");
}
}
- long endTime = System.currentTimeMillis();
-
- console.info("Scanned " + count + " records from " + sourceFile);
- console.info("Time: " + (endTime - startTime) / 1000.0 + " s");
+ long totalEndTime = System.currentTimeMillis();
+ console.info("Scanned " + totalCount + " records from " +
sourceFiles.size() + " file(s)");
+ console.info("Time: " + (totalEndTime - totalStartTime) / 1000.0 + " s");
return 0;
}
diff --git
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ScanCommandTest.java
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ScanCommandTest.java
index dbe1f889e..b8928ab07 100644
---
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ScanCommandTest.java
+++
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ScanCommandTest.java
@@ -31,8 +31,27 @@ public class ScanCommandTest extends ParquetFileTest {
public void testScanCommand() throws IOException {
File file = parquetFile();
ScanCommand command = new ScanCommand(createLogger());
- command.sourceFile = file.getAbsolutePath();
+ command.sourceFiles = Arrays.asList(file.getAbsolutePath());
command.setConf(new Configuration());
Assert.assertEquals(0, command.run());
}
+
+ @Test
+ public void testScanCommandWithMultipleSourceFiles() throws IOException {
+ File file = parquetFile();
+ ScanCommand command = new ScanCommand(createLogger());
+ command.sourceFiles = Arrays.asList(file.getAbsolutePath(),
file.getAbsolutePath());
+ command.setConf(new Configuration());
+ Assert.assertEquals(0, command.run());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testScanCommandWithInvalidColumnName() throws IOException {
+ File file = parquetFile();
+ ScanCommand command = new ScanCommand(createLogger());
+ command.sourceFiles = Arrays.asList(file.getAbsolutePath());
+ command.columns = Arrays.asList("invalid_field");
+ command.setConf(new Configuration());
+ command.run();
+ }
}