zeroshade commented on code in PR #864:
URL: https://github.com/apache/arrow-go/pull/864#discussion_r3507538694


##########
parquet/metadata/bloom_filter.go:
##########
@@ -549,6 +551,34 @@ func (r *RowGroupBloomFilterReader) GetColumnBloomFilter(i 
int) (BloomFilter, er
        return bf, nil
 }
 
+func (r *RowGroupBloomFilterReader) VisitColumnBloomFilter(i int, fn 
func(BloomFilter) error) error {
+       bf, err := r.GetColumnBloomFilter(i)
+       if err != nil || bf == nil {
+               return err
+       }
+
+       defer r.recycle(bf)
+
+       return fn(bf)
+}
+
+func (r *RowGroupBloomFilterReader) recycle(bf BloomFilter) {
+       if b, ok := bf.(*blockSplitBloomFilter); ok && b.data != nil {
+               if b.cancelCleanup != nil {
+                       b.cancelCleanup()
+               }
+
+               b.data.Reset(b.data.Buf()[:cap(b.data.Buf())])
+
+               b.bitset32 = nil
+
+               if r.bufferPool != nil {
+                       r.bufferPool.Put(b.data)

Review Comment:
   **[blocker] This returns the encrypted-path buffer to the pool.**
   
   `recycle` unconditionally `Put`s `b.data` back into `r.bufferPool`, but the 
encrypted branch of `GetColumnBloomFilter` (above) builds the filter with 
`data: memory.NewBufferBytes(bitset)` — a buffer with a **nil allocator** and 
no `cancelCleanup`. So `VisitColumnBloomFilter` on an encrypted column will:
   
   1. push a foreign buffer holding **decrypted plaintext** into the reader's 
general-purpose buffer pool, and
   2. **panic** on the next reuse that grows it (`ResizeNoShrink` → `Reserve` → 
`b.mem.Allocate` on a nil allocator).
   
   Reproducer — drop into `package metadata` and run `go test -run 
TestRecycleEncryptedPathPool -v ./parquet/metadata/`:
   
   ```go
   import (
        "sync"
        "testing"
   
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/memory"
   )
   
   func TestRecycleEncryptedPathPool(t *testing.T) {
        pool := &sync.Pool{New: func() any { return 
memory.NewResizableBuffer(memory.NewGoAllocator()) }}
        r := &RowGroupBloomFilterReader{bufferPool: pool}
   
        // Mirror the encrypted path: data wrapped by NewBufferBytes (nil 
allocator), no cancelCleanup.
        bitset := make([]byte, 1024)
        bf := &blockSplitBloomFilter{
                data:     memory.NewBufferBytes(bitset),
                bitset32: arrow.Uint32Traits.CastFromBytes(bitset),
        }
   
        r.recycle(bf) // pushes the nil-allocator buffer into the pool
   
        got := pool.Get().(*memory.Buffer)
        if got.Mutable() {
                t.Skip("pool returned a fresh buffer instead of the recycled 
one; rerun")
        }
        got.ResizeNoShrink(2 * 1024 * 1024) // grow -> Reserve -> 
b.mem.Allocate on nil -> panic
   }
   ```
   
   Output on this branch:
   ```
   --- FAIL: TestRecycleEncryptedPathPool (panic: runtime error: invalid memory 
address or nil pointer dereference)
   ```
   
   Suggested fix: only recycle a pool-owned buffer — e.g. gate on 
`b.cancelCleanup != nil` (the encrypted path never registers a cleanup), or 
verify the buffer is allocator-backed, or have the encrypted branch `Release()` 
its buffer instead of being eligible for `recycle`.
   



##########
parquet/metadata/bloom_filter_benchmark_test.go:
##########
@@ -0,0 +1,157 @@
+package metadata

Review Comment:
   **[blocker] Missing the ASF license header.** Every source file in the repo 
carries the 15-line Apache header, and CI enforces it via 
`dev/release/run_rat.sh` (Apache RAT) plus the pre-commit hook — this file will 
fail both. Please copy the header from any sibling file (e.g. 
`bloom_filter_reader_test.go`).
   



##########
parquet/metadata/bloom_filter_benchmark_test.go:
##########
@@ -0,0 +1,157 @@
+package metadata
+
+import (
+       "bytes"
+       "context"
+       "math/rand"
+       "sync"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet"
+       format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
+       "github.com/apache/arrow-go/v18/parquet/internal/thrift"
+       "github.com/apache/arrow-go/v18/parquet/schema"
+)
+
+type fakeReader struct {
+       *bytes.Reader
+}
+
+func (f *fakeReader) ReadAt(p []byte, off int64) (int, error) {
+       return f.Reader.ReadAt(p, off)
+}
+
+type testHarness struct {
+       payload        []byte
+       sourceFileSize int64
+       metaList       []*RowGroupMetaData
+       metaMask       int
+}
+
+func prepareTestHarness(b *testing.B) *testHarness {
+       ctx := context.Background()
+       const maxSize = 4 * 1024 * 1024
+
+       node, _ := schema.NewPrimitiveNode("test_col", 
parquet.Repetition(format.FieldRepetitionType_REQUIRED), 
parquet.Type(format.Type_BYTE_ARRAY), -1, -1)
+       rootGroup, _ := schema.NewGroupNode("schema", 
parquet.Repetition(format.FieldRepetitionType_REPEATED), 
schema.FieldList{node}, -1)
+       sc := schema.NewSchema(rootGroup)
+
+       const metaCount = 128
+       metaList := make([]*RowGroupMetaData, metaCount)
+       payload := make([]byte, maxSize)
+       var offset int64 = 8
+
+       rnd := rand.New(rand.NewSource(42))
+       for i := 0; i < metaCount; i++ {
+               bloomFilterDataSize := int32(1*1024*1024 + rnd.Intn(200*1024))
+               bloomFilterReadSize := bloomFilterDataSize + 4096
+
+               header := format.BloomFilterHeader{
+                       NumBytes:    bloomFilterDataSize,
+                       Algorithm:   &defaultAlgorithm,
+                       Hash:        &defaultHashStrategy,
+                       Compression: &defaultCompression,
+               }
+
+               serializer := thrift.NewThriftSerializer()
+               headerBytes, _ := serializer.Write(ctx, &header)
+               copy(payload[offset:], headerBytes)

Review Comment:
   Minor benchmark-fidelity note: `offset` is a single variable fixed at 8 for 
the whole loop (and every `columnMetaData.BloomFilterOffset` takes `&offset`), 
so each iteration copies its header over the previous one at the same position. 
At run time every `metaList[k]` read deserializes the *last* header written 
here, i.e. one fixed filter size — the 128 "random NDV size" metas don't 
actually vary the bytes read. If the intent is to exercise varying sizes, give 
each filter its own offset in `payload` and point that meta's 
`BloomFilterOffset` at it.
   



##########
parquet/metadata/bloom_filter.go:
##########
@@ -549,6 +551,34 @@ func (r *RowGroupBloomFilterReader) GetColumnBloomFilter(i 
int) (BloomFilter, er
        return bf, nil
 }
 
+func (r *RowGroupBloomFilterReader) VisitColumnBloomFilter(i int, fn 
func(BloomFilter) error) error {

Review Comment:
   Please add a doc comment stating the lifetime contract: the `BloomFilter` 
passed to `fn` (and its backing bitset) is recycled the moment `fn` returns, so 
callers must not retain or use it afterward. It's good that `recycle` nils 
`bitset32`/`data` so misuse fails fast rather than reading a recycled buffer — 
but for a public method that contract should be explicit.
   



##########
parquet/metadata/bloom_filter_benchmark_test.go:
##########
@@ -0,0 +1,157 @@
+package metadata
+
+import (
+       "bytes"
+       "context"
+       "math/rand"
+       "sync"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet"
+       format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
+       "github.com/apache/arrow-go/v18/parquet/internal/thrift"
+       "github.com/apache/arrow-go/v18/parquet/schema"
+)
+
+type fakeReader struct {
+       *bytes.Reader
+}
+
+func (f *fakeReader) ReadAt(p []byte, off int64) (int, error) {
+       return f.Reader.ReadAt(p, off)
+}
+
+type testHarness struct {
+       payload        []byte
+       sourceFileSize int64
+       metaList       []*RowGroupMetaData
+       metaMask       int
+}
+
+func prepareTestHarness(b *testing.B) *testHarness {
+       ctx := context.Background()
+       const maxSize = 4 * 1024 * 1024
+
+       node, _ := schema.NewPrimitiveNode("test_col", 
parquet.Repetition(format.FieldRepetitionType_REQUIRED), 
parquet.Type(format.Type_BYTE_ARRAY), -1, -1)
+       rootGroup, _ := schema.NewGroupNode("schema", 
parquet.Repetition(format.FieldRepetitionType_REPEATED), 
schema.FieldList{node}, -1)
+       sc := schema.NewSchema(rootGroup)
+
+       const metaCount = 128
+       metaList := make([]*RowGroupMetaData, metaCount)
+       payload := make([]byte, maxSize)
+       var offset int64 = 8
+
+       rnd := rand.New(rand.NewSource(42))
+       for i := 0; i < metaCount; i++ {
+               bloomFilterDataSize := int32(1*1024*1024 + rnd.Intn(200*1024))
+               bloomFilterReadSize := bloomFilterDataSize + 4096
+
+               header := format.BloomFilterHeader{
+                       NumBytes:    bloomFilterDataSize,
+                       Algorithm:   &defaultAlgorithm,
+                       Hash:        &defaultHashStrategy,
+                       Compression: &defaultCompression,
+               }
+
+               serializer := thrift.NewThriftSerializer()
+               headerBytes, _ := serializer.Write(ctx, &header)
+               copy(payload[offset:], headerBytes)
+
+               columnMetaData := format.ColumnMetaData{
+                       Type:              format.Type_BYTE_ARRAY,
+                       PathInSchema:      []string{"test_col"},
+                       Codec:             format.CompressionCodec_UNCOMPRESSED,
+                       BloomFilterOffset: &offset,
+                       BloomFilterLength: &bloomFilterReadSize,
+               }
+               thriftColumnChunk := format.ColumnChunk{MetaData: 
&columnMetaData}
+               thriftRowGroup := format.RowGroup{Columns: 
[]*format.ColumnChunk{&thriftColumnChunk}}
+               metaList[i] = NewRowGroupMetaData(&thriftRowGroup, sc, nil, nil)
+       }
+
+       return &testHarness{
+               payload:        payload,
+               sourceFileSize: int64(len(payload)),
+               metaList:       metaList,
+               metaMask:       metaCount - 1,
+       }
+}
+
+func BenchmarkVisitColumnBloomFilter_SyncPool(b *testing.B) {
+       h := prepareTestHarness(b)
+
+       originalArrowPool := &sync.Pool{
+               New: func() any {
+                       return 
memory.NewResizableBuffer(memory.NewGoAllocator())
+               },
+       }
+
+       b.ResetTimer()
+       b.ReportAllocs()
+
+       b.RunParallel(func(pb *testing.PB) {
+               threadRdr := &RowGroupBloomFilterReader{
+                       input:          &fakeReader{Reader: 
bytes.NewReader(h.payload)},
+                       sourceFileSize: h.sourceFileSize,
+                       bufferPool:     originalArrowPool,
+               }
+
+               seq := rand.Intn(h.metaMask)
+
+               for pb.Next() {
+                       threadRdr.rgMeta = h.metaList[seq&h.metaMask]
+                       seq++
+
+                       err := threadRdr.VisitColumnBloomFilter(0, func(bf 
BloomFilter) error {
+                               if bf == nil || bf.Size() <= 0 {
+                                       b.Fatal("bloom filter read path did not 
run or returned empty bitset")
+                               }
+                               _ = bf.Hasher()
+                               return nil
+                       })
+                       if err != nil {
+                               b.Fatal(err)
+                       }
+               }
+       })
+}
+
+/*

Review Comment:
   Please remove this commented-out `_Channels` benchmark. It references APIs 
that were dropped from this PR (`parquet.NewBloomFilterPools`, 
`WithSharedBloomFilterPools`), so it's dead code that can't compile if 
uncommented.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to