[ 
https://issues.apache.org/jira/browse/PARQUET-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688778#comment-17688778
 ] 

ASF GitHub Bot commented on PARQUET-2228:
-----------------------------------------

wgtmac commented on code in PR #1026:
URL: https://github.com/apache/parquet-mr/pull/1026#discussion_r1106565210


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -183,12 +186,61 @@ public ParquetRewriter(TransParquetFileReader reader,
     }
   }
 
+  // Open all input files to validate their schemas are compatible to merge
+  private void openInputFiles(List<Path> inputFiles, Configuration conf) {
+    Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), 
"No input files");
+
+    for (Path inputFile : inputFiles) {
+      try {
+        TransParquetFileReader reader = new TransParquetFileReader(
+                HadoopInputFile.fromPath(inputFile, conf), 
HadoopReadOptions.builder(conf).build());
+        MessageType inputFileSchema = 
reader.getFooter().getFileMetaData().getSchema();
+        if (this.schema == null) {
+          this.schema = inputFileSchema;
+        } else {
+          // Now we enforce equality of schemas from input files for 
simplicity.
+          if (!this.schema.equals(inputFileSchema)) {
+            throw new InvalidSchemaException("Input files have different 
schemas");

Review Comment:
   Fixed



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -183,12 +186,61 @@ public ParquetRewriter(TransParquetFileReader reader,
     }
   }
 
+  // Open all input files to validate their schemas are compatible to merge
+  private void openInputFiles(List<Path> inputFiles, Configuration conf) {
+    Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), 
"No input files");
+
+    for (Path inputFile : inputFiles) {
+      try {
+        TransParquetFileReader reader = new TransParquetFileReader(
+                HadoopInputFile.fromPath(inputFile, conf), 
HadoopReadOptions.builder(conf).build());
+        MessageType inputFileSchema = 
reader.getFooter().getFileMetaData().getSchema();
+        if (this.schema == null) {
+          this.schema = inputFileSchema;
+        } else {
+          // Now we enforce equality of schemas from input files for 
simplicity.
+          if (!this.schema.equals(inputFileSchema)) {
+            throw new InvalidSchemaException("Input files have different 
schemas");
+          }
+        }
+        
this.allOriginalCreatedBys.add(reader.getFooter().getFileMetaData().getCreatedBy());
+        this.inputFiles.add(reader);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Failed to open input file: " + 
inputFile, e);
+      }
+    }
+
+    extraMetaData.put(ORIGINAL_CREATED_BY_KEY, String.join("\n", 
allOriginalCreatedBys));
+  }
+
+  // Routines to get reader of next input file.
+  // Returns true if there is a next file to read, false otherwise.

Review Comment:
   Fixed



##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java:
##########
@@ -484,15 +673,22 @@ private List<Long> getOffsets(TransParquetFileReader 
reader, ColumnChunkMetaData
   }
 
   private void validateCreatedBy() throws Exception {
-    FileMetaData inFMD = getFileMetaData(inputFile.getFileName(), 
null).getFileMetaData();
-    FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
+    Set<String> createdBySet = new HashSet<>();
+    for (EncryptionTestFile inputFile : inputFiles) {
+      ParquetMetadata pmd = getFileMetaData(inputFile.getFileName(), null);
+      createdBySet.add(pmd.getFileMetaData().getCreatedBy());
+      
assertNull(pmd.getFileMetaData().getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
+    }
+    Object[] inputCreatedBys = createdBySet.toArray();
+    assertEquals(1, inputCreatedBys.length);
+    String inputCreatedBy = (String) inputCreatedBys[0];
 
-    assertEquals(inFMD.getCreatedBy(), outFMD.getCreatedBy());
-    
assertNull(inFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
+    FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
+    assertEquals(inputCreatedBy, outFMD.getCreatedBy());
 
     String originalCreatedBy = 
outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);
     assertNotNull(originalCreatedBy);
-    assertEquals(inFMD.getCreatedBy(), originalCreatedBy);
+    assertEquals(inputCreatedBy, originalCreatedBy);
   }
 

Review Comment:
   Fixed





> ParquetRewriter supports more than one input file
> -------------------------------------------------
>
>                 Key: PARQUET-2228
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2228
>             Project: Parquet
>          Issue Type: Sub-task
>          Components: parquet-mr
>            Reporter: Gang Wu
>            Assignee: Gang Wu
>            Priority: Major
>
> ParquetRewriter currently supports only one input file. The scope of this 
> task is to support multiple input files and the rewriter merges them into a 
> single one w/o some rewrite options specified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to