Patzifist commented on PR #864:
URL: https://github.com/apache/arrow-go/pull/864#issuecomment-4848546250
Test for original check
go test -bench=BenchmarkGetColumnBloomFilter_OriginalSyncPool -run=^$
-memprofile=mem.out
goos: linux
goarch: amd64
pkg: github.com/apache/arrow-go/v18/parquet/metadata
cpu: 13th Gen Intel(R) Core(TM) i7-1355U
BenchmarkGetColumnBloomFilter_OriginalSyncPool-12 1
1108550680 ns/op 5550647112 B/op 225956 allocs/op
PASS
ok github.com/apache/arrow-go/v18/parquet/metadata 1.262s
```
package metadata
import (
"bytes"
"context"
"math/rand"
"sync"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
"github.com/apache/arrow-go/v18/parquet/internal/thrift"
"github.com/apache/arrow-go/v18/parquet/schema"
)
type fakeReader struct {
*bytes.Reader
}
func (f *fakeReader) ReadAt(p []byte, off int64) (int, error) {
return f.Reader.ReadAt(p, off)
}
func BenchmarkGetColumnBloomFilter_OriginalSyncPool(b *testing.B) {
ctx := context.Background()
const (
workersCount = 64
iterationsPerWorker = 100
maxSize = 4 * 1024 * 1024
)
originalArrowPool := &sync.Pool{
New: func() any {
return
memory.NewResizableBuffer(memory.NewGoAllocator())
},
}
node, _ := schema.NewPrimitiveNode("test_col",
parquet.Repetition(format.FieldRepetitionType_REQUIRED),
parquet.Type(format.Type_BYTE_ARRAY), -1, -1)
rootGroup, _ := schema.NewGroupNode("schema",
parquet.Repetition(format.FieldRepetitionType_REPEATED),
schema.FieldList{node}, -1)
sc := schema.NewSchema(rootGroup)
payload := make([]byte, maxSize)
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
var wg sync.WaitGroup
startSignal := make(chan struct{})
for w := 0; w < workersCount; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
<-startSignal
r :=
rand.New(rand.NewSource(time.Now().UnixNano() + int64(workerID)))
threadRdr := &RowGroupBloomFilterReader{
input: &fakeReader{Reader:
bytes.NewReader(payload)},
sourceFileSize: int64(len(payload)),
bufferPool: originalArrowPool,
}
for i := 0; i < iterationsPerWorker; i++ {
bloomFilterDataSize :=
int32(1*1024*1024 + r.Intn(200*1024))
bloomFilterReadSize :=
bloomFilterDataSize + 4096
header := format.BloomFilterHeader{
NumBytes:
bloomFilterDataSize,
Algorithm: &defaultAlgorithm,
Hash:
&defaultHashStrategy,
Compression:
&defaultCompression,
}
serializer :=
thrift.NewThriftSerializer()
headerBytes, _ := serializer.Write(ctx,
&header)
var offset int64 = 8
copy(payload[offset:], headerBytes)
columnMetaData := format.ColumnMetaData{
Type:
format.Type_BYTE_ARRAY,
PathInSchema:
[]string{"test_col"},
Codec:
format.CompressionCodec_UNCOMPRESSED,
BloomFilterOffset: &offset,
BloomFilterLength:
&bloomFilterReadSize,
}
thriftColumnChunk :=
format.ColumnChunk{MetaData: &columnMetaData}
thriftRowGroup :=
format.RowGroup{Columns: []*format.ColumnChunk{&thriftColumnChunk}}
meta :=
NewRowGroupMetaData(&thriftRowGroup, sc, nil, nil)
threadRdr.rgMeta = meta
bf, err :=
threadRdr.GetColumnBloomFilter(0)
if err != nil {
return
}
_ = bf
}
}(w)
}
close(startSignal)
wg.Wait()
}
}
```
go tool pprof -http=:8080 mem.out
github.com/apache/arrow-go/v18/parquet/metadata.(*RowGroupBloomFilterReader).GetColumnBloomFilter
/home/alekssmirnov/GolandProjects/arrow-go/parquet/metadata/bloom_filter.go
Total: 1.50MB 5.16GB (flat, cum) 99.56%
490 . . compression:
*header.Compression,
491 . . }, nil
492 . . }
493 . .
494 . . headerBuf :=
r.bufferPool.Get().(*memory.Buffer)
495 . 4.06GB
headerBuf.ResizeNoShrink(int(bloomFilterReadSize))
496 . . defer func() {
497 . . if headerBuf != nil {
498 . .
headerBuf.ResizeNoShrink(0)
499 . .
r.bufferPool.Put(headerBuf)
500 . . }
501 . . }()
502 . .
503 . . if _, err =
sectionRdr.Read(headerBuf.Bytes()); err != nil {
504 . . return nil, err
505 . . }
506 . .
507 . 2MB remaining, err :=
thrift.DeserializeThrift(&header, headerBuf.Bytes())
508 . . if err != nil {
509 . . return nil, err
510 . . }
511 . . headerSize :=
len(headerBuf.Bytes()) - int(remaining)
512 . .
513 . . if err =
validateBloomFilterHeader(&header); err != nil {
514 . . return nil, err
515 . . }
516 . .
517 . . bloomFilterSz :=
header.NumBytes
518 . . var bitset []byte
519 . . if
int(bloomFilterSz)+headerSize <= len(headerBuf.Bytes()) {
520 . . // bloom filter data is
entirely contained in the buffer we just read
521 . . bitset =
headerBuf.Bytes()[headerSize : headerSize+int(bloomFilterSz)]
522 . . } else {
523 . . buf :=
r.bufferPool.Get().(*memory.Buffer)
524 . 1.09GB
buf.ResizeNoShrink(int(bloomFilterSz))
525 . . filterBytesInHeader :=
headerBuf.Len() - headerSize
526 . . if filterBytesInHeader
> 0 {
527 . .
copy(buf.Bytes(), headerBuf.Bytes()[headerSize:])
528 . . }
529 . .
530 . . if _, err =
sectionRdr.Read(buf.Bytes()[filterBytesInHeader:]); err != nil {
531 . . return nil, err
532 . . }
533 . . bitset = buf.Bytes()
534 . .
headerBuf.ResizeNoShrink(0)
535 . .
r.bufferPool.Put(headerBuf)
536 . . headerBuf = buf
537 . . }
538 . .
539 1.50MB 1.50MB bf := &blockSplitBloomFilter{
540 . . data:
headerBuf,
541 . . bitset32:
arrow.GetData[uint32](bitset),
542 . . hasher:
xxhasher{},
543 . . algorithm:
*header.Algorithm,
544 . . hashStrategy:
*header.Hash,
545 . . compression:
*header.Compression,
546 . . }
547 . . headerBuf = nil
548 . 1.50MB addCleanup(bf, r.bufferPool)
549 . . return bf, nil
550 . . }
551 . .
552 . . type FileBloomFilterBuilder struct
{
553 . . Schema *schema.Schema
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]