[ 
https://issues.apache.org/jira/browse/PARQUET-2366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17777156#comment-17777156
 ] 

ASF GitHub Bot commented on PARQUET-2366:
-----------------------------------------

ConeyLiu commented on code in PR #1174:
URL: https://github.com/apache/parquet-mr/pull/1174#discussion_r1365213364


##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.util.TestFileBuilder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.LocalInputFile;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+@RunWith(Parameterized.class)
+public class TestIndexCache {
+  private final Configuration conf = new Configuration();
+  private final int numRecords = 100000;
+  private final MessageType schema = new MessageType("schema",
+    new PrimitiveType(OPTIONAL, INT64, "DocId"),
+    new PrimitiveType(REQUIRED, BINARY, "Name"),
+    new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+    new GroupType(OPTIONAL, "Links",
+      new PrimitiveType(REPEATED, BINARY, "Backward"),
+      new PrimitiveType(REPEATED, BINARY, "Forward")));
+
+  private final ParquetProperties.WriterVersion writerVersion;
+
+  @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = 
{1}")
+  public static Object[] parameters() {
+    return new Object[] {"v1", "v2"};
+  }
+
+  public TestIndexCache(String writerVersion) {
+    this.writerVersion = 
ParquetProperties.WriterVersion.fromString(writerVersion);
+  }
+
+  @Test
+  public void testNoneCacheStrategy() throws IOException {
+    String file = createTestFile("DocID");
+
+    ParquetReadOptions options = ParquetReadOptions.builder().build();
+    ParquetFileReader fileReader = new ParquetFileReader(
+      new LocalInputFile(Paths.get(file)), options);
+    IndexCache indexCache = IndexCache.create(fileReader, new HashSet<>(), 
IndexCache.CacheStrategy.NONE);
+    Assert.assertTrue(indexCache instanceof NoneIndexCache);
+    List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
+    for (BlockMetaData blockMetaData : blocks) {
+      indexCache.setBlockMetadata(blockMetaData);
+      for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+        validateColumnIndex(fileReader.readColumnIndex(chunk), 
indexCache.getColumnIndex(chunk));
+        validateOffsetIndex(fileReader.readOffsetIndex(chunk), 
indexCache.getOffsetIndex(chunk));
+
+        Assert.assertEquals(
+          "BloomFilter should match",
+          fileReader.readBloomFilter(chunk),
+          indexCache.getBloomFilter(chunk));
+      }
+    }
+  }
+
+  @Test
+  public void testPrefetchCacheStrategy() throws IOException {
+    String file = createTestFile("DocID", "Name");
+
+    ParquetReadOptions options = ParquetReadOptions.builder().build();
+    ParquetFileReader fileReader = new ParquetFileReader(
+      new LocalInputFile(Paths.get(file)), options);
+    Set<ColumnPath> columns = new HashSet<>();
+    columns.add(ColumnPath.fromDotString("DocId"));
+    columns.add(ColumnPath.fromDotString("Name"));
+    columns.add(ColumnPath.fromDotString("Gender"));
+    columns.add(ColumnPath.fromDotString("Links.Backward"));
+    columns.add(ColumnPath.fromDotString("Links.Forward"));
+    IndexCache indexCache = IndexCache.create(fileReader, columns, 
IndexCache.CacheStrategy.PRECACHE_BLOCK);
+    Assert.assertTrue(indexCache instanceof PrefetchIndexCache);
+    List<BlockMetaData> blocks = fileReader.getFooter().getBlocks();
+    for (BlockMetaData blockMetaData : blocks) {
+      indexCache.setBlockMetadata(blockMetaData);
+      for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
+        validateColumnIndex(fileReader.readColumnIndex(chunk), 
indexCache.getColumnIndex(chunk));
+        validateOffsetIndex(fileReader.readOffsetIndex(chunk), 
indexCache.getOffsetIndex(chunk));
+
+        Assert.assertEquals(
+          "BloomFilter should match",
+          fileReader.readBloomFilter(chunk),
+          indexCache.getBloomFilter(chunk));
+
+        Assert.assertThrows(IllegalStateException.class, () -> 
indexCache.getColumnIndex(chunk));
+        Assert.assertThrows(IllegalStateException.class, () -> 
indexCache.getOffsetIndex(chunk));
+        if (columns.contains(chunk.getPath())) {
+          Assert.assertThrows(IllegalStateException.class, () -> 
indexCache.getBloomFilter(chunk));
+        }
+      }
+    }
+  }
+
+  private String createTestFile(String... bloomFilterEnabledColumns) throws 
IOException {
+    return new TestFileBuilder(conf, schema)
+      .withNumRecord(numRecords)
+      .withCodec("UNCOMPRESSED")
+      .withRowGroupSize(1024L)
+      .withBloomFilterEnabled(bloomFilterEnabledColumns)
+      .withWriterVersion(writerVersion)
+      .build()
+      .getFileName();
+  }
+
+  private void validateColumnIndex(ColumnIndex expected, ColumnIndex target) {
+    if (expected == null) {
+      Assert.assertEquals("ColumnIndex should should equal", expected, target);
+    } else {
+      Assert.assertNotNull("ColumnIndex should not be null", target);
+      Assert.assertEquals(expected.getClass(), target.getClass());
+      Assert.assertEquals(expected.getMinValues(), target.getMinValues());
+      Assert.assertEquals(expected.getMaxValues(), target.getMaxValues());
+      Assert.assertEquals(expected.getBoundaryOrder(), 
target.getBoundaryOrder());
+      Assert.assertEquals(expected.getNullCounts(), target.getNullCounts());
+      Assert.assertEquals(expected.getNullPages(), target.getNullPages());
+    }
+  }
+
+  private void validateOffsetIndex(OffsetIndex expected, OffsetIndex target) {

Review Comment:
   `OffsetIndex` has not customized `equals` too.





> Optimize random seek during rewriting
> -------------------------------------
>
>                 Key: PARQUET-2366
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2366
>             Project: Parquet
>          Issue Type: Improvement
>            Reporter: Xianyang Liu
>            Priority: Major
>
> The `ColunIndex`, `OffsetIndex`, and `BloomFilter` are stored at the end of 
> the file. We need to randomly seek 4 times when rewriting a column chunk. We 
> found this could impact the rewrite performance heavily for files with a 
> number of columns(~1000). In this PR, we read the `ColumnIndex`, 
> `OffsetIndex`, and `BloomFilter` into a cache to avoid the random seek. We 
> got about 60 times performance improvement in production environments for the 
> files with about one thousand columns.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to