This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 6637105 feat(parquet): Write/Read bloom filters from files (#341)
6637105 is described below
commit 6637105c3505505cfcd7e94e7e030afad067cf35
Author: Matt Topol <[email protected]>
AuthorDate: Wed Apr 2 11:33:49 2025 -0400
feat(parquet): Write/Read bloom filters from files (#341)
### Rationale for this change
Support for reading and writing bloom filters to files is important for
allowing consumers to skip row groups/columns/etc.
### What changes are included in this PR?
Adding functionality to the Writer and Reader to allow for writing and
reading bloom filters from parquet files directly. Adding write
properties to control if and how bloom filters get written.
### Are these changes tested?
Yes. Unit tests are added.
### Are there any user-facing changes?
Only the newly available functions.
---
parquet/file/column_writer.go | 30 ++
parquet/file/column_writer_types.gen.go | 64 +++++
parquet/file/column_writer_types.gen.go.tmpl | 8 +
parquet/file/file_reader.go | 24 +-
parquet/file/file_writer.go | 11 +
parquet/file/file_writer_test.go | 89 ++++++
parquet/file/row_group_writer.go | 13 +-
parquet/metadata/bloom_filter.go | 2 +-
parquet/metadata/bloom_filter_reader_test.go | 74 +++++
parquet/writer_properties.go | 401 ++++++++++++++++++++++-----
10 files changed, 641 insertions(+), 75 deletions(-)
diff --git a/parquet/file/column_writer.go b/parquet/file/column_writer.go
index d087533..d48b483 100644
--- a/parquet/file/column_writer.go
+++ b/parquet/file/column_writer.go
@@ -78,6 +78,8 @@ type ColumnChunkWriter interface {
LevelInfo() LevelInfo
SetBitsBuffer(*memory.Buffer)
HasBitsBuffer() bool
+
+ GetBloomFilter() metadata.BloomFilterBuilder
}
func computeLevelInfo(descr *schema.Column) (info LevelInfo) {
@@ -113,6 +115,7 @@ type columnWriter struct {
pageStatistics metadata.TypedStatistics
chunkStatistics metadata.TypedStatistics
+ bloomFilter metadata.BloomFilterBuilder
// total number of values stored in the current data page. this is the
maximum
// of the number of encoded def levels or encoded values. for
@@ -174,6 +177,7 @@ func newColumnWriterBase(metaData
*metadata.ColumnChunkMetaDataBuilder, pager Pa
ret.reset()
+ ret.initBloomFilter()
return ret
}
@@ -702,3 +706,29 @@ func (w *columnWriter) maybeReplaceValidity(values
arrow.Array, newNullCount int
defer data.Release()
return array.MakeFromData(data)
}
+
+func (w *columnWriter) initBloomFilter() {
+ path := w.descr.Path()
+ if !w.props.BloomFilterEnabledFor(path) {
+ return
+ }
+
+ maxFilterBytes := w.props.MaxBloomFilterBytes()
+ ndv := w.props.BloomFilterNDVFor(path)
+ fpp := w.props.BloomFilterFPPFor(path)
+ // if user specified the column NDV, we can construct the bloom filter
for it
+ if ndv > 0 {
+ w.bloomFilter =
metadata.NewBloomFilterFromNDVAndFPP(uint32(ndv), fpp, maxFilterBytes, w.mem)
+ } else if w.props.AdaptiveBloomFilterEnabledFor(path) {
+ numCandidates := w.props.BloomFilterCandidatesFor(path)
+ // construct adaptive bloom filter writer
+ w.bloomFilter =
metadata.NewAdaptiveBlockSplitBloomFilter(uint32(maxFilterBytes),
numCandidates, fpp, w.descr, w.mem)
+ } else {
+ // construct a bloom filter using the max size
+ w.bloomFilter = metadata.NewBloomFilter(uint32(maxFilterBytes),
uint32(maxFilterBytes), w.mem)
+ }
+}
+
+func (w *columnWriter) GetBloomFilter() metadata.BloomFilterBuilder {
+ return w.bloomFilter
+}
diff --git a/parquet/file/column_writer_types.gen.go
b/parquet/file/column_writer_types.gen.go
index cb02233..65bf29a 100644
--- a/parquet/file/column_writer_types.gen.go
+++ b/parquet/file/column_writer_types.gen.go
@@ -184,6 +184,10 @@ func (w *Int32ColumnChunkWriter) writeValues(values
[]int32, numNulls int64) {
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.Int32Statistics).Update(values,
numNulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetHashes(w.bloomFilter.Hasher(), values))
+ }
}
func (w *Int32ColumnChunkWriter) writeValuesSpaced(spacedValues []int32,
numRead, numValues int64, validBits []byte, validBitsOffset int64) {
@@ -196,6 +200,10 @@ func (w *Int32ColumnChunkWriter)
writeValuesSpaced(spacedValues []int32, numRead
nulls := numValues - numRead
w.pageStatistics.(*metadata.Int32Statistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetSpacedHashes(w.bloomFilter.Hasher(),
numRead, spacedValues, validBits, validBitsOffset))
+ }
}
func (w *Int32ColumnChunkWriter) checkDictionarySizeLimit() {
@@ -374,6 +382,10 @@ func (w *Int64ColumnChunkWriter) writeValues(values
[]int64, numNulls int64) {
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.Int64Statistics).Update(values,
numNulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetHashes(w.bloomFilter.Hasher(), values))
+ }
}
func (w *Int64ColumnChunkWriter) writeValuesSpaced(spacedValues []int64,
numRead, numValues int64, validBits []byte, validBitsOffset int64) {
@@ -386,6 +398,10 @@ func (w *Int64ColumnChunkWriter)
writeValuesSpaced(spacedValues []int64, numRead
nulls := numValues - numRead
w.pageStatistics.(*metadata.Int64Statistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetSpacedHashes(w.bloomFilter.Hasher(),
numRead, spacedValues, validBits, validBitsOffset))
+ }
}
func (w *Int64ColumnChunkWriter) checkDictionarySizeLimit() {
@@ -564,6 +580,10 @@ func (w *Int96ColumnChunkWriter) writeValues(values
[]parquet.Int96, numNulls in
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.Int96Statistics).Update(values,
numNulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetHashes(w.bloomFilter.Hasher(), values))
+ }
}
func (w *Int96ColumnChunkWriter) writeValuesSpaced(spacedValues
[]parquet.Int96, numRead, numValues int64, validBits []byte, validBitsOffset
int64) {
@@ -576,6 +596,10 @@ func (w *Int96ColumnChunkWriter)
writeValuesSpaced(spacedValues []parquet.Int96,
nulls := numValues - numRead
w.pageStatistics.(*metadata.Int96Statistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetSpacedHashes(w.bloomFilter.Hasher(),
numRead, spacedValues, validBits, validBitsOffset))
+ }
}
func (w *Int96ColumnChunkWriter) checkDictionarySizeLimit() {
@@ -754,6 +778,10 @@ func (w *Float32ColumnChunkWriter) writeValues(values
[]float32, numNulls int64)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.Float32Statistics).Update(values,
numNulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetHashes(w.bloomFilter.Hasher(), values))
+ }
}
func (w *Float32ColumnChunkWriter) writeValuesSpaced(spacedValues []float32,
numRead, numValues int64, validBits []byte, validBitsOffset int64) {
@@ -766,6 +794,10 @@ func (w *Float32ColumnChunkWriter)
writeValuesSpaced(spacedValues []float32, num
nulls := numValues - numRead
w.pageStatistics.(*metadata.Float32Statistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetSpacedHashes(w.bloomFilter.Hasher(),
numRead, spacedValues, validBits, validBitsOffset))
+ }
}
func (w *Float32ColumnChunkWriter) checkDictionarySizeLimit() {
@@ -944,6 +976,10 @@ func (w *Float64ColumnChunkWriter) writeValues(values
[]float64, numNulls int64)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.Float64Statistics).Update(values,
numNulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetHashes(w.bloomFilter.Hasher(), values))
+ }
}
func (w *Float64ColumnChunkWriter) writeValuesSpaced(spacedValues []float64,
numRead, numValues int64, validBits []byte, validBitsOffset int64) {
@@ -956,6 +992,10 @@ func (w *Float64ColumnChunkWriter)
writeValuesSpaced(spacedValues []float64, num
nulls := numValues - numRead
w.pageStatistics.(*metadata.Float64Statistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetSpacedHashes(w.bloomFilter.Hasher(),
numRead, spacedValues, validBits, validBitsOffset))
+ }
}
func (w *Float64ColumnChunkWriter) checkDictionarySizeLimit() {
@@ -1137,6 +1177,10 @@ func (w *BooleanColumnChunkWriter) writeValues(values
[]bool, numNulls int64) {
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.BooleanStatistics).Update(values,
numNulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetHashes(w.bloomFilter.Hasher(), values))
+ }
}
func (w *BooleanColumnChunkWriter) writeValuesSpaced(spacedValues []bool,
numRead, numValues int64, validBits []byte, validBitsOffset int64) {
@@ -1149,6 +1193,10 @@ func (w *BooleanColumnChunkWriter)
writeValuesSpaced(spacedValues []bool, numRea
nulls := numValues - numRead
w.pageStatistics.(*metadata.BooleanStatistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetSpacedHashes(w.bloomFilter.Hasher(),
numRead, spacedValues, validBits, validBitsOffset))
+ }
}
func (w *BooleanColumnChunkWriter) checkDictionarySizeLimit() {
@@ -1327,6 +1375,10 @@ func (w *ByteArrayColumnChunkWriter) writeValues(values
[]parquet.ByteArray, num
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.ByteArrayStatistics).Update(values,
numNulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetHashes(w.bloomFilter.Hasher(), values))
+ }
}
func (w *ByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues
[]parquet.ByteArray, numRead, numValues int64, validBits []byte,
validBitsOffset int64) {
@@ -1339,6 +1391,10 @@ func (w *ByteArrayColumnChunkWriter)
writeValuesSpaced(spacedValues []parquet.By
nulls := numValues - numRead
w.pageStatistics.(*metadata.ByteArrayStatistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetSpacedHashes(w.bloomFilter.Hasher(),
numRead, spacedValues, validBits, validBitsOffset))
+ }
}
func (w *ByteArrayColumnChunkWriter) checkDictionarySizeLimit() {
@@ -1521,6 +1577,10 @@ func (w *FixedLenByteArrayColumnChunkWriter)
writeValues(values []parquet.FixedL
w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).Update(values,
numNulls)
}
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetHashes(w.bloomFilter.Hasher(), values))
+ }
}
func (w *FixedLenByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues
[]parquet.FixedLenByteArray, numRead, numValues int64, validBits []byte,
validBitsOffset int64) {
@@ -1537,6 +1597,10 @@ func (w *FixedLenByteArrayColumnChunkWriter)
writeValuesSpaced(spacedValues []pa
w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
}
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+
w.bloomFilter.InsertBulk(metadata.GetSpacedHashes(w.bloomFilter.Hasher(),
numRead, spacedValues, validBits, validBitsOffset))
+ }
}
func (w *FixedLenByteArrayColumnChunkWriter) checkDictionarySizeLimit() {
diff --git a/parquet/file/column_writer_types.gen.go.tmpl
b/parquet/file/column_writer_types.gen.go.tmpl
index 772777b..d0a4da2 100644
--- a/parquet/file/column_writer_types.gen.go.tmpl
+++ b/parquet/file/column_writer_types.gen.go.tmpl
@@ -197,6 +197,10 @@ func (w *{{.Name}}ColumnChunkWriter) writeValues(values
[]{{.name}}, numNulls in
}
{{- end}}
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+ w.bloomFilter.InsertBulk(metadata.GetHashes(w.bloomFilter.Hasher(),
values))
+ }
}
func (w *{{.Name}}ColumnChunkWriter) writeValuesSpaced(spacedValues
[]{{.name}}, numRead, numValues int64, validBits []byte, validBitsOffset int64)
{
@@ -217,6 +221,10 @@ func (w *{{.Name}}ColumnChunkWriter)
writeValuesSpaced(spacedValues []{{.name}},
}
{{- end}}
}
+ if w.bloomFilter != nil {
+ // TODO: optimize for Dictionary Encoding case
+ w.bloomFilter.InsertBulk(metadata.GetSpacedHashes(w.bloomFilter.Hasher(),
numRead, spacedValues, validBits, validBitsOffset))
+ }
}
func (w *{{.Name}}ColumnChunkWriter) checkDictionarySizeLimit() {
diff --git a/parquet/file/file_reader.go b/parquet/file/file_reader.go
index ddccd79..c566ec2 100644
--- a/parquet/file/file_reader.go
+++ b/parquet/file/file_reader.go
@@ -44,11 +44,12 @@ var (
// Reader is the main interface for reading a parquet file
type Reader struct {
- r parquet.ReaderAtSeeker
- props *parquet.ReaderProperties
- metadata *metadata.FileMetaData
- fileDecryptor encryption.FileDecryptor
- pageIndexReader *metadata.PageIndexReader
+ r parquet.ReaderAtSeeker
+ props *parquet.ReaderProperties
+ metadata *metadata.FileMetaData
+ fileDecryptor encryption.FileDecryptor
+ pageIndexReader *metadata.PageIndexReader
+ bloomFilterReader *metadata.BloomFilterReader
bufferPool sync.Pool
}
@@ -327,3 +328,16 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
func (f *Reader) GetPageIndexReader() *metadata.PageIndexReader {
return f.pageIndexReader
}
+
+func (f *Reader) GetBloomFilterReader() *metadata.BloomFilterReader {
+ if f.bloomFilterReader == nil {
+ f.bloomFilterReader = &metadata.BloomFilterReader{
+ Input: f.r,
+ FileMetadata: f.metadata,
+ Props: f.props,
+ FileDecryptor: f.fileDecryptor,
+ BufferPool: &f.bufferPool,
+ }
+ }
+ return f.bloomFilterReader
+}
diff --git a/parquet/file/file_writer.go b/parquet/file/file_writer.go
index c9b913b..fa8d5db 100644
--- a/parquet/file/file_writer.go
+++ b/parquet/file/file_writer.go
@@ -40,6 +40,7 @@ type Writer struct {
fileEncryptor encryption.FileEncryptor
rowGroupWriter *rowGroupWriter
pageIndexBuilder *metadata.PageIndexBuilder
+ bloomFilters *metadata.FileBloomFilterBuilder
// The Schema of this writer
Schema *schema.Schema
@@ -135,6 +136,7 @@ func (fw *Writer) appendRowGroup(buffered bool)
*rowGroupWriter {
fw.rowGroupWriter = newRowGroupWriter(fw.sink, rgMeta,
int16(fw.rowGroups)-1,
fw.props, buffered, fw.fileEncryptor, fw.pageIndexBuilder)
+ fw.bloomFilters.AppendRowGroup(rgMeta, fw.rowGroupWriter.bloomFilters)
return fw.rowGroupWriter
}
@@ -163,6 +165,12 @@ func (fw *Writer) startFile() {
magic = magicEBytes
}
}
+
+ fw.bloomFilters = &metadata.FileBloomFilterBuilder{
+ Schema: fw.Schema,
+ Encryptor: fw.fileEncryptor,
+ }
+
n, err := fw.sink.Write(magic)
if n != 4 || err != nil {
panic("failed to write magic number")
@@ -236,6 +244,9 @@ func (fw *Writer) FlushWithFooter() error {
fw.rowGroupWriter.Close()
}
fw.rowGroupWriter = nil
+ if err := fw.bloomFilters.WriteTo(fw.sink); err != nil {
+ return err
+ }
fw.writePageIndex()
diff --git a/parquet/file/file_writer_test.go b/parquet/file/file_writer_test.go
index 7bcbe29..ec1f7eb 100644
--- a/parquet/file/file_writer_test.go
+++ b/parquet/file/file_writer_test.go
@@ -1067,3 +1067,92 @@ func TestPageIndexRoundTripSuite(t *testing.T) {
suite.Run(t, &PageIndexRoundTripSuite{pageVersion:
parquet.DataPageV2})
})
}
+
+func TestWriteBloomFilters(t *testing.T) {
+ input1 := []parquet.ByteArray{
+ parquet.ByteArray("hello"),
+ parquet.ByteArray("world"),
+ parquet.ByteArray("hello"),
+ parquet.ByteArray("parquet"),
+ }
+
+ input2 := []parquet.ByteArray{
+ parquet.ByteArray("foo"),
+ parquet.ByteArray("bar"),
+ parquet.ByteArray("baz"),
+ parquet.ByteArray("columns"),
+ }
+
+ size := len(input1)
+ chunk := size / 2
+
+ props := parquet.NewWriterProperties(
+ parquet.WithDictionaryDefault(false),
+ parquet.WithBloomFilterEnabledFor("col1", true),
+ parquet.WithBatchSize(int64(chunk)),
+ )
+
+ field1, err := schema.NewPrimitiveNode("col1",
parquet.Repetitions.Required,
+ parquet.Types.ByteArray, -1, -1)
+ require.NoError(t, err)
+ field2, err := schema.NewPrimitiveNode("col2",
parquet.Repetitions.Required,
+ parquet.Types.ByteArray, -1, -1)
+ require.NoError(t, err)
+ sc, err := schema.NewGroupNode("test", parquet.Repetitions.Required,
+ schema.FieldList{field1, field2}, -1)
+ require.NoError(t, err)
+
+ sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+ writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(props))
+
+ rgw := writer.AppendRowGroup()
+ cwr, err := rgw.NextColumn()
+ require.NoError(t, err)
+
+ cw, ok := cwr.(*file.ByteArrayColumnChunkWriter)
+ require.True(t, ok)
+
+ nVals, err := cw.WriteBatch(input1[:chunk], nil, nil)
+ require.NoError(t, err)
+ require.EqualValues(t, chunk, nVals)
+
+ nVals, err = cw.WriteBatch(input1[chunk:], nil, nil)
+ require.NoError(t, err)
+ require.EqualValues(t, chunk, nVals)
+
+ cwr, err = rgw.NextColumn()
+ require.NoError(t, err)
+ cw, ok = cwr.(*file.ByteArrayColumnChunkWriter)
+ require.True(t, ok)
+
+ nVals, err = cw.WriteBatch(input2, nil, nil)
+ require.NoError(t, err)
+ require.EqualValues(t, size, nVals)
+
+ require.NoError(t, cwr.Close())
+ require.NoError(t, rgw.Close())
+ require.NoError(t, writer.Close())
+
+ rdr, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
+ require.NoError(t, err)
+
+ bloom := rdr.GetBloomFilterReader()
+ bloomRgr, err := bloom.RowGroup(0)
+ require.NoError(t, err)
+
+ filter, err := bloomRgr.GetColumnBloomFilter(1)
+ require.NoError(t, err)
+ require.Nil(t, filter) // no filter written for col2
+
+ filter, err = bloomRgr.GetColumnBloomFilter(0)
+ require.NoError(t, err)
+ require.NotNil(t, filter)
+
+ byteArrayFilter :=
metadata.TypedBloomFilter[parquet.ByteArray]{BloomFilter: filter}
+ assert.True(t, byteArrayFilter.Check(parquet.ByteArray("hello")))
+ assert.True(t, byteArrayFilter.Check(parquet.ByteArray("world")))
+ assert.True(t, byteArrayFilter.Check(parquet.ByteArray("parquet")))
+ assert.False(t, byteArrayFilter.Check(parquet.ByteArray("foo")))
+ assert.False(t, byteArrayFilter.Check(parquet.ByteArray("bar")))
+ assert.False(t, byteArrayFilter.Check(parquet.ByteArray("baz")))
+}
diff --git a/parquet/file/row_group_writer.go b/parquet/file/row_group_writer.go
index c335a10..f3732c1 100644
--- a/parquet/file/row_group_writer.go
+++ b/parquet/file/row_group_writer.go
@@ -80,18 +80,22 @@ type rowGroupWriter struct {
columnWriters []ColumnChunkWriter
pager PageWriter
+
+ bloomFilters map[string]metadata.BloomFilterBuilder
}
-func newRowGroupWriter(sink utils.WriterTell, metadata
*metadata.RowGroupMetaDataBuilder, ordinal int16, props
*parquet.WriterProperties, buffered bool, fileEncryptor
encryption.FileEncryptor, pageIdxBldr *metadata.PageIndexBuilder)
*rowGroupWriter {
+func newRowGroupWriter(sink utils.WriterTell, rgMeta
*metadata.RowGroupMetaDataBuilder, ordinal int16, props
*parquet.WriterProperties, buffered bool, fileEncryptor
encryption.FileEncryptor, pageIdxBldr *metadata.PageIndexBuilder)
*rowGroupWriter {
ret := &rowGroupWriter{
sink: sink,
- metadata: metadata,
+ metadata: rgMeta,
props: props,
ordinal: ordinal,
buffered: buffered,
fileEncryptor: fileEncryptor,
pageIndexBuilder: pageIdxBldr,
+ bloomFilters: make(map[string]metadata.BloomFilterBuilder),
}
+
if buffered {
ret.initColumns()
} else {
@@ -187,6 +191,7 @@ func (rg *rowGroupWriter) NextColumn() (ColumnChunkWriter,
error) {
}
rg.columnWriters[0] = NewColumnChunkWriter(colMeta, rg.pager, rg.props)
+ rg.bloomFilters[path] = rg.columnWriters[0].GetBloomFilter()
return rg.columnWriters[0], nil
}
@@ -279,7 +284,9 @@ func (rg *rowGroupWriter) initColumns() error {
pager.SetIndexBuilders(colIdxBldr, offsetIdxBldr)
rg.nextColumnIdx++
- rg.columnWriters = append(rg.columnWriters,
NewColumnChunkWriter(colMeta, pager, rg.props))
+ cw := NewColumnChunkWriter(colMeta, pager, rg.props)
+ rg.columnWriters = append(rg.columnWriters, cw)
+ rg.bloomFilters[path] = cw.GetBloomFilter()
}
return nil
}
diff --git a/parquet/metadata/bloom_filter.go b/parquet/metadata/bloom_filter.go
index cb21809..f0ec8fa 100644
--- a/parquet/metadata/bloom_filter.go
+++ b/parquet/metadata/bloom_filter.go
@@ -48,7 +48,7 @@ const (
var (
salt = [bitsSetPerBlock]uint32{
- 0x476137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
+ 0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}
defaultHashStrategy = format.BloomFilterHash{XXHASH: &format.XxHash{}}
diff --git a/parquet/metadata/bloom_filter_reader_test.go
b/parquet/metadata/bloom_filter_reader_test.go
index 25d9505..432b90f 100644
--- a/parquet/metadata/bloom_filter_reader_test.go
+++ b/parquet/metadata/bloom_filter_reader_test.go
@@ -18,16 +18,20 @@ package metadata_test
import (
"bytes"
+ "os"
"runtime"
"sync"
"testing"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
+ "github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/internal/encryption"
"github.com/apache/arrow-go/v18/parquet/internal/utils"
"github.com/apache/arrow-go/v18/parquet/metadata"
"github.com/apache/arrow-go/v18/parquet/schema"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
@@ -273,3 +277,73 @@ func TestBloomFilterRoundTrip(t *testing.T) {
suite.Run(t, new(BloomFilterBuilderSuite))
suite.Run(t, new(EncryptedBloomFilterBuilderSuite))
}
+
+func TestReadBloomFilter(t *testing.T) {
+ dir := os.Getenv("PARQUET_TEST_DATA")
+ if dir == "" {
+ t.Skip("PARQUET_TEST_DATA not set")
+ }
+ require.DirExists(t, dir)
+
+ files := []string{"data_index_bloom_encoding_stats.parquet",
+ "data_index_bloom_encoding_with_length.parquet"}
+
+ for _, testfile := range files {
+ t.Run(testfile, func(t *testing.T) {
+ rdr, err := file.OpenParquetFile(dir+"/"+testfile,
false)
+ require.NoError(t, err)
+ defer rdr.Close()
+
+ bloomFilterRdr := rdr.GetBloomFilterReader()
+ rg0, err := bloomFilterRdr.RowGroup(0)
+ require.NoError(t, err)
+ require.NotNil(t, rg0)
+
+ rg1, err := bloomFilterRdr.RowGroup(1)
+ assert.Nil(t, rg1)
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "row group index 1 out of
range")
+
+ bf, err := rg0.GetColumnBloomFilter(0)
+ require.NoError(t, err)
+ require.NotNil(t, bf)
+
+ bf1, err := rg0.GetColumnBloomFilter(1)
+ assert.Nil(t, bf1)
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "column index 1 out of
range")
+
+ baBloomFilter :=
metadata.TypedBloomFilter[parquet.ByteArray]{bf}
+ assert.True(t, baBloomFilter.Check([]byte("Hello")))
+ assert.False(t,
baBloomFilter.Check([]byte("NOT_EXISTS")))
+ })
+ }
+}
+
+func TestBloomFilterReaderFileNotHaveFilter(t *testing.T) {
+ // can still get a BloomFilterReader and a RowGroupBloomFilterReader
+ // but cannot get a non-null BloomFilter
+ dir := os.Getenv("PARQUET_TEST_DATA")
+ if dir == "" {
+ t.Skip("PARQUET_TEST_DATA not set")
+ }
+ require.DirExists(t, dir)
+
+ rdr, err := file.OpenParquetFile(dir+"/alltypes_plain.parquet", false)
+ require.NoError(t, err)
+ defer rdr.Close()
+
+ bloomFilterRdr := rdr.GetBloomFilterReader()
+ rg0, err := bloomFilterRdr.RowGroup(0)
+ require.NoError(t, err)
+ require.NotNil(t, rg0)
+
+ rg1, err := bloomFilterRdr.RowGroup(1)
+ assert.Nil(t, rg1)
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "row group index 1 out of range")
+
+ bf, err := rg0.GetColumnBloomFilter(0)
+ require.NoError(t, err)
+ require.Nil(t, bf)
+}
diff --git a/parquet/writer_properties.go b/parquet/writer_properties.go
index 3ee5f79..3151e2f 100644
--- a/parquet/writer_properties.go
+++ b/parquet/writer_properties.go
@@ -17,6 +17,7 @@
package parquet
import (
+ "github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet/compress"
format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
@@ -49,41 +50,60 @@ const (
DefaultMaxStatsSize int64 = 4096
// Default is to not write page indexes for columns
DefaultPageIndexEnabled = false
- DefaultCreatedBy = "parquet-go version 18.0.0-SNAPSHOT"
+ DefaultCreatedBy = "parquet-go version " + arrow.PkgVersion
DefaultRootName = "schema"
+
+ DefaultMaxBloomFilterBytes = 1024 * 1024
+ DefaultBloomFilterEnabled = false
+ DefaultBloomFilterFPP = 0.01
+ DefaultAdaptiveBloomFilterEnabled = false
+ DefaultBloomFilterCandidates = 5
)
// ColumnProperties defines the encoding, codec, and so on for a given column.
type ColumnProperties struct {
- Encoding Encoding
- Codec compress.Compression
- DictionaryEnabled bool
- StatsEnabled bool
- PageIndexEnabled bool
- MaxStatsSize int64
- CompressionLevel int
+ Encoding Encoding
+ Codec compress.Compression
+ DictionaryEnabled bool
+ StatsEnabled bool
+ PageIndexEnabled bool
+ MaxStatsSize int64
+ CompressionLevel int
+ BloomFilterEnabled bool
+ BloomFilterFPP float64
+ AdaptiveBloomFilterEnabled bool
+ BloomFilterCandidates int
+ BloomFilterNDV int64
}
// DefaultColumnProperties returns the default properties which get utilized
for writing.
//
// The default column properties are the following constants:
//
-// Encoding: Encodings.Plain
-// Codec: compress.Codecs.Uncompressed
-// DictionaryEnabled: DefaultDictionaryEnabled
-// StatsEnabled: DefaultStatsEnabled
-// PageIndexEnabled: DefaultPageIndexEnabled
-// MaxStatsSize: DefaultMaxStatsSize
-// CompressionLevel: compress.DefaultCompressionLevel
+// Encoding: Encodings.Plain
+// Codec: compress.Codecs.Uncompressed
+// DictionaryEnabled: DefaultDictionaryEnabled
+// StatsEnabled: DefaultStatsEnabled
+// PageIndexEnabled: DefaultPageIndexEnabled
+// MaxStatsSize: DefaultMaxStatsSize
+// CompressionLevel: compress.DefaultCompressionLevel
+// BloomFilterEnabled: DefaultBloomFilterEnabled
+// BloomFilterFPP: DefaultBloomFilterFPP
+// AdaptiveBloomFilterEnabled: DefaultAdaptiveBloomFilterEnabled
+// BloomFilterCandidates: DefaultBloomFilterCandidates
func DefaultColumnProperties() ColumnProperties {
return ColumnProperties{
- Encoding: Encodings.Plain,
- Codec: compress.Codecs.Uncompressed,
- DictionaryEnabled: DefaultDictionaryEnabled,
- StatsEnabled: DefaultStatsEnabled,
- PageIndexEnabled: DefaultPageIndexEnabled,
- MaxStatsSize: DefaultMaxStatsSize,
- CompressionLevel: compress.DefaultCompressionLevel,
+ Encoding: Encodings.Plain,
+ Codec: compress.Codecs.Uncompressed,
+ DictionaryEnabled: DefaultDictionaryEnabled,
+ StatsEnabled: DefaultStatsEnabled,
+ PageIndexEnabled: DefaultPageIndexEnabled,
+ MaxStatsSize: DefaultMaxStatsSize,
+ CompressionLevel: compress.DefaultCompressionLevel,
+ BloomFilterEnabled: DefaultBloomFilterEnabled,
+ BloomFilterFPP: DefaultBloomFilterFPP,
+ AdaptiveBloomFilterEnabled: DefaultAdaptiveBloomFilterEnabled,
+ BloomFilterCandidates: DefaultBloomFilterCandidates,
}
}
@@ -91,13 +111,18 @@ func DefaultColumnProperties() ColumnProperties {
type SortingColumn = format.SortingColumn
type writerPropConfig struct {
- wr *WriterProperties
- encodings map[string]Encoding
- codecs map[string]compress.Compression
- compressLevel map[string]int
- dictEnabled map[string]bool
- statsEnabled map[string]bool
- indexEnabled map[string]bool
+ wr *WriterProperties
+ encodings map[string]Encoding
+ codecs map[string]compress.Compression
+ compressLevel map[string]int
+ dictEnabled map[string]bool
+ statsEnabled map[string]bool
+ indexEnabled map[string]bool
+ bloomFilterNDVs map[string]int64
+ bloomFilterFPPs map[string]float64
+ bloomFilterEnabled map[string]bool
+ adaptiveBloomFilterEnabled map[string]bool
+ numBloomFilterCandidates map[string]int
}
// WriterProperty is used as the options for building a writer properties
instance
@@ -337,20 +362,142 @@ func WithPageIndexEnabledPath(path ColumnPath, enabled
bool) WriterProperty {
return WithPageIndexEnabledFor(path.String(), enabled)
}
+// WithMaxBloomFilterBytes sets the maximum size for a bloom filter, after
which
+// it is abandoned and not written to the file.
+func WithMaxBloomFilterBytes(nbytes int64) WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.wr.maxBloomFilterBytes = nbytes
+ }
+}
+
+// WithBloomFilterEnabled sets the default value for whether to enable writing
bloom
+// filters for columns. This is the default value for all columns, but can be
overridden
+// by using WithBloomFilterEnabledFor or WithBloomFilterEnabledPath.
+func WithBloomFilterEnabled(enabled bool) WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.wr.defColumnProps.BloomFilterEnabled = enabled
+ }
+}
+
+// WithBloomFilterEnabledFor specifies a per column value as to enable or
disable writing
+// bloom filters for the column.
+func WithBloomFilterEnabledFor(path string, enabled bool) WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.bloomFilterEnabled[path] = enabled
+ }
+}
+
+// WithBloomFilterEnabledPath is like WithBloomFilterEnabledFor, but takes a
ColumnPath
+func WithBloomFilterEnabledPath(path ColumnPath, enabled bool) WriterProperty {
+ return WithBloomFilterEnabledFor(path.String(), enabled)
+}
+
+// WithBloomFilterFPP sets the default value for the false positive
probability for writing
+// bloom filters.
+func WithBloomFilterFPP(fpp float64) WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.wr.defColumnProps.BloomFilterFPP = fpp
+ }
+}
+
+// WithBloomFilterFPPFor specifies a per column value for the false positive
probability
+// for writing bloom filters.
+func WithBloomFilterFPPFor(path string, fpp float64) WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.bloomFilterFPPs[path] = fpp
+ }
+}
+
+// WithBloomFilterFPPPath is like WithBloomFilterFPPFor, but takes a ColumnPath
+func WithBloomFilterFPPPath(path ColumnPath, fpp float64) WriterProperty {
+ return WithBloomFilterFPPFor(path.String(), fpp)
+}
+
+// WithAdaptiveBloomFilterEnabled sets the default value for whether to enable
writing
+// adaptive bloom filters for columns. This is the default value for all
columns,
+// but can be overridden by using WithAdaptiveBloomFilterEnabledFor or
+// WithAdaptiveBloomFilterEnabledPath.
+//
+// Using an Adaptive Bloom filter will attempt to use multiple candidate bloom
filters
+// when building the column, with different expected distinct values. It will
attempt
+// to use the smallest candidate bloom filter that achieves the desired false
positive
+// probability. Dropping candidates bloom filters that are no longer viable.
+func WithAdaptiveBloomFilterEnabled(enabled bool) WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.wr.defColumnProps.AdaptiveBloomFilterEnabled = enabled
+ }
+}
+
+// WithAdaptiveBloomFilterEnabledFor specifies a per column value as to enable
or disable writing
+// adaptive bloom filters for the column.
+func WithAdaptiveBloomFilterEnabledFor(path string, enabled bool)
WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.adaptiveBloomFilterEnabled[path] = enabled
+ }
+}
+
+// WithAdaptiveBloomFilterEnabledPath is like
WithAdaptiveBloomFilterEnabledFor, but takes a ColumnPath
+func WithAdaptiveBloomFilterEnabledPath(path ColumnPath, enabled bool)
WriterProperty {
+ return WithAdaptiveBloomFilterEnabledFor(path.String(), enabled)
+}
+
+// WithBloomFilterCandidates sets the number of candidate bloom filters to use
when building
+// an adaptive bloom filter.
+func WithBloomFilterCandidates(candidates int) WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.wr.defColumnProps.BloomFilterCandidates = candidates
+ }
+}
+
+// WithBloomFilterCandidatesFor specifies a per column value for the number of
candidate
+// bloom filters to use when building an adaptive bloom filter.
+func WithBloomFilterCandidatesFor(path string, candidates int) WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.numBloomFilterCandidates[path] = candidates
+ }
+}
+
+// WithBloomFilterCandidatesPath is like WithBloomFilterCandidatesFor, but
takes a ColumnPath
+func WithBloomFilterCandidatesPath(path ColumnPath, candidates int)
WriterProperty {
+ return WithBloomFilterCandidatesFor(path.String(), candidates)
+}
+
+// WithBloomFilterNDV sets the default value for the expected number of
distinct values
+// to be written for the column. This is ignored when using adaptive bloom
filters.
+func WithBloomFilterNDV(ndv int64) WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.wr.defColumnProps.BloomFilterNDV = ndv
+ }
+}
+
+// WithBloomFilterNDVFor specifies a per column value for the expected number
of distinct values
+// to be written for the column. This is ignored when using adaptive bloom
filters.
+func WithBloomFilterNDVFor(path string, ndv int64) WriterProperty {
+ return func(cfg *writerPropConfig) {
+ cfg.bloomFilterNDVs[path] = ndv
+ }
+}
+
+// WithBloomFilterNDVPath is like WithBloomFilterNDVFor, but takes a ColumnPath
+func WithBloomFilterNDVPath(path ColumnPath, ndv int64) WriterProperty {
+ return WithBloomFilterNDVFor(path.String(), ndv)
+}
+
// WriterProperties is the collection of properties to use for writing a
parquet file. The values are
// read only once it has been constructed.
type WriterProperties struct {
- mem memory.Allocator
- dictPagesize int64
- batchSize int64
- maxRowGroupLen int64
- pageSize int64
- parquetVersion Version
- createdBy string
- dataPageVersion DataPageVersion
- rootName string
- rootRepetition Repetition
- storeDecimalAsInt bool
+ mem memory.Allocator
+ dictPagesize int64
+ batchSize int64
+ maxRowGroupLen int64
+ pageSize int64
+ parquetVersion Version
+ createdBy string
+ dataPageVersion DataPageVersion
+ rootName string
+ rootRepetition Repetition
+ storeDecimalAsInt bool
+ maxBloomFilterBytes int64
defColumnProps ColumnProperties
columnProps map[string]*ColumnProperties
@@ -360,18 +507,19 @@ type WriterProperties struct {
func defaultWriterProperties() *WriterProperties {
return &WriterProperties{
- mem: memory.DefaultAllocator,
- dictPagesize: DefaultDictionaryPageSizeLimit,
- batchSize: DefaultWriteBatchSize,
- maxRowGroupLen: DefaultMaxRowGroupLen,
- pageSize: DefaultDataPageSize,
- parquetVersion: V2_LATEST,
- dataPageVersion: DataPageV1,
- createdBy: DefaultCreatedBy,
- rootName: DefaultRootName,
- rootRepetition: Repetitions.Repeated,
- defColumnProps: DefaultColumnProperties(),
- sortingCols: []SortingColumn{},
+ mem: memory.DefaultAllocator,
+ dictPagesize: DefaultDictionaryPageSizeLimit,
+ batchSize: DefaultWriteBatchSize,
+ maxRowGroupLen: DefaultMaxRowGroupLen,
+ pageSize: DefaultDataPageSize,
+ parquetVersion: V2_LATEST,
+ dataPageVersion: DataPageV1,
+ createdBy: DefaultCreatedBy,
+ rootName: DefaultRootName,
+ rootRepetition: Repetitions.Repeated,
+ maxBloomFilterBytes: DefaultMaxBloomFilterBytes,
+ defColumnProps: DefaultColumnProperties(),
+ sortingCols: []SortingColumn{},
}
}
@@ -381,23 +529,28 @@ func defaultWriterProperties() *WriterProperties {
//
// The Default properties use the following constants:
//
-// Allocator: memory.DefaultAllocator
+// Allocator: memory.DefaultAllocator
// DictionaryPageSize: DefaultDictionaryPageSizeLimit
-// BatchSize: DefaultWriteBatchSize
-// MaxRowGroupLength: DefaultMaxRowGroupLen
-// PageSize:
DefaultDataPageSize
-// ParquetVersion: V1
-// DataPageVersion: DataPageV1
-// CreatedBy: DefaultCreatedBy
+// BatchSize: DefaultWriteBatchSize
+// MaxRowGroupLength: DefaultMaxRowGroupLen
+// PageSize: DefaultDataPageSize
+// ParquetVersion: V2_LATEST
+// DataPageVersion: DataPageV1
+// CreatedBy: DefaultCreatedBy
func NewWriterProperties(opts ...WriterProperty) *WriterProperties {
cfg := writerPropConfig{
- wr: defaultWriterProperties(),
- encodings: make(map[string]Encoding),
- codecs: make(map[string]compress.Compression),
- compressLevel: make(map[string]int),
- dictEnabled: make(map[string]bool),
- statsEnabled: make(map[string]bool),
- indexEnabled: make(map[string]bool),
+ wr: defaultWriterProperties(),
+ encodings: make(map[string]Encoding),
+ codecs:
make(map[string]compress.Compression),
+ compressLevel: make(map[string]int),
+ dictEnabled: make(map[string]bool),
+ statsEnabled: make(map[string]bool),
+ indexEnabled: make(map[string]bool),
+ bloomFilterNDVs: make(map[string]int64),
+ bloomFilterFPPs: make(map[string]float64),
+ bloomFilterEnabled: make(map[string]bool),
+ adaptiveBloomFilterEnabled: make(map[string]bool),
+ numBloomFilterCandidates: make(map[string]int),
}
for _, o := range opts {
o(&cfg)
@@ -436,6 +589,27 @@ func NewWriterProperties(opts ...WriterProperty)
*WriterProperties {
for key, value := range cfg.indexEnabled {
get(key).PageIndexEnabled = value
}
+
+ for key, value := range cfg.bloomFilterEnabled {
+ get(key).BloomFilterEnabled = value
+ }
+
+ for key, value := range cfg.bloomFilterFPPs {
+ get(key).BloomFilterFPP = value
+ }
+
+ for key, value := range cfg.bloomFilterNDVs {
+ get(key).BloomFilterNDV = value
+ }
+
+ for key, value := range cfg.adaptiveBloomFilterEnabled {
+ get(key).AdaptiveBloomFilterEnabled = value
+ }
+
+ for key, value := range cfg.numBloomFilterCandidates {
+ get(key).BloomFilterCandidates = value
+ }
+
return cfg.wr
}
@@ -613,3 +787,98 @@ func (w *WriterProperties) ColumnEncryptionProperties(path
string) *ColumnEncryp
func (w *WriterProperties) StoreDecimalAsInteger() bool {
return w.storeDecimalAsInt
}
+
+// MaxBloomFilterBytes returns the maximum number of bytes that a bloom filter
can use
+func (w *WriterProperties) MaxBloomFilterBytes() int64 {
+ return w.maxBloomFilterBytes
+}
+
+// BloomFilterEnabled returns the default value for whether or not bloom
filters are enabled
+func (w *WriterProperties) BloomFilterEnabled() bool {
+ return w.defColumnProps.BloomFilterEnabled
+}
+
+// BloomFilterEnabledFor returns whether or not bloom filters are enabled for
the given column path
+func (w *WriterProperties) BloomFilterEnabledFor(path string) bool {
+ if p, ok := w.columnProps[path]; ok {
+ return p.BloomFilterEnabled
+ }
+ return w.defColumnProps.BloomFilterEnabled
+}
+
+// BloomFilterEnabledPath is the same as BloomFilterEnabledFor but takes a
ColumnPath
+func (w *WriterProperties) BloomFilterEnabledPath(path ColumnPath) bool {
+ return w.BloomFilterEnabledFor(path.String())
+}
+
+// BloomFilterFPP returns the default false positive probability for bloom
filters
+func (w *WriterProperties) BloomFilterFPP() float64 {
+ return w.defColumnProps.BloomFilterFPP
+}
+
+// BloomFilterFPPFor returns the false positive probability for the given
column path
+func (w *WriterProperties) BloomFilterFPPFor(path string) float64 {
+ if p, ok := w.columnProps[path]; ok {
+ return p.BloomFilterFPP
+ }
+ return w.defColumnProps.BloomFilterFPP
+}
+
+// BloomFilterFPPPath is the same as BloomFilterFPPFor but takes a ColumnPath
+func (w *WriterProperties) BloomFilterFPPPath(path ColumnPath) float64 {
+ return w.BloomFilterFPPFor(path.String())
+}
+
+// AdaptiveBloomFilterEnabled returns the default value for whether or not
adaptive bloom filters are enabled
+func (w *WriterProperties) AdaptiveBloomFilterEnabled() bool {
+ return w.defColumnProps.AdaptiveBloomFilterEnabled
+}
+
+// AdaptiveBloomFilterEnabledFor returns whether or not adaptive bloom filters
are enabled for the given column path
+func (w *WriterProperties) AdaptiveBloomFilterEnabledFor(path string) bool {
+ if p, ok := w.columnProps[path]; ok {
+ return p.AdaptiveBloomFilterEnabled
+ }
+ return w.defColumnProps.AdaptiveBloomFilterEnabled
+}
+
+// AdaptiveBloomFilterEnabledPath is the same as AdaptiveBloomFilterEnabledFor
but takes a ColumnPath
+func (w *WriterProperties) AdaptiveBloomFilterEnabledPath(path ColumnPath)
bool {
+ return w.AdaptiveBloomFilterEnabledFor(path.String())
+}
+
+// BloomFilterCandidates returns the default number of candidates to use for
bloom filters
+func (w *WriterProperties) BloomFilterCandidates() int {
+ return w.defColumnProps.BloomFilterCandidates
+}
+
+// BloomFilterCandidatesFor returns the number of candidates to use for the
given column path
+func (w *WriterProperties) BloomFilterCandidatesFor(path string) int {
+ if p, ok := w.columnProps[path]; ok {
+ return p.BloomFilterCandidates
+ }
+ return w.defColumnProps.BloomFilterCandidates
+}
+
+// BloomFilterCandidatesPath is the same as BloomFilterCandidatesFor but takes
a ColumnPath
+func (w *WriterProperties) BloomFilterCandidatesPath(path ColumnPath) int {
+ return w.BloomFilterCandidatesFor(path.String())
+}
+
+// BloomFilterNDV returns the default number of distinct values to use for
bloom filters
+func (w *WriterProperties) BloomFilterNDV() int64 {
+ return w.defColumnProps.BloomFilterNDV
+}
+
+// BloomFilterNDVFor returns the number of distinct values to use for the
given column path
+func (w *WriterProperties) BloomFilterNDVFor(path string) int64 {
+ if p, ok := w.columnProps[path]; ok {
+ return p.BloomFilterNDV
+ }
+ return w.defColumnProps.BloomFilterNDV
+}
+
+// BloomFilterNDVPath is the same as BloomFilterNDVFor but takes a ColumnPath
+func (w *WriterProperties) BloomFilterNDVPath(path ColumnPath) int64 {
+ return w.BloomFilterNDVFor(path.String())
+}