This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 514cc6c25 PARQUET-2366: Optimize random seek during rewriting (#1174)
514cc6c25 is described below

commit 514cc6c257fe8e618b100a19d86d304f6442cb94
Author: Xianyang Liu <[email protected]>
AuthorDate: Mon Oct 30 09:54:02 2023 +0800

    PARQUET-2366: Optimize random seek during rewriting (#1174)
---
 .../java/org/apache/parquet/hadoop/IndexCache.java | 100 +++++++++++
 .../org/apache/parquet/hadoop/NoneIndexCache.java  |  63 +++++++
 .../apache/parquet/hadoop/PrefetchIndexCache.java  | 176 ++++++++++++++++++++
 .../parquet/hadoop/rewrite/ParquetRewriter.java    |  57 ++++---
 .../parquet/hadoop/rewrite/RewriteOptions.java     |  50 ++++--
 .../org/apache/parquet/hadoop/TestIndexCache.java  | 183 +++++++++++++++++++++
 .../hadoop/rewrite/ParquetRewriterTest.java        |  36 ++--
 7 files changed, 620 insertions(+), 45 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java
new file mode 100644
index 000000000..8002ee385
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ * A cache for caching indexes(including: ColumnIndex, OffsetIndex and 
BloomFilter)
+ */
+public interface IndexCache {
+
+  enum CacheStrategy {
+    NONE, /* No cache */
+    PREFETCH_BLOCK /* Prefetch block indexes */
+  }
+
+  /**
+   * Create an index cache for the given file reader
+   *
+   * @param fileReader the file reader
+   * @param columns the columns that need to do cache
+   * @param cacheStrategy the cache strategy, supports NONE and PREFETCH_BLOCK
+   * @param freeCacheAfterGet whether free the given index cache after calling 
the given get method
+   * @return the index cache
+   */
+  static IndexCache create(
+      ParquetFileReader fileReader,
+      Set<ColumnPath> columns,
+      CacheStrategy cacheStrategy,
+      boolean freeCacheAfterGet) {
+    if (cacheStrategy == CacheStrategy.NONE) {
+      return new NoneIndexCache(fileReader);
+    } else if (cacheStrategy == CacheStrategy.PREFETCH_BLOCK) {
+      return new PrefetchIndexCache(fileReader, columns, freeCacheAfterGet);
+    } else {
+      throw new UnsupportedOperationException("Unknown cache strategy: " + 
cacheStrategy);
+    }
+  }
+
+  /**
+   * Set the current BlockMetadata
+   */
+  void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException;
+
+  /**
+   * Get the ColumnIndex for the given column in the set row group.
+   *
+   * @param chunk the given column chunk
+   * @return the ColumnIndex for the given column
+   * @throws IOException if any I/O error occurs during get the ColumnIndex
+   */
+  ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException;
+
+  /**
+   * Get the OffsetIndex for the given column in the set row group.
+   *
+   * @param chunk the given column chunk
+   * @return the OffsetIndex for the given column
+   * @throws IOException if any I/O error occurs during get the OffsetIndex
+   */
+  OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException;
+
+  /**
+   * Get the BloomFilter for the given column in the set row group.
+   *
+   * @param chunk the given column chunk
+   * @return the BloomFilter for the given column
+   * @throws IOException if any I/O error occurs during get the BloomFilter
+   */
+  BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException;
+
+  /**
+   * Clean the cache
+   */
+  void clean();
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java
new file mode 100644
index 000000000..e1aded199
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+import java.io.IOException;
+
+/**
+ * Cache nothing. All the get methods are pushed to ParquetFileReader to read 
the given index.
+ */
+class NoneIndexCache implements IndexCache {
+  private final ParquetFileReader fileReader;
+
+  NoneIndexCache(ParquetFileReader fileReader) {
+    this.fileReader = fileReader;
+  }
+
+  @Override
+  public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws 
IOException {
+    // Do nothing
+  }
+
+  @Override
+  public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws 
IOException {
+    return fileReader.readColumnIndex(chunk);
+  }
+
+  @Override
+  public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws 
IOException {
+    return fileReader.readOffsetIndex(chunk);
+  }
+
+  @Override
+  public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws 
IOException {
+    return fileReader.readBloomFilter(chunk);
+  }
+
+  @Override
+  public void clean() {
+    // Do nothing
+  }
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java
new file mode 100644
index 000000000..9bbf901ff
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This index cache will prefetch indexes of all columns when calling {@link 
#setBlockMetadata(BlockMetaData)}.
+ * <p>
+ * <b>NOTE:</b> the {@link #setBlockMetadata(BlockMetaData)} will free the 
previous block cache
+ */
+class PrefetchIndexCache implements IndexCache {
+  private final ParquetFileReader fileReader;
+  private final Set<ColumnPath> columns;
+  private final boolean freeCacheAfterGet;
+
+  private Map<ColumnPath, ColumnIndex> columnIndexCache;
+  private Map<ColumnPath, OffsetIndex> offsetIndexCache;
+  private Map<ColumnPath, BloomFilter> bloomIndexCache;
+
+  /**
+   * @param fileReader the file reader
+   * @param columns the columns that need to cache
+   * @param freeCacheAfterGet whether free the given index cache after calling 
the given get method
+   */
+  PrefetchIndexCache(
+      ParquetFileReader fileReader,
+      Set<ColumnPath> columns,
+      boolean freeCacheAfterGet) {
+    this.fileReader = fileReader;
+    this.columns = columns;
+    this.freeCacheAfterGet = freeCacheAfterGet;
+  }
+
+  @Override
+  public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws 
IOException {
+    clean();
+    this.columnIndexCache = readAllColumnIndexes(currentBlockMetadata);
+    this.offsetIndexCache = readAllOffsetIndexes(currentBlockMetadata);
+    this.bloomIndexCache = readAllBloomFilters(currentBlockMetadata);
+  }
+
+  @Override
+  public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws 
IOException {
+    ColumnPath columnPath = chunk.getPath();
+    if (columns.contains(columnPath)) {
+      Preconditions.checkState(
+        columnIndexCache.containsKey(columnPath),
+        "Not found cached ColumnIndex for column: %s with cache strategy: %s",
+        columnPath.toDotString(),
+        CacheStrategy.PREFETCH_BLOCK);
+    }
+
+    if (freeCacheAfterGet) {
+      return columnIndexCache.remove(columnPath);
+    } else {
+      return columnIndexCache.get(columnPath);
+    }
+  }
+
+  @Override
+  public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws 
IOException {
+    ColumnPath columnPath = chunk.getPath();
+
+    if (columns.contains(columnPath)) {
+      Preconditions.checkState(
+        offsetIndexCache.containsKey(columnPath),
+        "Not found cached OffsetIndex for column: %s with cache strategy: %s",
+        columnPath.toDotString(),
+        CacheStrategy.PREFETCH_BLOCK);
+    }
+
+    if (freeCacheAfterGet) {
+      return offsetIndexCache.remove(columnPath);
+    } else {
+      return offsetIndexCache.get(columnPath);
+    }
+  }
+
+  @Override
+  public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws 
IOException {
+    ColumnPath columnPath = chunk.getPath();
+
+    if (columns.contains(columnPath)) {
+      Preconditions.checkState(
+        bloomIndexCache.containsKey(columnPath),
+        "Not found cached BloomFilter for column: %s with cache strategy: %s",
+        columnPath.toDotString(),
+        CacheStrategy.PREFETCH_BLOCK);
+    }
+
+    if (freeCacheAfterGet) {
+      return bloomIndexCache.remove(columnPath);
+    } else {
+      return bloomIndexCache.get(columnPath);
+    }
+  }
+
+  @Override
+  public void clean() {
+    if (columnIndexCache != null) {
+      columnIndexCache.clear();
+      columnIndexCache = null;
+    }
+
+    if (offsetIndexCache != null) {
+      offsetIndexCache.clear();
+      offsetIndexCache = null;
+    }
+
+    if (bloomIndexCache != null) {
+      bloomIndexCache.clear();
+      bloomIndexCache = null;
+    }
+  }
+
+  private Map<ColumnPath, ColumnIndex> readAllColumnIndexes(BlockMetaData 
blockMetaData) throws IOException {
+    Map<ColumnPath, ColumnIndex> columnIndexMap = new 
HashMap<>(columns.size());
+    for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+      if (columns.contains(chunk.getPath())) {
+        columnIndexMap.put(chunk.getPath(), fileReader.readColumnIndex(chunk));
+      }
+    }
+
+    return columnIndexMap;
+  }
+
+  private Map<ColumnPath, OffsetIndex> readAllOffsetIndexes(BlockMetaData 
blockMetaData) throws IOException {
+    Map<ColumnPath, OffsetIndex> offsetIndexMap = new 
HashMap<>(columns.size());
+    for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+      if (columns.contains(chunk.getPath())) {
+        offsetIndexMap.put(chunk.getPath(), fileReader.readOffsetIndex(chunk));
+      }
+    }
+
+    return offsetIndexMap;
+  }
+
+  private Map<ColumnPath, BloomFilter> readAllBloomFilters(BlockMetaData 
blockMetaData) throws IOException {
+    Map<ColumnPath, BloomFilter> bloomFilterMap = new 
HashMap<>(columns.size());
+    for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+      if (columns.contains(chunk.getPath())) {
+        bloomFilterMap.put(chunk.getPath(), fileReader.readBloomFilter(chunk));
+      }
+    }
+
+    return bloomFilterMap;
+  }
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 659ac1e5c..2eaaab18c 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -45,6 +45,7 @@ import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.IndexCache;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -96,19 +97,19 @@ public class ParquetRewriter implements Closeable {
   private final byte[] pageBuffer = new byte[pageBufferSize];
   // Configurations for the new file
   private CompressionCodecName newCodecName = null;
-  private List<String> pruneColumns = null;
   private Map<ColumnPath, MaskMode> maskColumns = null;
   private Set<ColumnPath> encryptColumns = null;
   private boolean encryptMode = false;
-  private Map<String, String> extraMetaData = new HashMap<>();
+  private final Map<String, String> extraMetaData = new HashMap<>();
   // Writer to rewrite the input files
-  private ParquetFileWriter writer;
+  private final ParquetFileWriter writer;
   // Number of blocks written which is used to keep track of the actual row 
group ordinal
   private int numBlocksRewritten = 0;
   // Reader and relevant states of the in-processing input file
-  private Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
+  private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
   // Schema of input files (should be the same) and to write to the output file
   private MessageType schema = null;
+  private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
   // The reader for the current input file
   private TransParquetFileReader reader = null;
   // The metadata of current reader being processed
@@ -116,7 +117,9 @@ public class ParquetRewriter implements Closeable {
   // created_by information of current reader being processed
   private String originalCreatedBy = "";
   // Unique created_by information from all input files
-  private Set<String> allOriginalCreatedBys = new HashSet<>();
+  private final Set<String> allOriginalCreatedBys = new HashSet<>();
+  // The index cache strategy
+  private final IndexCache.CacheStrategy indexCacheStrategy;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     Configuration conf = options.getConf();
@@ -129,8 +132,7 @@ public class ParquetRewriter implements Closeable {
     initNextReader();
 
     newCodecName = options.getNewCodecName();
-    pruneColumns = options.getPruneColumns();
-
+    List<String> pruneColumns = options.getPruneColumns();
     // Prune columns if specified
     if (pruneColumns != null && !pruneColumns.isEmpty()) {
       List<String> paths = new ArrayList<>();
@@ -145,6 +147,9 @@ public class ParquetRewriter implements Closeable {
       schema = pruneColumnsInSchema(schema, prunePaths);
     }
 
+    this.descriptorsMap =
+      schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+
     if (options.getMaskColumns() != null) {
       this.maskColumns = new HashMap<>();
       for (Map.Entry<String, MaskMode> col : 
options.getMaskColumns().entrySet()) {
@@ -157,6 +162,8 @@ public class ParquetRewriter implements Closeable {
       this.encryptMode = true;
     }
 
+    this.indexCacheStrategy = options.getIndexCacheStrategy();
+
     ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
     writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), 
schema, writerMode,
             DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, 
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
@@ -178,6 +185,8 @@ public class ParquetRewriter implements Closeable {
     this.writer = writer;
     this.meta = meta;
     this.schema = schema;
+    this.descriptorsMap =
+      schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
     this.newCodecName = codecName;
     originalCreatedBy = originalCreatedBy == null ? 
meta.getFileMetaData().getCreatedBy() : originalCreatedBy;
     extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
@@ -188,6 +197,7 @@ public class ParquetRewriter implements Closeable {
         this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
       }
     }
+    this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
   }
 
   // Open all input files to validate their schemas are compatible to merge
@@ -247,24 +257,24 @@ public class ParquetRewriter implements Closeable {
 
   public void processBlocks() throws IOException {
     while (reader != null) {
-      processBlocksFromReader();
+      IndexCache indexCache = IndexCache.create(reader, 
descriptorsMap.keySet(), indexCacheStrategy, true);
+      processBlocksFromReader(indexCache);
+      indexCache.clean();
       initNextReader();
     }
   }
 
-  private void processBlocksFromReader() throws IOException {
+  private void processBlocksFromReader(IndexCache indexCache) throws 
IOException {
     PageReadStore store = reader.readNextRowGroup();
     ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new 
DummyGroupConverter(), schema, originalCreatedBy);
-    Map<ColumnPath, ColumnDescriptor> descriptorsMap = 
schema.getColumns().stream().collect(
-            Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
 
     int blockId = 0;
     while (store != null) {
       writer.startBlock(store.getRowCount());
 
       BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+      indexCache.setBlockMetadata(blockMetaData);
       List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
-
       for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
         ColumnChunkMetaData chunk = columnsInOrder.get(i);
         ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
@@ -314,13 +324,20 @@ public class ParquetRewriter implements Closeable {
 
           // Translate compression and/or encryption
           writer.startColumn(descriptor, 
crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
-          processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, 
encryptColumn);
+          processChunk(
+                  chunk,
+                  newCodecName,
+                  columnChunkEncryptorRunTime,
+                  encryptColumn,
+                  indexCache.getBloomFilter(chunk),
+                  indexCache.getColumnIndex(chunk),
+                  indexCache.getOffsetIndex(chunk));
           writer.endColumn();
         } else {
           // Nothing changed, simply copy the binary data.
-          BloomFilter bloomFilter = reader.readBloomFilter(chunk);
-          ColumnIndex columnIndex = reader.readColumnIndex(chunk);
-          OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+          BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
+          ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
+          OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
           writer.appendColumnChunk(descriptor, reader.getStream(), chunk, 
bloomFilter, columnIndex, offsetIndex);
         }
 
@@ -338,7 +355,10 @@ public class ParquetRewriter implements Closeable {
   private void processChunk(ColumnChunkMetaData chunk,
                             CompressionCodecName newCodecName,
                             ColumnChunkEncryptorRunTime 
columnChunkEncryptorRunTime,
-                            boolean encryptColumn) throws IOException {
+                            boolean encryptColumn,
+                            BloomFilter bloomFilter,
+                            ColumnIndex columnIndex,
+                            OffsetIndex offsetIndex) throws IOException {
     CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
     CompressionCodecFactory.BytesInputDecompressor decompressor = null;
     CompressionCodecFactory.BytesInputCompressor compressor = null;
@@ -364,9 +384,6 @@ public class ParquetRewriter implements Closeable {
       dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
     }
 
-    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
-    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
-    BloomFilter bloomFilter = reader.readBloomFilter(chunk);
     if (bloomFilter != null) {
       writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
     }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index cc1280921..5bdc8d590 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.hadoop.IndexCache;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
 import java.util.Arrays;
@@ -33,14 +34,15 @@ import java.util.Map;
  */
 public class RewriteOptions {
 
-  final Configuration conf;
-  final List<Path> inputFiles;
-  final Path outputFile;
-  final List<String> pruneColumns;
-  final CompressionCodecName newCodecName;
-  final Map<String, MaskMode> maskColumns;
-  final List<String> encryptColumns;
-  final FileEncryptionProperties fileEncryptionProperties;
+  private final Configuration conf;
+  private final List<Path> inputFiles;
+  private final Path outputFile;
+  private final List<String> pruneColumns;
+  private final CompressionCodecName newCodecName;
+  private final Map<String, MaskMode> maskColumns;
+  private final List<String> encryptColumns;
+  private final FileEncryptionProperties fileEncryptionProperties;
+  private final IndexCache.CacheStrategy indexCacheStrategy;
 
   private RewriteOptions(Configuration conf,
                          List<Path> inputFiles,
@@ -49,7 +51,8 @@ public class RewriteOptions {
                          CompressionCodecName newCodecName,
                          Map<String, MaskMode> maskColumns,
                          List<String> encryptColumns,
-                         FileEncryptionProperties fileEncryptionProperties) {
+                         FileEncryptionProperties fileEncryptionProperties,
+                         IndexCache.CacheStrategy indexCacheStrategy) {
     this.conf = conf;
     this.inputFiles = inputFiles;
     this.outputFile = outputFile;
@@ -58,6 +61,7 @@ public class RewriteOptions {
     this.maskColumns = maskColumns;
     this.encryptColumns = encryptColumns;
     this.fileEncryptionProperties = fileEncryptionProperties;
+    this.indexCacheStrategy = indexCacheStrategy;
   }
 
   public Configuration getConf() {
@@ -92,16 +96,21 @@ public class RewriteOptions {
     return fileEncryptionProperties;
   }
 
+  public IndexCache.CacheStrategy getIndexCacheStrategy() {
+    return indexCacheStrategy;
+  }
+
   // Builder to create a RewriterOptions.
   public static class Builder {
-    private Configuration conf;
-    private List<Path> inputFiles;
-    private Path outputFile;
+    private final Configuration conf;
+    private final List<Path> inputFiles;
+    private final Path outputFile;
     private List<String> pruneColumns;
     private CompressionCodecName newCodecName;
     private Map<String, MaskMode> maskColumns;
     private List<String> encryptColumns;
     private FileEncryptionProperties fileEncryptionProperties;
+    private IndexCache.CacheStrategy indexCacheStrategy = 
IndexCache.CacheStrategy.NONE;
 
     /**
      * Create a builder to create a RewriterOptions.
@@ -213,6 +222,20 @@ public class RewriteOptions {
       return this;
     }
 
+    /**
+     * Set the index(ColumnIndex, Offset and BloomFilter) cache strategy.
+     * <p>
+     * This could reduce the random seek while rewriting with PREFETCH_BLOCK 
strategy, NONE by default.
+     *
+     * @param cacheStrategy the index cache strategy, supports: {@link 
IndexCache.CacheStrategy#NONE} or
+     *        {@link IndexCache.CacheStrategy#PREFETCH_BLOCK}
+     * @return self
+     */
+    public Builder indexCacheStrategy(IndexCache.CacheStrategy cacheStrategy) {
+      this.indexCacheStrategy = cacheStrategy;
+      return this;
+    }
+
     /**
      * Build the RewriterOptions.
      *
@@ -255,7 +278,8 @@ public class RewriteOptions {
               newCodecName,
               maskColumns,
               encryptColumns,
-              fileEncryptionProperties);
+              fileEncryptionProperties,
+              indexCacheStrategy);
     }
   }
 
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java
new file mode 100644
index 000000000..32874f795
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.util.TestFileBuilder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.LocalInputFile;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+@RunWith(Parameterized.class)
+public class TestIndexCache {
+  private final Configuration conf = new Configuration();
+  private final int numRecords = 100000;
+  private final MessageType schema = new MessageType("schema",
+    new PrimitiveType(OPTIONAL, INT64, "DocId"),
+    new PrimitiveType(REQUIRED, BINARY, "Name"),
+    new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+    new GroupType(OPTIONAL, "Links",
+      new PrimitiveType(REPEATED, BINARY, "Backward"),
+      new PrimitiveType(REPEATED, BINARY, "Forward")));
+
+  private final ParquetProperties.WriterVersion writerVersion;
+
+  @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = 
{1}")
+  public static Object[] parameters() {
+    return new Object[] {"v1", "v2"};
+  }
+
+  public TestIndexCache(String writerVersion) {
+    this.writerVersion = 
ParquetProperties.WriterVersion.fromString(writerVersion);
+  }
+
+  @Test
+  public void testNoneCacheStrategy() throws IOException {
+    String file = createTestFile("DocID");
+
+    ParquetReadOptions options = ParquetReadOptions.builder().build();
+    ParquetFileReader fileReader = new ParquetFileReader(
+      new LocalInputFile(Paths.get(file)), options);
+    IndexCache indexCache = IndexCache.create(fileReader, new HashSet<>(), 
IndexCache.CacheStrategy.NONE, false);
+    Assert.assertTrue(indexCache instanceof NoneIndexCache);
+    List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
+    for (BlockMetaData blockMetaData : blocks) {
+      indexCache.setBlockMetadata(blockMetaData);
+      for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+        validateColumnIndex(fileReader.readColumnIndex(chunk), 
indexCache.getColumnIndex(chunk));
+        validateOffsetIndex(fileReader.readOffsetIndex(chunk), 
indexCache.getOffsetIndex(chunk));
+
+        Assert.assertEquals(
+          "BloomFilter should match",
+          fileReader.readBloomFilter(chunk),
+          indexCache.getBloomFilter(chunk));
+      }
+    }
+  }
+
+  @Test
+  public void testPrefetchCacheStrategy() throws IOException {
+    String file = createTestFile("DocID", "Name");
+
+    ParquetReadOptions options = ParquetReadOptions.builder().build();
+    ParquetFileReader fileReader = new ParquetFileReader(
+      new LocalInputFile(Paths.get(file)), options);
+    Set<ColumnPath> columns = new HashSet<>();
+    columns.add(ColumnPath.fromDotString("DocId"));
+    columns.add(ColumnPath.fromDotString("Name"));
+    columns.add(ColumnPath.fromDotString("Gender"));
+    columns.add(ColumnPath.fromDotString("Links.Backward"));
+    columns.add(ColumnPath.fromDotString("Links.Forward"));
+
+    IndexCache indexCache = IndexCache.create(fileReader, columns, 
IndexCache.CacheStrategy.PREFETCH_BLOCK, false);
+    Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
+    validPrecacheIndexCache(fileReader, indexCache, columns, false);
+
+    indexCache = IndexCache.create(fileReader, columns, 
IndexCache.CacheStrategy.PREFETCH_BLOCK, true);
+    Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
+    validPrecacheIndexCache(fileReader, indexCache, columns, true);
+  }
+
+  private String createTestFile(String... bloomFilterEnabledColumns) throws 
IOException {
+    return new TestFileBuilder(conf, schema)
+      .withNumRecord(numRecords)
+      .withCodec("ZSTD")
+      .withRowGroupSize(8L * 1024 * 1024)
+      .withBloomFilterEnabled(bloomFilterEnabledColumns)
+      .withWriterVersion(writerVersion)
+      .build()
+      .getFileName();
+  }
+
+  private static void validPrecacheIndexCache(
+      ParquetFileReader fileReader,
+      IndexCache indexCache,
+      Set<ColumnPath> columns,
+      boolean freeCacheAfterGet) throws IOException {
+    List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
+    for (BlockMetaData blockMetaData : blocks) {
+      indexCache.setBlockMetadata(blockMetaData);
+      for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+        validateColumnIndex(fileReader.readColumnIndex(chunk), 
indexCache.getColumnIndex(chunk));
+        validateOffsetIndex(fileReader.readOffsetIndex(chunk), 
indexCache.getOffsetIndex(chunk));
+
+        Assert.assertEquals(
+          "BloomFilter should match",
+          fileReader.readBloomFilter(chunk),
+          indexCache.getBloomFilter(chunk));
+
+        if (freeCacheAfterGet) {
+          Assert.assertThrows(IllegalStateException.class, () -> 
indexCache.getColumnIndex(chunk));
+          Assert.assertThrows(IllegalStateException.class, () -> 
indexCache.getOffsetIndex(chunk));
+          if (columns.contains(chunk.getPath())) {
+            Assert.assertThrows(IllegalStateException.class, () -> 
indexCache.getBloomFilter(chunk));
+          }
+        }
+      }
+    }
+  }
+
+  private static void validateColumnIndex(ColumnIndex expected, ColumnIndex 
target) {
+    if (expected == null) {
+      Assert.assertEquals("ColumnIndex should should equal", expected, target);
+    } else {
+      Assert.assertNotNull("ColumnIndex should not be null", target);
+      Assert.assertEquals(expected.getClass(), target.getClass());
+      Assert.assertEquals(expected.getMinValues(), target.getMinValues());
+      Assert.assertEquals(expected.getMaxValues(), target.getMaxValues());
+      Assert.assertEquals(expected.getBoundaryOrder(), 
target.getBoundaryOrder());
+      Assert.assertEquals(expected.getNullCounts(), target.getNullCounts());
+      Assert.assertEquals(expected.getNullPages(), target.getNullPages());
+    }
+  }
+
+  private static void validateOffsetIndex(OffsetIndex expected, OffsetIndex 
target) {
+    if (expected == null) {
+      Assert.assertEquals("OffsetIndex should should equal", expected, target);
+    } else {
+      Assert.assertNotNull("OffsetIndex should not be null", target);
+      Assert.assertEquals(expected.getClass(), target.getClass());
+      Assert.assertEquals(expected.toString(), target.toString());
+    }
+  }
+}
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 1f03deceb..6ce7e2c91 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -37,6 +37,7 @@ import org.apache.parquet.format.DataPageHeader;
 import org.apache.parquet.format.DataPageHeaderV2;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.IndexCache;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
@@ -97,18 +98,20 @@ public class ParquetRewriterTest {
   private final int numRecord = 100000;
   private final Configuration conf = new Configuration();
   private final ParquetProperties.WriterVersion writerVersion;
+  private final IndexCache.CacheStrategy indexCacheStrategy;
 
   private List<EncryptionTestFile> inputFiles = null;
   private String outputFile = null;
   private ParquetRewriter rewriter = null;
 
-  @Parameterized.Parameters(name = "WriterVersion = {0}")
-  public static Object[] parameters() {
-    return new Object[] {"v1", "v2"};
+  @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = 
{1}")
+  public static Object[][] parameters() {
+    return new Object[][] {{"v1", "NONE"}, {"v1", "PREFETCH_BLOCK"}, {"v2", 
"NONE"}, {"v2", "PREFETCH_BLOCK"}};
   }
 
-  public ParquetRewriterTest(String writerVersion) {
+  public ParquetRewriterTest(String writerVersion, String indexCacheStrategy) {
     this.writerVersion = 
ParquetProperties.WriterVersion.fromString(writerVersion);
+    this.indexCacheStrategy = 
IndexCache.CacheStrategy.valueOf(indexCacheStrategy);
   }
 
   private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) 
throws Exception {
@@ -116,7 +119,8 @@ public class ParquetRewriterTest {
     List<String> pruneColumns = Arrays.asList("Gender");
     CompressionCodecName newCodec = CompressionCodecName.ZSTD;
     RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
-    RewriteOptions options = 
builder.prune(pruneColumns).transform(newCodec).build();
+    RewriteOptions options =
+      
builder.prune(pruneColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build();
 
     rewriter = new ParquetRewriter(options);
     rewriter.processBlocks();
@@ -187,7 +191,8 @@ public class ParquetRewriterTest {
     maskColumns.put("Links.Forward", MaskMode.NULLIFY);
     CompressionCodecName newCodec = CompressionCodecName.ZSTD;
     RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
-    RewriteOptions options = 
builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).build();
+    RewriteOptions options =
+      
builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build();
 
     rewriter = new ParquetRewriter(options);
     rewriter.processBlocks();
@@ -263,6 +268,8 @@ public class ParquetRewriterTest {
             EncDecProperties.getFileEncryptionProperties(encryptColumns, 
ParquetCipher.AES_GCM_CTR_V1, false);
     
builder.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties);
 
+    builder.indexCacheStrategy(indexCacheStrategy);
+
     RewriteOptions options = builder.build();
     rewriter = new ParquetRewriter(options);
     rewriter.processBlocks();
@@ -345,7 +352,8 @@ public class ParquetRewriterTest {
 
     List<String> pruneCols = Lists.newArrayList("phoneNumbers");
 
-    RewriteOptions options = builder.mask(maskCols).prune(pruneCols).build();
+    RewriteOptions options =
+      
builder.mask(maskCols).prune(pruneCols).indexCacheStrategy(indexCacheStrategy).build();
     rewriter = new ParquetRewriter(options);
     rewriter.processBlocks();
     rewriter.close();
@@ -401,9 +409,13 @@ public class ParquetRewriterTest {
             encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false);
 
     Path outputPath = new Path(outputFile);
-    RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths, 
outputPath).mask(maskColumns)
-            .transform(CompressionCodecName.ZSTD)
-            
.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties).build();
+    RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths, 
outputPath)
+      .mask(maskColumns)
+      .transform(CompressionCodecName.ZSTD)
+      .encrypt(Arrays.asList(encryptColumns))
+      .encryptionProperties(fileEncryptionProperties)
+      .indexCacheStrategy(indexCacheStrategy)
+      .build();
 
     rewriter = new ParquetRewriter(options);
     rewriter.processBlocks();
@@ -474,7 +486,7 @@ public class ParquetRewriterTest {
     }
     Path outputPath = new Path(outputFile);
     RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
-    RewriteOptions options = builder.build();
+    RewriteOptions options = 
builder.indexCacheStrategy(indexCacheStrategy).build();
 
     rewriter = new ParquetRewriter(options);
     rewriter.processBlocks();
@@ -542,7 +554,7 @@ public class ParquetRewriterTest {
     }
     Path outputPath = new Path(outputFile);
     RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
-    RewriteOptions options = builder.build();
+    RewriteOptions options = 
builder.indexCacheStrategy(indexCacheStrategy).build();
 
     // This should throw an exception because the schemas are different
     rewriter = new ParquetRewriter(options);


Reply via email to