This is an automated email from the ASF dual-hosted git repository.
shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new c7bff51 PARQUET-2117: Expose Row Index via ParquetReader and
ParquetRecordReader (#945)
c7bff51 is described below
commit c7bff519094920a8609df6cbd98821a43ed779e3
Author: Prakhar Jain <[email protected]>
AuthorDate: Sat Mar 19 17:00:24 2022 -0700
PARQUET-2117: Expose Row Index via ParquetReader and ParquetRecordReader
(#945)
* PARQUET-2117: Changes to generate row index in
InternalParquetRecordReader, also expose the row index via ParquetReader or
ParquetRecordReader
- Add and populate rowIndexOffset field in BlockMetaData
- Changes to generate row index in InternalParquetRecordReader, also
expose the row index via ParquetReader or ParquetRecordReader
- Add new unit tests and extend all the ColumnIndexFiltering and
BloomFiltering unit tests to validate row indexes also.
* address review comments
* add test based on old parquet file without column indexes
* address review comments - Return -1 when row index info not available,
document the same, Return -1 when rowIndexOffset info not available in
BlockMetadata
* address review comments - Fix java doc style
* address review comments from ggershinsky - early return and reduce
indentation
* fix build
---
.../apache/parquet/column/page/PageReadStore.java | 8 +
.../format/converter/ParquetMetadataConverter.java | 63 +++++--
.../parquet/hadoop/ColumnChunkPageReadStore.java | 18 +-
.../hadoop/InternalParquetRecordReader.java | 55 ++++++-
.../apache/parquet/hadoop/ParquetFileReader.java | 4 +-
.../org/apache/parquet/hadoop/ParquetReader.java | 10 ++
.../apache/parquet/hadoop/ParquetRecordReader.java | 7 +
.../parquet/hadoop/metadata/BlockMetaData.java | 19 ++-
.../filter2/recordlevel/PhoneBookWriter.java | 19 ++-
.../apache/parquet/hadoop/TestBloomFiltering.java | 2 +-
.../parquet/hadoop/TestColumnIndexFiltering.java | 4 +-
.../apache/parquet/hadoop/TestParquetReader.java | 181 +++++++++++++++++++++
.../test-file-with-no-column-indexes-1.parquet | Bin 0 -> 35855 bytes
13 files changed, 370 insertions(+), 20 deletions(-)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
index 753bda8..796cf17 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
@@ -44,6 +44,14 @@ public interface PageReadStore {
long getRowCount();
/**
+ * @return the optional of the long representing the row index offset of
this row group or an empty optional if the
+ * related data is not available
+ */
+ default Optional<Long> getRowIndexOffset() {
+ return Optional.empty();
+ }
+
+ /**
* Returns the indexes of the rows to be read/built if the related data is
available. All the rows which index is not
* returned shall be skipped.
*
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 96980a4..0ea75f3 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1400,6 +1400,31 @@ public class ParquetMetadataConverter {
return readParquetMetadata(from, filter, null, false, 0);
}
+ private Map<RowGroup, Long> generateRowGroupOffsets(FileMetaData metaData) {
+ Map<RowGroup, Long> rowGroupOrdinalToRowIdx = new HashMap<>();
+ List<RowGroup> rowGroups = metaData.getRow_groups();
+ if (rowGroups != null) {
+ long rowIdxSum = 0;
+ for (int i = 0; i < rowGroups.size(); i++) {
+ rowGroupOrdinalToRowIdx.put(rowGroups.get(i), rowIdxSum);
+ rowIdxSum += rowGroups.get(i).getNum_rows();
+ }
+ }
+ return rowGroupOrdinalToRowIdx;
+ }
+
+ /**
+ * A container for [[FileMetaData]] and [[RowGroup]] to ROW_INDEX offset map.
+ */
+ private class FileMetaDataAndRowGroupOffsetInfo {
+ final FileMetaData fileMetadata;
+ final Map<RowGroup, Long> rowGroupToRowIndexOffsetMap;
+ public FileMetaDataAndRowGroupOffsetInfo(FileMetaData fileMetadata,
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap) {
+ this.fileMetadata = fileMetadata;
+ this.rowGroupToRowIndexOffsetMap = rowGroupToRowIndexOffsetMap;
+ }
+ }
+
public ParquetMetadata readParquetMetadata(final InputStream from,
MetadataFilter filter,
final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter,
final int combinedFooterLength) throws IOException {
@@ -1407,27 +1432,35 @@ public class ParquetMetadataConverter {
final BlockCipher.Decryptor footerDecryptor = (encryptedFooter?
fileDecryptor.fetchFooterDecryptor() : null);
final byte[] encryptedFooterAAD = (encryptedFooter?
AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);
- FileMetaData fileMetaData = filter.accept(new
MetadataFilterVisitor<FileMetaData, IOException>() {
+ FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo =
filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo,
IOException>() {
@Override
- public FileMetaData visit(NoFilter filter) throws IOException {
- return readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws
IOException {
+ FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor,
encryptedFooterAAD);
+ return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata,
generateRowGroupOffsets(fileMetadata));
}
@Override
- public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
- return readFileMetaData(from, true, footerDecryptor,
encryptedFooterAAD);
+ public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter
filter) throws IOException {
+ FileMetaData fileMetadata = readFileMetaData(from, true,
footerDecryptor, encryptedFooterAAD);
+ return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata,
generateRowGroupOffsets(fileMetadata));
}
@Override
- public FileMetaData visit(OffsetMetadataFilter filter) throws
IOException {
- return filterFileMetaDataByStart(readFileMetaData(from,
footerDecryptor, encryptedFooterAAD), filter);
+ public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter
filter) throws IOException {
+ FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor,
encryptedFooterAAD);
+ FileMetaData filteredFileMetadata =
filterFileMetaDataByStart(fileMetadata, filter);
+ return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata,
generateRowGroupOffsets(fileMetadata));
}
@Override
- public FileMetaData visit(RangeMetadataFilter filter) throws IOException
{
- return filterFileMetaDataByMidpoint(readFileMetaData(from,
footerDecryptor, encryptedFooterAAD), filter);
+ public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter
filter) throws IOException {
+ FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor,
encryptedFooterAAD);
+ FileMetaData filteredFileMetadata =
filterFileMetaDataByMidpoint(fileMetadata, filter);
+ return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata,
generateRowGroupOffsets(fileMetadata));
}
});
+ FileMetaData fileMetaData = fileMetaDataAndRowGroupInfo.fileMetadata;
+ Map<RowGroup, Long> rowGroupToRowIndexOffsetMap =
fileMetaDataAndRowGroupInfo.rowGroupToRowIndexOffsetMap;
LOG.debug("{}", fileMetaData);
if (!encryptedFooter && null != fileDecryptor) {
@@ -1447,7 +1480,7 @@ public class ParquetMetadataConverter {
}
}
- ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData,
fileDecryptor, encryptedFooter);
+ ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData,
fileDecryptor, encryptedFooter, rowGroupToRowIndexOffsetMap);
if (LOG.isDebugEnabled())
LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
return parquetMetadata;
}
@@ -1476,6 +1509,13 @@ public class ParquetMetadataConverter {
public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws
IOException {
+ return fromParquetMetadata(parquetMetadata, fileDecryptor,
encryptedFooter, new HashMap<RowGroup, Long>());
+ }
+
+ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
+ InternalFileDecryptor
fileDecryptor,
+ boolean encryptedFooter,
+ Map<RowGroup, Long>
rowGroupToRowIndexOffsetMap) throws IOException {
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(),
parquetMetadata.getColumn_orders());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
@@ -1485,6 +1525,9 @@ public class ParquetMetadataConverter {
BlockMetaData blockMetaData = new BlockMetaData();
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+ if (rowGroupToRowIndexOffsetMap.containsKey(rowGroup)) {
+
blockMetaData.setRowIndexOffset(rowGroupToRowIndexOffsetMap.get(rowGroup));
+ }
// not set in legacy files
if (rowGroup.isSetOrdinal()) {
blockMetaData.setOrdinal(rowGroup.getOrdinal());
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 3d1bafe..85ba98c 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -248,15 +248,26 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
private final Map<ColumnDescriptor, ColumnChunkPageReader> readers = new
HashMap<ColumnDescriptor, ColumnChunkPageReader>();
private final long rowCount;
+ private final long rowIndexOffset;
private final RowRanges rowRanges;
public ColumnChunkPageReadStore(long rowCount) {
+ this(rowCount, -1);
+ }
+
+ ColumnChunkPageReadStore(RowRanges rowRanges) {
+ this(rowRanges, -1);
+ }
+
+ ColumnChunkPageReadStore(long rowCount, long rowIndexOffset) {
this.rowCount = rowCount;
+ this.rowIndexOffset = rowIndexOffset;
rowRanges = null;
}
- ColumnChunkPageReadStore(RowRanges rowRanges) {
+ ColumnChunkPageReadStore(RowRanges rowRanges, long rowIndexOffset) {
this.rowRanges = rowRanges;
+ this.rowIndexOffset = rowIndexOffset;
rowCount = rowRanges.rowCount();
}
@@ -266,6 +277,11 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
}
@Override
+ public Optional<Long> getRowIndexOffset() {
+ return rowIndexOffset < 0 ? Optional.empty() : Optional.of(rowIndexOffset);
+ }
+
+ @Override
public PageReader getPageReader(ColumnDescriptor path) {
final PageReader pageReader = readers.get(path);
if (pageReader == null) {
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index 8ffe19f..8203e90 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
+import java.util.Optional;
+import java.util.PrimitiveIterator;
import java.util.Set;
+import java.util.stream.LongStream;
import org.apache.hadoop.conf.Configuration;
@@ -69,6 +71,8 @@ class InternalParquetRecordReader<T> {
private long current = 0;
private int currentBlock = -1;
private ParquetFileReader reader;
+ private long currentRowIdx = -1;
+ private PrimitiveIterator.OfLong rowIdxInFileItr;
private org.apache.parquet.io.RecordReader<T> recordReader;
private boolean strictTypeChecking;
@@ -127,6 +131,7 @@ class InternalParquetRecordReader<T> {
if (pages == null) {
throw new IOException("expecting more rows but reached last block.
Read " + current + " out of " + total);
}
+ resetRowIndexIterator(pages);
long timeSpentReading = System.currentTimeMillis() - t0;
totalTimeSpentReadingBytes += timeSpentReading;
BenchmarkCounter.incrementTime(timeSpentReading);
@@ -227,6 +232,11 @@ class InternalParquetRecordReader<T> {
try {
currentValue = recordReader.read();
+ if (rowIdxInFileItr != null && rowIdxInFileItr.hasNext()) {
+ currentRowIdx = rowIdxInFileItr.next();
+ } else {
+ currentRowIdx = -1;
+ }
} catch (RecordMaterializationException e) {
// this might throw, but it's fatal if it does.
unmaterializableRecordCounter.incErrors(e);
@@ -265,4 +275,47 @@ class InternalParquetRecordReader<T> {
return Collections.unmodifiableMap(setMultiMap);
}
+ /**
+ * Returns the row index of the current row. If no row has been processed or
if the
+ * row index information is unavailable from the underlying @{@link
PageReadStore}, returns -1.
+ */
+ public long getCurrentRowIndex() {
+ if (current == 0L || rowIdxInFileItr == null) {
+ return -1;
+ }
+ return currentRowIdx;
+ }
+
+ /**
+ * Resets the row index iterator based on the current processed row group.
+ */
+ private void resetRowIndexIterator(PageReadStore pages) {
+ Optional<Long> rowGroupRowIdxOffset = pages.getRowIndexOffset();
+ if (!rowGroupRowIdxOffset.isPresent()) {
+ this.rowIdxInFileItr = null;
+ return;
+ }
+
+ currentRowIdx = -1;
+ final PrimitiveIterator.OfLong rowIdxInRowGroupItr;
+ if (pages.getRowIndexes().isPresent()) {
+ rowIdxInRowGroupItr = pages.getRowIndexes().get();
+ } else {
+ rowIdxInRowGroupItr = LongStream.range(0,
pages.getRowCount()).iterator();
+ }
+ // Adjust the row group offset in the `rowIndexWithinRowGroupIterator`
iterator.
+ this.rowIdxInFileItr = new PrimitiveIterator.OfLong() {
+ public long nextLong() {
+ return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.nextLong();
+ }
+
+ public boolean hasNext() {
+ return rowIdxInRowGroupItr.hasNext();
+ }
+
+ public Long next() {
+ return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.next();
+ }
+ };
+ }
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 63a22d1..97fe86d 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -929,7 +929,7 @@ public class ParquetFileReader implements Closeable {
if (block.getRowCount() == 0) {
throw new RuntimeException("Illegal row group of 0 rows");
}
- ColumnChunkPageReadStore rowGroup = new
ColumnChunkPageReadStore(block.getRowCount());
+ ColumnChunkPageReadStore rowGroup = new
ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
ConsecutivePartList currentParts = null;
@@ -1044,7 +1044,7 @@ public class ParquetFileReader implements Closeable {
}
private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData
block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException {
- ColumnChunkPageReadStore rowGroup = new
ColumnChunkPageReadStore(rowRanges);
+ ColumnChunkPageReadStore rowGroup = new
ColumnChunkPageReadStore(rowRanges, block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
List<ConsecutivePartList> allParts = new ArrayList<>();
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index c215f5e..6d76723 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -140,6 +140,16 @@ public class ParquetReader<T> implements Closeable {
}
}
+ /**
+ * @return the row index of the last read row. If no row has been processed,
returns -1.
+ */
+ public long getCurrentRowIndex() {
+ if (reader == null) {
+ return -1;
+ }
+ return reader.getCurrentRowIndex();
+ }
+
private void initReader() throws IOException {
if (reader != null) {
reader.close();
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index 4653410..e46ccdd 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -207,6 +207,13 @@ public class ParquetRecordReader<T> extends
RecordReader<Void, T> {
return internalReader.nextKeyValue();
}
+ /**
+ * @return the row index of the current row. If no row has been processed,
returns -1.
+ */
+ public long getCurrentRowIndex() throws IOException {
+ return internalReader.getCurrentRowIndex();
+ }
+
private ParquetInputSplit toParquetSplit(InputSplit split) throws
IOException {
if (split instanceof ParquetInputSplit) {
return (ParquetInputSplit) split;
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
index ce204dc..4f9fd14 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
@@ -33,6 +33,7 @@ public class BlockMetaData {
private long totalByteSize;
private String path;
private int ordinal;
+ private long rowIndexOffset = -1;
public BlockMetaData() {
}
@@ -66,6 +67,18 @@ public class BlockMetaData {
}
/**
+ * @return -1 if the rowIndexOffset for the {@link BlockMetaData} is
unavailable else returns the actual rowIndexOffset
+ */
+ public long getRowIndexOffset() { return rowIndexOffset; }
+
+ /**
+ * @param rowIndexOffset the rowIndexOffset to set
+ */
+ public void setRowIndexOffset(long rowIndexOffset) {
+ this.rowIndexOffset = rowIndexOffset;
+ }
+
+ /**
* @return the totalByteSize
*/
public long getTotalByteSize() {
@@ -105,7 +118,11 @@ public class BlockMetaData {
@Override
public String toString() {
- return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns
+ "}";
+ String rowIndexOffsetStr = "";
+ if (rowIndexOffset != -1) {
+ rowIndexOffsetStr = ", rowIndexOffset = " + rowIndexOffset;
+ }
+ return "BlockMetaData{" + rowCount + ", " + totalByteSize +
rowIndexOffsetStr + " " + columns + "}";
}
/**
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
index 6355f35..1e74353 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -18,6 +18,8 @@
*/
package org.apache.parquet.filter2.recordlevel;
+import static org.junit.Assert.assertEquals;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -315,7 +317,7 @@ public class PhoneBookWriter {
}
}
- private static ParquetReader<Group> createReader(Path file, Filter filter)
throws IOException {
+ public static ParquetReader<Group> createReader(Path file, Filter filter)
throws IOException {
Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);
@@ -341,11 +343,24 @@ public class PhoneBookWriter {
}
public static List<User> readUsers(ParquetReader.Builder<Group> builder)
throws IOException {
+ return readUsers(builder, false);
+ }
+
+ /**
+ * Returns a list of users from the underlying [[ParquetReader]] builder.
+ * If `validateRowIndexes` is set to true, this method will also validate
the ROW_INDEXes for the
+ * rows read from ParquetReader - ROW_INDEX for a row should be same as
underlying user id.
+ */
+ public static List<User> readUsers(ParquetReader.Builder<Group> builder,
boolean validateRowIndexes) throws IOException {
ParquetReader<Group> reader =
builder.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA,
schema.toString()).build();
List<User> users = new ArrayList<>();
for (Group group = reader.read(); group != null; group = reader.read()) {
- users.add(userFromGroup(group));
+ User u = userFromGroup(group);
+ users.add(u);
+ if (validateRowIndexes) {
+ assertEquals(reader.getCurrentRowIndex(), u.id);
+ }
}
return users;
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
index b07fccd..68a4e34 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
@@ -200,7 +200,7 @@ public class TestBloomFiltering {
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
.useBloomFilter(useBloomFilter)
- .useColumnIndexFilter(useOtherFiltering));
+ .useColumnIndexFilter(useOtherFiltering), true);
}
// Assumes that both lists are in the same order
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
index 5e18105..0678cbf 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -248,7 +248,7 @@ public class TestColumnIndexFiltering {
.useDictionaryFilter(useOtherFiltering)
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
- .useColumnIndexFilter(useColumnIndexFilter));
+ .useColumnIndexFilter(useColumnIndexFilter), true);
}
private List<User> readUsersWithProjection(Filter filter, MessageType
schema, boolean useOtherFiltering,
@@ -261,7 +261,7 @@ public class TestColumnIndexFiltering {
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
.useColumnIndexFilter(useColumnIndexFilter)
- .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
+ .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()), true);
}
private FileDecryptionProperties getFileDecryptionProperties() {
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java
new file mode 100644
index 0000000..86f14a8
--- /dev/null
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestParquetReader {
+
+ private static final Path FILE_V1 = createTempFile();
+ private static final Path FILE_V2 = createTempFile();
+ private static final Path STATIC_FILE_WITHOUT_COL_INDEXES =
createPathFromCP("/test-file-with-no-column-indexes-1.parquet");
+ private static final List<PhoneBookWriter.User> DATA =
Collections.unmodifiableList(makeUsers(1000));
+
+ private final Path file;
+
+ private static Path createPathFromCP(String path) {
+ try {
+ return new Path(TestParquetReader.class.getResource(path).toURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public TestParquetReader(Path file) {
+ this.file = file;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ Object[][] data = new Object[][] {
+ { FILE_V1 },
+ { FILE_V2 },
+ { STATIC_FILE_WITHOUT_COL_INDEXES } };
+ return Arrays.asList(data);
+ }
+
+ @BeforeClass
+ public static void createFiles() throws IOException {
+ writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0);
+ writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+
+ @AfterClass
+ public static void deleteFiles() throws IOException {
+ deleteFile(FILE_V1);
+ deleteFile(FILE_V2);
+ }
+
+ private static void deleteFile(Path file) throws IOException {
+ file.getFileSystem(new Configuration()).delete(file, false);
+ }
+
+ public static List<PhoneBookWriter.User> makeUsers(int rowCount) {
+ List<PhoneBookWriter.User> users = new ArrayList<>();
+ for (int i = 0; i < rowCount; i++) {
+ PhoneBookWriter.Location location = null;
+ if (i % 3 == 1) {
+ location = new PhoneBookWriter.Location((double)i, (double) i * 2);
+ }
+ if (i % 3 == 2) {
+ location = new PhoneBookWriter.Location((double)i, null);
+ }
+ // row index of each row in the file is same as the user id.
+ users.add(new PhoneBookWriter.User(i, "p" + i, Arrays.asList(new
PhoneBookWriter.PhoneNumber(i, "cell")), location));
+ }
+ return users;
+ }
+
+ private static Path createTempFile() {
+ try {
+ return new Path(Files.createTempFile("test-ci_",
".parquet").toAbsolutePath().toString());
+ } catch (IOException e) {
+ throw new AssertionError("Unable to create temporary file", e);
+ }
+ }
+
+ private static void writePhoneBookToFile(Path file,
ParquetProperties.WriterVersion parquetVersion) throws IOException {
+ int pageSize = DATA.size() / 10; // Ensure that several pages will be
created
+ int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more
row-groups created
+
+ PhoneBookWriter.write(ExampleParquetWriter.builder(file)
+ .withWriteMode(OVERWRITE)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withWriterVersion(parquetVersion),
+ DATA);
+ }
+
+ private List<PhoneBookWriter.User> readUsers(FilterCompat.Filter filter,
boolean useOtherFiltering, boolean useColumnIndexFilter)
+ throws IOException {
+ return PhoneBookWriter.readUsers(ParquetReader.builder(new
GroupReadSupport(), file)
+ .withFilter(filter)
+ .useDictionaryFilter(useOtherFiltering)
+ .useStatsFilter(useOtherFiltering)
+ .useRecordFilter(useOtherFiltering)
+ .useColumnIndexFilter(useColumnIndexFilter), true);
+ }
+
+ @Test
+ public void testCurrentRowIndex() throws Exception {
+ ParquetReader<Group> reader = PhoneBookWriter.createReader(file,
FilterCompat.NOOP);
+ // Fetch row index without processing any row.
+ assertEquals(reader.getCurrentRowIndex(), -1);
+ reader.read();
+ assertEquals(reader.getCurrentRowIndex(), 0);
+ // calling the same API again and again should return same result.
+ assertEquals(reader.getCurrentRowIndex(), 0);
+
+ reader.read();
+ assertEquals(reader.getCurrentRowIndex(), 1);
+ assertEquals(reader.getCurrentRowIndex(), 1);
+ long expectedCurrentRowIndex = 2L;
+ while(reader.read() != null) {
+ assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex);
+ expectedCurrentRowIndex++;
+ }
+ // reader.read() returned null and so reader doesn't have any more rows.
+ assertEquals(reader.getCurrentRowIndex(), -1);
+ }
+
+ @Test
+ public void testSimpleFiltering() throws Exception {
+ Set<Long> idSet = new HashSet<>();
+ idSet.add(123l);
+ idSet.add(567l);
+ // The readUsers also validates the rowIndex for each returned row.
+ List<PhoneBookWriter.User> filteredUsers1 =
readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, true);
+ assertEquals(filteredUsers1.size(), 2L);
+ List<PhoneBookWriter.User> filteredUsers2 =
readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, false);
+ assertEquals(filteredUsers2.size(), 2L);
+ List<PhoneBookWriter.User> filteredUsers3 =
readUsers(FilterCompat.get(in(longColumn("id"), idSet)), false, false);
+ assertEquals(filteredUsers3.size(), 1000L);
+ }
+
+ @Test
+ public void testNoFiltering() throws Exception {
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, false, false));
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, true, false));
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, false, true));
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, true, true));
+ }
+}
diff --git
a/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet
b/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet
new file mode 100644
index 0000000..722e687
Binary files /dev/null and
b/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet
differ