[
https://issues.apache.org/jira/browse/PARQUET-2366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17777155#comment-17777155
]
ASF GitHub Bot commented on PARQUET-2366:
-----------------------------------------
ConeyLiu commented on code in PR #1174:
URL: https://github.com/apache/parquet-mr/pull/1174#discussion_r1365213147
##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.util.TestFileBuilder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.LocalInputFile;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+@RunWith(Parameterized.class)
+public class TestIndexCache {
+ private final Configuration conf = new Configuration();
+ private final int numRecords = 100000;
+ private final MessageType schema = new MessageType("schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+ new GroupType(OPTIONAL, "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+
+ private final ParquetProperties.WriterVersion writerVersion;
+
+ @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy =
{1}")
+ public static Object[] parameters() {
+ return new Object[] {"v1", "v2"};
+ }
+
+ public TestIndexCache(String writerVersion) {
+ this.writerVersion =
ParquetProperties.WriterVersion.fromString(writerVersion);
+ }
+
+ @Test
+ public void testNoneCacheStrategy() throws IOException {
+ String file = createTestFile("DocID");
+
+ ParquetReadOptions options = ParquetReadOptions.builder().build();
+ ParquetFileReader fileReader = new ParquetFileReader(
+ new LocalInputFile(Paths.get(file)), options);
+ IndexCache indexCache = IndexCache.create(fileReader, new HashSet<>(),
IndexCache.CacheStrategy.NONE);
+ Assert.assertTrue(indexCache instanceof NoneIndexCache);
+ List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
+ for (BlockMetaData blockMetaData : blocks) {
+ indexCache.setBlockMetadata(blockMetaData);
+ for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+ validateColumnIndex(fileReader.readColumnIndex(chunk),
indexCache.getColumnIndex(chunk));
+ validateOffsetIndex(fileReader.readOffsetIndex(chunk),
indexCache.getOffsetIndex(chunk));
+
+ Assert.assertEquals(
+ "BloomFilter should match",
+ fileReader.readBloomFilter(chunk),
+ indexCache.getBloomFilter(chunk));
+ }
+ }
+ }
+
+ @Test
+ public void testPrefetchCacheStrategy() throws IOException {
+ String file = createTestFile("DocID", "Name");
+
+ ParquetReadOptions options = ParquetReadOptions.builder().build();
+ ParquetFileReader fileReader = new ParquetFileReader(
+ new LocalInputFile(Paths.get(file)), options);
+ Set<ColumnPath> columns = new HashSet<>();
+ columns.add(ColumnPath.fromDotString("DocId"));
+ columns.add(ColumnPath.fromDotString("Name"));
+ columns.add(ColumnPath.fromDotString("Gender"));
+ columns.add(ColumnPath.fromDotString("Links.Backward"));
+ columns.add(ColumnPath.fromDotString("Links.Forward"));
+ IndexCache indexCache = IndexCache.create(fileReader, columns,
IndexCache.CacheStrategy.PRECACHE_BLOCK);
+ Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
+ List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
+ for (BlockMetaData blockMetaData : blocks) {
+ indexCache.setBlockMetadata(blockMetaData);
+ for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+ validateColumnIndex(fileReader.readColumnIndex(chunk),
indexCache.getColumnIndex(chunk));
+ validateOffsetIndex(fileReader.readOffsetIndex(chunk),
indexCache.getOffsetIndex(chunk));
+
+ Assert.assertEquals(
+ "BloomFilter should match",
+ fileReader.readBloomFilter(chunk),
+ indexCache.getBloomFilter(chunk));
+
+ Assert.assertThrows(IllegalStateException.class, () ->
indexCache.getColumnIndex(chunk));
+ Assert.assertThrows(IllegalStateException.class, () ->
indexCache.getOffsetIndex(chunk));
+ if (columns.contains(chunk.getPath())) {
+ Assert.assertThrows(IllegalStateException.class, () ->
indexCache.getBloomFilter(chunk));
+ }
+ }
+ }
+ }
+
+ private String createTestFile(String... bloomFilterEnabledColumns) throws
IOException {
+ return new TestFileBuilder(conf, schema)
+ .withNumRecord(numRecords)
+ .withCodec("UNCOMPRESSED")
+ .withRowGroupSize(1024L)
+ .withBloomFilterEnabled(bloomFilterEnabledColumns)
+ .withWriterVersion(writerVersion)
+ .build()
+ .getFileName();
+ }
+
+ private void validateColumnIndex(ColumnIndex expected, ColumnIndex target) {
Review Comment:
`ColumnIndex` has not customized `equals`, maybe we could implement it to
simplify the check here.
##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.util.TestFileBuilder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.LocalInputFile;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+@RunWith(Parameterized.class)
+public class TestIndexCache {
+ private final Configuration conf = new Configuration();
+ private final int numRecords = 100000;
+ private final MessageType schema = new MessageType("schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+ new GroupType(OPTIONAL, "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+
+ private final ParquetProperties.WriterVersion writerVersion;
+
+ @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy =
{1}")
+ public static Object[] parameters() {
+ return new Object[] {"v1", "v2"};
+ }
+
+ public TestIndexCache(String writerVersion) {
+ this.writerVersion =
ParquetProperties.WriterVersion.fromString(writerVersion);
+ }
+
+ @Test
+ public void testNoneCacheStrategy() throws IOException {
+ String file = createTestFile("DocID");
+
+ ParquetReadOptions options = ParquetReadOptions.builder().build();
+ ParquetFileReader fileReader = new ParquetFileReader(
+ new LocalInputFile(Paths.get(file)), options);
+ IndexCache indexCache = IndexCache.create(fileReader, new HashSet<>(),
IndexCache.CacheStrategy.NONE);
+ Assert.assertTrue(indexCache instanceof NoneIndexCache);
+ List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
+ for (BlockMetaData blockMetaData : blocks) {
+ indexCache.setBlockMetadata(blockMetaData);
+ for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+ validateColumnIndex(fileReader.readColumnIndex(chunk),
indexCache.getColumnIndex(chunk));
+ validateOffsetIndex(fileReader.readOffsetIndex(chunk),
indexCache.getOffsetIndex(chunk));
+
+ Assert.assertEquals(
+ "BloomFilter should match",
+ fileReader.readBloomFilter(chunk),
+ indexCache.getBloomFilter(chunk));
+ }
+ }
+ }
+
+ @Test
+ public void testPrefetchCacheStrategy() throws IOException {
+ String file = createTestFile("DocID", "Name");
+
+ ParquetReadOptions options = ParquetReadOptions.builder().build();
+ ParquetFileReader fileReader = new ParquetFileReader(
+ new LocalInputFile(Paths.get(file)), options);
+ Set<ColumnPath> columns = new HashSet<>();
+ columns.add(ColumnPath.fromDotString("DocId"));
+ columns.add(ColumnPath.fromDotString("Name"));
+ columns.add(ColumnPath.fromDotString("Gender"));
+ columns.add(ColumnPath.fromDotString("Links.Backward"));
+ columns.add(ColumnPath.fromDotString("Links.Forward"));
+ IndexCache indexCache = IndexCache.create(fileReader, columns,
IndexCache.CacheStrategy.PRECACHE_BLOCK);
+ Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
+ List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
+ for (BlockMetaData blockMetaData : blocks) {
+ indexCache.setBlockMetadata(blockMetaData);
+ for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+ validateColumnIndex(fileReader.readColumnIndex(chunk),
indexCache.getColumnIndex(chunk));
+ validateOffsetIndex(fileReader.readOffsetIndex(chunk),
indexCache.getOffsetIndex(chunk));
+
+ Assert.assertEquals(
+ "BloomFilter should match",
+ fileReader.readBloomFilter(chunk),
+ indexCache.getBloomFilter(chunk));
+
+ Assert.assertThrows(IllegalStateException.class, () ->
indexCache.getColumnIndex(chunk));
+ Assert.assertThrows(IllegalStateException.class, () ->
indexCache.getOffsetIndex(chunk));
+ if (columns.contains(chunk.getPath())) {
+ Assert.assertThrows(IllegalStateException.class, () ->
indexCache.getBloomFilter(chunk));
+ }
+ }
+ }
+ }
+
+ private String createTestFile(String... bloomFilterEnabledColumns) throws
IOException {
+ return new TestFileBuilder(conf, schema)
+ .withNumRecord(numRecords)
+ .withCodec("UNCOMPRESSED")
+ .withRowGroupSize(1024L)
+ .withBloomFilterEnabled(bloomFilterEnabledColumns)
+ .withWriterVersion(writerVersion)
+ .build()
+ .getFileName();
+ }
+
+ private void validateColumnIndex(ColumnIndex expected, ColumnIndex target) {
+ if (expected == null) {
+ Assert.assertEquals("ColumnIndex should should equal", expected, target);
+ } else {
+ Assert.assertNotNull("ColumnIndex should not be null", target);
+ Assert.assertEquals(expected.getClass(), target.getClass());
+ Assert.assertEquals(expected.getMinValues(), target.getMinValues());
+ Assert.assertEquals(expected.getMaxValues(), target.getMaxValues());
+ Assert.assertEquals(expected.getBoundaryOrder(),
target.getBoundaryOrder());
+ Assert.assertEquals(expected.getNullCounts(), target.getNullCounts());
+ Assert.assertEquals(expected.getNullPages(), target.getNullPages());
+ }
+ }
+
+ private void validateOffsetIndex(OffsetIndex expected, OffsetIndex target) {
Review Comment:
Sames as `OffsetIndex`
> Optimize random seek during rewriting
> -------------------------------------
>
> Key: PARQUET-2366
> URL: https://issues.apache.org/jira/browse/PARQUET-2366
> Project: Parquet
> Issue Type: Improvement
> Reporter: Xianyang Liu
> Priority: Major
>
> The `ColunIndex`, `OffsetIndex`, and `BloomFilter` are stored at the end of
> the file. We need to randomly seek 4 times when rewriting a column chunk. We
> found this could impact the rewrite performance heavily for files with a
> number of columns(~1000). In this PR, we read the `ColumnIndex`,
> `OffsetIndex`, and `BloomFilter` into a cache to avoid the random seek. We
> got about 60 times performance improvement in production environments for the
> files with about one thousand columns.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)