This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 8dc19979d5 GH-37306: [Go] Add binary dictionary unifier (#37309)
8dc19979d5 is described below
commit 8dc19979d588bb7408fafb374daf4aada11ce2b3
Author: Frederic Branczyk <[email protected]>
AuthorDate: Mon Aug 28 17:53:24 2023 +0200
GH-37306: [Go] Add binary dictionary unifier (#37309)
The "generic" dictionary unifier is fine for starting out, but causes
unnecessary CPU cycles because of the `getvalFn` returning `interface{}` and
the ramifications of it.
Fixes #37306
### Rationale for this change
See #37306
### What changes are included in this PR?
An optimized version of a unifier for binary dictionaries.
### Are these changes tested?
Tested by using it within [Parca](https://github.com/parca-dev/parca).
### Are there any user-facing changes?
This is a new API.
* Closes: #37306
Authored-by: Frederic Branczyk <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/arrow/array/dictionary.go | 129 ++++++++++++++++++++++++++++++++++++++
go/arrow/array/dictionary_test.go | 49 +++++++++++++++
2 files changed, 178 insertions(+)
diff --git a/go/arrow/array/dictionary.go b/go/arrow/array/dictionary.go
index 06978244d8..30bdc23c37 100644
--- a/go/arrow/array/dictionary.go
+++ b/go/arrow/array/dictionary.go
@@ -1674,6 +1674,135 @@ func (u *unifier) GetResultWithIndexType(indexType
arrow.DataType) (arrow.Array,
return MakeFromData(dictData), nil
}
+type binaryUnifier struct {
+ mem memory.Allocator
+ memoTable *hashing.BinaryMemoTable
+}
+
+// NewBinaryDictionaryUnifier constructs and returns a new dictionary unifier
for dictionaries
+// of binary values, using the provided allocator for allocating the unified
dictionary
+// and the memotable used for building it.
+func NewBinaryDictionaryUnifier(alloc memory.Allocator) DictionaryUnifier {
+ return &binaryUnifier{
+ mem: alloc,
+ memoTable: hashing.NewBinaryMemoTable(0, 0,
NewBinaryBuilder(alloc, arrow.BinaryTypes.Binary)),
+ }
+}
+
+func (u *binaryUnifier) Release() {
+ u.memoTable.Release()
+}
+
+func (u *binaryUnifier) Unify(dict arrow.Array) (err error) {
+ if !arrow.TypeEqual(arrow.BinaryTypes.Binary, dict.DataType()) {
+ return fmt.Errorf("dictionary type different from unifier: %s,
expected: %s", dict.DataType(), arrow.BinaryTypes.Binary)
+ }
+
+ typedDict := dict.(*Binary)
+ for i := 0; i < dict.Len(); i++ {
+ if dict.IsNull(i) {
+ u.memoTable.GetOrInsertNull()
+ continue
+ }
+
+ if _, _, err =
u.memoTable.GetOrInsertBytes(typedDict.Value(i)); err != nil {
+ return err
+ }
+ }
+ return
+}
+
+func (u *binaryUnifier) UnifyAndTranspose(dict arrow.Array) (transposed
*memory.Buffer, err error) {
+ if !arrow.TypeEqual(arrow.BinaryTypes.Binary, dict.DataType()) {
+ return nil, fmt.Errorf("dictionary type different from unifier:
%s, expected: %s", dict.DataType(), arrow.BinaryTypes.Binary)
+ }
+
+ transposed = memory.NewResizableBuffer(u.mem)
+ transposed.Resize(arrow.Int32Traits.BytesRequired(dict.Len()))
+
+ newIdxes := arrow.Int32Traits.CastFromBytes(transposed.Bytes())
+ typedDict := dict.(*Binary)
+ for i := 0; i < dict.Len(); i++ {
+ if dict.IsNull(i) {
+ idx, _ := u.memoTable.GetOrInsertNull()
+ newIdxes[i] = int32(idx)
+ continue
+ }
+
+ idx, _, err := u.memoTable.GetOrInsertBytes(typedDict.Value(i))
+ if err != nil {
+ transposed.Release()
+ return nil, err
+ }
+ newIdxes[i] = int32(idx)
+ }
+ return
+}
+
+func (u *binaryUnifier) GetResult() (outType arrow.DataType, outDict
arrow.Array, err error) {
+ dictLen := u.memoTable.Size()
+ var indexType arrow.DataType
+ switch {
+ case dictLen <= math.MaxInt8:
+ indexType = arrow.PrimitiveTypes.Int8
+ case dictLen <= math.MaxInt16:
+ indexType = arrow.PrimitiveTypes.Int16
+ case dictLen <= math.MaxInt32:
+ indexType = arrow.PrimitiveTypes.Int32
+ default:
+ indexType = arrow.PrimitiveTypes.Int64
+ }
+ outType = &arrow.DictionaryType{IndexType: indexType, ValueType:
arrow.BinaryTypes.Binary}
+
+ dictData, err := GetDictArrayData(u.mem, arrow.BinaryTypes.Binary,
u.memoTable, 0)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ u.memoTable.Reset()
+
+ defer dictData.Release()
+ outDict = MakeFromData(dictData)
+ return
+}
+
+func (u *binaryUnifier) GetResultWithIndexType(indexType arrow.DataType)
(arrow.Array, error) {
+ dictLen := u.memoTable.Size()
+ var toobig bool
+ switch indexType.ID() {
+ case arrow.UINT8:
+ toobig = dictLen > math.MaxUint8
+ case arrow.INT8:
+ toobig = dictLen > math.MaxInt8
+ case arrow.UINT16:
+ toobig = dictLen > math.MaxUint16
+ case arrow.INT16:
+ toobig = dictLen > math.MaxInt16
+ case arrow.UINT32:
+ toobig = uint(dictLen) > math.MaxUint32
+ case arrow.INT32:
+ toobig = dictLen > math.MaxInt32
+ case arrow.UINT64:
+ toobig = uint64(dictLen) > uint64(math.MaxUint64)
+ case arrow.INT64:
+ default:
+ return nil, fmt.Errorf("arrow/array: invalid dictionary index
type: %s, must be integral", indexType)
+ }
+ if toobig {
+ return nil, errors.New("arrow/array: cannot combine
dictionaries. unified dictionary requires a larger index type")
+ }
+
+ dictData, err := GetDictArrayData(u.mem, arrow.BinaryTypes.Binary,
u.memoTable, 0)
+ if err != nil {
+ return nil, err
+ }
+
+ u.memoTable.Reset()
+
+ defer dictData.Release()
+ return MakeFromData(dictData), nil
+}
+
func unifyRecursive(mem memory.Allocator, typ arrow.DataType, chunks []*Data)
(changed bool, err error) {
debug.Assert(len(chunks) != 0, "must provide non-zero length chunk
slice")
var extType arrow.DataType
diff --git a/go/arrow/array/dictionary_test.go
b/go/arrow/array/dictionary_test.go
index 4fb37127d6..d0878fa3b0 100644
--- a/go/arrow/array/dictionary_test.go
+++ b/go/arrow/array/dictionary_test.go
@@ -1422,6 +1422,55 @@ func TestDictionaryUnifierString(t *testing.T) {
checkTransposeMap(t, b2, []int32{2, 0})
}
+func TestDictionaryUnifierBinary(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ dictType := arrow.BinaryTypes.Binary
+ d1, _, err := array.FromJSON(mem, dictType,
strings.NewReader(`["Zm9vCg==", "YmFyCg=="]`)) // base64("foo\n"),
base64("bar\n")
+ require.NoError(t, err)
+ defer d1.Release()
+
+ d2, _, err := array.FromJSON(mem, dictType,
strings.NewReader(`["cXV1eAo=", "Zm9vCg=="]`)) // base64("quux\n"),
base64("foo\n")
+ require.NoError(t, err)
+ defer d2.Release()
+
+ expected := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int8,
ValueType: dictType}
+ expectedDict, _, _ := array.FromJSON(mem, dictType,
strings.NewReader(`["Zm9vCg==", "YmFyCg==", "cXV1eAo="]`))
+ defer expectedDict.Release()
+
+ unifier := array.NewBinaryDictionaryUnifier(mem)
+ defer unifier.Release()
+
+ assert.NoError(t, unifier.Unify(d1))
+ assert.NoError(t, unifier.Unify(d2))
+ outType, outDict, err := unifier.GetResult()
+ assert.NoError(t, err)
+ defer outDict.Release()
+
+ assert.Truef(t, arrow.TypeEqual(expected, outType), "got: %s, expected:
%s", outType, expected)
+ assert.Truef(t, array.Equal(expectedDict, outDict), "got: %s, expected:
%s", outDict, expectedDict)
+
+ b1, err := unifier.UnifyAndTranspose(d1)
+ assert.NoError(t, err)
+ b2, err := unifier.UnifyAndTranspose(d2)
+ assert.NoError(t, err)
+
+ outType, outDict, err = unifier.GetResult()
+ assert.NoError(t, err)
+ defer func() {
+ outDict.Release()
+ b1.Release()
+ b2.Release()
+ }()
+
+ assert.Truef(t, arrow.TypeEqual(expected, outType), "got: %s, expected:
%s", outType, expected)
+ assert.Truef(t, array.Equal(expectedDict, outDict), "got: %s, expected:
%s", outDict, expectedDict)
+
+ checkTransposeMap(t, b1, []int32{0, 1})
+ checkTransposeMap(t, b2, []int32{2, 0})
+}
+
func TestDictionaryUnifierFixedSizeBinary(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)