This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 514cc6c25 PARQUET-2366: Optimize random seek during rewriting (#1174)
514cc6c25 is described below
commit 514cc6c257fe8e618b100a19d86d304f6442cb94
Author: Xianyang Liu <[email protected]>
AuthorDate: Mon Oct 30 09:54:02 2023 +0800
PARQUET-2366: Optimize random seek during rewriting (#1174)
---
.../java/org/apache/parquet/hadoop/IndexCache.java | 100 +++++++++++
.../org/apache/parquet/hadoop/NoneIndexCache.java | 63 +++++++
.../apache/parquet/hadoop/PrefetchIndexCache.java | 176 ++++++++++++++++++++
.../parquet/hadoop/rewrite/ParquetRewriter.java | 57 ++++---
.../parquet/hadoop/rewrite/RewriteOptions.java | 50 ++++--
.../org/apache/parquet/hadoop/TestIndexCache.java | 183 +++++++++++++++++++++
.../hadoop/rewrite/ParquetRewriterTest.java | 36 ++--
7 files changed, 620 insertions(+), 45 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java
new file mode 100644
index 000000000..8002ee385
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ * A cache for caching indexes(including: ColumnIndex, OffsetIndex and
BloomFilter)
+ */
+public interface IndexCache {
+
+ enum CacheStrategy {
+ NONE, /* No cache */
+ PREFETCH_BLOCK /* Prefetch block indexes */
+ }
+
+ /**
+ * Create an index cache for the given file reader
+ *
+ * @param fileReader the file reader
+ * @param columns the columns that need to do cache
+ * @param cacheStrategy the cache strategy, supports NONE and PREFETCH_BLOCK
+ * @param freeCacheAfterGet whether free the given index cache after calling
the given get method
+ * @return the index cache
+ */
+ static IndexCache create(
+ ParquetFileReader fileReader,
+ Set<ColumnPath> columns,
+ CacheStrategy cacheStrategy,
+ boolean freeCacheAfterGet) {
+ if (cacheStrategy == CacheStrategy.NONE) {
+ return new NoneIndexCache(fileReader);
+ } else if (cacheStrategy == CacheStrategy.PREFETCH_BLOCK) {
+ return new PrefetchIndexCache(fileReader, columns, freeCacheAfterGet);
+ } else {
+ throw new UnsupportedOperationException("Unknown cache strategy: " +
cacheStrategy);
+ }
+ }
+
+ /**
+ * Set the current BlockMetadata
+ */
+ void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException;
+
+ /**
+ * Get the ColumnIndex for the given column in the set row group.
+ *
+ * @param chunk the given column chunk
+ * @return the ColumnIndex for the given column
+ * @throws IOException if any I/O error occurs during get the ColumnIndex
+ */
+ ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException;
+
+ /**
+ * Get the OffsetIndex for the given column in the set row group.
+ *
+ * @param chunk the given column chunk
+ * @return the OffsetIndex for the given column
+ * @throws IOException if any I/O error occurs during get the OffsetIndex
+ */
+ OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException;
+
+ /**
+ * Get the BloomFilter for the given column in the set row group.
+ *
+ * @param chunk the given column chunk
+ * @return the BloomFilter for the given column
+ * @throws IOException if any I/O error occurs during get the BloomFilter
+ */
+ BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException;
+
+ /**
+ * Clean the cache
+ */
+ void clean();
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java
new file mode 100644
index 000000000..e1aded199
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+import java.io.IOException;
+
+/**
+ * Cache nothing. All the get methods are pushed to ParquetFileReader to read
the given index.
+ */
+class NoneIndexCache implements IndexCache {
+ private final ParquetFileReader fileReader;
+
+ NoneIndexCache(ParquetFileReader fileReader) {
+ this.fileReader = fileReader;
+ }
+
+ @Override
+ public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws
IOException {
+ // Do nothing
+ }
+
+ @Override
+ public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws
IOException {
+ return fileReader.readColumnIndex(chunk);
+ }
+
+ @Override
+ public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws
IOException {
+ return fileReader.readOffsetIndex(chunk);
+ }
+
+ @Override
+ public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws
IOException {
+ return fileReader.readBloomFilter(chunk);
+ }
+
+ @Override
+ public void clean() {
+ // Do nothing
+ }
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java
new file mode 100644
index 000000000..9bbf901ff
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This index cache will prefetch indexes of all columns when calling {@link
#setBlockMetadata(BlockMetaData)}.
+ * <p>
+ * <b>NOTE:</b> the {@link #setBlockMetadata(BlockMetaData)} will free the
previous block cache
+ */
+class PrefetchIndexCache implements IndexCache {
+ private final ParquetFileReader fileReader;
+ private final Set<ColumnPath> columns;
+ private final boolean freeCacheAfterGet;
+
+ private Map<ColumnPath, ColumnIndex> columnIndexCache;
+ private Map<ColumnPath, OffsetIndex> offsetIndexCache;
+ private Map<ColumnPath, BloomFilter> bloomIndexCache;
+
+ /**
+ * @param fileReader the file reader
+ * @param columns the columns that need to cache
+ * @param freeCacheAfterGet whether free the given index cache after calling
the given get method
+ */
+ PrefetchIndexCache(
+ ParquetFileReader fileReader,
+ Set<ColumnPath> columns,
+ boolean freeCacheAfterGet) {
+ this.fileReader = fileReader;
+ this.columns = columns;
+ this.freeCacheAfterGet = freeCacheAfterGet;
+ }
+
+ @Override
+ public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws
IOException {
+ clean();
+ this.columnIndexCache = readAllColumnIndexes(currentBlockMetadata);
+ this.offsetIndexCache = readAllOffsetIndexes(currentBlockMetadata);
+ this.bloomIndexCache = readAllBloomFilters(currentBlockMetadata);
+ }
+
+ @Override
+ public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws
IOException {
+ ColumnPath columnPath = chunk.getPath();
+ if (columns.contains(columnPath)) {
+ Preconditions.checkState(
+ columnIndexCache.containsKey(columnPath),
+ "Not found cached ColumnIndex for column: %s with cache strategy: %s",
+ columnPath.toDotString(),
+ CacheStrategy.PREFETCH_BLOCK);
+ }
+
+ if (freeCacheAfterGet) {
+ return columnIndexCache.remove(columnPath);
+ } else {
+ return columnIndexCache.get(columnPath);
+ }
+ }
+
+ @Override
+ public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws
IOException {
+ ColumnPath columnPath = chunk.getPath();
+
+ if (columns.contains(columnPath)) {
+ Preconditions.checkState(
+ offsetIndexCache.containsKey(columnPath),
+ "Not found cached OffsetIndex for column: %s with cache strategy: %s",
+ columnPath.toDotString(),
+ CacheStrategy.PREFETCH_BLOCK);
+ }
+
+ if (freeCacheAfterGet) {
+ return offsetIndexCache.remove(columnPath);
+ } else {
+ return offsetIndexCache.get(columnPath);
+ }
+ }
+
+ @Override
+ public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws
IOException {
+ ColumnPath columnPath = chunk.getPath();
+
+ if (columns.contains(columnPath)) {
+ Preconditions.checkState(
+ bloomIndexCache.containsKey(columnPath),
+ "Not found cached BloomFilter for column: %s with cache strategy: %s",
+ columnPath.toDotString(),
+ CacheStrategy.PREFETCH_BLOCK);
+ }
+
+ if (freeCacheAfterGet) {
+ return bloomIndexCache.remove(columnPath);
+ } else {
+ return bloomIndexCache.get(columnPath);
+ }
+ }
+
+ @Override
+ public void clean() {
+ if (columnIndexCache != null) {
+ columnIndexCache.clear();
+ columnIndexCache = null;
+ }
+
+ if (offsetIndexCache != null) {
+ offsetIndexCache.clear();
+ offsetIndexCache = null;
+ }
+
+ if (bloomIndexCache != null) {
+ bloomIndexCache.clear();
+ bloomIndexCache = null;
+ }
+ }
+
+ private Map<ColumnPath, ColumnIndex> readAllColumnIndexes(BlockMetaData
blockMetaData) throws IOException {
+ Map<ColumnPath, ColumnIndex> columnIndexMap = new
HashMap<>(columns.size());
+ for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+ if (columns.contains(chunk.getPath())) {
+ columnIndexMap.put(chunk.getPath(), fileReader.readColumnIndex(chunk));
+ }
+ }
+
+ return columnIndexMap;
+ }
+
+ private Map<ColumnPath, OffsetIndex> readAllOffsetIndexes(BlockMetaData
blockMetaData) throws IOException {
+ Map<ColumnPath, OffsetIndex> offsetIndexMap = new
HashMap<>(columns.size());
+ for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+ if (columns.contains(chunk.getPath())) {
+ offsetIndexMap.put(chunk.getPath(), fileReader.readOffsetIndex(chunk));
+ }
+ }
+
+ return offsetIndexMap;
+ }
+
+ private Map<ColumnPath, BloomFilter> readAllBloomFilters(BlockMetaData
blockMetaData) throws IOException {
+ Map<ColumnPath, BloomFilter> bloomFilterMap = new
HashMap<>(columns.size());
+ for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+ if (columns.contains(chunk.getPath())) {
+ bloomFilterMap.put(chunk.getPath(), fileReader.readBloomFilter(chunk));
+ }
+ }
+
+ return bloomFilterMap;
+ }
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 659ac1e5c..2eaaab18c 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -45,6 +45,7 @@ import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.IndexCache;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -96,19 +97,19 @@ public class ParquetRewriter implements Closeable {
private final byte[] pageBuffer = new byte[pageBufferSize];
// Configurations for the new file
private CompressionCodecName newCodecName = null;
- private List<String> pruneColumns = null;
private Map<ColumnPath, MaskMode> maskColumns = null;
private Set<ColumnPath> encryptColumns = null;
private boolean encryptMode = false;
- private Map<String, String> extraMetaData = new HashMap<>();
+ private final Map<String, String> extraMetaData = new HashMap<>();
// Writer to rewrite the input files
- private ParquetFileWriter writer;
+ private final ParquetFileWriter writer;
// Number of blocks written which is used to keep track of the actual row
group ordinal
private int numBlocksRewritten = 0;
// Reader and relevant states of the in-processing input file
- private Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
+ private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
// Schema of input files (should be the same) and to write to the output file
private MessageType schema = null;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
// The reader for the current input file
private TransParquetFileReader reader = null;
// The metadata of current reader being processed
@@ -116,7 +117,9 @@ public class ParquetRewriter implements Closeable {
// created_by information of current reader being processed
private String originalCreatedBy = "";
// Unique created_by information from all input files
- private Set<String> allOriginalCreatedBys = new HashSet<>();
+ private final Set<String> allOriginalCreatedBys = new HashSet<>();
+ // The index cache strategy
+ private final IndexCache.CacheStrategy indexCacheStrategy;
public ParquetRewriter(RewriteOptions options) throws IOException {
Configuration conf = options.getConf();
@@ -129,8 +132,7 @@ public class ParquetRewriter implements Closeable {
initNextReader();
newCodecName = options.getNewCodecName();
- pruneColumns = options.getPruneColumns();
-
+ List<String> pruneColumns = options.getPruneColumns();
// Prune columns if specified
if (pruneColumns != null && !pruneColumns.isEmpty()) {
List<String> paths = new ArrayList<>();
@@ -145,6 +147,9 @@ public class ParquetRewriter implements Closeable {
schema = pruneColumnsInSchema(schema, prunePaths);
}
+ this.descriptorsMap =
+ schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+
if (options.getMaskColumns() != null) {
this.maskColumns = new HashMap<>();
for (Map.Entry<String, MaskMode> col :
options.getMaskColumns().entrySet()) {
@@ -157,6 +162,8 @@ public class ParquetRewriter implements Closeable {
this.encryptMode = true;
}
+ this.indexCacheStrategy = options.getIndexCacheStrategy();
+
ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf),
schema, writerMode,
DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT,
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
@@ -178,6 +185,8 @@ public class ParquetRewriter implements Closeable {
this.writer = writer;
this.meta = meta;
this.schema = schema;
+ this.descriptorsMap =
+ schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
this.newCodecName = codecName;
originalCreatedBy = originalCreatedBy == null ?
meta.getFileMetaData().getCreatedBy() : originalCreatedBy;
extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
@@ -188,6 +197,7 @@ public class ParquetRewriter implements Closeable {
this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
}
}
+ this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
}
// Open all input files to validate their schemas are compatible to merge
@@ -247,24 +257,24 @@ public class ParquetRewriter implements Closeable {
public void processBlocks() throws IOException {
while (reader != null) {
- processBlocksFromReader();
+ IndexCache indexCache = IndexCache.create(reader,
descriptorsMap.keySet(), indexCacheStrategy, true);
+ processBlocksFromReader(indexCache);
+ indexCache.clean();
initNextReader();
}
}
- private void processBlocksFromReader() throws IOException {
+ private void processBlocksFromReader(IndexCache indexCache) throws
IOException {
PageReadStore store = reader.readNextRowGroup();
ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new
DummyGroupConverter(), schema, originalCreatedBy);
- Map<ColumnPath, ColumnDescriptor> descriptorsMap =
schema.getColumns().stream().collect(
- Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
int blockId = 0;
while (store != null) {
writer.startBlock(store.getRowCount());
BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+ indexCache.setBlockMetadata(blockMetaData);
List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
-
for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
ColumnChunkMetaData chunk = columnsInOrder.get(i);
ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
@@ -314,13 +324,20 @@ public class ParquetRewriter implements Closeable {
// Translate compression and/or encryption
writer.startColumn(descriptor,
crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
- processChunk(chunk, newCodecName, columnChunkEncryptorRunTime,
encryptColumn);
+ processChunk(
+ chunk,
+ newCodecName,
+ columnChunkEncryptorRunTime,
+ encryptColumn,
+ indexCache.getBloomFilter(chunk),
+ indexCache.getColumnIndex(chunk),
+ indexCache.getOffsetIndex(chunk));
writer.endColumn();
} else {
// Nothing changed, simply copy the binary data.
- BloomFilter bloomFilter = reader.readBloomFilter(chunk);
- ColumnIndex columnIndex = reader.readColumnIndex(chunk);
- OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+ BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
+ ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
+ OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
writer.appendColumnChunk(descriptor, reader.getStream(), chunk,
bloomFilter, columnIndex, offsetIndex);
}
@@ -338,7 +355,10 @@ public class ParquetRewriter implements Closeable {
private void processChunk(ColumnChunkMetaData chunk,
CompressionCodecName newCodecName,
ColumnChunkEncryptorRunTime
columnChunkEncryptorRunTime,
- boolean encryptColumn) throws IOException {
+ boolean encryptColumn,
+ BloomFilter bloomFilter,
+ ColumnIndex columnIndex,
+ OffsetIndex offsetIndex) throws IOException {
CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
CompressionCodecFactory.BytesInputDecompressor decompressor = null;
CompressionCodecFactory.BytesInputCompressor compressor = null;
@@ -364,9 +384,6 @@ public class ParquetRewriter implements Closeable {
dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
}
- ColumnIndex columnIndex = reader.readColumnIndex(chunk);
- OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
- BloomFilter bloomFilter = reader.readBloomFilter(chunk);
if (bloomFilter != null) {
writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index cc1280921..5bdc8d590 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.Preconditions;
import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.hadoop.IndexCache;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.util.Arrays;
@@ -33,14 +34,15 @@ import java.util.Map;
*/
public class RewriteOptions {
- final Configuration conf;
- final List<Path> inputFiles;
- final Path outputFile;
- final List<String> pruneColumns;
- final CompressionCodecName newCodecName;
- final Map<String, MaskMode> maskColumns;
- final List<String> encryptColumns;
- final FileEncryptionProperties fileEncryptionProperties;
+ private final Configuration conf;
+ private final List<Path> inputFiles;
+ private final Path outputFile;
+ private final List<String> pruneColumns;
+ private final CompressionCodecName newCodecName;
+ private final Map<String, MaskMode> maskColumns;
+ private final List<String> encryptColumns;
+ private final FileEncryptionProperties fileEncryptionProperties;
+ private final IndexCache.CacheStrategy indexCacheStrategy;
private RewriteOptions(Configuration conf,
List<Path> inputFiles,
@@ -49,7 +51,8 @@ public class RewriteOptions {
CompressionCodecName newCodecName,
Map<String, MaskMode> maskColumns,
List<String> encryptColumns,
- FileEncryptionProperties fileEncryptionProperties) {
+ FileEncryptionProperties fileEncryptionProperties,
+ IndexCache.CacheStrategy indexCacheStrategy) {
this.conf = conf;
this.inputFiles = inputFiles;
this.outputFile = outputFile;
@@ -58,6 +61,7 @@ public class RewriteOptions {
this.maskColumns = maskColumns;
this.encryptColumns = encryptColumns;
this.fileEncryptionProperties = fileEncryptionProperties;
+ this.indexCacheStrategy = indexCacheStrategy;
}
public Configuration getConf() {
@@ -92,16 +96,21 @@ public class RewriteOptions {
return fileEncryptionProperties;
}
+ public IndexCache.CacheStrategy getIndexCacheStrategy() {
+ return indexCacheStrategy;
+ }
+
// Builder to create a RewriterOptions.
public static class Builder {
- private Configuration conf;
- private List<Path> inputFiles;
- private Path outputFile;
+ private final Configuration conf;
+ private final List<Path> inputFiles;
+ private final Path outputFile;
private List<String> pruneColumns;
private CompressionCodecName newCodecName;
private Map<String, MaskMode> maskColumns;
private List<String> encryptColumns;
private FileEncryptionProperties fileEncryptionProperties;
+ private IndexCache.CacheStrategy indexCacheStrategy =
IndexCache.CacheStrategy.NONE;
/**
* Create a builder to create a RewriterOptions.
@@ -213,6 +222,20 @@ public class RewriteOptions {
return this;
}
+ /**
+ * Set the index(ColumnIndex, Offset and BloomFilter) cache strategy.
+ * <p>
+ * This could reduce the random seek while rewriting with PREFETCH_BLOCK
strategy, NONE by default.
+ *
+ * @param cacheStrategy the index cache strategy, supports: {@link
IndexCache.CacheStrategy#NONE} or
+ * {@link IndexCache.CacheStrategy#PREFETCH_BLOCK}
+ * @return self
+ */
+ public Builder indexCacheStrategy(IndexCache.CacheStrategy cacheStrategy) {
+ this.indexCacheStrategy = cacheStrategy;
+ return this;
+ }
+
/**
* Build the RewriterOptions.
*
@@ -255,7 +278,8 @@ public class RewriteOptions {
newCodecName,
maskColumns,
encryptColumns,
- fileEncryptionProperties);
+ fileEncryptionProperties,
+ indexCacheStrategy);
}
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java
new file mode 100644
index 000000000..32874f795
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.util.TestFileBuilder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.LocalInputFile;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+@RunWith(Parameterized.class)
+public class TestIndexCache {
+ private final Configuration conf = new Configuration();
+ private final int numRecords = 100000;
+ private final MessageType schema = new MessageType("schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+ new GroupType(OPTIONAL, "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+
+ private final ParquetProperties.WriterVersion writerVersion;
+
+ @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy =
{1}")
+ public static Object[] parameters() {
+ return new Object[] {"v1", "v2"};
+ }
+
+ public TestIndexCache(String writerVersion) {
+ this.writerVersion =
ParquetProperties.WriterVersion.fromString(writerVersion);
+ }
+
+ @Test
+ public void testNoneCacheStrategy() throws IOException {
+ String file = createTestFile("DocID");
+
+ ParquetReadOptions options = ParquetReadOptions.builder().build();
+ ParquetFileReader fileReader = new ParquetFileReader(
+ new LocalInputFile(Paths.get(file)), options);
+ IndexCache indexCache = IndexCache.create(fileReader, new HashSet<>(),
IndexCache.CacheStrategy.NONE, false);
+ Assert.assertTrue(indexCache instanceof NoneIndexCache);
+ List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
+ for (BlockMetaData blockMetaData : blocks) {
+ indexCache.setBlockMetadata(blockMetaData);
+ for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+ validateColumnIndex(fileReader.readColumnIndex(chunk),
indexCache.getColumnIndex(chunk));
+ validateOffsetIndex(fileReader.readOffsetIndex(chunk),
indexCache.getOffsetIndex(chunk));
+
+ Assert.assertEquals(
+ "BloomFilter should match",
+ fileReader.readBloomFilter(chunk),
+ indexCache.getBloomFilter(chunk));
+ }
+ }
+ }
+
+ @Test
+ public void testPrefetchCacheStrategy() throws IOException {
+ String file = createTestFile("DocID", "Name");
+
+ ParquetReadOptions options = ParquetReadOptions.builder().build();
+ ParquetFileReader fileReader = new ParquetFileReader(
+ new LocalInputFile(Paths.get(file)), options);
+ Set<ColumnPath> columns = new HashSet<>();
+ columns.add(ColumnPath.fromDotString("DocId"));
+ columns.add(ColumnPath.fromDotString("Name"));
+ columns.add(ColumnPath.fromDotString("Gender"));
+ columns.add(ColumnPath.fromDotString("Links.Backward"));
+ columns.add(ColumnPath.fromDotString("Links.Forward"));
+
+ IndexCache indexCache = IndexCache.create(fileReader, columns,
IndexCache.CacheStrategy.PREFETCH_BLOCK, false);
+ Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
+ validPrecacheIndexCache(fileReader, indexCache, columns, false);
+
+ indexCache = IndexCache.create(fileReader, columns,
IndexCache.CacheStrategy.PREFETCH_BLOCK, true);
+ Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
+ validPrecacheIndexCache(fileReader, indexCache, columns, true);
+ }
+
+ private String createTestFile(String... bloomFilterEnabledColumns) throws
IOException {
+ return new TestFileBuilder(conf, schema)
+ .withNumRecord(numRecords)
+ .withCodec("ZSTD")
+ .withRowGroupSize(8L * 1024 * 1024)
+ .withBloomFilterEnabled(bloomFilterEnabledColumns)
+ .withWriterVersion(writerVersion)
+ .build()
+ .getFileName();
+ }
+
+ private static void validPrecacheIndexCache(
+ ParquetFileReader fileReader,
+ IndexCache indexCache,
+ Set<ColumnPath> columns,
+ boolean freeCacheAfterGet) throws IOException {
+ List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
+ for (BlockMetaData blockMetaData : blocks) {
+ indexCache.setBlockMetadata(blockMetaData);
+ for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+ validateColumnIndex(fileReader.readColumnIndex(chunk),
indexCache.getColumnIndex(chunk));
+ validateOffsetIndex(fileReader.readOffsetIndex(chunk),
indexCache.getOffsetIndex(chunk));
+
+ Assert.assertEquals(
+ "BloomFilter should match",
+ fileReader.readBloomFilter(chunk),
+ indexCache.getBloomFilter(chunk));
+
+ if (freeCacheAfterGet) {
+ Assert.assertThrows(IllegalStateException.class, () ->
indexCache.getColumnIndex(chunk));
+ Assert.assertThrows(IllegalStateException.class, () ->
indexCache.getOffsetIndex(chunk));
+ if (columns.contains(chunk.getPath())) {
+ Assert.assertThrows(IllegalStateException.class, () ->
indexCache.getBloomFilter(chunk));
+ }
+ }
+ }
+ }
+ }
+
+ private static void validateColumnIndex(ColumnIndex expected, ColumnIndex
target) {
+ if (expected == null) {
+ Assert.assertEquals("ColumnIndex should should equal", expected, target);
+ } else {
+ Assert.assertNotNull("ColumnIndex should not be null", target);
+ Assert.assertEquals(expected.getClass(), target.getClass());
+ Assert.assertEquals(expected.getMinValues(), target.getMinValues());
+ Assert.assertEquals(expected.getMaxValues(), target.getMaxValues());
+ Assert.assertEquals(expected.getBoundaryOrder(),
target.getBoundaryOrder());
+ Assert.assertEquals(expected.getNullCounts(), target.getNullCounts());
+ Assert.assertEquals(expected.getNullPages(), target.getNullPages());
+ }
+ }
+
+ private static void validateOffsetIndex(OffsetIndex expected, OffsetIndex
target) {
+ if (expected == null) {
+ Assert.assertEquals("OffsetIndex should should equal", expected, target);
+ } else {
+ Assert.assertNotNull("OffsetIndex should not be null", target);
+ Assert.assertEquals(expected.getClass(), target.getClass());
+ Assert.assertEquals(expected.toString(), target.toString());
+ }
+ }
+}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 1f03deceb..6ce7e2c91 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -37,6 +37,7 @@ import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.IndexCache;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -97,18 +98,20 @@ public class ParquetRewriterTest {
private final int numRecord = 100000;
private final Configuration conf = new Configuration();
private final ParquetProperties.WriterVersion writerVersion;
+ private final IndexCache.CacheStrategy indexCacheStrategy;
private List<EncryptionTestFile> inputFiles = null;
private String outputFile = null;
private ParquetRewriter rewriter = null;
- @Parameterized.Parameters(name = "WriterVersion = {0}")
- public static Object[] parameters() {
- return new Object[] {"v1", "v2"};
+ @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy =
{1}")
+ public static Object[][] parameters() {
+ return new Object[][] {{"v1", "NONE"}, {"v1", "PREFETCH_BLOCK"}, {"v2",
"NONE"}, {"v2", "PREFETCH_BLOCK"}};
}
- public ParquetRewriterTest(String writerVersion) {
+ public ParquetRewriterTest(String writerVersion, String indexCacheStrategy) {
this.writerVersion =
ParquetProperties.WriterVersion.fromString(writerVersion);
+ this.indexCacheStrategy =
IndexCache.CacheStrategy.valueOf(indexCacheStrategy);
}
private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths)
throws Exception {
@@ -116,7 +119,8 @@ public class ParquetRewriterTest {
List<String> pruneColumns = Arrays.asList("Gender");
CompressionCodecName newCodec = CompressionCodecName.ZSTD;
RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
- RewriteOptions options =
builder.prune(pruneColumns).transform(newCodec).build();
+ RewriteOptions options =
+
builder.prune(pruneColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build();
rewriter = new ParquetRewriter(options);
rewriter.processBlocks();
@@ -187,7 +191,8 @@ public class ParquetRewriterTest {
maskColumns.put("Links.Forward", MaskMode.NULLIFY);
CompressionCodecName newCodec = CompressionCodecName.ZSTD;
RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
- RewriteOptions options =
builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).build();
+ RewriteOptions options =
+
builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build();
rewriter = new ParquetRewriter(options);
rewriter.processBlocks();
@@ -263,6 +268,8 @@ public class ParquetRewriterTest {
EncDecProperties.getFileEncryptionProperties(encryptColumns,
ParquetCipher.AES_GCM_CTR_V1, false);
builder.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties);
+ builder.indexCacheStrategy(indexCacheStrategy);
+
RewriteOptions options = builder.build();
rewriter = new ParquetRewriter(options);
rewriter.processBlocks();
@@ -345,7 +352,8 @@ public class ParquetRewriterTest {
List<String> pruneCols = Lists.newArrayList("phoneNumbers");
- RewriteOptions options = builder.mask(maskCols).prune(pruneCols).build();
+ RewriteOptions options =
+
builder.mask(maskCols).prune(pruneCols).indexCacheStrategy(indexCacheStrategy).build();
rewriter = new ParquetRewriter(options);
rewriter.processBlocks();
rewriter.close();
@@ -401,9 +409,13 @@ public class ParquetRewriterTest {
encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false);
Path outputPath = new Path(outputFile);
- RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths,
outputPath).mask(maskColumns)
- .transform(CompressionCodecName.ZSTD)
-
.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties).build();
+ RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths,
outputPath)
+ .mask(maskColumns)
+ .transform(CompressionCodecName.ZSTD)
+ .encrypt(Arrays.asList(encryptColumns))
+ .encryptionProperties(fileEncryptionProperties)
+ .indexCacheStrategy(indexCacheStrategy)
+ .build();
rewriter = new ParquetRewriter(options);
rewriter.processBlocks();
@@ -474,7 +486,7 @@ public class ParquetRewriterTest {
}
Path outputPath = new Path(outputFile);
RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
- RewriteOptions options = builder.build();
+ RewriteOptions options =
builder.indexCacheStrategy(indexCacheStrategy).build();
rewriter = new ParquetRewriter(options);
rewriter.processBlocks();
@@ -542,7 +554,7 @@ public class ParquetRewriterTest {
}
Path outputPath = new Path(outputFile);
RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
- RewriteOptions options = builder.build();
+ RewriteOptions options =
builder.indexCacheStrategy(indexCacheStrategy).build();
// This should throw an exception because the schemas are different
rewriter = new ParquetRewriter(options);