This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 61420289f PARQUET-2331: Allow convert-csv to take multiple input files 
(#1129)
61420289f is described below

commit 61420289f7d300e9dc661819b112a22abeef37b0
Author: Kengo Seki <[email protected]>
AuthorDate: Thu Aug 10 00:03:31 2023 +0900

    PARQUET-2331: Allow convert-csv to take multiple input files (#1129)
---
 .../parquet/cli/commands/ConvertCSVCommand.java    | 70 ++++++++++++----------
 .../apache/parquet/cli/commands/CSVFileTest.java   | 18 ++++++
 .../cli/commands/ConvertCSVCommandTest.java        | 24 ++++++++
 3 files changed, 82 insertions(+), 30 deletions(-)

diff --git 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
index f989daa9a..460b6c6f7 100644
--- 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
+++ 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
@@ -24,6 +24,7 @@ import com.beust.jcommander.Parameters;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import org.apache.avro.SchemaNormalization;
 import org.apache.parquet.cli.BaseCommand;
 import org.apache.parquet.cli.csv.AvroCSVReader;
 import org.apache.parquet.cli.csv.CSVProperties;
@@ -120,7 +121,7 @@ public class ConvertCSVCommand extends BaseCommand {
   @Override
   @SuppressWarnings("unchecked")
   public int run() throws IOException {
-    Preconditions.checkArgument(targets != null && targets.size() == 1,
+    Preconditions.checkArgument(targets != null && !targets.isEmpty(),
         "CSV path is required.");
 
     if (header != null) {
@@ -138,9 +139,7 @@ public class ConvertCSVCommand extends BaseCommand {
         .charset(charsetName)
         .build();
 
-    String source = targets.get(0);
-
-    Schema csvSchema;
+    Schema csvSchema = null;
     if (avroSchemaFile != null) {
       csvSchema = Schemas.fromAvsc(open(avroSchemaFile));
     } else {
@@ -149,7 +148,7 @@ public class ConvertCSVCommand extends BaseCommand {
         required = ImmutableSet.copyOf(requiredFields);
       }
 
-      String filename = new File(source).getName();
+      String filename = new File(targets.get(0)).getName();
       String recordName;
       if (filename.contains(".")) {
         recordName = filename.substring(0, filename.indexOf("."));
@@ -157,34 +156,45 @@ public class ConvertCSVCommand extends BaseCommand {
         recordName = filename;
       }
 
-      csvSchema = AvroCSV.inferNullableSchema(
-          recordName, open(source), props, required);
+      // If the schema is not explicitly provided,
+      // ensure that all input files share the same one.
+      for (String target : targets) {
+        Schema schema = AvroCSV.inferNullableSchema(
+          recordName, open(target), props, required);
+        if (csvSchema == null) {
+          csvSchema = schema;
+        } else if 
(!SchemaNormalization.toParsingForm(csvSchema).equals(SchemaNormalization.toParsingForm(schema)))
 {
+          throw new IllegalArgumentException(target + " seems to have a 
different schema from others. " +
+            "Please specify the correct schema explicitly with the `--schema` 
option.");
+        }
+      }
     }
 
-    long count = 0;
-    try (AvroCSVReader<Record> reader = new AvroCSVReader<>(
-        open(source), props, csvSchema, Record.class, true)) {
-        CompressionCodecName codec = Codecs.parquetCodec(compressionCodecName);
-      try (ParquetWriter<Record> writer = AvroParquetWriter
-          .<Record>builder(qualifiedPath(outputPath))
-          .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
-          .withWriteMode(overwrite ?
-              ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
-          .withCompressionCodec(codec)
-          .withDictionaryEncoding(true)
-          .withDictionaryPageSize(dictionaryPageSize)
-          .withPageSize(pageSize)
-          .withRowGroupSize(rowGroupSize)
-          .withDataModel(GenericData.get())
-          .withConf(getConf())
-          .withSchema(csvSchema)
-          .build()) {
-        for (Record record : reader) {
-          writer.write(record);
-          count++;
+    try (ParquetWriter<Record> writer = AvroParquetWriter
+      .<Record>builder(qualifiedPath(outputPath))
+      .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
+      .withWriteMode(overwrite ?
+        ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
+      .withCompressionCodec(Codecs.parquetCodec(compressionCodecName))
+      .withDictionaryEncoding(true)
+      .withDictionaryPageSize(dictionaryPageSize)
+      .withPageSize(pageSize)
+      .withRowGroupSize(rowGroupSize)
+      .withDataModel(GenericData.get())
+      .withConf(getConf())
+      .withSchema(csvSchema)
+      .build()) {
+      for (String target : targets) {
+        long count = 0;
+        try (AvroCSVReader<Record> reader = new AvroCSVReader<>(
+          open(target), props, csvSchema, Record.class, true)) {
+          for (Record record : reader) {
+            writer.write(record);
+            count++;
+          }
+        } catch (RuntimeException e) {
+          throw new RuntimeException("Failed on record " + count + " in file " 
+ target, e);
         }
-      } catch (RuntimeException e) {
-        throw new RuntimeException("Failed on record " + count, e);
       }
     }
 
diff --git 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CSVFileTest.java 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CSVFileTest.java
index 1a179bd3b..2a3e66e0a 100644
--- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CSVFileTest.java
+++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CSVFileTest.java
@@ -30,6 +30,7 @@ public abstract class CSVFileTest extends FileTest {
   @Before
   public void setUp() throws IOException {
     createTestCSVFile();
+    createTestCSVFileWithDifferentSchema();
   }
 
   protected File csvFile() {
@@ -37,6 +38,11 @@ public abstract class CSVFileTest extends FileTest {
     return new File(tmpDir, getClass().getSimpleName() + ".csv");
   }
 
+  protected File csvFileWithDifferentSchema() {
+    File tmpDir = getTempFolder();
+    return new File(tmpDir, getClass().getSimpleName() + "2.csv");
+  }
+
   private void createTestCSVFile() throws IOException {
     File file = csvFile();
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
@@ -48,4 +54,16 @@ public abstract class CSVFileTest extends FileTest {
         Integer.MAX_VALUE, Long.MAX_VALUE, COLORS[1]));
     }
   }
+
+  private void createTestCSVFileWithDifferentSchema() throws IOException {
+    File file = csvFileWithDifferentSchema();
+    try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
+      writer.write(String.format("%s,%s,%s\n",
+        FLOAT_FIELD, DOUBLE_FIELD, BINARY_FIELD));
+      writer.write(String.format("%f,%f,\"%s\"\n",
+        Float.MIN_VALUE, Double.MIN_VALUE, COLORS[0]));
+      writer.write(String.format("%f,%f,\"%s\"\n",
+        Float.MAX_VALUE, Double.MAX_VALUE, COLORS[1]));
+    }
+  }
 }
diff --git 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCSVCommandTest.java
 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCSVCommandTest.java
index 42f938b5b..e02b7048a 100644
--- 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCSVCommandTest.java
+++ 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCSVCommandTest.java
@@ -38,4 +38,28 @@ public class ConvertCSVCommandTest extends CSVFileTest {
     Assert.assertEquals(0, command.run());
     Assert.assertTrue(output.exists());
   }
+
+  @Test
+  public void testConvertCSVCommandWithMultipleInput() throws IOException {
+    File file = csvFile();
+    ConvertCSVCommand command = new ConvertCSVCommand(createLogger());
+    command.targets = Arrays.asList(file.getAbsolutePath(), 
file.getAbsolutePath());
+    File output = new File(getTempFolder(), getClass().getSimpleName() + 
".parquet");
+    command.outputPath = output.getAbsolutePath();
+    command.setConf(new Configuration());
+    Assert.assertEquals(0, command.run());
+    Assert.assertTrue(output.exists());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testConvertCSVCommandWithDifferentSchemas() throws IOException {
+    File file = csvFile();
+    File fileWithDifferentSchema = csvFileWithDifferentSchema();
+    ConvertCSVCommand command = new ConvertCSVCommand(createLogger());
+    command.targets = Arrays.asList(file.getAbsolutePath(), 
fileWithDifferentSchema.getAbsolutePath());
+    File output = new File(getTempFolder(), getClass().getSimpleName() + 
".parquet");
+    command.outputPath = output.getAbsolutePath();
+    command.setConf(new Configuration());
+    command.run();
+  }
 }

Reply via email to