This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 90071ccd67 GH-34171: [Go][Compute] Implement "Unique" kernel (#34172)
90071ccd67 is described below
commit 90071ccd67fb7a81dacbb1966d881b7705d62b97
Author: Matt Topol <[email protected]>
AuthorDate: Tue Feb 14 13:45:19 2023 -0500
GH-34171: [Go][Compute] Implement "Unique" kernel (#34172)
### Rationale for this change
Implementing a kernel for computing the "unique" values in an arrow array,
primarily for use in solving #33466.
### What changes are included in this PR?
Adds a "unique" function to the compute list and helper convenience
functions.
### Are these changes tested?
Yes, unit tests are included.
### Are there any user-facing changes?
Just the new available functions.
* Closes: #34171
Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/arrow/array/dictionary.go | 6 +-
go/arrow/array/util.go | 13 +-
go/arrow/compute/executor.go | 12 +-
go/arrow/compute/internal/exec/span.go | 42 +-
go/arrow/compute/internal/exec/utils.go | 8 +-
go/arrow/compute/internal/kernels/types.go | 3 +
go/arrow/compute/internal/kernels/vector_hash.go | 560 +++++++++++++++++++++++
go/arrow/compute/registry.go | 1 +
go/arrow/compute/vector_hash.go | 59 +++
go/arrow/compute/vector_hash_test.go | 518 +++++++++++++++++++++
go/internal/hashing/xxh3_memo_table.go | 36 +-
11 files changed, 1227 insertions(+), 31 deletions(-)
diff --git a/go/arrow/array/dictionary.go b/go/arrow/array/dictionary.go
index dd342113d6..9cbcfaccfe 100644
--- a/go/arrow/array/dictionary.go
+++ b/go/arrow/array/dictionary.go
@@ -771,7 +771,7 @@ func (b *dictionaryBuilder) newWithDictOffset(offset int)
(indices, dict *Data,
indices.Retain()
b.deltaOffset = b.memoTable.Size()
- dict, err = getDictArrayData(b.mem, b.dt.ValueType, b.memoTable, offset)
+ dict, err = GetDictArrayData(b.mem, b.dt.ValueType, b.memoTable, offset)
b.reset()
return
}
@@ -1471,7 +1471,7 @@ func (u *unifier) GetResult() (outType arrow.DataType,
outDict arrow.Array, err
}
outType = &arrow.DictionaryType{IndexType: indexType, ValueType:
u.valueType}
- dictData, err := getDictArrayData(u.mem, u.valueType, u.memoTable, 0)
+ dictData, err := GetDictArrayData(u.mem, u.valueType, u.memoTable, 0)
if err != nil {
return nil, nil, err
}
@@ -1509,7 +1509,7 @@ func (u *unifier) GetResultWithIndexType(indexType
arrow.DataType) (arrow.Array,
return nil, errors.New("arrow/array: cannot combine
dictionaries. unified dictionary requires a larger index type")
}
- dictData, err := getDictArrayData(u.mem, u.valueType, u.memoTable, 0)
+ dictData, err := GetDictArrayData(u.mem, u.valueType, u.memoTable, 0)
if err != nil {
return nil, err
}
diff --git a/go/arrow/array/util.go b/go/arrow/array/util.go
index b246b0e9ef..8f7bf40734 100644
--- a/go/arrow/array/util.go
+++ b/go/arrow/array/util.go
@@ -250,7 +250,7 @@ func TableFromJSON(mem memory.Allocator, sc *arrow.Schema,
recJSON []string, opt
return NewTableFromRecords(sc, batches), nil
}
-func getDictArrayData(mem memory.Allocator, valueType arrow.DataType,
memoTable hashing.MemoTable, startOffset int) (*Data, error) {
+func GetDictArrayData(mem memory.Allocator, valueType arrow.DataType,
memoTable hashing.MemoTable, startOffset int) (*Data, error) {
dictLen := memoTable.Size() - startOffset
buffers := []*memory.Buffer{nil, nil}
@@ -272,6 +272,17 @@ func getDictArrayData(mem memory.Allocator, valueType
arrow.DataType, memoTable
offsets :=
arrow.Int32Traits.CastFromBytes(buffers[1].Bytes())
tbl.CopyOffsetsSubset(startOffset, offsets)
+ valuesz := offsets[len(offsets)-1] - offsets[0]
+ buffers[2].Resize(int(valuesz))
+ tbl.CopyValuesSubset(startOffset, buffers[2].Bytes())
+ case arrow.LARGE_BINARY, arrow.LARGE_STRING:
+ buffers = append(buffers,
memory.NewResizableBuffer(mem))
+ defer buffers[2].Release()
+
+
buffers[1].Resize(arrow.Int64Traits.BytesRequired(dictLen + 1))
+ offsets :=
arrow.Int64Traits.CastFromBytes(buffers[1].Bytes())
+ tbl.CopyLargeOffsetsSubset(startOffset, offsets)
+
valuesz := offsets[len(offsets)-1] - offsets[0]
buffers[2].Resize(int(valuesz))
tbl.CopyValuesSubset(startOffset, buffers[2].Bytes())
diff --git a/go/arrow/compute/executor.go b/go/arrow/compute/executor.go
index 3f385d56e4..96cbdf4055 100644
--- a/go/arrow/compute/executor.go
+++ b/go/arrow/compute/executor.go
@@ -962,10 +962,20 @@ func (v *vectorExecutor) Execute(ctx context.Context,
batch *ExecBatch, data cha
func (v *vectorExecutor) WrapResults(ctx context.Context, out <-chan Datum,
hasChunked bool) Datum {
// if kernel doesn't output chunked, just grab the one output and
return it
if !v.kernel.(*exec.VectorKernel).OutputChunked {
+ var output Datum
select {
case <-ctx.Done():
return nil
- case output := <-out:
+ case output = <-out:
+ }
+
+ // we got an output datum, but let's wait for the channel to
+ // close so we don't have any race conditions
+ select {
+ case <-ctx.Done():
+ output.Release()
+ return nil
+ case <-out:
return output
}
}
diff --git a/go/arrow/compute/internal/exec/span.go
b/go/arrow/compute/internal/exec/span.go
index 646fdcb23d..4221d6c08d 100644
--- a/go/arrow/compute/internal/exec/span.go
+++ b/go/arrow/compute/internal/exec/span.go
@@ -437,6 +437,12 @@ func (a *ArraySpan) FillFromScalar(val scalar.Scalar) {
}
}
+func (a *ArraySpan) SetDictionary(span *ArraySpan) {
+ a.resizeChildren(1)
+ a.Children[0].Release()
+ a.Children[0] = *span
+}
+
// TakeOwnership is like SetMembers only this takes ownership of
// the buffers by calling Retain on them so that the passed in
// ArrayData can be released without negatively affecting this
@@ -479,18 +485,13 @@ func (a *ArraySpan) TakeOwnership(data arrow.ArrayData) {
}
if typeID == arrow.DICTIONARY {
- if cap(a.Children) >= 1 {
- a.Children = a.Children[:1]
- } else {
- a.Children = make([]ArraySpan, 1)
+ a.resizeChildren(1)
+ dict := data.Dictionary()
+ if dict != (*array.Data)(nil) {
+ a.Children[0].TakeOwnership(dict)
}
- a.Children[0].TakeOwnership(data.Dictionary())
} else {
- if cap(a.Children) >= len(data.Children()) {
- a.Children = a.Children[:len(data.Children())]
- } else {
- a.Children = make([]ArraySpan, len(data.Children()))
- }
+ a.resizeChildren(len(data.Children()))
for i, c := range data.Children() {
a.Children[i].TakeOwnership(c)
}
@@ -538,12 +539,11 @@ func (a *ArraySpan) SetMembers(data arrow.ArrayData) {
}
if typeID == arrow.DICTIONARY {
- if cap(a.Children) >= 1 {
- a.Children = a.Children[:1]
- } else {
- a.Children = make([]ArraySpan, 1)
+ a.resizeChildren(1)
+ dict := data.Dictionary()
+ if dict != (*array.Data)(nil) {
+ a.Children[0].SetMembers(dict)
}
- a.Children[0].SetMembers(data.Dictionary())
} else {
if cap(a.Children) >= len(data.Children()) {
a.Children = a.Children[:len(data.Children())]
@@ -619,6 +619,12 @@ func FillZeroLength(dt arrow.DataType, span *ArraySpan) {
span.Buffers[i].Buf, span.Buffers[i].Owner = nil, nil
}
+ if dt.ID() == arrow.DICTIONARY {
+ span.resizeChildren(1)
+ FillZeroLength(dt.(*arrow.DictionaryType).ValueType,
&span.Children[0])
+ return
+ }
+
nt, ok := dt.(arrow.NestedType)
if !ok {
if len(span.Children) > 0 {
@@ -627,11 +633,7 @@ func FillZeroLength(dt arrow.DataType, span *ArraySpan) {
return
}
- if cap(span.Children) >= len(nt.Fields()) {
- span.Children = span.Children[:len(nt.Fields())]
- } else {
- span.Children = make([]ArraySpan, len(nt.Fields()))
- }
+ span.resizeChildren(len(nt.Fields()))
for i, f := range nt.Fields() {
FillZeroLength(f.Type, &span.Children[i])
}
diff --git a/go/arrow/compute/internal/exec/utils.go
b/go/arrow/compute/internal/exec/utils.go
index fea29dd08f..0e4cdf6594 100644
--- a/go/arrow/compute/internal/exec/utils.go
+++ b/go/arrow/compute/internal/exec/utils.go
@@ -102,6 +102,11 @@ func GetValues[T FixedWidthTypes](data arrow.ArrayData, i
int) []T {
return ret[data.Offset():]
}
+func GetOffsets[T int32 | int64](data arrow.ArrayData, i int) []T {
+ ret :=
unsafe.Slice((*T)(unsafe.Pointer(&data.Buffers()[i].Bytes()[0])),
data.Offset()+data.Len()+1)
+ return ret[data.Offset():]
+}
+
// GetSpanValues returns a properly typed slice by reinterpreting
// the buffer at index i using unsafe.Slice. This will take into account
// the offset of the given ArraySpan.
@@ -177,13 +182,14 @@ var typMap = map[reflect.Type]arrow.DataType{
reflect.TypeOf(arrow.Date64(0)): arrow.FixedWidthTypes.Date64,
reflect.TypeOf(true): arrow.FixedWidthTypes.Boolean,
reflect.TypeOf(float16.Num{}): arrow.FixedWidthTypes.Float16,
+ reflect.TypeOf([]byte{}): arrow.BinaryTypes.Binary,
}
// GetDataType returns the appropriate arrow.DataType for the given type T
// only for non-parametric types. This uses a map and reflection internally
// so don't call this in a tight loop, instead call this once and then use
// a closure with the result.
-func GetDataType[T NumericTypes | bool | string | float16.Num]()
arrow.DataType {
+func GetDataType[T NumericTypes | bool | string | []byte | float16.Num]()
arrow.DataType {
var z T
return typMap[reflect.TypeOf(z)]
}
diff --git a/go/arrow/compute/internal/kernels/types.go
b/go/arrow/compute/internal/kernels/types.go
index c9e3e2e070..d1fdfeb6ee 100644
--- a/go/arrow/compute/internal/kernels/types.go
+++ b/go/arrow/compute/internal/kernels/types.go
@@ -52,6 +52,9 @@ var (
arrow.BinaryTypes.LargeBinary,
arrow.BinaryTypes.String,
arrow.BinaryTypes.LargeString}
+ primitiveTypes = append(append([]arrow.DataType{arrow.Null,
+ arrow.FixedWidthTypes.Date32, arrow.FixedWidthTypes.Date64},
+ numericTypes...), baseBinaryTypes...)
)
//go:generate stringer -type=CompareOperator -linecomment
diff --git a/go/arrow/compute/internal/kernels/vector_hash.go
b/go/arrow/compute/internal/kernels/vector_hash.go
new file mode 100644
index 0000000000..7f9c908e6f
--- /dev/null
+++ b/go/arrow/compute/internal/kernels/vector_hash.go
@@ -0,0 +1,560 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package kernels
+
+import (
+ "fmt"
+
+ "github.com/apache/arrow/go/v12/arrow"
+ "github.com/apache/arrow/go/v12/arrow/array"
+ "github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
+ "github.com/apache/arrow/go/v12/arrow/internal/debug"
+ "github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/bitutils"
+ "github.com/apache/arrow/go/v12/internal/hashing"
+)
+
+type HashState interface {
+ // Reset for another run
+ Reset() error
+ // Flush out accumulated results from last invocation
+ Flush(*exec.ExecResult) error
+ // FlushFinal flushes the accumulated results across all invocations
+ // of calls. The kernel should not be used again until after
+ // Reset() is called.
+ FlushFinal(out *exec.ExecResult) error
+ // GetDictionary returns the values (keys) accumulated in the dictionary
+ // so far.
+ GetDictionary() (arrow.ArrayData, error)
+ ValueType() arrow.DataType
+ // Append prepares the action for the given input (reserving
appropriately
+ // sized data structures, etc.) and visits the input with the Action
+ Append(*exec.KernelCtx, *exec.ArraySpan) error
+ Allocator() memory.Allocator
+}
+
+type Action interface {
+ Reset() error
+ Reserve(int) error
+ Flush(*exec.ExecResult) error
+ FlushFinal(*exec.ExecResult) error
+ ObserveFound(int)
+ ObserveNotFound(int) error
+ ObserveNullFound(int)
+ ObserveNullNotFound(int) error
+ ShouldEncodeNulls() bool
+}
+
+type emptyAction struct {
+ mem memory.Allocator
+ dt arrow.DataType
+}
+
+func (emptyAction) Reset() error { return nil }
+func (emptyAction) Reserve(int) error { return nil }
+func (emptyAction) Flush(*exec.ExecResult) error { return nil }
+func (emptyAction) FlushFinal(*exec.ExecResult) error { return nil }
+func (emptyAction) ObserveFound(int) {}
+func (emptyAction) ObserveNotFound(int) error { return nil }
+func (emptyAction) ObserveNullFound(int) {}
+func (emptyAction) ObserveNullNotFound(int) error { return nil }
+func (emptyAction) ShouldEncodeNulls() bool { return true }
+
+type uniqueAction = emptyAction
+
+type regularHashState struct {
+ mem memory.Allocator
+ typ arrow.DataType
+ memoTable hashing.MemoTable
+ action Action
+
+ doAppend func(Action, hashing.MemoTable, *exec.ArraySpan) error
+}
+
+func (rhs *regularHashState) Allocator() memory.Allocator { return rhs.mem }
+
+func (rhs *regularHashState) ValueType() arrow.DataType { return rhs.typ }
+
+func (rhs *regularHashState) Reset() error {
+ rhs.memoTable.Reset()
+ return rhs.action.Reset()
+}
+
+func (rhs *regularHashState) Append(_ *exec.KernelCtx, arr *exec.ArraySpan)
error {
+ if err := rhs.action.Reserve(int(arr.Len)); err != nil {
+ return err
+ }
+
+ return rhs.doAppend(rhs.action, rhs.memoTable, arr)
+}
+
+func (rhs *regularHashState) Flush(out *exec.ExecResult) error { return
rhs.action.Flush(out) }
+func (rhs *regularHashState) FlushFinal(out *exec.ExecResult) error {
+ return rhs.action.FlushFinal(out)
+}
+
+func (rhs *regularHashState) GetDictionary() (arrow.ArrayData, error) {
+ return array.GetDictArrayData(rhs.mem, rhs.typ, rhs.memoTable, 0)
+}
+
+func doAppendBinary[OffsetT int32 | int64](action Action, memo
hashing.MemoTable, arr *exec.ArraySpan) error {
+ var (
+ bitmap = arr.Buffers[0].Buf
+ offsets = exec.GetSpanOffsets[OffsetT](arr, 1)
+ data = arr.Buffers[2].Buf
+ shouldEncodeNulls = action.ShouldEncodeNulls()
+ )
+
+ return bitutils.VisitBitBlocksShort(bitmap, arr.Offset, arr.Len,
+ func(pos int64) error {
+ v := data[offsets[pos]:offsets[pos+1]]
+ idx, found, err := memo.GetOrInsert(v)
+ if err != nil {
+ return err
+ }
+ if found {
+ action.ObserveFound(idx)
+ return nil
+ }
+ return action.ObserveNotFound(idx)
+ },
+ func() error {
+ if !shouldEncodeNulls {
+ return action.ObserveNullNotFound(-1)
+ }
+
+ idx, found := memo.GetOrInsertNull()
+ if found {
+ action.ObserveNullFound(idx)
+ }
+ return action.ObserveNullNotFound(idx)
+ })
+}
+
+func doAppendFixedSize(action Action, memo hashing.MemoTable, arr
*exec.ArraySpan) error {
+ sz := int64(arr.Type.(arrow.FixedWidthDataType).Bytes())
+ arrData := arr.Buffers[1].Buf[arr.Offset*sz:]
+ shouldEncodeNulls := action.ShouldEncodeNulls()
+
+ return bitutils.VisitBitBlocksShort(arr.Buffers[0].Buf, arr.Offset,
arr.Len,
+ func(pos int64) error {
+ // fixed size type memo table we use a binary memo table
+ // so get the raw bytes
+ idx, found, err := memo.GetOrInsert(arrData[pos*sz :
(pos+1)*sz])
+ if err != nil {
+ return err
+ }
+ if found {
+ action.ObserveFound(idx)
+ return nil
+ }
+ return action.ObserveNotFound(idx)
+ }, func() error {
+ if !shouldEncodeNulls {
+ return action.ObserveNullNotFound(-1)
+ }
+
+ idx, found := memo.GetOrInsertNull()
+ if found {
+ action.ObserveNullFound(idx)
+ }
+ return action.ObserveNullNotFound(idx)
+ })
+}
+
+func doAppendNumeric[T exec.IntTypes | exec.UintTypes |
exec.FloatTypes](action Action, memo hashing.MemoTable, arr *exec.ArraySpan)
error {
+ arrData := exec.GetSpanValues[T](arr, 1)
+ shouldEncodeNulls := action.ShouldEncodeNulls()
+ return bitutils.VisitBitBlocksShort(arr.Buffers[0].Buf, arr.Offset,
arr.Len,
+ func(pos int64) error {
+ idx, found, err := memo.GetOrInsert(arrData[pos])
+ if err != nil {
+ return err
+ }
+ if found {
+ action.ObserveFound(idx)
+ return nil
+ }
+ return action.ObserveNotFound(idx)
+ }, func() error {
+ if !shouldEncodeNulls {
+ return action.ObserveNullNotFound(-1)
+ }
+
+ idx, found := memo.GetOrInsertNull()
+ if found {
+ action.ObserveNullFound(idx)
+ }
+ return action.ObserveNullNotFound(idx)
+ })
+}
+
+type nullHashState struct {
+ mem memory.Allocator
+ typ arrow.DataType
+ seenNull bool
+ action Action
+}
+
+func (nhs *nullHashState) Allocator() memory.Allocator { return nhs.mem }
+
+func (nhs *nullHashState) ValueType() arrow.DataType { return nhs.typ }
+
+func (nhs *nullHashState) Reset() error {
+ return nhs.action.Reset()
+}
+
+func (nhs *nullHashState) Append(_ *exec.KernelCtx, arr *exec.ArraySpan) (err
error) {
+ if err := nhs.action.Reserve(int(arr.Len)); err != nil {
+ return err
+ }
+
+ for i := 0; i < int(arr.Len); i++ {
+ if i == 0 {
+ nhs.seenNull = true
+ err = nhs.action.ObserveNullNotFound(0)
+ } else {
+ nhs.action.ObserveNullFound(0)
+ }
+ }
+ return
+}
+
+func (nhs *nullHashState) Flush(out *exec.ExecResult) error { return
nhs.action.Flush(out) }
+func (nhs *nullHashState) FlushFinal(out *exec.ExecResult) error {
+ return nhs.action.FlushFinal(out)
+}
+
+func (nhs *nullHashState) GetDictionary() (arrow.ArrayData, error) {
+ var out arrow.Array
+ if nhs.seenNull {
+ out = array.NewNull(1)
+ } else {
+ out = array.NewNull(0)
+ }
+ data := out.Data()
+ data.Retain()
+ out.Release()
+ return data, nil
+}
+
+type dictionaryHashState struct {
+ indicesKernel HashState
+ dictionary arrow.Array
+ dictValueType arrow.DataType
+}
+
+func (dhs *dictionaryHashState) Allocator() memory.Allocator { return
dhs.indicesKernel.Allocator() }
+func (dhs *dictionaryHashState) Reset() error { return
dhs.indicesKernel.Reset() }
+func (dhs *dictionaryHashState) Flush(out *exec.ExecResult) error {
+ return dhs.indicesKernel.Flush(out)
+}
+func (dhs *dictionaryHashState) FlushFinal(out *exec.ExecResult) error {
+ return dhs.indicesKernel.FlushFinal(out)
+}
+func (dhs *dictionaryHashState) GetDictionary() (arrow.ArrayData, error) {
+ return dhs.indicesKernel.GetDictionary()
+}
+func (dhs *dictionaryHashState) ValueType() arrow.DataType { return
dhs.indicesKernel.ValueType() }
+func (dhs *dictionaryHashState) DictionaryValueType() arrow.DataType { return
dhs.dictValueType }
+func (dhs *dictionaryHashState) Dictionary() arrow.Array { return
dhs.dictionary }
+func (dhs *dictionaryHashState) Append(ctx *exec.KernelCtx, arr
*exec.ArraySpan) error {
+ arrDict := arr.Dictionary().MakeArray()
+ if dhs.dictionary == nil || array.Equal(dhs.dictionary, arrDict) {
+ dhs.dictionary = arrDict
+ return dhs.indicesKernel.Append(ctx, arr)
+ }
+
+ defer arrDict.Release()
+
+ // NOTE: this approach computes a new dictionary unification per chunk
+ // this is in effect O(n*k) where n is the total chunked array length
+ // and k is the number of chunks (therefore O(n**2) if chunks have a
fixed size).
+ //
+ // A better approach may be to run the kernel over each individual
chunk,
+ // and then hash-aggregate all results (for example sum-group-by for
+ // the "value_counts" kernel)
+ unifier, err :=
array.NewDictionaryUnifier(dhs.indicesKernel.Allocator(), dhs.dictValueType)
+ if err != nil {
+ return err
+ }
+ defer unifier.Release()
+
+ if err := unifier.Unify(dhs.dictionary); err != nil {
+ return err
+ }
+ transposeMap, err := unifier.UnifyAndTranspose(arrDict)
+ if err != nil {
+ return err
+ }
+ defer transposeMap.Release()
+ _, outDict, err := unifier.GetResult()
+ if err != nil {
+ return err
+ }
+ defer func() {
+ dhs.dictionary.Release()
+ dhs.dictionary = outDict
+ }()
+
+ inDict := arr.MakeData()
+ defer inDict.Release()
+ tmp, err := array.TransposeDictIndices(dhs.Allocator(), inDict,
arr.Type, arr.Type, outDict.Data(),
arrow.Int32Traits.CastFromBytes(transposeMap.Bytes()))
+ if err != nil {
+ return err
+ }
+ defer tmp.Release()
+
+ var tmpSpan exec.ArraySpan
+ tmpSpan.SetMembers(tmp)
+ return dhs.indicesKernel.Append(ctx, &tmpSpan)
+}
+
+func nullHashInit(actionInit initAction) exec.KernelInitFn {
+ return func(ctx *exec.KernelCtx, args exec.KernelInitArgs)
(exec.KernelState, error) {
+ mem := exec.GetAllocator(ctx.Ctx)
+ ret := &nullHashState{
+ mem: mem,
+ typ: args.Inputs[0],
+ action: actionInit(args.Inputs[0], args.Options, mem),
+ }
+ ret.Reset()
+ return ret, nil
+ }
+}
+
+func newMemoTable(mem memory.Allocator, dt arrow.Type) (hashing.MemoTable,
error) {
+ switch dt {
+ case arrow.INT8, arrow.UINT8:
+ return hashing.NewUint8MemoTable(0), nil
+ case arrow.INT16, arrow.UINT16:
+ return hashing.NewUint16MemoTable(0), nil
+ case arrow.INT32, arrow.UINT32, arrow.FLOAT32,
+ arrow.DATE32, arrow.TIME32, arrow.INTERVAL_MONTHS:
+ return hashing.NewUint32MemoTable(0), nil
+ case arrow.INT64, arrow.UINT64, arrow.FLOAT64,
+ arrow.DATE64, arrow.TIME64, arrow.TIMESTAMP,
+ arrow.DURATION, arrow.INTERVAL_DAY_TIME:
+ return hashing.NewUint64MemoTable(0), nil
+ case arrow.BINARY, arrow.STRING, arrow.FIXED_SIZE_BINARY,
arrow.DECIMAL128,
+ arrow.DECIMAL256, arrow.INTERVAL_MONTH_DAY_NANO:
+ return hashing.NewBinaryMemoTable(0, 0,
+ array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary)),
nil
+ case arrow.LARGE_BINARY, arrow.LARGE_STRING:
+ return hashing.NewBinaryMemoTable(0, 0,
+ array.NewBinaryBuilder(mem,
arrow.BinaryTypes.LargeBinary)), nil
+ default:
+ return nil, fmt.Errorf("%w: unsupported type %s",
arrow.ErrNotImplemented, dt)
+ }
+}
+
+func regularHashInit(dt arrow.DataType, actionInit initAction, appendFn
func(Action, hashing.MemoTable, *exec.ArraySpan) error) exec.KernelInitFn {
+ return func(ctx *exec.KernelCtx, args exec.KernelInitArgs)
(exec.KernelState, error) {
+ mem := exec.GetAllocator(ctx.Ctx)
+ memoTable, err := newMemoTable(mem, dt.ID())
+ if err != nil {
+ return nil, err
+ }
+
+ ret := ®ularHashState{
+ mem: mem,
+ typ: args.Inputs[0],
+ memoTable: memoTable,
+ action: actionInit(args.Inputs[0], args.Options,
mem),
+ doAppend: appendFn,
+ }
+ ret.Reset()
+ return ret, nil
+ }
+}
+
+func dictionaryHashInit(actionInit initAction) exec.KernelInitFn {
+ return func(ctx *exec.KernelCtx, args exec.KernelInitArgs)
(exec.KernelState, error) {
+ var (
+ dictType = args.Inputs[0].(*arrow.DictionaryType)
+ indicesHasher exec.KernelState
+ err error
+ )
+
+ switch dictType.IndexType.ID() {
+ case arrow.INT8, arrow.UINT8:
+ indicesHasher, err = getHashInit(arrow.UINT8,
actionInit)(ctx, args)
+ case arrow.INT16, arrow.UINT16:
+ indicesHasher, err = getHashInit(arrow.UINT16,
actionInit)(ctx, args)
+ case arrow.INT32, arrow.UINT32:
+ indicesHasher, err = getHashInit(arrow.UINT32,
actionInit)(ctx, args)
+ case arrow.INT64, arrow.UINT64:
+ indicesHasher, err = getHashInit(arrow.UINT64,
actionInit)(ctx, args)
+ default:
+ return nil, fmt.Errorf("%w: unsupported dictionary
index type", arrow.ErrInvalid)
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ return &dictionaryHashState{
+ indicesKernel: indicesHasher.(HashState),
+ dictValueType: dictType.ValueType,
+ }, nil
+ }
+}
+
+type initAction func(arrow.DataType, any, memory.Allocator) Action
+
+func getHashInit(typeID arrow.Type, actionInit initAction) exec.KernelInitFn {
+ switch typeID {
+ case arrow.NULL:
+ return nullHashInit(actionInit)
+ case arrow.INT8, arrow.UINT8:
+ return regularHashInit(arrow.PrimitiveTypes.Uint8, actionInit,
doAppendNumeric[uint8])
+ case arrow.INT16, arrow.UINT16:
+ return regularHashInit(arrow.PrimitiveTypes.Uint16, actionInit,
doAppendNumeric[uint16])
+ case arrow.INT32, arrow.UINT32, arrow.FLOAT32,
+ arrow.DATE32, arrow.TIME32, arrow.INTERVAL_MONTHS:
+ return regularHashInit(arrow.PrimitiveTypes.Uint32, actionInit,
doAppendNumeric[uint32])
+ case arrow.INT64, arrow.UINT64, arrow.FLOAT64,
+ arrow.DATE64, arrow.TIME64, arrow.TIMESTAMP,
+ arrow.DURATION, arrow.INTERVAL_DAY_TIME:
+ return regularHashInit(arrow.PrimitiveTypes.Uint64, actionInit,
doAppendNumeric[uint64])
+ case arrow.BINARY, arrow.STRING:
+ return regularHashInit(arrow.BinaryTypes.Binary, actionInit,
doAppendBinary[int32])
+ case arrow.LARGE_BINARY, arrow.LARGE_STRING:
+ return regularHashInit(arrow.BinaryTypes.LargeBinary,
actionInit, doAppendBinary[int64])
+ case arrow.FIXED_SIZE_BINARY, arrow.DECIMAL128, arrow.DECIMAL256:
+ return regularHashInit(arrow.BinaryTypes.Binary, actionInit,
doAppendFixedSize)
+ case arrow.INTERVAL_MONTH_DAY_NANO:
+ return
regularHashInit(arrow.FixedWidthTypes.MonthDayNanoInterval, actionInit,
doAppendFixedSize)
+ default:
+ debug.Assert(false, "unsupported hash init type")
+ return nil
+ }
+}
+
+func hashExec(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult)
error {
+ impl, ok := ctx.State.(HashState)
+ if !ok {
+ return fmt.Errorf("%w: bad initialization of hash state",
arrow.ErrInvalid)
+ }
+
+ if err := impl.Append(ctx, &batch.Values[0].Array); err != nil {
+ return err
+ }
+
+ return impl.Flush(out)
+}
+
+func uniqueFinalize(ctx *exec.KernelCtx, _ []*exec.ArraySpan)
([]*exec.ArraySpan, error) {
+ impl, ok := ctx.State.(HashState)
+ if !ok {
+ return nil, fmt.Errorf("%w: HashState in invalid state",
arrow.ErrInvalid)
+ }
+
+ uniques, err := impl.GetDictionary()
+ if err != nil {
+ return nil, err
+ }
+ defer uniques.Release()
+
+ var out exec.ArraySpan
+ out.TakeOwnership(uniques)
+ return []*exec.ArraySpan{&out}, nil
+}
+
+func ensureHashDictionary(ctx *exec.KernelCtx, hash *dictionaryHashState)
(*exec.ArraySpan, error) {
+ out := &exec.ArraySpan{}
+
+ if hash.dictionary != nil {
+ out.TakeOwnership(hash.dictionary.Data())
+ hash.dictionary.Release()
+ return out, nil
+ }
+
+ exec.FillZeroLength(hash.DictionaryValueType(), out)
+ return out, nil
+}
+
+func uniqueFinalizeDictionary(ctx *exec.KernelCtx, result []*exec.ArraySpan)
(out []*exec.ArraySpan, err error) {
+ if out, err = uniqueFinalize(ctx, result); err != nil {
+ return
+ }
+
+ hash, ok := ctx.State.(*dictionaryHashState)
+ if !ok {
+ return nil, fmt.Errorf("%w: state should be
*dictionaryHashState", arrow.ErrInvalid)
+ }
+
+ dict, err := ensureHashDictionary(ctx, hash)
+ if err != nil {
+ return nil, err
+ }
+ out[0].SetDictionary(dict)
+ return
+}
+
+func addHashKernels(base exec.VectorKernel, actionInit initAction, outTy
exec.OutputType) []exec.VectorKernel {
+ kernels := make([]exec.VectorKernel, 0)
+ for _, ty := range primitiveTypes {
+ base.Init = getHashInit(ty.ID(), actionInit)
+ base.Signature = &exec.KernelSignature{
+ InputTypes: []exec.InputType{exec.NewExactInput(ty)},
+ OutType: outTy,
+ }
+ kernels = append(kernels, base)
+ }
+
+ parametricTypes := []arrow.Type{arrow.TIME32, arrow.TIME64,
arrow.TIMESTAMP,
+ arrow.DURATION, arrow.FIXED_SIZE_BINARY, arrow.DECIMAL128,
arrow.DECIMAL256,
+ arrow.INTERVAL_DAY_TIME, arrow.INTERVAL_MONTHS,
arrow.INTERVAL_MONTH_DAY_NANO}
+ for _, ty := range parametricTypes {
+ base.Init = getHashInit(ty, actionInit)
+ base.Signature = &exec.KernelSignature{
+ InputTypes: []exec.InputType{exec.NewIDInput(ty)},
+ OutType: outTy,
+ }
+ kernels = append(kernels, base)
+ }
+
+ return kernels
+}
+
+func initUnique(dt arrow.DataType, _ any, mem memory.Allocator) Action {
+ return uniqueAction{mem: mem, dt: dt}
+}
+
+func GetVectorHashKernels() (unique, valueCounts, dictEncode
[]exec.VectorKernel) {
+ var base exec.VectorKernel
+ base.ExecFn = hashExec
+
+ // unique
+ base.Finalize = uniqueFinalize
+ base.OutputChunked = false
+ base.CanExecuteChunkWise = true
+ unique = addHashKernels(base, initUnique, OutputFirstType)
+
+ // dictionary unique
+ base.Init = dictionaryHashInit(initUnique)
+ base.Finalize = uniqueFinalizeDictionary
+ base.Signature = &exec.KernelSignature{
+ InputTypes: []exec.InputType{exec.NewIDInput(arrow.DICTIONARY)},
+ OutType: OutputFirstType,
+ }
+ unique = append(unique, base)
+
+ return
+}
diff --git a/go/arrow/compute/registry.go b/go/arrow/compute/registry.go
index 5aee18040d..54d39d17e0 100644
--- a/go/arrow/compute/registry.go
+++ b/go/arrow/compute/registry.go
@@ -51,6 +51,7 @@ func GetFunctionRegistry() FunctionRegistry {
RegisterScalarBoolean(registry)
RegisterScalarArithmetic(registry)
RegisterScalarComparisons(registry)
+ RegisterVectorHash(registry)
})
return registry
}
diff --git a/go/arrow/compute/vector_hash.go b/go/arrow/compute/vector_hash.go
new file mode 100644
index 0000000000..42ec942573
--- /dev/null
+++ b/go/arrow/compute/vector_hash.go
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package compute
+
+import (
+ "context"
+
+ "github.com/apache/arrow/go/v12/arrow"
+ "github.com/apache/arrow/go/v12/arrow/compute/internal/kernels"
+)
+
+var (
+ uniqueDoc = FunctionDoc{
+ Summary: "Compute unique elements",
+ Description: "Return an array with distinct values. Nulls in
the input are ignored",
+ ArgNames: []string{"array"},
+ }
+)
+
+func Unique(ctx context.Context, values Datum) (Datum, error) {
+ return CallFunction(ctx, "unique", nil, values)
+}
+
+func UniqueArray(ctx context.Context, values arrow.Array) (arrow.Array, error)
{
+ out, err := Unique(ctx, &ArrayDatum{Value: values.Data()})
+ if err != nil {
+ return nil, err
+ }
+ defer out.Release()
+
+ return out.(*ArrayDatum).MakeArray(), nil
+}
+
+func RegisterVectorHash(reg FunctionRegistry) {
+ unique, _, _ := kernels.GetVectorHashKernels()
+ uniqFn := NewVectorFunction("unique", Unary(), uniqueDoc)
+ for _, vd := range unique {
+ if err := uniqFn.AddKernel(vd); err != nil {
+ panic(err)
+ }
+ }
+ reg.AddFunction(uniqFn, false)
+}
diff --git a/go/arrow/compute/vector_hash_test.go
b/go/arrow/compute/vector_hash_test.go
new file mode 100644
index 0000000000..796472c9bb
--- /dev/null
+++ b/go/arrow/compute/vector_hash_test.go
@@ -0,0 +1,518 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package compute_test
+
+import (
+ "context"
+ "strings"
+ "testing"
+
+ "github.com/apache/arrow/go/v12/arrow"
+ "github.com/apache/arrow/go/v12/arrow/array"
+ "github.com/apache/arrow/go/v12/arrow/compute"
+ "github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
+ "github.com/apache/arrow/go/v12/arrow/decimal128"
+ "github.com/apache/arrow/go/v12/arrow/decimal256"
+ "github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "github.com/stretchr/testify/suite"
+ "golang.org/x/exp/constraints"
+)
+
+func checkUniqueDict[I exec.IntTypes | exec.UintTypes](t *testing.T, input
compute.ArrayLikeDatum, expected arrow.Array) {
+ out, err := compute.Unique(context.TODO(), input)
+ require.NoError(t, err)
+ defer out.Release()
+
+ result := out.(*compute.ArrayDatum).MakeArray().(*array.Dictionary)
+ defer result.Release()
+
+ require.Truef(t, arrow.TypeEqual(result.DataType(),
expected.DataType()),
+ "wanted: %s\ngot: %s", expected.DataType(), result.DataType())
+
+ exDict := expected.(*array.Dictionary).Dictionary()
+ resultDict := result.Dictionary()
+
+ require.Truef(t, array.Equal(exDict, resultDict), "wanted: %s\ngot:
%s", exDict, resultDict)
+
+ want :=
exec.GetValues[I](expected.(*array.Dictionary).Indices().Data(), 1)
+ got := exec.GetValues[I](result.Indices().Data(), 1)
+ assert.ElementsMatchf(t, got, want, "wanted: %s\ngot: %s", want, got)
+}
+
+func checkDictionaryUnique(t *testing.T, input compute.ArrayLikeDatum,
expected arrow.Array) {
+ require.Truef(t, arrow.TypeEqual(input.Type(), expected.DataType()),
+ "wanted: %s\ngot: %s", expected.DataType(), input.Type())
+
+ switch input.Type().(*arrow.DictionaryType).IndexType.ID() {
+ case arrow.INT8:
+ checkUniqueDict[int8](t, input, expected)
+ case arrow.INT16:
+ checkUniqueDict[int16](t, input, expected)
+ case arrow.INT32:
+ checkUniqueDict[int32](t, input, expected)
+ case arrow.INT64:
+ checkUniqueDict[int64](t, input, expected)
+ case arrow.UINT8:
+ checkUniqueDict[uint8](t, input, expected)
+ case arrow.UINT16:
+ checkUniqueDict[uint16](t, input, expected)
+ case arrow.UINT32:
+ checkUniqueDict[uint32](t, input, expected)
+ case arrow.UINT64:
+ checkUniqueDict[uint64](t, input, expected)
+ }
+}
+
+func checkUniqueFixedWidth[T exec.FixedWidthTypes](t *testing.T, input,
expected arrow.Array) {
+ result, err := compute.UniqueArray(context.TODO(), input)
+ require.NoError(t, err)
+ defer result.Release()
+
+ require.Truef(t, arrow.TypeEqual(result.DataType(),
expected.DataType()),
+ "wanted: %s\ngot: %s", expected.DataType(), result.DataType())
+ want := exec.GetValues[T](expected.Data(), 1)
+ got := exec.GetValues[T](expected.Data(), 1)
+
+ assert.ElementsMatchf(t, got, want, "wanted: %s\ngot: %s", want, got)
+}
+
+func checkUniqueVariableWidth[OffsetType int32 | int64](t *testing.T, input,
expected arrow.Array) {
+ result, err := compute.UniqueArray(context.TODO(), input)
+ require.NoError(t, err)
+ defer result.Release()
+
+ require.Truef(t, arrow.TypeEqual(result.DataType(),
expected.DataType()),
+ "wanted: %s\ngot: %s", expected.DataType(), result.DataType())
+
+ require.EqualValues(t, expected.Len(), result.Len())
+
+ createSlice := func(v arrow.Array) [][]byte {
+ var (
+ offsets = exec.GetOffsets[OffsetType](v.Data(), 1)
+ data = v.Data().Buffers()[2].Bytes()
+ out = make([][]byte, v.Len())
+ )
+
+ for i := 0; i < v.Len(); i++ {
+ out[i] = data[offsets[i]:offsets[i+1]]
+ }
+ return out
+ }
+
+ want := createSlice(expected)
+ got := createSlice(result)
+
+ assert.ElementsMatch(t, want, got)
+}
+
+type ArrowType interface {
+ exec.FixedWidthTypes | string | []byte
+}
+
+type builder[T ArrowType] interface {
+ AppendValues([]T, []bool)
+}
+
+func makeArray[T ArrowType](mem memory.Allocator, dt arrow.DataType, values
[]T, isValid []bool) arrow.Array {
+ bldr := array.NewBuilder(mem, dt)
+ defer bldr.Release()
+
+ bldr.(builder[T]).AppendValues(values, isValid)
+ return bldr.NewArray()
+}
+
+func checkUniqueFixedSizeBinary(t *testing.T, mem memory.Allocator, dt
*arrow.FixedSizeBinaryType, inValues, outValues [][]byte, inValid, outValid
[]bool) {
+ input := makeArray(mem, dt, inValues, inValid)
+ defer input.Release()
+ expected := makeArray(mem, dt, outValues, outValid)
+ defer expected.Release()
+
+ result, err := compute.UniqueArray(context.TODO(), input)
+ require.NoError(t, err)
+ defer result.Release()
+
+ require.Truef(t, arrow.TypeEqual(result.DataType(),
expected.DataType()),
+ "wanted: %s\ngot: %s", expected.DataType(), result.DataType())
+
+ slice := func(v arrow.Array) [][]byte {
+ data := v.Data().Buffers()[1].Bytes()
+ out := make([][]byte, v.Len())
+ for i := range out {
+ out[i] = data[i*dt.ByteWidth : (i+1)*dt.ByteWidth]
+ }
+ return out
+ }
+
+ want := slice(expected)
+ got := slice(result)
+ assert.ElementsMatch(t, want, got)
+}
+
+func checkUniqueFW[T exec.FixedWidthTypes](t *testing.T, mem memory.Allocator,
dt arrow.DataType, inValues, outValues []T, inValid, outValid []bool) {
+ input := makeArray(mem, dt, inValues, inValid)
+ defer input.Release()
+ expected := makeArray(mem, dt, outValues, outValid)
+ defer expected.Release()
+
+ checkUniqueFixedWidth[T](t, input, expected)
+}
+
+func checkUniqueVW[T string | []byte](t *testing.T, mem memory.Allocator, dt
arrow.DataType, inValues, outValues []T, inValid, outValid []bool) {
+ input := makeArray(mem, dt, inValues, inValid)
+ defer input.Release()
+ expected := makeArray(mem, dt, outValues, outValid)
+ defer expected.Release()
+
+ switch dt.(arrow.BinaryDataType).Layout().Buffers[1].ByteWidth {
+ case 4:
+ checkUniqueVariableWidth[int32](t, input, expected)
+ case 8:
+ checkUniqueVariableWidth[int64](t, input, expected)
+ }
+}
+
+type PrimitiveHashKernelSuite[T exec.IntTypes | exec.UintTypes |
constraints.Float] struct {
+ suite.Suite
+
+ mem *memory.CheckedAllocator
+ dt arrow.DataType
+}
+
+func (ps *PrimitiveHashKernelSuite[T]) SetupSuite() {
+ ps.dt = exec.GetDataType[T]()
+}
+
+func (ps *PrimitiveHashKernelSuite[T]) SetupTest() {
+ ps.mem = memory.NewCheckedAllocator(memory.DefaultAllocator)
+}
+
+func (ps *PrimitiveHashKernelSuite[T]) TearDownTest() {
+ ps.mem.AssertSize(ps.T(), 0)
+}
+
+func (ps *PrimitiveHashKernelSuite[T]) TestUnique() {
+ ps.Run(ps.dt.String(), func() {
+ if ps.dt.ID() == arrow.DATE64 {
+ checkUniqueFW(ps.T(), ps.mem, ps.dt,
+ []arrow.Date64{172800000, 864000000, 172800000,
864000000},
+ []arrow.Date64{172800000, 0, 864000000},
+ []bool{true, false, true, true}, []bool{true,
false, true})
+
+ checkUniqueFW(ps.T(), ps.mem, ps.dt,
+ []arrow.Date64{172800000, 864000000, 259200000,
864000000},
+ []arrow.Date64{0, 259200000, 864000000},
+ []bool{false, false, true, true}, []bool{false,
true, true})
+
+ arr, _, err := array.FromJSON(ps.mem, ps.dt,
strings.NewReader(`[86400000, 172800000, null, 259200000, 172800000, null]`))
+ ps.Require().NoError(err)
+ defer arr.Release()
+ input := array.NewSlice(arr, 1, 5)
+ defer input.Release()
+ expected, _, err := array.FromJSON(ps.mem, ps.dt,
strings.NewReader(`[172800000, null, 259200000]`))
+ ps.Require().NoError(err)
+ defer expected.Release()
+ checkUniqueFixedWidth[arrow.Date64](ps.T(), input,
expected)
+ return
+ }
+
+ checkUniqueFW(ps.T(), ps.mem, ps.dt,
+ []T{2, 1, 2, 1}, []T{2, 0, 1},
+ []bool{true, false, true, true}, []bool{true, false,
true})
+ checkUniqueFW(ps.T(), ps.mem, ps.dt,
+ []T{2, 1, 3, 1}, []T{0, 3, 1},
+ []bool{false, false, true, true}, []bool{false, true,
true})
+
+ arr, _, err := array.FromJSON(ps.mem, ps.dt,
strings.NewReader(`[1, 2, null, 3, 2, null]`))
+ ps.Require().NoError(err)
+ defer arr.Release()
+ input := array.NewSlice(arr, 1, 5)
+ defer input.Release()
+
+ expected, _, err := array.FromJSON(ps.mem, ps.dt,
strings.NewReader(`[2, null, 3]`))
+ ps.Require().NoError(err)
+ defer expected.Release()
+
+ checkUniqueFixedWidth[T](ps.T(), input, expected)
+ })
+}
+
+type BinaryTypeHashKernelSuite[T string | []byte] struct {
+ suite.Suite
+
+ mem *memory.CheckedAllocator
+ dt arrow.DataType
+}
+
+func (ps *BinaryTypeHashKernelSuite[T]) SetupTest() {
+ ps.mem = memory.NewCheckedAllocator(memory.DefaultAllocator)
+}
+
+func (ps *BinaryTypeHashKernelSuite[T]) TearDownTest() {
+ ps.mem.AssertSize(ps.T(), 0)
+}
+
+func (ps *BinaryTypeHashKernelSuite[T]) TestUnique() {
+ ps.Run(ps.dt.String(), func() {
+ checkUniqueVW(ps.T(), ps.mem, ps.dt,
+ []T{T("test"), T(""), T("test2"), T("test")},
[]T{T("test"), T(""), T("test2")},
+ []bool{true, false, true, true}, []bool{true, false,
true})
+ })
+}
+
+func TestHashKernels(t *testing.T) {
+ suite.Run(t, &PrimitiveHashKernelSuite[int8]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[uint8]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[int16]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[uint16]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[int32]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[uint32]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[int64]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[uint64]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[float32]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[float64]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[arrow.Date32]{})
+ suite.Run(t, &PrimitiveHashKernelSuite[arrow.Date64]{})
+
+ suite.Run(t, &BinaryTypeHashKernelSuite[string]{dt:
arrow.BinaryTypes.String})
+ suite.Run(t, &BinaryTypeHashKernelSuite[string]{dt:
arrow.BinaryTypes.LargeString})
+ suite.Run(t, &BinaryTypeHashKernelSuite[[]byte]{dt:
arrow.BinaryTypes.Binary})
+ suite.Run(t, &BinaryTypeHashKernelSuite[[]byte]{dt:
arrow.BinaryTypes.LargeBinary})
+}
+
+func TestUniqueTimeTimestamp(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ checkUniqueFW(t, mem, arrow.FixedWidthTypes.Time32s,
+ []arrow.Time32{2, 1, 2, 1}, []arrow.Time32{2, 0, 1},
+ []bool{true, false, true, true}, []bool{true, false, true})
+
+ checkUniqueFW(t, mem, arrow.FixedWidthTypes.Time64ns,
+ []arrow.Time64{2, 1, 2, 1}, []arrow.Time64{2, 0, 1},
+ []bool{true, false, true, true}, []bool{true, false, true})
+
+ checkUniqueFW(t, mem, arrow.FixedWidthTypes.Timestamp_ns,
+ []arrow.Timestamp{2, 1, 2, 1}, []arrow.Timestamp{2, 0, 1},
+ []bool{true, false, true, true}, []bool{true, false, true})
+
+ checkUniqueFW(t, mem, arrow.FixedWidthTypes.Duration_ns,
+ []arrow.Duration{2, 1, 2, 1}, []arrow.Duration{2, 0, 1},
+ []bool{true, false, true, true}, []bool{true, false, true})
+}
+
+func TestUniqueFixedSizeBinary(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ dt := &arrow.FixedSizeBinaryType{ByteWidth: 3}
+ checkUniqueFixedSizeBinary(t, mem, dt,
+ [][]byte{[]byte("aaa"), nil, []byte("bbb"), []byte("aaa")},
+ [][]byte{[]byte("aaa"), nil, []byte("bbb")},
+ []bool{true, false, true, true}, []bool{true, false, true})
+}
+
+func TestUniqueDecimal(t *testing.T) {
+ t.Run("decimal128", func(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ values := []decimal128.Num{
+ decimal128.FromI64(12),
+ decimal128.FromI64(12),
+ decimal128.FromI64(11),
+ decimal128.FromI64(12)}
+ expected := []decimal128.Num{
+ decimal128.FromI64(12),
+ decimal128.FromI64(0),
+ decimal128.FromI64(11)}
+
+ checkUniqueFW(t, mem, &arrow.Decimal128Type{Precision: 2,
Scale: 0},
+ values, expected, []bool{true, false, true, true},
[]bool{true, false, true})
+ })
+
+ t.Run("decimal256", func(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ values := []decimal256.Num{
+ decimal256.FromI64(12),
+ decimal256.FromI64(12),
+ decimal256.FromI64(11),
+ decimal256.FromI64(12)}
+ expected := []decimal256.Num{
+ decimal256.FromI64(12),
+ decimal256.FromI64(0),
+ decimal256.FromI64(11)}
+
+ checkUniqueFW(t, mem, &arrow.Decimal256Type{Precision: 2,
Scale: 0},
+ values, expected, []bool{true, false, true, true},
[]bool{true, false, true})
+ })
+}
+
+func TestUniqueIntervalMonth(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ checkUniqueFW(t, mem, arrow.FixedWidthTypes.MonthInterval,
+ []arrow.MonthInterval{2, 1, 2, 1}, []arrow.MonthInterval{2, 0,
1},
+ []bool{true, false, true, true}, []bool{true, false, true})
+
+ checkUniqueFW(t, mem, arrow.FixedWidthTypes.DayTimeInterval,
+ []arrow.DayTimeInterval{
+ {Days: 2, Milliseconds: 1}, {Days: 3, Milliseconds: 2},
+ {Days: 2, Milliseconds: 1}, {Days: 1, Milliseconds: 2}},
+ []arrow.DayTimeInterval{{Days: 2, Milliseconds: 1},
+ {Days: 1, Milliseconds: 1}, {Days: 1, Milliseconds: 2}},
+ []bool{true, false, true, true}, []bool{true, false, true})
+
+ checkUniqueFW(t, mem, arrow.FixedWidthTypes.MonthDayNanoInterval,
+ []arrow.MonthDayNanoInterval{
+ {Months: 2, Days: 1, Nanoseconds: 1},
+ {Months: 3, Days: 2, Nanoseconds: 1},
+ {Months: 2, Days: 1, Nanoseconds: 1},
+ {Months: 1, Days: 2, Nanoseconds: 1}},
+ []arrow.MonthDayNanoInterval{
+ {Months: 2, Days: 1, Nanoseconds: 1},
+ {Months: 1, Days: 1, Nanoseconds: 1},
+ {Months: 1, Days: 2, Nanoseconds: 1}},
+ []bool{true, false, true, true}, []bool{true, false, true})
+}
+
+func TestUniqueChunkedArrayInvoke(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ var (
+ values1 = []string{"foo", "bar", "foo"}
+ values2 = []string{"bar", "baz", "quuux", "foo"}
+ dictValues = []string{"foo", "bar", "baz", "quuux"}
+ typ = arrow.BinaryTypes.String
+ a1 = makeArray(mem, typ, values1, nil)
+ a2 = makeArray(mem, typ, values2, nil)
+ exDict = makeArray(mem, typ, dictValues, nil)
+ )
+
+ defer a1.Release()
+ defer a2.Release()
+ defer exDict.Release()
+
+ carr := arrow.NewChunked(typ, []arrow.Array{a1, a2})
+ defer carr.Release()
+
+ result, err := compute.Unique(context.TODO(),
&compute.ChunkedDatum{Value: carr})
+ require.NoError(t, err)
+ defer result.Release()
+
+ require.Equal(t, compute.KindArray, result.Kind())
+ out := result.(*compute.ArrayDatum).MakeArray()
+ defer out.Release()
+
+ assertArraysEqual(t, exDict, out)
+}
+
+func TestDictionaryUnique(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ const dictJSON = `[10, 20, 30, 40]`
+ dict, _, err := array.FromJSON(mem, arrow.PrimitiveTypes.Int64,
strings.NewReader(dictJSON))
+ require.NoError(t, err)
+ defer dict.Release()
+
+ for _, idxTyp := range integerTypes {
+ t.Run("index_type="+idxTyp.Name(), func(t *testing.T) {
+ scope := memory.NewCheckedAllocatorScope(mem)
+ defer scope.CheckSize(t)
+
+ indices, _, _ := array.FromJSON(mem, idxTyp,
strings.NewReader(`[3, 0, 0, 0, 1, 1, 3, 0, 1, 3, 0, 1]`))
+ defer indices.Release()
+ dictType := &arrow.DictionaryType{
+ IndexType: idxTyp, ValueType:
arrow.PrimitiveTypes.Int64}
+ exIndices, _, _ := array.FromJSON(mem, idxTyp,
strings.NewReader(`[3, 0, 1]`))
+ defer exIndices.Release()
+
+ input := array.NewDictionaryArray(dictType, indices,
dict)
+ defer input.Release()
+ exUniques := array.NewDictionaryArray(dictType,
exIndices, dict)
+ defer exUniques.Release()
+
+ checkDictionaryUnique(t, &compute.ArrayDatum{Value:
input.Data()}, exUniques)
+
+ t.Run("empty array", func(t *testing.T) {
+ scope := memory.NewCheckedAllocatorScope(mem)
+ defer scope.CheckSize(t)
+
+ // executor never gives the kernel any batches
+ // so result dictionary is empty
+ emptyInput, _ := array.DictArrayFromJSON(mem,
dictType, `[]`, dictJSON)
+ defer emptyInput.Release()
+ exEmpty, _ := array.DictArrayFromJSON(mem,
dictType, `[]`, `[]`)
+ defer exEmpty.Release()
+ checkDictionaryUnique(t,
&compute.ArrayDatum{Value: emptyInput.Data()}, exEmpty)
+ })
+
+ t.Run("different chunk dictionaries", func(t
*testing.T) {
+ scope := memory.NewCheckedAllocatorScope(mem)
+ defer scope.CheckSize(t)
+
+ input2, _ := array.DictArrayFromJSON(mem,
dictType, `[1, null, 2, 3]`, `[30, 40, 50, 60]`)
+ defer input2.Release()
+
+ diffCarr := arrow.NewChunked(dictType,
[]arrow.Array{input, input2})
+ defer diffCarr.Release()
+
+ exUnique2, _ := array.DictArrayFromJSON(mem,
dictType, `[3, 0, 1, null, 4, 5]`, `[10, 20, 30, 40, 50, 60]`)
+ defer exUnique2.Release()
+
+ checkDictionaryUnique(t,
&compute.ChunkedDatum{Value: diffCarr}, exUnique2)
+ })
+
+ t.Run("encoded nulls", func(t *testing.T) {
+ scope := memory.NewCheckedAllocatorScope(mem)
+ defer scope.CheckSize(t)
+
+ dictWithNull, _, _ := array.FromJSON(mem,
arrow.PrimitiveTypes.Int64, strings.NewReader(`[10, null, 30, 40]`))
+ defer dictWithNull.Release()
+ input := array.NewDictionaryArray(dictType,
indices, dictWithNull)
+ defer input.Release()
+ exUniques := array.NewDictionaryArray(dictType,
exIndices, dictWithNull)
+ defer exUniques.Release()
+ checkDictionaryUnique(t,
&compute.ArrayDatum{Value: input.Data()}, exUniques)
+ })
+
+ t.Run("masked nulls", func(t *testing.T) {
+ scope := memory.NewCheckedAllocatorScope(mem)
+ defer scope.CheckSize(t)
+
+ indicesWithNull, _, _ := array.FromJSON(mem,
idxTyp, strings.NewReader(`[3, 0, 0, 0, null, null, 3, 0, null, 3, 0, null]`))
+ defer indicesWithNull.Release()
+ exIndicesWithNull, _, _ := array.FromJSON(mem,
idxTyp, strings.NewReader(`[3, 0, null]`))
+ defer exIndicesWithNull.Release()
+ exUniques := array.NewDictionaryArray(dictType,
exIndicesWithNull, dict)
+ defer exUniques.Release()
+ input := array.NewDictionaryArray(dictType,
indicesWithNull, dict)
+ defer input.Release()
+
+ checkDictionaryUnique(t,
&compute.ArrayDatum{Value: input.Data()}, exUniques)
+ })
+ })
+ }
+}
diff --git a/go/internal/hashing/xxh3_memo_table.go
b/go/internal/hashing/xxh3_memo_table.go
index e76f1cca3c..787a4df34d 100644
--- a/go/internal/hashing/xxh3_memo_table.go
+++ b/go/internal/hashing/xxh3_memo_table.go
@@ -381,6 +381,31 @@ func (b *BinaryMemoTable) CopyOffsetsSubset(start int, out
[]int32) {
out[sz-start] = int32(b.builder.DataLen() - (int(delta) - int(first)))
}
+// CopyLargeOffsets copies the list of offsets into the passed in slice, the
offsets
+// being the start and end values of the underlying allocated bytes in the
builder
+// for the individual values of the table. out should be at least sized to
Size()+1
+func (b *BinaryMemoTable) CopyLargeOffsets(out []int64) {
+ b.CopyLargeOffsetsSubset(0, out)
+}
+
+// CopyLargeOffsetsSubset is like CopyOffsets but instead of copying all of
the offsets,
+// it gets a subset of the offsets in the table starting at the index provided
by "start".
+func (b *BinaryMemoTable) CopyLargeOffsetsSubset(start int, out []int64) {
+ if b.builder.Len() <= start {
+ return
+ }
+
+ first := b.findOffset(0)
+ delta := b.findOffset(start)
+ sz := b.Size()
+ for i := start; i < sz; i++ {
+ offset := int64(b.findOffset(i) - delta)
+ out[i-start] = offset
+ }
+
+ out[sz-start] = int64(b.builder.DataLen() - (int(delta) - int(first)))
+}
+
// CopyValues copies the raw binary data bytes out, out should be a []byte
// with at least ValuesSize bytes allocated to copy into.
func (b *BinaryMemoTable) CopyValues(out interface{}) {
@@ -428,19 +453,20 @@ func (b *BinaryMemoTable) CopyFixedWidthValues(start,
width int, out []byte) {
}
var (
- leftOffset = b.findOffset(start)
- nullOffset = b.findOffset(null)
- leftSize = nullOffset - leftOffset
+ leftOffset = b.findOffset(start)
+ nullOffset = b.findOffset(null)
+ leftSize = nullOffset - leftOffset
+ rightOffset = leftOffset + uintptr(b.ValuesSize())
)
if leftSize > 0 {
copy(out, b.builder.Value(start)[0:leftSize])
}
- rightSize := b.ValuesSize() - int(nullOffset)
+ rightSize := rightOffset - nullOffset
if rightSize > 0 {
// skip the null fixed size value
- copy(out[int(leftSize)+width:],
b.builder.Value(int(nullOffset))[0:rightSize])
+ copy(out[int(leftSize)+width:], b.builder.Value(null +
1)[0:rightSize])
}
}