This is an automated email from the ASF dual-hosted git repository. gangwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push: new d8aaf93b9 PARQUET-2334: Allow the cat subcommand to take multiple files (#1132) d8aaf93b9 is described below commit d8aaf93b960693a7dab52b0dc33c786f33938a54 Author: Kengo Seki <sek...@apache.org> AuthorDate: Thu Aug 10 14:48:00 2023 +0900 PARQUET-2334: Allow the cat subcommand to take multiple files (#1132) --- .../apache/parquet/cli/commands/CatCommand.java | 55 ++++++++++++---------- .../parquet/cli/commands/CatCommandTest.java | 29 ++++++++++++ 2 files changed, 59 insertions(+), 25 deletions(-) diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java index 7703e88ca..aa0ab5ef7 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java @@ -29,7 +29,9 @@ import org.apache.parquet.cli.util.Expressions; import org.slf4j.Logger; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.parquet.cli.util.Expressions.select; @@ -58,35 +60,38 @@ public class CatCommand extends BaseCommand { Preconditions.checkArgument( sourceFiles != null && !sourceFiles.isEmpty(), "Missing file name"); - Preconditions.checkArgument(sourceFiles.size() == 1, - "Only one file can be given"); - final String source = sourceFiles.get(0); - - Schema schema = getAvroSchema(source); - Schema projection = Expressions.filterSchema(schema, columns); + // Ensure all source files have the columns specified first + Map<String, Schema> schemas = new HashMap<>(); + for (String source : sourceFiles) { + Schema schema = getAvroSchema(source); + schemas.put(source, Expressions.filterSchema(schema, columns)); + } - Iterable<Object> reader = openDataFile(source, projection); - boolean threw = true; - long count = 0; - try { - for (Object record : reader) { - if (numRecords > 0 && count >= numRecords) { - break; + for (String source : sourceFiles) { + Schema projection = schemas.get(source); + Iterable<Object> reader = openDataFile(source, projection); + boolean threw = true; + long count = 0; + try { + for (Object record : reader) { + if (numRecords > 0 && count >= numRecords) { + break; + } + if (columns == null || columns.size() != 1) { + console.info(String.valueOf(record)); + } else { + console.info(String.valueOf(select(projection, record, columns.get(0)))); + } + count += 1; } - if (columns == null || columns.size() != 1) { - console.info(String.valueOf(record)); - } else { - console.info(String.valueOf(select(projection, record, columns.get(0)))); + threw = false; + } catch (RuntimeException e) { + throw new RuntimeException("Failed on record " + count + " in file " + source, e); + } finally { + if (reader instanceof Closeable) { + Closeables.close((Closeable) reader, threw); } - count += 1; - } - threw = false; - } catch (RuntimeException e) { - throw new RuntimeException("Failed on record " + count, e); - } finally { - if (reader instanceof Closeable) { - Closeables.close((Closeable) reader, threw); } } diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java index 38055e6ac..cfc6b2baa 100644 --- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java +++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java @@ -35,4 +35,33 @@ public class CatCommandTest extends ParquetFileTest { command.setConf(new Configuration()); Assert.assertEquals(0, command.run()); } + + @Test + public void testCatCommandWithMultipleInput() throws IOException { + File file = parquetFile(); + CatCommand command = new CatCommand(createLogger(), 0); + command.sourceFiles = Arrays.asList(file.getAbsolutePath(), file.getAbsolutePath()); + command.setConf(new Configuration()); + Assert.assertEquals(0, command.run()); + } + + @Test + public void testCatCommandWithSpecificColumns() throws IOException { + File file = parquetFile(); + CatCommand command = new CatCommand(createLogger(), 0); + command.sourceFiles = Arrays.asList(file.getAbsolutePath()); + command.columns = Arrays.asList(INT32_FIELD, INT64_FIELD); + command.setConf(new Configuration()); + Assert.assertEquals(0, command.run()); + } + + @Test(expected = IllegalArgumentException.class) + public void testCatCommandWithInvalidColumn() throws IOException { + File file = parquetFile(); + CatCommand command = new CatCommand(createLogger(), 0); + command.sourceFiles = Arrays.asList(file.getAbsolutePath()); + command.columns = Arrays.asList("invalid_field"); + command.setConf(new Configuration()); + command.run(); + } }