[
https://issues.apache.org/jira/browse/PARQUET-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688778#comment-17688778
]
ASF GitHub Bot commented on PARQUET-2228:
-----------------------------------------
wgtmac commented on code in PR #1026:
URL: https://github.com/apache/parquet-mr/pull/1026#discussion_r1106565210
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -183,12 +186,61 @@ public ParquetRewriter(TransParquetFileReader reader,
}
}
+ // Open all input files to validate their schemas are compatible to merge
+ private void openInputFiles(List<Path> inputFiles, Configuration conf) {
+ Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(),
"No input files");
+
+ for (Path inputFile : inputFiles) {
+ try {
+ TransParquetFileReader reader = new TransParquetFileReader(
+ HadoopInputFile.fromPath(inputFile, conf),
HadoopReadOptions.builder(conf).build());
+ MessageType inputFileSchema =
reader.getFooter().getFileMetaData().getSchema();
+ if (this.schema == null) {
+ this.schema = inputFileSchema;
+ } else {
+ // Now we enforce equality of schemas from input files for
simplicity.
+ if (!this.schema.equals(inputFileSchema)) {
+ throw new InvalidSchemaException("Input files have different
schemas");
Review Comment:
Fixed
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -183,12 +186,61 @@ public ParquetRewriter(TransParquetFileReader reader,
}
}
+ // Open all input files to validate their schemas are compatible to merge
+ private void openInputFiles(List<Path> inputFiles, Configuration conf) {
+ Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(),
"No input files");
+
+ for (Path inputFile : inputFiles) {
+ try {
+ TransParquetFileReader reader = new TransParquetFileReader(
+ HadoopInputFile.fromPath(inputFile, conf),
HadoopReadOptions.builder(conf).build());
+ MessageType inputFileSchema =
reader.getFooter().getFileMetaData().getSchema();
+ if (this.schema == null) {
+ this.schema = inputFileSchema;
+ } else {
+ // Now we enforce equality of schemas from input files for
simplicity.
+ if (!this.schema.equals(inputFileSchema)) {
+ throw new InvalidSchemaException("Input files have different
schemas");
+ }
+ }
+
this.allOriginalCreatedBys.add(reader.getFooter().getFileMetaData().getCreatedBy());
+ this.inputFiles.add(reader);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to open input file: " +
inputFile, e);
+ }
+ }
+
+ extraMetaData.put(ORIGINAL_CREATED_BY_KEY, String.join("\n",
allOriginalCreatedBys));
+ }
+
+ // Routines to get reader of next input file.
+ // Returns true if there is a next file to read, false otherwise.
Review Comment:
Fixed
##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java:
##########
@@ -484,15 +673,22 @@ private List<Long> getOffsets(TransParquetFileReader
reader, ColumnChunkMetaData
}
private void validateCreatedBy() throws Exception {
- FileMetaData inFMD = getFileMetaData(inputFile.getFileName(),
null).getFileMetaData();
- FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
+ Set<String> createdBySet = new HashSet<>();
+ for (EncryptionTestFile inputFile : inputFiles) {
+ ParquetMetadata pmd = getFileMetaData(inputFile.getFileName(), null);
+ createdBySet.add(pmd.getFileMetaData().getCreatedBy());
+
assertNull(pmd.getFileMetaData().getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
+ }
+ Object[] inputCreatedBys = createdBySet.toArray();
+ assertEquals(1, inputCreatedBys.length);
+ String inputCreatedBy = (String) inputCreatedBys[0];
- assertEquals(inFMD.getCreatedBy(), outFMD.getCreatedBy());
-
assertNull(inFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
+ FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
+ assertEquals(inputCreatedBy, outFMD.getCreatedBy());
String originalCreatedBy =
outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);
assertNotNull(originalCreatedBy);
- assertEquals(inFMD.getCreatedBy(), originalCreatedBy);
+ assertEquals(inputCreatedBy, originalCreatedBy);
}
Review Comment:
Fixed
> ParquetRewriter supports more than one input file
> -------------------------------------------------
>
> Key: PARQUET-2228
> URL: https://issues.apache.org/jira/browse/PARQUET-2228
> Project: Parquet
> Issue Type: Sub-task
> Components: parquet-mr
> Reporter: Gang Wu
> Assignee: Gang Wu
> Priority: Major
>
> ParquetRewriter currently supports only one input file. The scope of this
> task is to support multiple input files and the rewriter merges them into a
> single one w/o some rewrite options specified.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)