vinothchandar commented on code in PR #9564: URL: https://github.com/apache/hudi/pull/9564#discussion_r1322267897
########## hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetKeyedLookupReader.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.metadata.parquet.ParquetFileMetadataLoader; +import org.apache.hudi.metadata.parquet.RowGroup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +/** + * Implements an efficient lookup for a key in a Parquet file, by using page level statistics, and bloom filters. + * Parquet file is expected to have two columns : + * 1. `key` binary column, which is the column to be used for lookup + * 2. `value` binary column, which is the column to be returned as a result of lookup + * + * as well as being sorted by `key` column. + * + * Known limitations: + * 1) Does not do bloom filter based skipping of row groups. + */ +public class HoodieParquetKeyedLookupReader { + private static Logger LOG = LoggerFactory.getLogger(HoodieParquetKeyedLookupReader.class); + private static String KEY = "key"; + private static String VALUE = "value"; + private final InputFile parquetFile; + private final Configuration conf; + private final ParquetFileMetadataLoader metadataLoader; + private final CompressionCodecFactory codecFactory; + private final ParquetMetadataConverter converter; + + public HoodieParquetKeyedLookupReader(Configuration conf, InputFile parquetFile) throws Exception { + this.conf = conf; + this.parquetFile = parquetFile; + this.metadataLoader = new ParquetFileMetadataLoader( + parquetFile, ParquetFileMetadataLoader.Options.builder().enableLoadBloomFilters().build()); + this.codecFactory = HadoopCodecs.newFactory(0); + this.converter = new ParquetMetadataConverter(); + + metadataLoader.load(); + } + + public Map<String, Option<String>> lookup(SortedSet<String> keys) throws Exception { + Map<String, Option<String>> keyToValue = new HashMap<>(); + try (CompressionConverter.TransParquetFileReader reader = new CompressionConverter.TransParquetFileReader( + parquetFile, HadoopReadOptions.builder(conf).build())) { + Map<String, String> matchingRecords = getMatchingRecords(reader, new LinkedList<>(keys)); + for (String key: keys) { + if (matchingRecords.containsKey(key)) { + keyToValue.put(key, Option.of(matchingRecords.get(key))); + } else { + keyToValue.put(key, Option.empty()); + } + } + } + return keyToValue; + } + + private Map<String, String> getMatchingRecords(CompressionConverter.TransParquetFileReader reader, + Queue<String> keys) throws Exception { + Map<String, String> keyToValue = new HashMap<>(); + for (RowGroup rowGroup : metadataLoader.getRowGroups()) { + final int keyColNumber = searchColumn(rowGroup, KEY); + if (keyColNumber < 0) { + throw new IllegalArgumentException("Cannot find key column in schema."); + } + + // Skip row group using bloom filter if possible. + if (shouldSkip(rowGroup, keyColNumber, new LinkedList<>(keys))) { + continue; + } + + ArrayList<Pair<String, Long>> keyPositions = lookupKeyColumnChunk(reader, rowGroup, keyColNumber, keys); + // Now fetch the respective values out. + final int valueColNumber = searchColumn(rowGroup, VALUE); + if (valueColNumber < 0) { + throw new IllegalArgumentException("Cannot find value column in schema."); + } + if (!keyPositions.isEmpty()) { + keyToValue.putAll(fetchFromValueColumnChunk(reader, rowGroup, valueColNumber, keyPositions)); + } + LOG.info("Done with rowGroup"); Review Comment: move to lower logging level like Debug. Fix logging across the board. IDK if we need anything beyond 1-2 lines in INFO. ########## hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetKeyedLookupReader.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.metadata.parquet.ParquetFileMetadataLoader; +import org.apache.hudi.metadata.parquet.RowGroup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +/** + * Implements an efficient lookup for a key in a Parquet file, by using page level statistics, and bloom filters. + * Parquet file is expected to have two columns : + * 1. `key` binary column, which is the column to be used for lookup + * 2. `value` binary column, which is the column to be returned as a result of lookup + * + * as well as being sorted by `key` column. + * + * Known limitations: + * 1) Does not do bloom filter based skipping of row groups. Review Comment: fix this? given its addressed below? and any other comments ########## hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieParquetKeyedLookupReader.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.metadata.parquet.ParquetFileMetadataLoader; +import org.apache.hudi.metadata.parquet.ParquetMetadataFileReader; +import org.apache.hudi.metadata.parquet.RowGroup; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.text.ParseException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; + +public class TestHoodieParquetKeyedLookupReader extends HoodieCommonTestHarness { + private static final long BLOOM_FILTER_HEADER_SIZE_GUESS = 1024; // 1KB + private static final long BLOOM_FILTER_SIZE_GUESS = 1024 * 1024; // 1 MB + private ParquetMetadataFileReader fileReader; + private final String fileName = genParquetFileName(); + + private static String genParquetFileName() { + return UUID.randomUUID().toString().replace("-", ""); + } + + @BeforeEach + public void generateParquetFile() throws IOException, ParseException { Review Comment: please use existing TestUtils methods to generate parquet. ########## hudi-common/src/main/java/org/apache/hudi/metadata/parquet/ParquetFileMetadataLoader.java: ########## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.parquet; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +/** + * Loads metadata of a Parquet file into memory, which includes + * (1) the Parquet file metadata from footer, + * (2) the column index metadata for all row groups and column chunks, + * (3) the page index metadata for all row groups and column chunks, + * (4) the bloom headers for all row groups and column chunks, + * (5) Optionally the bloom filters for all row groups and column chunks. + */ +public class ParquetFileMetadataLoader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetFileMetadataLoader.class); + private static final long BLOOM_FILTER_HEADER_SIZE_GUESS = 1024; // 1KB + private static final long BLOOM_FILTER_SIZE_GUESS = 1024 * 1024; // 1 MB + + private final ParquetMetadataFileReader metadataFileReader; + private final Options options; + private final List<RowGroup> rowGroups; + private FileMetaData fileMetaData; + + public ParquetFileMetadataLoader(InputFile file, Options options) throws IOException { + this.metadataFileReader = new ParquetMetadataFileReader(file, ParquetReadOptions.builder().build()); + this.options = options; + this.rowGroups = new ArrayList<>(); + } + + public Options getOptions() { + return options; + } + + public FileMetaData getFileMetaData() { + return fileMetaData; + } + + public List<RowGroup> getRowGroups() { + return rowGroups; + } + + public Iterator<RowGroup> rowGroups() { + return rowGroups.iterator(); + } + + public void load() { + if (fileMetaData == null) { + loadFileMetaData(); + LOG.debug("Loaded file level metadata from footer"); + } + + if (rowGroups.isEmpty()) { + loadRowGroupMetaData(); + LOG.debug("Loaded file level metadata from footer"); + } + + // TODO: see if we can tolerate missing indexing metadata. + try { + loadIndexMetadata(); + LOG.debug( + "Successfully loaded the metadata " + + "(bloom filters, page index and column index)" + + "for all column chunks"); + } catch (IOException e) { + throw new ParquetDecodingException("Unable to read the index metadata", e); + } + } + + private void loadFileMetaData() { + fileMetaData = metadataFileReader.getFileMetaData(); + } + + private void loadRowGroupMetaData() { + metadataFileReader.getRowGroups().forEach(e -> rowGroups.add(new RowGroup(e))); + } + + private void loadIndexMetadata() throws IOException { + List<ContinuousRange> ranges = new ArrayList<>(); + for (RowGroup rowGroup : rowGroups) { + ranges = generateRangesForRowGroup(rowGroup, ranges); + } + + for (ContinuousRange range : ranges) { + addIndexMetadataToRowGroups(range); + } + } + + private List<ContinuousRange> generateRangesForRowGroup(RowGroup rowGroup, List<ContinuousRange> ranges) { + BlockMetaData blockMetaData = rowGroup.getBlockMetaData(); + List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns(); + int rowGroupId = rowGroups.indexOf(rowGroup); + for (int chunkId = 0; chunkId < columnsInOrder.size(); ++chunkId) { + ColumnChunkMetaData chunk = columnsInOrder.get(chunkId); + ranges = generateRangesForColumn(chunk, ranges, chunkId, rowGroupId); + } + return ranges; + } + + private List<ContinuousRange> generateRangesForColumn( + ColumnChunkMetaData chunk, List<ContinuousRange> ranges, int chunkId, int rowGroupId) { + ranges = generateRangeForColumnIndex(chunk, ranges, chunkId, rowGroupId); + ranges = generateRangeForOffSetIndex(chunk, ranges, chunkId, rowGroupId); + ranges = generateRangeForBloomFilterIndex(chunk, ranges, chunkId, rowGroupId); + return ranges; + } + + private List<ContinuousRange> generateRangeForColumnIndex( + ColumnChunkMetaData chunk, List<ContinuousRange> ranges, int chunkId, int rowGroupId) { + IndexReference ref = chunk.getColumnIndexReference(); + if (ref != null) { + ColumnId columnId = new ColumnId(rowGroupId, chunkId, ColumnId.ChunkType.COLUMN_INDEX); + ranges = ContinuousRange.insertAndMergeRanges(ranges, columnId, ref.getOffset(), ref.getOffset() + ref.getLength()); + } + return ranges; + } + + private List<ContinuousRange> generateRangeForOffSetIndex( + ColumnChunkMetaData chunk, List<ContinuousRange> ranges, int chunkId, int rowGroupId) { + IndexReference ref = chunk.getOffsetIndexReference(); + if (ref != null) { + ColumnId columnId = new ColumnId(rowGroupId, chunkId, ColumnId.ChunkType.PAGE_INDEX); + ranges = ContinuousRange.insertAndMergeRanges(ranges, columnId, ref.getOffset(), ref.getOffset() + ref.getLength()); + } + return ranges; + } + + private List<ContinuousRange> generateRangeForBloomFilterIndex( + ColumnChunkMetaData chunk, List<ContinuousRange> ranges, int chunkId, int rowGroupId) { + long bloomFilterOffset = chunk.getBloomFilterOffset(); + if (options.isLoadBloomFiltersEnabled() && bloomFilterOffset >= 0) { + ColumnId columnId = new ColumnId(rowGroupId, chunkId, ColumnId.ChunkType.BLOOM_FILTER); + ranges = ContinuousRange.insertAndMergeRanges( + ranges, columnId, bloomFilterOffset, bloomFilterOffset + BLOOM_FILTER_HEADER_SIZE_GUESS + BLOOM_FILTER_SIZE_GUESS); + } + return ranges; + } + + private void addIndexMetadataToRowGroups(ContinuousRange range) throws IOException { + // Overflow can happen due to overestimated bloom filter size. + range.endOffset = Math.min(range.endOffset, metadataFileReader.getInputFile().getLength()); + if (range.endOffset < range.startOffset) { + throw new IllegalStateException("Metadata range has higher startOffset " + range.startOffset + " than endOffset " + range.endOffset); + } + + ByteBuffer metadataCache; + metadataFileReader.setStreamPosition(range.startOffset); + metadataCache = ByteBuffer.allocate((int) (range.endOffset - range.startOffset)); + metadataFileReader.blockRead(metadataCache); + metadataCache.flip(); + + try (SeekableInputStream indexBytesStream = HeapSeekableInputStream.wrap(metadataCache.array())) { Review Comment: are we reading the entire file by default? If so, is that warranted? ########## hudi-common/src/main/java/org/apache/hudi/metadata/parquet/ByteBufferBackedInputStream.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.parquet; + +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * Instance of {@link InputStream} backed by {@link ByteBuffer}, implementing following + * functionality (on top of what's required by {@link InputStream}) + * + * <ol> + * <li>Seeking: enables random access by allowing to seek to an arbitrary position w/in the stream</li> + * <li>(Thread-safe) Copying: enables to copy from the underlying buffer not modifying the state of the stream</li> + * </ol> + * + * NOTE: Generally methods of this class are NOT thread-safe, unless specified otherwise Review Comment: lets UT this class. ########## hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetKeyedLookupReader.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.metadata.parquet.ParquetFileMetadataLoader; +import org.apache.hudi.metadata.parquet.RowGroup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +/** + * Implements an efficient lookup for a key in a Parquet file, by using page level statistics, and bloom filters. + * Parquet file is expected to have two columns : + * 1. `key` binary column, which is the column to be used for lookup + * 2. `value` binary column, which is the column to be returned as a result of lookup + * + * as well as being sorted by `key` column. + * + * Known limitations: + * 1) Does not do bloom filter based skipping of row groups. + */ +public class HoodieParquetKeyedLookupReader { + private static Logger LOG = LoggerFactory.getLogger(HoodieParquetKeyedLookupReader.class); + private static String KEY = "key"; + private static String VALUE = "value"; + private final InputFile parquetFile; Review Comment: nit: new line. ########## hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetKeyedLookupReader.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.metadata.parquet.ParquetFileMetadataLoader; +import org.apache.hudi.metadata.parquet.RowGroup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +/** + * Implements an efficient lookup for a key in a Parquet file, by using page level statistics, and bloom filters. + * Parquet file is expected to have two columns : + * 1. `key` binary column, which is the column to be used for lookup + * 2. `value` binary column, which is the column to be returned as a result of lookup + * + * as well as being sorted by `key` column. + * + * Known limitations: + * 1) Does not do bloom filter based skipping of row groups. + */ +public class HoodieParquetKeyedLookupReader { + private static Logger LOG = LoggerFactory.getLogger(HoodieParquetKeyedLookupReader.class); + private static String KEY = "key"; + private static String VALUE = "value"; + private final InputFile parquetFile; + private final Configuration conf; + private final ParquetFileMetadataLoader metadataLoader; + private final CompressionCodecFactory codecFactory; + private final ParquetMetadataConverter converter; + + public HoodieParquetKeyedLookupReader(Configuration conf, InputFile parquetFile) throws Exception { + this.conf = conf; + this.parquetFile = parquetFile; + this.metadataLoader = new ParquetFileMetadataLoader( + parquetFile, ParquetFileMetadataLoader.Options.builder().enableLoadBloomFilters().build()); + this.codecFactory = HadoopCodecs.newFactory(0); + this.converter = new ParquetMetadataConverter(); + + metadataLoader.load(); + } + + public Map<String, Option<String>> lookup(SortedSet<String> keys) throws Exception { + Map<String, Option<String>> keyToValue = new HashMap<>(); + try (CompressionConverter.TransParquetFileReader reader = new CompressionConverter.TransParquetFileReader( + parquetFile, HadoopReadOptions.builder(conf).build())) { + Map<String, String> matchingRecords = getMatchingRecords(reader, new LinkedList<>(keys)); + for (String key: keys) { + if (matchingRecords.containsKey(key)) { + keyToValue.put(key, Option.of(matchingRecords.get(key))); + } else { + keyToValue.put(key, Option.empty()); + } + } + } + return keyToValue; + } + + private Map<String, String> getMatchingRecords(CompressionConverter.TransParquetFileReader reader, + Queue<String> keys) throws Exception { + Map<String, String> keyToValue = new HashMap<>(); + for (RowGroup rowGroup : metadataLoader.getRowGroups()) { + final int keyColNumber = searchColumn(rowGroup, KEY); + if (keyColNumber < 0) { + throw new IllegalArgumentException("Cannot find key column in schema."); + } + + // Skip row group using bloom filter if possible. + if (shouldSkip(rowGroup, keyColNumber, new LinkedList<>(keys))) { + continue; + } + + ArrayList<Pair<String, Long>> keyPositions = lookupKeyColumnChunk(reader, rowGroup, keyColNumber, keys); + // Now fetch the respective values out. + final int valueColNumber = searchColumn(rowGroup, VALUE); + if (valueColNumber < 0) { + throw new IllegalArgumentException("Cannot find value column in schema."); + } + if (!keyPositions.isEmpty()) { + keyToValue.putAll(fetchFromValueColumnChunk(reader, rowGroup, valueColNumber, keyPositions)); + } + LOG.info("Done with rowGroup"); + } + return keyToValue; + } + + public static boolean shouldSkip(RowGroup rowGroup, int colNumber, Queue<String> keys) { + if (rowGroup.getBloomFilters().isEmpty()) { + return false; + } + + BloomFilter bloomFilter = null; + try { + bloomFilter = rowGroup.getBloomFilters().get(colNumber); + } catch (Exception e) { + LOG.warn("Can not load the bloom filter correctly.", e); + return false; + } + + // Bloom filter is not found for this column; can not skip. + if (bloomFilter == null) { + return false; + } + + while (!keys.isEmpty()) { + String key = keys.poll(); + Binary binary = Binary.fromString(key); + long hash = bloomFilter.hash(binary); + // At least one key is found; can not skip. + if (bloomFilter.findHash(hash)) { + return false; + } + } + + // No keys are found in the filter; skip. + return true; + } + + private int searchColumn(RowGroup rowGroup, String columnDotPath) { + final int numColumns = rowGroup.getBlockMetaData().getColumns().size(); + for (int i = 0; i < numColumns; i++) { + if (rowGroup.getBlockMetaData().getColumns().get(i).getPath().toDotString().equals(columnDotPath)) { + return i; + } + } + return -1; + } + + private ArrayList<Pair<String, Long>> lookupKeyColumnChunk(CompressionConverter.TransParquetFileReader reader, + RowGroup rowGroup, + int colNumber, + Queue<String> keys) throws IOException { + ColumnChunkMetaData chunkMetaData = rowGroup.getBlockMetaData().getColumns().get(colNumber); + ColumnIndex columnIndex = rowGroup.getColumnIndices().get(colNumber); + OffsetIndex pageLocation = rowGroup.getPageLocations().get(colNumber); + CompressionCodecFactory.BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunkMetaData.getCodec()); + ColumnPath columnPath = rowGroup.getBlockMetaData().getColumns().get(colNumber).getPath(); + ColumnDescriptor columnDescriptor = metadataLoader.getFileMetaData().getSchema().getColumnDescription(columnPath.toArray()); + PrimitiveType type = metadataLoader.getFileMetaData().getSchema().getType(columnDescriptor.getPath()).asPrimitiveType(); + + AtomicInteger pageCounter = new AtomicInteger(0); + int totalPages = pageLocation.getPageCount(); + ArrayList<Pair<String, Long>> keyAndPositions = new ArrayList<>(); + for (int pageIndex = 0; pageIndex < totalPages; pageIndex++) { + String pageMinKey = new String(columnIndex.getMinValues().get(pageIndex).array(), StandardCharsets.UTF_8); + String pageMaxKey = new String(columnIndex.getMaxValues().get(pageIndex).array(), StandardCharsets.UTF_8); + + while (keys.peek() != null && keys.peek().compareTo(pageMinKey) < 0) { Review Comment: can we bring back some of the comments I originally had. It will help readers down the line, maintain the code. ########## hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetKeyedLookupReader.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.metadata.parquet.ParquetFileMetadataLoader; +import org.apache.hudi.metadata.parquet.RowGroup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +/** + * Implements an efficient lookup for a key in a Parquet file, by using page level statistics, and bloom filters. + * Parquet file is expected to have two columns : + * 1. `key` binary column, which is the column to be used for lookup + * 2. `value` binary column, which is the column to be returned as a result of lookup + * + * as well as being sorted by `key` column. + * + * Known limitations: + * 1) Does not do bloom filter based skipping of row groups. + */ +public class HoodieParquetKeyedLookupReader { + private static Logger LOG = LoggerFactory.getLogger(HoodieParquetKeyedLookupReader.class); + private static String KEY = "key"; + private static String VALUE = "value"; + private final InputFile parquetFile; + private final Configuration conf; + private final ParquetFileMetadataLoader metadataLoader; + private final CompressionCodecFactory codecFactory; + private final ParquetMetadataConverter converter; + + public HoodieParquetKeyedLookupReader(Configuration conf, InputFile parquetFile) throws Exception { + this.conf = conf; + this.parquetFile = parquetFile; + this.metadataLoader = new ParquetFileMetadataLoader( + parquetFile, ParquetFileMetadataLoader.Options.builder().enableLoadBloomFilters().build()); + this.codecFactory = HadoopCodecs.newFactory(0); + this.converter = new ParquetMetadataConverter(); + + metadataLoader.load(); + } + + public Map<String, Option<String>> lookup(SortedSet<String> keys) throws Exception { Review Comment: javadocs ########## hudi-common/src/main/java/org/apache/hudi/metadata/parquet/ParquetFileMetadataLoader.java: ########## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.parquet; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +/** + * Loads metadata of a Parquet file into memory, which includes + * (1) the Parquet file metadata from footer, + * (2) the column index metadata for all row groups and column chunks, + * (3) the page index metadata for all row groups and column chunks, + * (4) the bloom headers for all row groups and column chunks, + * (5) Optionally the bloom filters for all row groups and column chunks. + */ +public class ParquetFileMetadataLoader { Review Comment: lets UT this class thoroughly. ########## hudi-common/src/main/java/org/apache/hudi/metadata/parquet/ByteBufferBackedInputStream.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.parquet; Review Comment: idk if all these belong in the `metadata` package, since these are just io classes. Move both InputStream impls to ...hudi.io. ? ########## hudi-common/src/main/java/org/apache/hudi/metadata/parquet/HeapSeekableInputStream.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.parquet; + +import org.apache.parquet.io.DelegatingSeekableInputStream; + +import java.io.IOException; + +public class HeapSeekableInputStream extends DelegatingSeekableInputStream { Review Comment: javadocs. ########## hudi-common/src/test/java/org/apache/hudi/metadata/parquet/TestParquetMetadataFileReader.java: ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.parquet; + +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.io.SeekableInputStream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.ParseException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class TestParquetMetadataFileReader extends HoodieCommonTestHarness { + private static final long BLOOM_FILTER_HEADER_SIZE_GUESS = 1024; // 1KB + private static final long BLOOM_FILTER_SIZE_GUESS = 1024 * 1024; // 1 MB + private ParquetMetadataFileReader fileReader; + private final String fileName = genParquetFileName(); + + private static String genParquetFileName() { + return UUID.randomUUID().toString().replace("-", ""); + } + + @BeforeEach + public void generateParquetFile() throws IOException, ParseException { + initPath(); + initTestDataGenerator(); + final ParquetWriter<GenericRecord> writer = AvroParquetWriter + .<GenericRecord>builder(HadoopOutputFile.fromPath( + new Path(basePath, fileName), + new Configuration())) + .withSchema(AVRO_SCHEMA) + .withCompressionCodec(CompressionCodecName.GZIP) + .withBloomFilterEnabled("key", true) + .withPageSize(1024 * 1024) + .withRowGroupSize(10 * 1024 * 1024) + .enableDictionaryEncoding() + .build(); + + long startTime = HoodieActiveTimeline.parseDateFromInstantTime("20230828000000").getTime() / 1000; + List<GenericRecord> records = new ArrayList<GenericRecord>(); + for (long recordNum = 0; recordNum < 96; recordNum++) { + records.add(new HoodieTestDataGenerator(Instant.now().toEpochMilli()).generateGenericRecord( + Long.toString(recordNum), + "0", + "rider-" + recordNum, + "driver-" + recordNum, + startTime + TimeUnit.HOURS.toSeconds(recordNum))); + } + for (GenericRecord record : records) { + writer.write(record); + } + writer.close(); + } + + @Test + public void testReadColumnIndex() throws IOException { + try (ParquetMetadataFileReader fileReader = new ParquetMetadataFileReader( + HadoopInputFile.fromPath(new Path(basePath, fileName), new Configuration()), + ParquetReadOptions.builder().build())) { + List<BlockMetaData> rowGroups = fileReader.getRowGroups(); + for (BlockMetaData rowGroup : rowGroups) { + List<ColumnChunkMetaData> chunks = rowGroup.getColumns(); + for (ColumnChunkMetaData chunk : chunks) { + IndexReference ref = chunk.getColumnIndexReference(); + if (ref != null) { + ByteBuffer metadataCache = ByteBuffer.allocate((int)(ref.getLength())); + fileReader.setStreamPosition(ref.getOffset()); + fileReader.blockRead(metadataCache); + metadataCache.flip(); + try (SeekableInputStream indexBytesStream = HeapSeekableInputStream.wrap(metadataCache.array())) { + ColumnIndex columnIndex = fileReader.readColumnIndex( + fileReader, + indexBytesStream, + ref.getOffset(), + ref.getOffset() + ref.getLength(), + chunk); + Assertions.assertNotNull((Object)columnIndex); + } + } + } + } + } + } + + @Test + public void testReadOffsetIndex() throws IOException { + try (ParquetMetadataFileReader fileReader = new ParquetMetadataFileReader( + HadoopInputFile.fromPath(new Path(basePath, fileName), new Configuration()), + ParquetReadOptions.builder().build())) { + List<BlockMetaData> rowGroups = fileReader.getRowGroups(); + for (BlockMetaData rowGroup : rowGroups) { + List<ColumnChunkMetaData> chunks = rowGroup.getColumns(); + for (ColumnChunkMetaData chunk : chunks) { + IndexReference ref = chunk.getOffsetIndexReference(); + if (ref != null) { + ByteBuffer metadataCache = ByteBuffer.allocate((int)(ref.getLength())); + fileReader.setStreamPosition(ref.getOffset()); + fileReader.blockRead(metadataCache); + metadataCache.flip(); + try (SeekableInputStream indexBytesStream = HeapSeekableInputStream.wrap(metadataCache.array())) { + OffsetIndex offsetIndex = fileReader.readOffsetIndex( + fileReader, + indexBytesStream, + ref.getOffset(), + ref.getOffset() + ref.getLength(), + chunk); + Assertions.assertNotNull((Object)offsetIndex); + } + } + } + } + } + } + + @Test + public void testReadBloomFilter() throws IOException { Review Comment: can we test the few different cases, where its stored and not stored for e.g ########## hudi-common/src/test/java/org/apache/hudi/metadata/parquet/TestParquetMetadataFileReader.java: ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.parquet; + +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.io.SeekableInputStream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.ParseException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class TestParquetMetadataFileReader extends HoodieCommonTestHarness { + private static final long BLOOM_FILTER_HEADER_SIZE_GUESS = 1024; // 1KB + private static final long BLOOM_FILTER_SIZE_GUESS = 1024 * 1024; // 1 MB + private ParquetMetadataFileReader fileReader; + private final String fileName = genParquetFileName(); + + private static String genParquetFileName() { + return UUID.randomUUID().toString().replace("-", ""); + } + + @BeforeEach + public void generateParquetFile() throws IOException, ParseException { Review Comment: Same comment. leverage existing test utils ########## hudi-common/src/main/java/org/apache/hudi/metadata/parquet/ParquetMetadataFileReader.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.parquet; Review Comment: move this to io/storage/parquet? None of this is specific to metadata? ########## hudi-common/src/main/java/org/apache/hudi/metadata/parquet/ParquetFileMetadataLoader.java: ########## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.parquet; Review Comment: move this to io/storage/parquet? None of this is specific to metadata? ########## hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieParquetKeyedLookupReader.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.metadata.parquet.ParquetFileMetadataLoader; +import org.apache.hudi.metadata.parquet.ParquetMetadataFileReader; +import org.apache.hudi.metadata.parquet.RowGroup; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.text.ParseException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; + +public class TestHoodieParquetKeyedLookupReader extends HoodieCommonTestHarness { + private static final long BLOOM_FILTER_HEADER_SIZE_GUESS = 1024; // 1KB + private static final long BLOOM_FILTER_SIZE_GUESS = 1024 * 1024; // 1 MB + private ParquetMetadataFileReader fileReader; + private final String fileName = genParquetFileName(); + + private static String genParquetFileName() { + return UUID.randomUUID().toString().replace("-", ""); + } + + @BeforeEach + public void generateParquetFile() throws IOException, ParseException { + initPath(); + initTestDataGenerator(); + final ParquetWriter<GenericRecord> writer = AvroParquetWriter + .<GenericRecord>builder(HadoopOutputFile.fromPath( + new Path(basePath, fileName), + new Configuration())) + .withSchema(AVRO_SCHEMA) + .withCompressionCodec(CompressionCodecName.GZIP) + .withBloomFilterEnabled(true) + .withPageSize(1024 * 1024) + .withRowGroupSize(10 * 1024 * 1024) + .enableDictionaryEncoding() + .build(); + + long startTime = HoodieActiveTimeline.parseDateFromInstantTime("20230830000000").getTime() / 1000; + List<GenericRecord> records = new ArrayList<GenericRecord>(); + for (long recordNum = 0; recordNum < 96; recordNum++) { + records.add(new HoodieTestDataGenerator(Instant.now().toEpochMilli()).generateGenericRecord( + Long.toString(recordNum), + "0", + "rider-" + recordNum, + "driver-" + recordNum, + startTime + TimeUnit.HOURS.toSeconds(recordNum))); + } + for (GenericRecord record : records) { + writer.write(record); + } + writer.close(); + } + + @Test + public void testBloomFilter() throws IOException { Review Comment: I feel we need more test cases here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
