This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch bloom-filter
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/bloom-filter by this push:
new 96c2fef PARQUET-1516: Store Bloom filters near to footer (#608)
96c2fef is described below
commit 96c2fef80c8d433cf2e247e28f3af07562a8065e
Author: Chen, Junjie <[email protected]>
AuthorDate: Tue Feb 12 17:28:25 2019 +0800
PARQUET-1516: Store Bloom filters near to footer (#608)
* PARQUET-1516: Store Bloom filters near to footer
---
.../format/converter/ParquetMetadataConverter.java | 2 +-
.../parquet/hadoop/ColumnChunkPageWriteStore.java | 10 ++-
.../apache/parquet/hadoop/ParquetFileWriter.java | 59 ++++++++++----
.../hadoop/metadata/ColumnChunkMetaData.java | 89 +++++-----------------
.../hadoop/TestColumnChunkPageWriteStore.java | 1 +
.../parquet/hadoop/TestParquetFileWriter.java | 5 +-
6 files changed, 77 insertions(+), 89 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index fcefe3c..945b83d 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1192,12 +1192,12 @@ public class ParquetMetadataConverter {
messageType.getType(path.toArray()).asPrimitiveType()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
- metaData.bloom_filter_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
column.setColumnIndexReference(toColumnIndexReference(columnChunk));
column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
+ column.setBloomFilterOffset(metaData.bloom_filter_offset);
// TODO
// index_page_offset
// key_value_metadata
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index f87630b..1a607d4 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -35,6 +35,8 @@ import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
@@ -50,7 +52,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
private static ParquetMetadataConverter parquetMetadataConverter = new
ParquetMetadataConverter();
- private static final class ColumnChunkPageWriter implements PageWriter {
+ private static final class ColumnChunkPageWriter implements PageWriter,
BloomFilterWriter {
private final ColumnDescriptor path;
private final BytesCompressor compressor;
@@ -69,6 +71,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
private Set<Encoding> dlEncodings = new HashSet<Encoding>();
private List<Encoding> dataEncodings = new ArrayList<Encoding>();
+ private BloomFilter bloomFilter;
private ColumnIndexBuilder columnIndexBuilder;
private OffsetIndexBuilder offsetIndexBuilder;
private Statistics totalStatistics;
@@ -227,6 +230,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
totalStatistics,
columnIndexBuilder,
offsetIndexBuilder,
+ bloomFilter,
rlEncodings,
dlEncodings,
dataEncodings);
@@ -267,6 +271,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
return buf.memUsageString(prefix + " ColumnChunkPageWriter");
}
+ @Override
+ public void writeBloomFilter(BloomFilter bloomFilter) {
+ this.bloomFilter = bloomFilter;
+ }
}
private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new
HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 1fc2c13..764f519 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -124,6 +124,9 @@ public class ParquetFileWriter {
private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
+ // The Bloom filters
+ private final List<List<BloomFilter>> bloomFilters = new ArrayList<>();
+
// row group data
private BlockMetaData currentBlock; // appended to by endColumn
@@ -131,6 +134,9 @@ public class ParquetFileWriter {
private List<ColumnIndex> currentColumnIndexes;
private List<OffsetIndex> currentOffsetIndexes;
+ // The Bloom filter for the actual block
+ private List<BloomFilter> currentBloomFilters;
+
// row group data set at the start of a row group
private long currentRecordCount; // set in startBlock
@@ -151,7 +157,6 @@ public class ParquetFileWriter {
private long currentChunkValueCount; // set in startColumn
private long currentChunkFirstDataPage; // set in startColumn
(out.pos())
private long currentChunkDictionaryPageOffset; // set in writeDictionaryPage
- private long currentChunkBloomFilterDataOffset; // set in writeBloomData
// set when end is called
private ParquetMetadata footer = null;
@@ -354,6 +359,8 @@ public class ParquetFileWriter {
currentColumnIndexes = new ArrayList<>();
currentOffsetIndexes = new ArrayList<>();
+
+ currentBloomFilters = new ArrayList<>();
}
/**
@@ -410,16 +417,6 @@ public class ParquetFileWriter {
currentEncodings.add(dictionaryPage.getEncoding());
}
- /**
- * Write a Bloom filter
- * @param bloomFilter the bloom filter of column values
- * @throws IOException if there is an error while writing
- */
- public void writeBloomFilter(BloomFilter bloomFilter) throws IOException {
- state = state.write();
- currentChunkBloomFilterDataOffset = out.getPos();
- bloomFilter.writeTo(out);
- }
/**
* writes a single page
@@ -559,6 +556,14 @@ public class ParquetFileWriter {
}
/**
+ * Write a Bloom filter
+ * @param bloomFilter the bloom filter of column values
+ */
+ void writeBloomFilter(BloomFilter bloomFilter) {
+ currentBloomFilters.add(bloomFilter);
+ }
+
+ /**
* Writes a column chunk at once
* @param descriptor the descriptor of the column
* @param valueCount the value count in this column
@@ -570,6 +575,7 @@ public class ParquetFileWriter {
* @param totalStats accumulated statistics for the column chunk
* @param columnIndexBuilder the builder object for the column index
* @param offsetIndexBuilder the builder object for the offset index
+ * @param bloomFilter the bloom filter for this column
* @param rlEncodings the RL encodings used in this column chunk
* @param dlEncodings the DL encodings used in this column chunk
* @param dataEncodings the data encodings used in this column chunk
@@ -585,14 +591,18 @@ public class ParquetFileWriter {
Statistics<?> totalStats,
ColumnIndexBuilder columnIndexBuilder,
OffsetIndexBuilder offsetIndexBuilder,
+ BloomFilter bloomFilter,
Set<Encoding> rlEncodings,
Set<Encoding> dlEncodings,
List<Encoding> dataEncodings) throws IOException {
startColumn(descriptor, valueCount, compressionCodecName);
state = state.write();
+
if (dictionaryPage != null) {
writeDictionaryPage(dictionaryPage);
+ } else if (bloomFilter != null) {
+ currentBloomFilters.add(bloomFilter);
}
LOG.debug("{}: write data pages", out.getPos());
long headersSize = bytes.size() - compressedTotalPageSize;
@@ -638,7 +648,6 @@ public class ParquetFileWriter {
currentStatistics,
currentChunkFirstDataPage,
currentChunkDictionaryPageOffset,
- currentChunkBloomFilterDataOffset,
currentChunkValueCount,
compressedLength,
uncompressedLength));
@@ -660,8 +669,10 @@ public class ParquetFileWriter {
blocks.add(currentBlock);
columnIndexes.add(currentColumnIndexes);
offsetIndexes.add(currentOffsetIndexes);
+ bloomFilters.add(currentBloomFilters);
currentColumnIndexes = null;
currentOffsetIndexes = null;
+ currentBloomFilters = null;
currentBlock = null;
}
@@ -898,7 +909,6 @@ public class ParquetFileWriter {
chunk.getStatistics(),
newChunkStart,
newChunkStart,
- chunk.getBloomFilterOffset(),
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
@@ -958,6 +968,7 @@ public class ParquetFileWriter {
state = state.end();
serializeColumnIndexes(columnIndexes, blocks, out);
serializeOffsetIndexes(offsetIndexes, blocks, out);
+ serializeBloomFilters(bloomFilters, blocks, out);
LOG.debug("{}: end", out.getPos());
this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData,
Version.FULL_VERSION), blocks);
serializeFooter(footer, out);
@@ -1007,6 +1018,28 @@ public class ParquetFileWriter {
}
}
+ private static void serializeBloomFilters(
+ List<List<BloomFilter>> bloomFilters,
+ List<BlockMetaData> blocks,
+ PositionOutputStream out) throws IOException {
+ LOG.debug("{}: bloom filters", out.getPos());
+ for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+ List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+ List<BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
+ if (blockBloomFilters.isEmpty()) continue;
+ for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+ BloomFilter bloomFilter = blockBloomFilters.get(cIndex);
+ if (bloomFilter == null) {
+ continue;
+ }
+ ColumnChunkMetaData column = columns.get(cIndex);
+ long offset = out.getPos();
+ column.setBloomFilterOffset(offset);
+ bloomFilter.writeTo(out);
+ }
+ }
+ }
+
private static void serializeFooter(ParquetMetadata footer,
PositionOutputStream out) throws IOException {
long footerIndex = out.getPos();
org.apache.parquet.format.FileMetaData parquetMetadata =
metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 3156132..2c24356 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -126,7 +126,6 @@ abstract public class ColumnChunkMetaData {
statistics,
firstDataPage,
dictionaryPageOffset,
- 0,
valueCount,
totalSize,
totalUncompressedSize);
@@ -137,56 +136,12 @@ abstract public class ColumnChunkMetaData {
statistics,
firstDataPage,
dictionaryPageOffset,
- 0,
valueCount,
totalSize,
totalUncompressedSize);
}
}
- public static ColumnChunkMetaData get(
- ColumnPath path,
- PrimitiveType type,
- CompressionCodecName codec,
- EncodingStats encodingStats,
- Set<Encoding> encodings,
- Statistics statistics,
- long firstDataPage,
- long dictionaryPageOffset,
- long bloomFilterDataOffset,
- long valueCount,
- long totalSize,
- long totalUncompressedSize) {
- // to save space we store those always positive longs in ints when they
fit.
- if (positiveLongFitsInAnInt(firstDataPage)
- && positiveLongFitsInAnInt(dictionaryPageOffset)
- && positiveLongFitsInAnInt(valueCount)
- && positiveLongFitsInAnInt(totalSize)
- && positiveLongFitsInAnInt(totalUncompressedSize)) {
- return new IntColumnChunkMetaData(
- path, type, codec,
- encodingStats, encodings,
- statistics,
- firstDataPage,
- dictionaryPageOffset,
- bloomFilterDataOffset,
- valueCount,
- totalSize,
- totalUncompressedSize);
- } else {
- return new LongColumnChunkMetaData(
- path, type, codec,
- encodingStats, encodings,
- statistics,
- firstDataPage,
- dictionaryPageOffset,
- bloomFilterDataOffset,
- valueCount,
- totalSize,
- totalUncompressedSize);
- }
- }
-
/**
* @return the offset of the first byte in the chunk
*/
@@ -218,6 +173,8 @@ abstract public class ColumnChunkMetaData {
private IndexReference columnIndexReference;
private IndexReference offsetIndexReference;
+ private long bloomFilterOffset;
+
protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
this(null, columnChunkProperties);
}
@@ -266,11 +223,6 @@ abstract public class ColumnChunkMetaData {
abstract public long getDictionaryPageOffset();
/**
- * @return the location of the bloomFilter filter data if any
- */
- abstract public long getBloomFilterOffset();
-
- /**
* @return count of values in this block of the column
*/
abstract public long getValueCount();
@@ -325,6 +277,23 @@ abstract public class ColumnChunkMetaData {
}
/**
+ * @param bloomFilterOffset
+ * the reference to the Bloom filter
+ */
+ @Private
+ public void setBloomFilterOffset(long bloomFilterOffset) {
+ this.bloomFilterOffset = bloomFilterOffset;
+ }
+
+ /**
+ * @return the offset to the Bloom filter
+ */
+ @Private
+ public long getBloomFilterOffset() {
+ return bloomFilterOffset;
+ }
+
+ /**
* @return all the encodings used in this column
*/
public Set<Encoding> getEncodings() {
@@ -345,7 +314,6 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
private final int firstDataPage;
private final int dictionaryPageOffset;
- private final int bloomFilterDataOffset;
private final int valueCount;
private final int totalSize;
private final int totalUncompressedSize;
@@ -372,14 +340,12 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
Statistics statistics,
long firstDataPage,
long dictionaryPageOffset,
- long bloomFilterDataOffset,
long valueCount,
long totalSize,
long totalUncompressedSize) {
super(encodingStats, ColumnChunkProperties.get(path, type, codec,
encodings));
this.firstDataPage = positiveLongToInt(firstDataPage);
this.dictionaryPageOffset = positiveLongToInt(dictionaryPageOffset);
- this.bloomFilterDataOffset = positiveLongToInt(bloomFilterDataOffset);
this.valueCount = positiveLongToInt(valueCount);
this.totalSize = positiveLongToInt(totalSize);
this.totalUncompressedSize = positiveLongToInt(totalUncompressedSize);
@@ -422,13 +388,6 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
}
/**
- * @return the location of bloom filter if any
- */
- public long getBloomFilterOffset() {
- return intToPositiveLong(bloomFilterDataOffset);
- }
-
- /**
* @return count of values in this block of the column
*/
public long getValueCount() {
@@ -460,7 +419,6 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData {
private final long firstDataPageOffset;
private final long dictionaryPageOffset;
- private final long bloomFilterDataOffset;
private final long valueCount;
private final long totalSize;
private final long totalUncompressedSize;
@@ -487,14 +445,12 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData
{
Statistics statistics,
long firstDataPageOffset,
long dictionaryPageOffset,
- long bloomFilterDataOffset,
long valueCount,
long totalSize,
long totalUncompressedSize) {
super(encodingStats, ColumnChunkProperties.get(path, type, codec,
encodings));
this.firstDataPageOffset = firstDataPageOffset;
this.dictionaryPageOffset = dictionaryPageOffset;
- this.bloomFilterDataOffset = bloomFilterDataOffset;
this.valueCount = valueCount;
this.totalSize = totalSize;
this.totalUncompressedSize = totalUncompressedSize;
@@ -516,13 +472,6 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData {
}
/**
- * @return the location of the bloom filter if any
- */
- public long getBloomFilterOffset() {
- return bloomFilterDataOffset;
- }
-
- /**
* @return count of values in this block of the column
*/
public long getValueCount() {
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index c353ee3..fc37717 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -259,6 +259,7 @@ public class TestColumnChunkPageWriteStore {
same(OffsetIndexBuilder.getNoOpBuilder()), // Deprecated writePage
-> no offset index
any(),
any(),
+ any(),
any());
}
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 0cfb001..71ca5ea 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -239,17 +239,14 @@ public class TestParquetFileWriter {
w.startColumn(col, 5, CODEC);
w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED,
BIT_PACKED, PLAIN);
w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.endColumn();
BloomFilter bloomData = new BlockSplitBloomFilter(0);
bloomData.insertHash(bloomData.hash(Binary.fromString("hello")));
bloomData.insertHash(bloomData.hash(Binary.fromString("world")));
- long blStarts = w.getPos();
w.writeBloomFilter(bloomData);
- w.endColumn();
w.endBlock();
w.end(new HashMap<String, String>());
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration,
path);
- assertEquals("bloomFilter offset",
- blStarts,
readFooter.getBlocks().get(0).getColumns().get(0).getBloomFilterOffset());
ParquetFileReader r = new ParquetFileReader(configuration,
readFooter.getFileMetaData(), path,
Arrays.asList(readFooter.getBlocks().get(0)),
Arrays.asList(schema.getColumnDescription(colPath)));
BloomFilterReader bloomFilterReader =
r.getBloomFilterDataReader(readFooter.getBlocks().get(0));