This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 61420289f PARQUET-2331: Allow convert-csv to take multiple input files
(#1129)
61420289f is described below
commit 61420289f7d300e9dc661819b112a22abeef37b0
Author: Kengo Seki <[email protected]>
AuthorDate: Thu Aug 10 00:03:31 2023 +0900
PARQUET-2331: Allow convert-csv to take multiple input files (#1129)
---
.../parquet/cli/commands/ConvertCSVCommand.java | 70 ++++++++++++----------
.../apache/parquet/cli/commands/CSVFileTest.java | 18 ++++++
.../cli/commands/ConvertCSVCommandTest.java | 24 ++++++++
3 files changed, 82 insertions(+), 30 deletions(-)
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
index f989daa9a..460b6c6f7 100644
---
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
+++
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
@@ -24,6 +24,7 @@ import com.beust.jcommander.Parameters;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import org.apache.avro.SchemaNormalization;
import org.apache.parquet.cli.BaseCommand;
import org.apache.parquet.cli.csv.AvroCSVReader;
import org.apache.parquet.cli.csv.CSVProperties;
@@ -120,7 +121,7 @@ public class ConvertCSVCommand extends BaseCommand {
@Override
@SuppressWarnings("unchecked")
public int run() throws IOException {
- Preconditions.checkArgument(targets != null && targets.size() == 1,
+ Preconditions.checkArgument(targets != null && !targets.isEmpty(),
"CSV path is required.");
if (header != null) {
@@ -138,9 +139,7 @@ public class ConvertCSVCommand extends BaseCommand {
.charset(charsetName)
.build();
- String source = targets.get(0);
-
- Schema csvSchema;
+ Schema csvSchema = null;
if (avroSchemaFile != null) {
csvSchema = Schemas.fromAvsc(open(avroSchemaFile));
} else {
@@ -149,7 +148,7 @@ public class ConvertCSVCommand extends BaseCommand {
required = ImmutableSet.copyOf(requiredFields);
}
- String filename = new File(source).getName();
+ String filename = new File(targets.get(0)).getName();
String recordName;
if (filename.contains(".")) {
recordName = filename.substring(0, filename.indexOf("."));
@@ -157,34 +156,45 @@ public class ConvertCSVCommand extends BaseCommand {
recordName = filename;
}
- csvSchema = AvroCSV.inferNullableSchema(
- recordName, open(source), props, required);
+ // If the schema is not explicitly provided,
+ // ensure that all input files share the same one.
+ for (String target : targets) {
+ Schema schema = AvroCSV.inferNullableSchema(
+ recordName, open(target), props, required);
+ if (csvSchema == null) {
+ csvSchema = schema;
+ } else if
(!SchemaNormalization.toParsingForm(csvSchema).equals(SchemaNormalization.toParsingForm(schema)))
{
+ throw new IllegalArgumentException(target + " seems to have a
different schema from others. " +
+ "Please specify the correct schema explicitly with the `--schema`
option.");
+ }
+ }
}
- long count = 0;
- try (AvroCSVReader<Record> reader = new AvroCSVReader<>(
- open(source), props, csvSchema, Record.class, true)) {
- CompressionCodecName codec = Codecs.parquetCodec(compressionCodecName);
- try (ParquetWriter<Record> writer = AvroParquetWriter
- .<Record>builder(qualifiedPath(outputPath))
- .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
- .withWriteMode(overwrite ?
- ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
- .withCompressionCodec(codec)
- .withDictionaryEncoding(true)
- .withDictionaryPageSize(dictionaryPageSize)
- .withPageSize(pageSize)
- .withRowGroupSize(rowGroupSize)
- .withDataModel(GenericData.get())
- .withConf(getConf())
- .withSchema(csvSchema)
- .build()) {
- for (Record record : reader) {
- writer.write(record);
- count++;
+ try (ParquetWriter<Record> writer = AvroParquetWriter
+ .<Record>builder(qualifiedPath(outputPath))
+ .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
+ .withWriteMode(overwrite ?
+ ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
+ .withCompressionCodec(Codecs.parquetCodec(compressionCodecName))
+ .withDictionaryEncoding(true)
+ .withDictionaryPageSize(dictionaryPageSize)
+ .withPageSize(pageSize)
+ .withRowGroupSize(rowGroupSize)
+ .withDataModel(GenericData.get())
+ .withConf(getConf())
+ .withSchema(csvSchema)
+ .build()) {
+ for (String target : targets) {
+ long count = 0;
+ try (AvroCSVReader<Record> reader = new AvroCSVReader<>(
+ open(target), props, csvSchema, Record.class, true)) {
+ for (Record record : reader) {
+ writer.write(record);
+ count++;
+ }
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed on record " + count + " in file "
+ target, e);
}
- } catch (RuntimeException e) {
- throw new RuntimeException("Failed on record " + count, e);
}
}
diff --git
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CSVFileTest.java
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CSVFileTest.java
index 1a179bd3b..2a3e66e0a 100644
--- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CSVFileTest.java
+++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CSVFileTest.java
@@ -30,6 +30,7 @@ public abstract class CSVFileTest extends FileTest {
@Before
public void setUp() throws IOException {
createTestCSVFile();
+ createTestCSVFileWithDifferentSchema();
}
protected File csvFile() {
@@ -37,6 +38,11 @@ public abstract class CSVFileTest extends FileTest {
return new File(tmpDir, getClass().getSimpleName() + ".csv");
}
+ protected File csvFileWithDifferentSchema() {
+ File tmpDir = getTempFolder();
+ return new File(tmpDir, getClass().getSimpleName() + "2.csv");
+ }
+
private void createTestCSVFile() throws IOException {
File file = csvFile();
try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
@@ -48,4 +54,16 @@ public abstract class CSVFileTest extends FileTest {
Integer.MAX_VALUE, Long.MAX_VALUE, COLORS[1]));
}
}
+
+ private void createTestCSVFileWithDifferentSchema() throws IOException {
+ File file = csvFileWithDifferentSchema();
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
+ writer.write(String.format("%s,%s,%s\n",
+ FLOAT_FIELD, DOUBLE_FIELD, BINARY_FIELD));
+ writer.write(String.format("%f,%f,\"%s\"\n",
+ Float.MIN_VALUE, Double.MIN_VALUE, COLORS[0]));
+ writer.write(String.format("%f,%f,\"%s\"\n",
+ Float.MAX_VALUE, Double.MAX_VALUE, COLORS[1]));
+ }
+ }
}
diff --git
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCSVCommandTest.java
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCSVCommandTest.java
index 42f938b5b..e02b7048a 100644
---
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCSVCommandTest.java
+++
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCSVCommandTest.java
@@ -38,4 +38,28 @@ public class ConvertCSVCommandTest extends CSVFileTest {
Assert.assertEquals(0, command.run());
Assert.assertTrue(output.exists());
}
+
+ @Test
+ public void testConvertCSVCommandWithMultipleInput() throws IOException {
+ File file = csvFile();
+ ConvertCSVCommand command = new ConvertCSVCommand(createLogger());
+ command.targets = Arrays.asList(file.getAbsolutePath(),
file.getAbsolutePath());
+ File output = new File(getTempFolder(), getClass().getSimpleName() +
".parquet");
+ command.outputPath = output.getAbsolutePath();
+ command.setConf(new Configuration());
+ Assert.assertEquals(0, command.run());
+ Assert.assertTrue(output.exists());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConvertCSVCommandWithDifferentSchemas() throws IOException {
+ File file = csvFile();
+ File fileWithDifferentSchema = csvFileWithDifferentSchema();
+ ConvertCSVCommand command = new ConvertCSVCommand(createLogger());
+ command.targets = Arrays.asList(file.getAbsolutePath(),
fileWithDifferentSchema.getAbsolutePath());
+ File output = new File(getTempFolder(), getClass().getSimpleName() +
".parquet");
+ command.outputPath = output.getAbsolutePath();
+ command.setConf(new Configuration());
+ command.run();
+ }
}