MaxNevermind commented on code in PR #1335:
URL: https://github.com/apache/parquet-java/pull/1335#discussion_r1707436857


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -267,100 +282,157 @@ public void close() throws IOException {
   }
 
   public void processBlocks() throws IOException {
-    while (reader != null) {
-      IndexCache indexCache = IndexCache.create(reader, 
descriptorsMap.keySet(), indexCacheStrategy, true);
-      processBlocksFromReader(indexCache);
-      indexCache.clean();
-      initNextReader();
-    }
-  }
-
-  private void processBlocksFromReader(IndexCache indexCache) throws 
IOException {
-    for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
-      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
-      writer.startBlock(blockMetaData.getRowCount());
-      indexCache.setBlockMetadata(blockMetaData);
-      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
-      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
-        ColumnChunkMetaData chunk = columnsInOrder.get(i);
-        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
-        // This column has been pruned.
-        if (descriptor == null) {
-          continue;
-        }
-
-        // If a column is encrypted, we simply throw exception.
-        // Later we can add a feature to trans-encrypt it with different keys
-        if (chunk.isEncrypted()) {
-          throw new IOException("Column " + chunk.getPath().toDotString() + " 
is already encrypted");
+    TransParquetFileReader readerJoin = inputFilesToJoin.peek();
+    IndexCache indexCacheJoin = null;
+    int blockIdxJoin = -1;
+    List<ColumnDescriptor> outColumns = outSchema.getColumns();
+
+    while (!inputFiles.isEmpty()) {
+      TransParquetFileReader reader = inputFiles.poll();
+      LOG.info("Rewriting input file: {}, remaining files: {}", 
reader.getFile(), inputFiles.size());
+      ParquetMetadata meta = reader.getFooter();
+      Set<ColumnPath> columnPaths = 
meta.getFileMetaData().getSchema().getColumns().stream()
+          .map(x -> ColumnPath.get(x.getPath()))
+          .collect(Collectors.toSet());
+      IndexCache indexCache = IndexCache.create(reader, columnPaths, 
indexCacheStrategy, true);
+
+      for (int blockIdx = 0; blockIdx < meta.getBlocks().size(); blockIdx++) {
+        BlockMetaData blockMetaData = meta.getBlocks().get(blockIdx);
+        writer.startBlock(blockMetaData.getRowCount());
+        indexCache.setBlockMetadata(blockMetaData);
+        Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
+            blockMetaData.getColumns().stream().collect(Collectors.toMap(x -> 
x.getPath(), x -> x));
+
+        if (readerJoin != null
+            && (blockIdxJoin == -1
+                || ++blockIdxJoin
+                    == readerJoin.getFooter().getBlocks().size())) {
+          blockIdxJoin = 0;
+          readerJoin = inputFilesToJoin.poll();
+          Set<ColumnPath> columnPathsJoin = 
readerJoin.getFileMetaData().getSchema().getColumns().stream()
+              .map(x -> ColumnPath.get(x.getPath()))
+              .collect(Collectors.toSet());
+          if (indexCacheJoin != null) {
+            indexCacheJoin.clean();
+          }
+          indexCacheJoin = IndexCache.create(readerJoin, columnPathsJoin, 
indexCacheStrategy, true);
+          indexCacheJoin.setBlockMetadata(
+              readerJoin.getFooter().getBlocks().get(blockIdxJoin));
+        } else {
+          blockIdxJoin++;
         }
 
-        reader.setStreamPosition(chunk.getStartingPos());
-        CompressionCodecName newCodecName = this.newCodecName == null ? 
chunk.getCodec() : this.newCodecName;
-        boolean encryptColumn =
-            encryptMode && encryptColumns != null && 
encryptColumns.contains(chunk.getPath());
-
-        if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
-          // Mask column and compress it again.
-          MaskMode maskMode = maskColumns.get(chunk.getPath());
-          if (maskMode.equals(MaskMode.NULLIFY)) {
-            Type.Repetition repetition =
-                descriptor.getPrimitiveType().getRepetition();
-            if (repetition.equals(Type.Repetition.REQUIRED)) {
-              throw new IOException("Required column ["
-                  + descriptor.getPrimitiveType().getName() + "] cannot be 
nullified");
+        for (int outColumnIdx = 0; outColumnIdx < outColumns.size(); 
outColumnIdx++) {
+          ColumnPath colPath =
+              ColumnPath.get(outColumns.get(outColumnIdx).getPath());
+          if (readerJoin != null) {
+            Optional<ColumnChunkMetaData> chunkJoin =

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to