This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 90071ccd67 GH-34171: [Go][Compute] Implement "Unique" kernel (#34172)
90071ccd67 is described below

commit 90071ccd67fb7a81dacbb1966d881b7705d62b97
Author: Matt Topol <[email protected]>
AuthorDate: Tue Feb 14 13:45:19 2023 -0500

    GH-34171: [Go][Compute] Implement "Unique" kernel (#34172)
    
    
    
    ### Rationale for this change
    
    Implementing a kernel for computing the "unique" values in an arrow array, 
primarily for use in solving #33466.
    
    ### What changes are included in this PR?
    Adds a "unique" function to the compute list and helper convenience 
functions.
    
    ### Are these changes tested?
    Yes, unit tests are included.
    
    ### Are there any user-facing changes?
    Just the new available functions.
    
    * Closes: #34171
    
    Authored-by: Matt Topol <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 go/arrow/array/dictionary.go                     |   6 +-
 go/arrow/array/util.go                           |  13 +-
 go/arrow/compute/executor.go                     |  12 +-
 go/arrow/compute/internal/exec/span.go           |  42 +-
 go/arrow/compute/internal/exec/utils.go          |   8 +-
 go/arrow/compute/internal/kernels/types.go       |   3 +
 go/arrow/compute/internal/kernels/vector_hash.go | 560 +++++++++++++++++++++++
 go/arrow/compute/registry.go                     |   1 +
 go/arrow/compute/vector_hash.go                  |  59 +++
 go/arrow/compute/vector_hash_test.go             | 518 +++++++++++++++++++++
 go/internal/hashing/xxh3_memo_table.go           |  36 +-
 11 files changed, 1227 insertions(+), 31 deletions(-)

diff --git a/go/arrow/array/dictionary.go b/go/arrow/array/dictionary.go
index dd342113d6..9cbcfaccfe 100644
--- a/go/arrow/array/dictionary.go
+++ b/go/arrow/array/dictionary.go
@@ -771,7 +771,7 @@ func (b *dictionaryBuilder) newWithDictOffset(offset int) 
(indices, dict *Data,
        indices.Retain()
 
        b.deltaOffset = b.memoTable.Size()
-       dict, err = getDictArrayData(b.mem, b.dt.ValueType, b.memoTable, offset)
+       dict, err = GetDictArrayData(b.mem, b.dt.ValueType, b.memoTable, offset)
        b.reset()
        return
 }
@@ -1471,7 +1471,7 @@ func (u *unifier) GetResult() (outType arrow.DataType, 
outDict arrow.Array, err
        }
        outType = &arrow.DictionaryType{IndexType: indexType, ValueType: 
u.valueType}
 
-       dictData, err := getDictArrayData(u.mem, u.valueType, u.memoTable, 0)
+       dictData, err := GetDictArrayData(u.mem, u.valueType, u.memoTable, 0)
        if err != nil {
                return nil, nil, err
        }
@@ -1509,7 +1509,7 @@ func (u *unifier) GetResultWithIndexType(indexType 
arrow.DataType) (arrow.Array,
                return nil, errors.New("arrow/array: cannot combine 
dictionaries. unified dictionary requires a larger index type")
        }
 
-       dictData, err := getDictArrayData(u.mem, u.valueType, u.memoTable, 0)
+       dictData, err := GetDictArrayData(u.mem, u.valueType, u.memoTable, 0)
        if err != nil {
                return nil, err
        }
diff --git a/go/arrow/array/util.go b/go/arrow/array/util.go
index b246b0e9ef..8f7bf40734 100644
--- a/go/arrow/array/util.go
+++ b/go/arrow/array/util.go
@@ -250,7 +250,7 @@ func TableFromJSON(mem memory.Allocator, sc *arrow.Schema, 
recJSON []string, opt
        return NewTableFromRecords(sc, batches), nil
 }
 
-func getDictArrayData(mem memory.Allocator, valueType arrow.DataType, 
memoTable hashing.MemoTable, startOffset int) (*Data, error) {
+func GetDictArrayData(mem memory.Allocator, valueType arrow.DataType, 
memoTable hashing.MemoTable, startOffset int) (*Data, error) {
        dictLen := memoTable.Size() - startOffset
        buffers := []*memory.Buffer{nil, nil}
 
@@ -272,6 +272,17 @@ func getDictArrayData(mem memory.Allocator, valueType 
arrow.DataType, memoTable
                        offsets := 
arrow.Int32Traits.CastFromBytes(buffers[1].Bytes())
                        tbl.CopyOffsetsSubset(startOffset, offsets)
 
+                       valuesz := offsets[len(offsets)-1] - offsets[0]
+                       buffers[2].Resize(int(valuesz))
+                       tbl.CopyValuesSubset(startOffset, buffers[2].Bytes())
+               case arrow.LARGE_BINARY, arrow.LARGE_STRING:
+                       buffers = append(buffers, 
memory.NewResizableBuffer(mem))
+                       defer buffers[2].Release()
+
+                       
buffers[1].Resize(arrow.Int64Traits.BytesRequired(dictLen + 1))
+                       offsets := 
arrow.Int64Traits.CastFromBytes(buffers[1].Bytes())
+                       tbl.CopyLargeOffsetsSubset(startOffset, offsets)
+
                        valuesz := offsets[len(offsets)-1] - offsets[0]
                        buffers[2].Resize(int(valuesz))
                        tbl.CopyValuesSubset(startOffset, buffers[2].Bytes())
diff --git a/go/arrow/compute/executor.go b/go/arrow/compute/executor.go
index 3f385d56e4..96cbdf4055 100644
--- a/go/arrow/compute/executor.go
+++ b/go/arrow/compute/executor.go
@@ -962,10 +962,20 @@ func (v *vectorExecutor) Execute(ctx context.Context, 
batch *ExecBatch, data cha
 func (v *vectorExecutor) WrapResults(ctx context.Context, out <-chan Datum, 
hasChunked bool) Datum {
        // if kernel doesn't output chunked, just grab the one output and 
return it
        if !v.kernel.(*exec.VectorKernel).OutputChunked {
+               var output Datum
                select {
                case <-ctx.Done():
                        return nil
-               case output := <-out:
+               case output = <-out:
+               }
+
+               // we got an output datum, but let's wait for the channel to
+               // close so we don't have any race conditions
+               select {
+               case <-ctx.Done():
+                       output.Release()
+                       return nil
+               case <-out:
                        return output
                }
        }
diff --git a/go/arrow/compute/internal/exec/span.go 
b/go/arrow/compute/internal/exec/span.go
index 646fdcb23d..4221d6c08d 100644
--- a/go/arrow/compute/internal/exec/span.go
+++ b/go/arrow/compute/internal/exec/span.go
@@ -437,6 +437,12 @@ func (a *ArraySpan) FillFromScalar(val scalar.Scalar) {
        }
 }
 
+func (a *ArraySpan) SetDictionary(span *ArraySpan) {
+       a.resizeChildren(1)
+       a.Children[0].Release()
+       a.Children[0] = *span
+}
+
 // TakeOwnership is like SetMembers only this takes ownership of
 // the buffers by calling Retain on them so that the passed in
 // ArrayData can be released without negatively affecting this
@@ -479,18 +485,13 @@ func (a *ArraySpan) TakeOwnership(data arrow.ArrayData) {
        }
 
        if typeID == arrow.DICTIONARY {
-               if cap(a.Children) >= 1 {
-                       a.Children = a.Children[:1]
-               } else {
-                       a.Children = make([]ArraySpan, 1)
+               a.resizeChildren(1)
+               dict := data.Dictionary()
+               if dict != (*array.Data)(nil) {
+                       a.Children[0].TakeOwnership(dict)
                }
-               a.Children[0].TakeOwnership(data.Dictionary())
        } else {
-               if cap(a.Children) >= len(data.Children()) {
-                       a.Children = a.Children[:len(data.Children())]
-               } else {
-                       a.Children = make([]ArraySpan, len(data.Children()))
-               }
+               a.resizeChildren(len(data.Children()))
                for i, c := range data.Children() {
                        a.Children[i].TakeOwnership(c)
                }
@@ -538,12 +539,11 @@ func (a *ArraySpan) SetMembers(data arrow.ArrayData) {
        }
 
        if typeID == arrow.DICTIONARY {
-               if cap(a.Children) >= 1 {
-                       a.Children = a.Children[:1]
-               } else {
-                       a.Children = make([]ArraySpan, 1)
+               a.resizeChildren(1)
+               dict := data.Dictionary()
+               if dict != (*array.Data)(nil) {
+                       a.Children[0].SetMembers(dict)
                }
-               a.Children[0].SetMembers(data.Dictionary())
        } else {
                if cap(a.Children) >= len(data.Children()) {
                        a.Children = a.Children[:len(data.Children())]
@@ -619,6 +619,12 @@ func FillZeroLength(dt arrow.DataType, span *ArraySpan) {
                span.Buffers[i].Buf, span.Buffers[i].Owner = nil, nil
        }
 
+       if dt.ID() == arrow.DICTIONARY {
+               span.resizeChildren(1)
+               FillZeroLength(dt.(*arrow.DictionaryType).ValueType, 
&span.Children[0])
+               return
+       }
+
        nt, ok := dt.(arrow.NestedType)
        if !ok {
                if len(span.Children) > 0 {
@@ -627,11 +633,7 @@ func FillZeroLength(dt arrow.DataType, span *ArraySpan) {
                return
        }
 
-       if cap(span.Children) >= len(nt.Fields()) {
-               span.Children = span.Children[:len(nt.Fields())]
-       } else {
-               span.Children = make([]ArraySpan, len(nt.Fields()))
-       }
+       span.resizeChildren(len(nt.Fields()))
        for i, f := range nt.Fields() {
                FillZeroLength(f.Type, &span.Children[i])
        }
diff --git a/go/arrow/compute/internal/exec/utils.go 
b/go/arrow/compute/internal/exec/utils.go
index fea29dd08f..0e4cdf6594 100644
--- a/go/arrow/compute/internal/exec/utils.go
+++ b/go/arrow/compute/internal/exec/utils.go
@@ -102,6 +102,11 @@ func GetValues[T FixedWidthTypes](data arrow.ArrayData, i 
int) []T {
        return ret[data.Offset():]
 }
 
+func GetOffsets[T int32 | int64](data arrow.ArrayData, i int) []T {
+       ret := 
unsafe.Slice((*T)(unsafe.Pointer(&data.Buffers()[i].Bytes()[0])), 
data.Offset()+data.Len()+1)
+       return ret[data.Offset():]
+}
+
 // GetSpanValues returns a properly typed slice by reinterpreting
 // the buffer at index i using unsafe.Slice. This will take into account
 // the offset of the given ArraySpan.
@@ -177,13 +182,14 @@ var typMap = map[reflect.Type]arrow.DataType{
        reflect.TypeOf(arrow.Date64(0)): arrow.FixedWidthTypes.Date64,
        reflect.TypeOf(true):            arrow.FixedWidthTypes.Boolean,
        reflect.TypeOf(float16.Num{}):   arrow.FixedWidthTypes.Float16,
+       reflect.TypeOf([]byte{}):        arrow.BinaryTypes.Binary,
 }
 
 // GetDataType returns the appropriate arrow.DataType for the given type T
 // only for non-parametric types. This uses a map and reflection internally
 // so don't call this in a tight loop, instead call this once and then use
 // a closure with the result.
-func GetDataType[T NumericTypes | bool | string | float16.Num]() 
arrow.DataType {
+func GetDataType[T NumericTypes | bool | string | []byte | float16.Num]() 
arrow.DataType {
        var z T
        return typMap[reflect.TypeOf(z)]
 }
diff --git a/go/arrow/compute/internal/kernels/types.go 
b/go/arrow/compute/internal/kernels/types.go
index c9e3e2e070..d1fdfeb6ee 100644
--- a/go/arrow/compute/internal/kernels/types.go
+++ b/go/arrow/compute/internal/kernels/types.go
@@ -52,6 +52,9 @@ var (
                arrow.BinaryTypes.LargeBinary,
                arrow.BinaryTypes.String,
                arrow.BinaryTypes.LargeString}
+       primitiveTypes = append(append([]arrow.DataType{arrow.Null,
+               arrow.FixedWidthTypes.Date32, arrow.FixedWidthTypes.Date64},
+               numericTypes...), baseBinaryTypes...)
 )
 
 //go:generate stringer -type=CompareOperator -linecomment
diff --git a/go/arrow/compute/internal/kernels/vector_hash.go 
b/go/arrow/compute/internal/kernels/vector_hash.go
new file mode 100644
index 0000000000..7f9c908e6f
--- /dev/null
+++ b/go/arrow/compute/internal/kernels/vector_hash.go
@@ -0,0 +1,560 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package kernels
+
+import (
+       "fmt"
+
+       "github.com/apache/arrow/go/v12/arrow"
+       "github.com/apache/arrow/go/v12/arrow/array"
+       "github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
+       "github.com/apache/arrow/go/v12/arrow/internal/debug"
+       "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/bitutils"
+       "github.com/apache/arrow/go/v12/internal/hashing"
+)
+
+type HashState interface {
+       // Reset for another run
+       Reset() error
+       // Flush out accumulated results from last invocation
+       Flush(*exec.ExecResult) error
+       // FlushFinal flushes the accumulated results across all invocations
+       // of calls. The kernel should not be used again until after
+       // Reset() is called.
+       FlushFinal(out *exec.ExecResult) error
+       // GetDictionary returns the values (keys) accumulated in the dictionary
+       // so far.
+       GetDictionary() (arrow.ArrayData, error)
+       ValueType() arrow.DataType
+       // Append prepares the action for the given input (reserving 
appropriately
+       // sized data structures, etc.) and visits the input with the Action
+       Append(*exec.KernelCtx, *exec.ArraySpan) error
+       Allocator() memory.Allocator
+}
+
+type Action interface {
+       Reset() error
+       Reserve(int) error
+       Flush(*exec.ExecResult) error
+       FlushFinal(*exec.ExecResult) error
+       ObserveFound(int)
+       ObserveNotFound(int) error
+       ObserveNullFound(int)
+       ObserveNullNotFound(int) error
+       ShouldEncodeNulls() bool
+}
+
+type emptyAction struct {
+       mem memory.Allocator
+       dt  arrow.DataType
+}
+
+func (emptyAction) Reset() error                      { return nil }
+func (emptyAction) Reserve(int) error                 { return nil }
+func (emptyAction) Flush(*exec.ExecResult) error      { return nil }
+func (emptyAction) FlushFinal(*exec.ExecResult) error { return nil }
+func (emptyAction) ObserveFound(int)                  {}
+func (emptyAction) ObserveNotFound(int) error         { return nil }
+func (emptyAction) ObserveNullFound(int)              {}
+func (emptyAction) ObserveNullNotFound(int) error     { return nil }
+func (emptyAction) ShouldEncodeNulls() bool           { return true }
+
+type uniqueAction = emptyAction
+
+type regularHashState struct {
+       mem       memory.Allocator
+       typ       arrow.DataType
+       memoTable hashing.MemoTable
+       action    Action
+
+       doAppend func(Action, hashing.MemoTable, *exec.ArraySpan) error
+}
+
+func (rhs *regularHashState) Allocator() memory.Allocator { return rhs.mem }
+
+func (rhs *regularHashState) ValueType() arrow.DataType { return rhs.typ }
+
+func (rhs *regularHashState) Reset() error {
+       rhs.memoTable.Reset()
+       return rhs.action.Reset()
+}
+
+func (rhs *regularHashState) Append(_ *exec.KernelCtx, arr *exec.ArraySpan) 
error {
+       if err := rhs.action.Reserve(int(arr.Len)); err != nil {
+               return err
+       }
+
+       return rhs.doAppend(rhs.action, rhs.memoTable, arr)
+}
+
+func (rhs *regularHashState) Flush(out *exec.ExecResult) error { return 
rhs.action.Flush(out) }
+func (rhs *regularHashState) FlushFinal(out *exec.ExecResult) error {
+       return rhs.action.FlushFinal(out)
+}
+
+func (rhs *regularHashState) GetDictionary() (arrow.ArrayData, error) {
+       return array.GetDictArrayData(rhs.mem, rhs.typ, rhs.memoTable, 0)
+}
+
+func doAppendBinary[OffsetT int32 | int64](action Action, memo 
hashing.MemoTable, arr *exec.ArraySpan) error {
+       var (
+               bitmap            = arr.Buffers[0].Buf
+               offsets           = exec.GetSpanOffsets[OffsetT](arr, 1)
+               data              = arr.Buffers[2].Buf
+               shouldEncodeNulls = action.ShouldEncodeNulls()
+       )
+
+       return bitutils.VisitBitBlocksShort(bitmap, arr.Offset, arr.Len,
+               func(pos int64) error {
+                       v := data[offsets[pos]:offsets[pos+1]]
+                       idx, found, err := memo.GetOrInsert(v)
+                       if err != nil {
+                               return err
+                       }
+                       if found {
+                               action.ObserveFound(idx)
+                               return nil
+                       }
+                       return action.ObserveNotFound(idx)
+               },
+               func() error {
+                       if !shouldEncodeNulls {
+                               return action.ObserveNullNotFound(-1)
+                       }
+
+                       idx, found := memo.GetOrInsertNull()
+                       if found {
+                               action.ObserveNullFound(idx)
+                       }
+                       return action.ObserveNullNotFound(idx)
+               })
+}
+
+func doAppendFixedSize(action Action, memo hashing.MemoTable, arr 
*exec.ArraySpan) error {
+       sz := int64(arr.Type.(arrow.FixedWidthDataType).Bytes())
+       arrData := arr.Buffers[1].Buf[arr.Offset*sz:]
+       shouldEncodeNulls := action.ShouldEncodeNulls()
+
+       return bitutils.VisitBitBlocksShort(arr.Buffers[0].Buf, arr.Offset, 
arr.Len,
+               func(pos int64) error {
+                       // fixed size type memo table we use a binary memo table
+                       // so get the raw bytes
+                       idx, found, err := memo.GetOrInsert(arrData[pos*sz : 
(pos+1)*sz])
+                       if err != nil {
+                               return err
+                       }
+                       if found {
+                               action.ObserveFound(idx)
+                               return nil
+                       }
+                       return action.ObserveNotFound(idx)
+               }, func() error {
+                       if !shouldEncodeNulls {
+                               return action.ObserveNullNotFound(-1)
+                       }
+
+                       idx, found := memo.GetOrInsertNull()
+                       if found {
+                               action.ObserveNullFound(idx)
+                       }
+                       return action.ObserveNullNotFound(idx)
+               })
+}
+
+func doAppendNumeric[T exec.IntTypes | exec.UintTypes | 
exec.FloatTypes](action Action, memo hashing.MemoTable, arr *exec.ArraySpan) 
error {
+       arrData := exec.GetSpanValues[T](arr, 1)
+       shouldEncodeNulls := action.ShouldEncodeNulls()
+       return bitutils.VisitBitBlocksShort(arr.Buffers[0].Buf, arr.Offset, 
arr.Len,
+               func(pos int64) error {
+                       idx, found, err := memo.GetOrInsert(arrData[pos])
+                       if err != nil {
+                               return err
+                       }
+                       if found {
+                               action.ObserveFound(idx)
+                               return nil
+                       }
+                       return action.ObserveNotFound(idx)
+               }, func() error {
+                       if !shouldEncodeNulls {
+                               return action.ObserveNullNotFound(-1)
+                       }
+
+                       idx, found := memo.GetOrInsertNull()
+                       if found {
+                               action.ObserveNullFound(idx)
+                       }
+                       return action.ObserveNullNotFound(idx)
+               })
+}
+
+type nullHashState struct {
+       mem      memory.Allocator
+       typ      arrow.DataType
+       seenNull bool
+       action   Action
+}
+
+func (nhs *nullHashState) Allocator() memory.Allocator { return nhs.mem }
+
+func (nhs *nullHashState) ValueType() arrow.DataType { return nhs.typ }
+
+func (nhs *nullHashState) Reset() error {
+       return nhs.action.Reset()
+}
+
+func (nhs *nullHashState) Append(_ *exec.KernelCtx, arr *exec.ArraySpan) (err 
error) {
+       if err := nhs.action.Reserve(int(arr.Len)); err != nil {
+               return err
+       }
+
+       for i := 0; i < int(arr.Len); i++ {
+               if i == 0 {
+                       nhs.seenNull = true
+                       err = nhs.action.ObserveNullNotFound(0)
+               } else {
+                       nhs.action.ObserveNullFound(0)
+               }
+       }
+       return
+}
+
+func (nhs *nullHashState) Flush(out *exec.ExecResult) error { return 
nhs.action.Flush(out) }
+func (nhs *nullHashState) FlushFinal(out *exec.ExecResult) error {
+       return nhs.action.FlushFinal(out)
+}
+
+func (nhs *nullHashState) GetDictionary() (arrow.ArrayData, error) {
+       var out arrow.Array
+       if nhs.seenNull {
+               out = array.NewNull(1)
+       } else {
+               out = array.NewNull(0)
+       }
+       data := out.Data()
+       data.Retain()
+       out.Release()
+       return data, nil
+}
+
+type dictionaryHashState struct {
+       indicesKernel HashState
+       dictionary    arrow.Array
+       dictValueType arrow.DataType
+}
+
+func (dhs *dictionaryHashState) Allocator() memory.Allocator { return 
dhs.indicesKernel.Allocator() }
+func (dhs *dictionaryHashState) Reset() error                { return 
dhs.indicesKernel.Reset() }
+func (dhs *dictionaryHashState) Flush(out *exec.ExecResult) error {
+       return dhs.indicesKernel.Flush(out)
+}
+func (dhs *dictionaryHashState) FlushFinal(out *exec.ExecResult) error {
+       return dhs.indicesKernel.FlushFinal(out)
+}
+func (dhs *dictionaryHashState) GetDictionary() (arrow.ArrayData, error) {
+       return dhs.indicesKernel.GetDictionary()
+}
+func (dhs *dictionaryHashState) ValueType() arrow.DataType           { return 
dhs.indicesKernel.ValueType() }
+func (dhs *dictionaryHashState) DictionaryValueType() arrow.DataType { return 
dhs.dictValueType }
+func (dhs *dictionaryHashState) Dictionary() arrow.Array             { return 
dhs.dictionary }
+func (dhs *dictionaryHashState) Append(ctx *exec.KernelCtx, arr 
*exec.ArraySpan) error {
+       arrDict := arr.Dictionary().MakeArray()
+       if dhs.dictionary == nil || array.Equal(dhs.dictionary, arrDict) {
+               dhs.dictionary = arrDict
+               return dhs.indicesKernel.Append(ctx, arr)
+       }
+
+       defer arrDict.Release()
+
+       // NOTE: this approach computes a new dictionary unification per chunk
+       // this is in effect O(n*k) where n is the total chunked array length
+       // and k is the number of chunks (therefore O(n**2) if chunks have a 
fixed size).
+       //
+       // A better approach may be to run the kernel over each individual 
chunk,
+       // and then hash-aggregate all results (for example sum-group-by for
+       // the "value_counts" kernel)
+       unifier, err := 
array.NewDictionaryUnifier(dhs.indicesKernel.Allocator(), dhs.dictValueType)
+       if err != nil {
+               return err
+       }
+       defer unifier.Release()
+
+       if err := unifier.Unify(dhs.dictionary); err != nil {
+               return err
+       }
+       transposeMap, err := unifier.UnifyAndTranspose(arrDict)
+       if err != nil {
+               return err
+       }
+       defer transposeMap.Release()
+       _, outDict, err := unifier.GetResult()
+       if err != nil {
+               return err
+       }
+       defer func() {
+               dhs.dictionary.Release()
+               dhs.dictionary = outDict
+       }()
+
+       inDict := arr.MakeData()
+       defer inDict.Release()
+       tmp, err := array.TransposeDictIndices(dhs.Allocator(), inDict, 
arr.Type, arr.Type, outDict.Data(), 
arrow.Int32Traits.CastFromBytes(transposeMap.Bytes()))
+       if err != nil {
+               return err
+       }
+       defer tmp.Release()
+
+       var tmpSpan exec.ArraySpan
+       tmpSpan.SetMembers(tmp)
+       return dhs.indicesKernel.Append(ctx, &tmpSpan)
+}
+
+func nullHashInit(actionInit initAction) exec.KernelInitFn {
+       return func(ctx *exec.KernelCtx, args exec.KernelInitArgs) 
(exec.KernelState, error) {
+               mem := exec.GetAllocator(ctx.Ctx)
+               ret := &nullHashState{
+                       mem:    mem,
+                       typ:    args.Inputs[0],
+                       action: actionInit(args.Inputs[0], args.Options, mem),
+               }
+               ret.Reset()
+               return ret, nil
+       }
+}
+
+func newMemoTable(mem memory.Allocator, dt arrow.Type) (hashing.MemoTable, 
error) {
+       switch dt {
+       case arrow.INT8, arrow.UINT8:
+               return hashing.NewUint8MemoTable(0), nil
+       case arrow.INT16, arrow.UINT16:
+               return hashing.NewUint16MemoTable(0), nil
+       case arrow.INT32, arrow.UINT32, arrow.FLOAT32,
+               arrow.DATE32, arrow.TIME32, arrow.INTERVAL_MONTHS:
+               return hashing.NewUint32MemoTable(0), nil
+       case arrow.INT64, arrow.UINT64, arrow.FLOAT64,
+               arrow.DATE64, arrow.TIME64, arrow.TIMESTAMP,
+               arrow.DURATION, arrow.INTERVAL_DAY_TIME:
+               return hashing.NewUint64MemoTable(0), nil
+       case arrow.BINARY, arrow.STRING, arrow.FIXED_SIZE_BINARY, 
arrow.DECIMAL128,
+               arrow.DECIMAL256, arrow.INTERVAL_MONTH_DAY_NANO:
+               return hashing.NewBinaryMemoTable(0, 0,
+                       array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary)), 
nil
+       case arrow.LARGE_BINARY, arrow.LARGE_STRING:
+               return hashing.NewBinaryMemoTable(0, 0,
+                       array.NewBinaryBuilder(mem, 
arrow.BinaryTypes.LargeBinary)), nil
+       default:
+               return nil, fmt.Errorf("%w: unsupported type %s", 
arrow.ErrNotImplemented, dt)
+       }
+}
+
+func regularHashInit(dt arrow.DataType, actionInit initAction, appendFn 
func(Action, hashing.MemoTable, *exec.ArraySpan) error) exec.KernelInitFn {
+       return func(ctx *exec.KernelCtx, args exec.KernelInitArgs) 
(exec.KernelState, error) {
+               mem := exec.GetAllocator(ctx.Ctx)
+               memoTable, err := newMemoTable(mem, dt.ID())
+               if err != nil {
+                       return nil, err
+               }
+
+               ret := &regularHashState{
+                       mem:       mem,
+                       typ:       args.Inputs[0],
+                       memoTable: memoTable,
+                       action:    actionInit(args.Inputs[0], args.Options, 
mem),
+                       doAppend:  appendFn,
+               }
+               ret.Reset()
+               return ret, nil
+       }
+}
+
+func dictionaryHashInit(actionInit initAction) exec.KernelInitFn {
+       return func(ctx *exec.KernelCtx, args exec.KernelInitArgs) 
(exec.KernelState, error) {
+               var (
+                       dictType      = args.Inputs[0].(*arrow.DictionaryType)
+                       indicesHasher exec.KernelState
+                       err           error
+               )
+
+               switch dictType.IndexType.ID() {
+               case arrow.INT8, arrow.UINT8:
+                       indicesHasher, err = getHashInit(arrow.UINT8, 
actionInit)(ctx, args)
+               case arrow.INT16, arrow.UINT16:
+                       indicesHasher, err = getHashInit(arrow.UINT16, 
actionInit)(ctx, args)
+               case arrow.INT32, arrow.UINT32:
+                       indicesHasher, err = getHashInit(arrow.UINT32, 
actionInit)(ctx, args)
+               case arrow.INT64, arrow.UINT64:
+                       indicesHasher, err = getHashInit(arrow.UINT64, 
actionInit)(ctx, args)
+               default:
+                       return nil, fmt.Errorf("%w: unsupported dictionary 
index type", arrow.ErrInvalid)
+               }
+               if err != nil {
+                       return nil, err
+               }
+
+               return &dictionaryHashState{
+                       indicesKernel: indicesHasher.(HashState),
+                       dictValueType: dictType.ValueType,
+               }, nil
+       }
+}
+
+type initAction func(arrow.DataType, any, memory.Allocator) Action
+
+func getHashInit(typeID arrow.Type, actionInit initAction) exec.KernelInitFn {
+       switch typeID {
+       case arrow.NULL:
+               return nullHashInit(actionInit)
+       case arrow.INT8, arrow.UINT8:
+               return regularHashInit(arrow.PrimitiveTypes.Uint8, actionInit, 
doAppendNumeric[uint8])
+       case arrow.INT16, arrow.UINT16:
+               return regularHashInit(arrow.PrimitiveTypes.Uint16, actionInit, 
doAppendNumeric[uint16])
+       case arrow.INT32, arrow.UINT32, arrow.FLOAT32,
+               arrow.DATE32, arrow.TIME32, arrow.INTERVAL_MONTHS:
+               return regularHashInit(arrow.PrimitiveTypes.Uint32, actionInit, 
doAppendNumeric[uint32])
+       case arrow.INT64, arrow.UINT64, arrow.FLOAT64,
+               arrow.DATE64, arrow.TIME64, arrow.TIMESTAMP,
+               arrow.DURATION, arrow.INTERVAL_DAY_TIME:
+               return regularHashInit(arrow.PrimitiveTypes.Uint64, actionInit, 
doAppendNumeric[uint64])
+       case arrow.BINARY, arrow.STRING:
+               return regularHashInit(arrow.BinaryTypes.Binary, actionInit, 
doAppendBinary[int32])
+       case arrow.LARGE_BINARY, arrow.LARGE_STRING:
+               return regularHashInit(arrow.BinaryTypes.LargeBinary, 
actionInit, doAppendBinary[int64])
+       case arrow.FIXED_SIZE_BINARY, arrow.DECIMAL128, arrow.DECIMAL256:
+               return regularHashInit(arrow.BinaryTypes.Binary, actionInit, 
doAppendFixedSize)
+       case arrow.INTERVAL_MONTH_DAY_NANO:
+               return 
regularHashInit(arrow.FixedWidthTypes.MonthDayNanoInterval, actionInit, 
doAppendFixedSize)
+       default:
+               debug.Assert(false, "unsupported hash init type")
+               return nil
+       }
+}
+
+func hashExec(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) 
error {
+       impl, ok := ctx.State.(HashState)
+       if !ok {
+               return fmt.Errorf("%w: bad initialization of hash state", 
arrow.ErrInvalid)
+       }
+
+       if err := impl.Append(ctx, &batch.Values[0].Array); err != nil {
+               return err
+       }
+
+       return impl.Flush(out)
+}
+
+func uniqueFinalize(ctx *exec.KernelCtx, _ []*exec.ArraySpan) 
([]*exec.ArraySpan, error) {
+       impl, ok := ctx.State.(HashState)
+       if !ok {
+               return nil, fmt.Errorf("%w: HashState in invalid state", 
arrow.ErrInvalid)
+       }
+
+       uniques, err := impl.GetDictionary()
+       if err != nil {
+               return nil, err
+       }
+       defer uniques.Release()
+
+       var out exec.ArraySpan
+       out.TakeOwnership(uniques)
+       return []*exec.ArraySpan{&out}, nil
+}
+
+func ensureHashDictionary(ctx *exec.KernelCtx, hash *dictionaryHashState) 
(*exec.ArraySpan, error) {
+       out := &exec.ArraySpan{}
+
+       if hash.dictionary != nil {
+               out.TakeOwnership(hash.dictionary.Data())
+               hash.dictionary.Release()
+               return out, nil
+       }
+
+       exec.FillZeroLength(hash.DictionaryValueType(), out)
+       return out, nil
+}
+
+func uniqueFinalizeDictionary(ctx *exec.KernelCtx, result []*exec.ArraySpan) 
(out []*exec.ArraySpan, err error) {
+       if out, err = uniqueFinalize(ctx, result); err != nil {
+               return
+       }
+
+       hash, ok := ctx.State.(*dictionaryHashState)
+       if !ok {
+               return nil, fmt.Errorf("%w: state should be 
*dictionaryHashState", arrow.ErrInvalid)
+       }
+
+       dict, err := ensureHashDictionary(ctx, hash)
+       if err != nil {
+               return nil, err
+       }
+       out[0].SetDictionary(dict)
+       return
+}
+
+func addHashKernels(base exec.VectorKernel, actionInit initAction, outTy 
exec.OutputType) []exec.VectorKernel {
+       kernels := make([]exec.VectorKernel, 0)
+       for _, ty := range primitiveTypes {
+               base.Init = getHashInit(ty.ID(), actionInit)
+               base.Signature = &exec.KernelSignature{
+                       InputTypes: []exec.InputType{exec.NewExactInput(ty)},
+                       OutType:    outTy,
+               }
+               kernels = append(kernels, base)
+       }
+
+       parametricTypes := []arrow.Type{arrow.TIME32, arrow.TIME64, 
arrow.TIMESTAMP,
+               arrow.DURATION, arrow.FIXED_SIZE_BINARY, arrow.DECIMAL128, 
arrow.DECIMAL256,
+               arrow.INTERVAL_DAY_TIME, arrow.INTERVAL_MONTHS, 
arrow.INTERVAL_MONTH_DAY_NANO}
+       for _, ty := range parametricTypes {
+               base.Init = getHashInit(ty, actionInit)
+               base.Signature = &exec.KernelSignature{
+                       InputTypes: []exec.InputType{exec.NewIDInput(ty)},
+                       OutType:    outTy,
+               }
+               kernels = append(kernels, base)
+       }
+
+       return kernels
+}
+
+func initUnique(dt arrow.DataType, _ any, mem memory.Allocator) Action {
+       return uniqueAction{mem: mem, dt: dt}
+}
+
+func GetVectorHashKernels() (unique, valueCounts, dictEncode 
[]exec.VectorKernel) {
+       var base exec.VectorKernel
+       base.ExecFn = hashExec
+
+       // unique
+       base.Finalize = uniqueFinalize
+       base.OutputChunked = false
+       base.CanExecuteChunkWise = true
+       unique = addHashKernels(base, initUnique, OutputFirstType)
+
+       // dictionary unique
+       base.Init = dictionaryHashInit(initUnique)
+       base.Finalize = uniqueFinalizeDictionary
+       base.Signature = &exec.KernelSignature{
+               InputTypes: []exec.InputType{exec.NewIDInput(arrow.DICTIONARY)},
+               OutType:    OutputFirstType,
+       }
+       unique = append(unique, base)
+
+       return
+}
diff --git a/go/arrow/compute/registry.go b/go/arrow/compute/registry.go
index 5aee18040d..54d39d17e0 100644
--- a/go/arrow/compute/registry.go
+++ b/go/arrow/compute/registry.go
@@ -51,6 +51,7 @@ func GetFunctionRegistry() FunctionRegistry {
                RegisterScalarBoolean(registry)
                RegisterScalarArithmetic(registry)
                RegisterScalarComparisons(registry)
+               RegisterVectorHash(registry)
        })
        return registry
 }
diff --git a/go/arrow/compute/vector_hash.go b/go/arrow/compute/vector_hash.go
new file mode 100644
index 0000000000..42ec942573
--- /dev/null
+++ b/go/arrow/compute/vector_hash.go
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package compute
+
+import (
+       "context"
+
+       "github.com/apache/arrow/go/v12/arrow"
+       "github.com/apache/arrow/go/v12/arrow/compute/internal/kernels"
+)
+
+var (
+       uniqueDoc = FunctionDoc{
+               Summary:     "Compute unique elements",
+               Description: "Return an array with distinct values. Nulls in 
the input are ignored",
+               ArgNames:    []string{"array"},
+       }
+)
+
+func Unique(ctx context.Context, values Datum) (Datum, error) {
+       return CallFunction(ctx, "unique", nil, values)
+}
+
+func UniqueArray(ctx context.Context, values arrow.Array) (arrow.Array, error) 
{
+       out, err := Unique(ctx, &ArrayDatum{Value: values.Data()})
+       if err != nil {
+               return nil, err
+       }
+       defer out.Release()
+
+       return out.(*ArrayDatum).MakeArray(), nil
+}
+
+func RegisterVectorHash(reg FunctionRegistry) {
+       unique, _, _ := kernels.GetVectorHashKernels()
+       uniqFn := NewVectorFunction("unique", Unary(), uniqueDoc)
+       for _, vd := range unique {
+               if err := uniqFn.AddKernel(vd); err != nil {
+                       panic(err)
+               }
+       }
+       reg.AddFunction(uniqFn, false)
+}
diff --git a/go/arrow/compute/vector_hash_test.go 
b/go/arrow/compute/vector_hash_test.go
new file mode 100644
index 0000000000..796472c9bb
--- /dev/null
+++ b/go/arrow/compute/vector_hash_test.go
@@ -0,0 +1,518 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.18
+
+package compute_test
+
+import (
+       "context"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow/go/v12/arrow"
+       "github.com/apache/arrow/go/v12/arrow/array"
+       "github.com/apache/arrow/go/v12/arrow/compute"
+       "github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
+       "github.com/apache/arrow/go/v12/arrow/decimal128"
+       "github.com/apache/arrow/go/v12/arrow/decimal256"
+       "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "github.com/stretchr/testify/suite"
+       "golang.org/x/exp/constraints"
+)
+
+func checkUniqueDict[I exec.IntTypes | exec.UintTypes](t *testing.T, input 
compute.ArrayLikeDatum, expected arrow.Array) {
+       out, err := compute.Unique(context.TODO(), input)
+       require.NoError(t, err)
+       defer out.Release()
+
+       result := out.(*compute.ArrayDatum).MakeArray().(*array.Dictionary)
+       defer result.Release()
+
+       require.Truef(t, arrow.TypeEqual(result.DataType(), 
expected.DataType()),
+               "wanted: %s\ngot: %s", expected.DataType(), result.DataType())
+
+       exDict := expected.(*array.Dictionary).Dictionary()
+       resultDict := result.Dictionary()
+
+       require.Truef(t, array.Equal(exDict, resultDict), "wanted: %s\ngot: 
%s", exDict, resultDict)
+
+       want := 
exec.GetValues[I](expected.(*array.Dictionary).Indices().Data(), 1)
+       got := exec.GetValues[I](result.Indices().Data(), 1)
+       assert.ElementsMatchf(t, got, want, "wanted: %s\ngot: %s", want, got)
+}
+
+func checkDictionaryUnique(t *testing.T, input compute.ArrayLikeDatum, 
expected arrow.Array) {
+       require.Truef(t, arrow.TypeEqual(input.Type(), expected.DataType()),
+               "wanted: %s\ngot: %s", expected.DataType(), input.Type())
+
+       switch input.Type().(*arrow.DictionaryType).IndexType.ID() {
+       case arrow.INT8:
+               checkUniqueDict[int8](t, input, expected)
+       case arrow.INT16:
+               checkUniqueDict[int16](t, input, expected)
+       case arrow.INT32:
+               checkUniqueDict[int32](t, input, expected)
+       case arrow.INT64:
+               checkUniqueDict[int64](t, input, expected)
+       case arrow.UINT8:
+               checkUniqueDict[uint8](t, input, expected)
+       case arrow.UINT16:
+               checkUniqueDict[uint16](t, input, expected)
+       case arrow.UINT32:
+               checkUniqueDict[uint32](t, input, expected)
+       case arrow.UINT64:
+               checkUniqueDict[uint64](t, input, expected)
+       }
+}
+
+func checkUniqueFixedWidth[T exec.FixedWidthTypes](t *testing.T, input, 
expected arrow.Array) {
+       result, err := compute.UniqueArray(context.TODO(), input)
+       require.NoError(t, err)
+       defer result.Release()
+
+       require.Truef(t, arrow.TypeEqual(result.DataType(), 
expected.DataType()),
+               "wanted: %s\ngot: %s", expected.DataType(), result.DataType())
+       want := exec.GetValues[T](expected.Data(), 1)
+       got := exec.GetValues[T](expected.Data(), 1)
+
+       assert.ElementsMatchf(t, got, want, "wanted: %s\ngot: %s", want, got)
+}
+
+func checkUniqueVariableWidth[OffsetType int32 | int64](t *testing.T, input, 
expected arrow.Array) {
+       result, err := compute.UniqueArray(context.TODO(), input)
+       require.NoError(t, err)
+       defer result.Release()
+
+       require.Truef(t, arrow.TypeEqual(result.DataType(), 
expected.DataType()),
+               "wanted: %s\ngot: %s", expected.DataType(), result.DataType())
+
+       require.EqualValues(t, expected.Len(), result.Len())
+
+       createSlice := func(v arrow.Array) [][]byte {
+               var (
+                       offsets = exec.GetOffsets[OffsetType](v.Data(), 1)
+                       data    = v.Data().Buffers()[2].Bytes()
+                       out     = make([][]byte, v.Len())
+               )
+
+               for i := 0; i < v.Len(); i++ {
+                       out[i] = data[offsets[i]:offsets[i+1]]
+               }
+               return out
+       }
+
+       want := createSlice(expected)
+       got := createSlice(result)
+
+       assert.ElementsMatch(t, want, got)
+}
+
+type ArrowType interface {
+       exec.FixedWidthTypes | string | []byte
+}
+
+type builder[T ArrowType] interface {
+       AppendValues([]T, []bool)
+}
+
+func makeArray[T ArrowType](mem memory.Allocator, dt arrow.DataType, values 
[]T, isValid []bool) arrow.Array {
+       bldr := array.NewBuilder(mem, dt)
+       defer bldr.Release()
+
+       bldr.(builder[T]).AppendValues(values, isValid)
+       return bldr.NewArray()
+}
+
+func checkUniqueFixedSizeBinary(t *testing.T, mem memory.Allocator, dt 
*arrow.FixedSizeBinaryType, inValues, outValues [][]byte, inValid, outValid 
[]bool) {
+       input := makeArray(mem, dt, inValues, inValid)
+       defer input.Release()
+       expected := makeArray(mem, dt, outValues, outValid)
+       defer expected.Release()
+
+       result, err := compute.UniqueArray(context.TODO(), input)
+       require.NoError(t, err)
+       defer result.Release()
+
+       require.Truef(t, arrow.TypeEqual(result.DataType(), 
expected.DataType()),
+               "wanted: %s\ngot: %s", expected.DataType(), result.DataType())
+
+       slice := func(v arrow.Array) [][]byte {
+               data := v.Data().Buffers()[1].Bytes()
+               out := make([][]byte, v.Len())
+               for i := range out {
+                       out[i] = data[i*dt.ByteWidth : (i+1)*dt.ByteWidth]
+               }
+               return out
+       }
+
+       want := slice(expected)
+       got := slice(result)
+       assert.ElementsMatch(t, want, got)
+}
+
+func checkUniqueFW[T exec.FixedWidthTypes](t *testing.T, mem memory.Allocator, 
dt arrow.DataType, inValues, outValues []T, inValid, outValid []bool) {
+       input := makeArray(mem, dt, inValues, inValid)
+       defer input.Release()
+       expected := makeArray(mem, dt, outValues, outValid)
+       defer expected.Release()
+
+       checkUniqueFixedWidth[T](t, input, expected)
+}
+
+func checkUniqueVW[T string | []byte](t *testing.T, mem memory.Allocator, dt 
arrow.DataType, inValues, outValues []T, inValid, outValid []bool) {
+       input := makeArray(mem, dt, inValues, inValid)
+       defer input.Release()
+       expected := makeArray(mem, dt, outValues, outValid)
+       defer expected.Release()
+
+       switch dt.(arrow.BinaryDataType).Layout().Buffers[1].ByteWidth {
+       case 4:
+               checkUniqueVariableWidth[int32](t, input, expected)
+       case 8:
+               checkUniqueVariableWidth[int64](t, input, expected)
+       }
+}
+
+type PrimitiveHashKernelSuite[T exec.IntTypes | exec.UintTypes | 
constraints.Float] struct {
+       suite.Suite
+
+       mem *memory.CheckedAllocator
+       dt  arrow.DataType
+}
+
+func (ps *PrimitiveHashKernelSuite[T]) SetupSuite() {
+       ps.dt = exec.GetDataType[T]()
+}
+
+func (ps *PrimitiveHashKernelSuite[T]) SetupTest() {
+       ps.mem = memory.NewCheckedAllocator(memory.DefaultAllocator)
+}
+
+func (ps *PrimitiveHashKernelSuite[T]) TearDownTest() {
+       ps.mem.AssertSize(ps.T(), 0)
+}
+
+func (ps *PrimitiveHashKernelSuite[T]) TestUnique() {
+       ps.Run(ps.dt.String(), func() {
+               if ps.dt.ID() == arrow.DATE64 {
+                       checkUniqueFW(ps.T(), ps.mem, ps.dt,
+                               []arrow.Date64{172800000, 864000000, 172800000, 
864000000},
+                               []arrow.Date64{172800000, 0, 864000000},
+                               []bool{true, false, true, true}, []bool{true, 
false, true})
+
+                       checkUniqueFW(ps.T(), ps.mem, ps.dt,
+                               []arrow.Date64{172800000, 864000000, 259200000, 
864000000},
+                               []arrow.Date64{0, 259200000, 864000000},
+                               []bool{false, false, true, true}, []bool{false, 
true, true})
+
+                       arr, _, err := array.FromJSON(ps.mem, ps.dt, 
strings.NewReader(`[86400000, 172800000, null, 259200000, 172800000, null]`))
+                       ps.Require().NoError(err)
+                       defer arr.Release()
+                       input := array.NewSlice(arr, 1, 5)
+                       defer input.Release()
+                       expected, _, err := array.FromJSON(ps.mem, ps.dt, 
strings.NewReader(`[172800000, null, 259200000]`))
+                       ps.Require().NoError(err)
+                       defer expected.Release()
+                       checkUniqueFixedWidth[arrow.Date64](ps.T(), input, 
expected)
+                       return
+               }
+
+               checkUniqueFW(ps.T(), ps.mem, ps.dt,
+                       []T{2, 1, 2, 1}, []T{2, 0, 1},
+                       []bool{true, false, true, true}, []bool{true, false, 
true})
+               checkUniqueFW(ps.T(), ps.mem, ps.dt,
+                       []T{2, 1, 3, 1}, []T{0, 3, 1},
+                       []bool{false, false, true, true}, []bool{false, true, 
true})
+
+               arr, _, err := array.FromJSON(ps.mem, ps.dt, 
strings.NewReader(`[1, 2, null, 3, 2, null]`))
+               ps.Require().NoError(err)
+               defer arr.Release()
+               input := array.NewSlice(arr, 1, 5)
+               defer input.Release()
+
+               expected, _, err := array.FromJSON(ps.mem, ps.dt, 
strings.NewReader(`[2, null, 3]`))
+               ps.Require().NoError(err)
+               defer expected.Release()
+
+               checkUniqueFixedWidth[T](ps.T(), input, expected)
+       })
+}
+
+type BinaryTypeHashKernelSuite[T string | []byte] struct {
+       suite.Suite
+
+       mem *memory.CheckedAllocator
+       dt  arrow.DataType
+}
+
+func (ps *BinaryTypeHashKernelSuite[T]) SetupTest() {
+       ps.mem = memory.NewCheckedAllocator(memory.DefaultAllocator)
+}
+
+func (ps *BinaryTypeHashKernelSuite[T]) TearDownTest() {
+       ps.mem.AssertSize(ps.T(), 0)
+}
+
+func (ps *BinaryTypeHashKernelSuite[T]) TestUnique() {
+       ps.Run(ps.dt.String(), func() {
+               checkUniqueVW(ps.T(), ps.mem, ps.dt,
+                       []T{T("test"), T(""), T("test2"), T("test")}, 
[]T{T("test"), T(""), T("test2")},
+                       []bool{true, false, true, true}, []bool{true, false, 
true})
+       })
+}
+
+func TestHashKernels(t *testing.T) {
+       suite.Run(t, &PrimitiveHashKernelSuite[int8]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[uint8]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[int16]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[uint16]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[int32]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[uint32]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[int64]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[uint64]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[float32]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[float64]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[arrow.Date32]{})
+       suite.Run(t, &PrimitiveHashKernelSuite[arrow.Date64]{})
+
+       suite.Run(t, &BinaryTypeHashKernelSuite[string]{dt: 
arrow.BinaryTypes.String})
+       suite.Run(t, &BinaryTypeHashKernelSuite[string]{dt: 
arrow.BinaryTypes.LargeString})
+       suite.Run(t, &BinaryTypeHashKernelSuite[[]byte]{dt: 
arrow.BinaryTypes.Binary})
+       suite.Run(t, &BinaryTypeHashKernelSuite[[]byte]{dt: 
arrow.BinaryTypes.LargeBinary})
+}
+
+func TestUniqueTimeTimestamp(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       checkUniqueFW(t, mem, arrow.FixedWidthTypes.Time32s,
+               []arrow.Time32{2, 1, 2, 1}, []arrow.Time32{2, 0, 1},
+               []bool{true, false, true, true}, []bool{true, false, true})
+
+       checkUniqueFW(t, mem, arrow.FixedWidthTypes.Time64ns,
+               []arrow.Time64{2, 1, 2, 1}, []arrow.Time64{2, 0, 1},
+               []bool{true, false, true, true}, []bool{true, false, true})
+
+       checkUniqueFW(t, mem, arrow.FixedWidthTypes.Timestamp_ns,
+               []arrow.Timestamp{2, 1, 2, 1}, []arrow.Timestamp{2, 0, 1},
+               []bool{true, false, true, true}, []bool{true, false, true})
+
+       checkUniqueFW(t, mem, arrow.FixedWidthTypes.Duration_ns,
+               []arrow.Duration{2, 1, 2, 1}, []arrow.Duration{2, 0, 1},
+               []bool{true, false, true, true}, []bool{true, false, true})
+}
+
+func TestUniqueFixedSizeBinary(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       dt := &arrow.FixedSizeBinaryType{ByteWidth: 3}
+       checkUniqueFixedSizeBinary(t, mem, dt,
+               [][]byte{[]byte("aaa"), nil, []byte("bbb"), []byte("aaa")},
+               [][]byte{[]byte("aaa"), nil, []byte("bbb")},
+               []bool{true, false, true, true}, []bool{true, false, true})
+}
+
+func TestUniqueDecimal(t *testing.T) {
+       t.Run("decimal128", func(t *testing.T) {
+               mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+               defer mem.AssertSize(t, 0)
+
+               values := []decimal128.Num{
+                       decimal128.FromI64(12),
+                       decimal128.FromI64(12),
+                       decimal128.FromI64(11),
+                       decimal128.FromI64(12)}
+               expected := []decimal128.Num{
+                       decimal128.FromI64(12),
+                       decimal128.FromI64(0),
+                       decimal128.FromI64(11)}
+
+               checkUniqueFW(t, mem, &arrow.Decimal128Type{Precision: 2, 
Scale: 0},
+                       values, expected, []bool{true, false, true, true}, 
[]bool{true, false, true})
+       })
+
+       t.Run("decimal256", func(t *testing.T) {
+               mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+               defer mem.AssertSize(t, 0)
+
+               values := []decimal256.Num{
+                       decimal256.FromI64(12),
+                       decimal256.FromI64(12),
+                       decimal256.FromI64(11),
+                       decimal256.FromI64(12)}
+               expected := []decimal256.Num{
+                       decimal256.FromI64(12),
+                       decimal256.FromI64(0),
+                       decimal256.FromI64(11)}
+
+               checkUniqueFW(t, mem, &arrow.Decimal256Type{Precision: 2, 
Scale: 0},
+                       values, expected, []bool{true, false, true, true}, 
[]bool{true, false, true})
+       })
+}
+
+func TestUniqueIntervalMonth(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       checkUniqueFW(t, mem, arrow.FixedWidthTypes.MonthInterval,
+               []arrow.MonthInterval{2, 1, 2, 1}, []arrow.MonthInterval{2, 0, 
1},
+               []bool{true, false, true, true}, []bool{true, false, true})
+
+       checkUniqueFW(t, mem, arrow.FixedWidthTypes.DayTimeInterval,
+               []arrow.DayTimeInterval{
+                       {Days: 2, Milliseconds: 1}, {Days: 3, Milliseconds: 2},
+                       {Days: 2, Milliseconds: 1}, {Days: 1, Milliseconds: 2}},
+               []arrow.DayTimeInterval{{Days: 2, Milliseconds: 1},
+                       {Days: 1, Milliseconds: 1}, {Days: 1, Milliseconds: 2}},
+               []bool{true, false, true, true}, []bool{true, false, true})
+
+       checkUniqueFW(t, mem, arrow.FixedWidthTypes.MonthDayNanoInterval,
+               []arrow.MonthDayNanoInterval{
+                       {Months: 2, Days: 1, Nanoseconds: 1},
+                       {Months: 3, Days: 2, Nanoseconds: 1},
+                       {Months: 2, Days: 1, Nanoseconds: 1},
+                       {Months: 1, Days: 2, Nanoseconds: 1}},
+               []arrow.MonthDayNanoInterval{
+                       {Months: 2, Days: 1, Nanoseconds: 1},
+                       {Months: 1, Days: 1, Nanoseconds: 1},
+                       {Months: 1, Days: 2, Nanoseconds: 1}},
+               []bool{true, false, true, true}, []bool{true, false, true})
+}
+
+func TestUniqueChunkedArrayInvoke(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       var (
+               values1    = []string{"foo", "bar", "foo"}
+               values2    = []string{"bar", "baz", "quuux", "foo"}
+               dictValues = []string{"foo", "bar", "baz", "quuux"}
+               typ        = arrow.BinaryTypes.String
+               a1         = makeArray(mem, typ, values1, nil)
+               a2         = makeArray(mem, typ, values2, nil)
+               exDict     = makeArray(mem, typ, dictValues, nil)
+       )
+
+       defer a1.Release()
+       defer a2.Release()
+       defer exDict.Release()
+
+       carr := arrow.NewChunked(typ, []arrow.Array{a1, a2})
+       defer carr.Release()
+
+       result, err := compute.Unique(context.TODO(), 
&compute.ChunkedDatum{Value: carr})
+       require.NoError(t, err)
+       defer result.Release()
+
+       require.Equal(t, compute.KindArray, result.Kind())
+       out := result.(*compute.ArrayDatum).MakeArray()
+       defer out.Release()
+
+       assertArraysEqual(t, exDict, out)
+}
+
+func TestDictionaryUnique(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       const dictJSON = `[10, 20, 30, 40]`
+       dict, _, err := array.FromJSON(mem, arrow.PrimitiveTypes.Int64, 
strings.NewReader(dictJSON))
+       require.NoError(t, err)
+       defer dict.Release()
+
+       for _, idxTyp := range integerTypes {
+               t.Run("index_type="+idxTyp.Name(), func(t *testing.T) {
+                       scope := memory.NewCheckedAllocatorScope(mem)
+                       defer scope.CheckSize(t)
+
+                       indices, _, _ := array.FromJSON(mem, idxTyp, 
strings.NewReader(`[3, 0, 0, 0, 1, 1, 3, 0, 1, 3, 0, 1]`))
+                       defer indices.Release()
+                       dictType := &arrow.DictionaryType{
+                               IndexType: idxTyp, ValueType: 
arrow.PrimitiveTypes.Int64}
+                       exIndices, _, _ := array.FromJSON(mem, idxTyp, 
strings.NewReader(`[3, 0, 1]`))
+                       defer exIndices.Release()
+
+                       input := array.NewDictionaryArray(dictType, indices, 
dict)
+                       defer input.Release()
+                       exUniques := array.NewDictionaryArray(dictType, 
exIndices, dict)
+                       defer exUniques.Release()
+
+                       checkDictionaryUnique(t, &compute.ArrayDatum{Value: 
input.Data()}, exUniques)
+
+                       t.Run("empty array", func(t *testing.T) {
+                               scope := memory.NewCheckedAllocatorScope(mem)
+                               defer scope.CheckSize(t)
+
+                               // executor never gives the kernel any batches
+                               // so result dictionary is empty
+                               emptyInput, _ := array.DictArrayFromJSON(mem, 
dictType, `[]`, dictJSON)
+                               defer emptyInput.Release()
+                               exEmpty, _ := array.DictArrayFromJSON(mem, 
dictType, `[]`, `[]`)
+                               defer exEmpty.Release()
+                               checkDictionaryUnique(t, 
&compute.ArrayDatum{Value: emptyInput.Data()}, exEmpty)
+                       })
+
+                       t.Run("different chunk dictionaries", func(t 
*testing.T) {
+                               scope := memory.NewCheckedAllocatorScope(mem)
+                               defer scope.CheckSize(t)
+
+                               input2, _ := array.DictArrayFromJSON(mem, 
dictType, `[1, null, 2, 3]`, `[30, 40, 50, 60]`)
+                               defer input2.Release()
+
+                               diffCarr := arrow.NewChunked(dictType, 
[]arrow.Array{input, input2})
+                               defer diffCarr.Release()
+
+                               exUnique2, _ := array.DictArrayFromJSON(mem, 
dictType, `[3, 0, 1, null, 4, 5]`, `[10, 20, 30, 40, 50, 60]`)
+                               defer exUnique2.Release()
+
+                               checkDictionaryUnique(t, 
&compute.ChunkedDatum{Value: diffCarr}, exUnique2)
+                       })
+
+                       t.Run("encoded nulls", func(t *testing.T) {
+                               scope := memory.NewCheckedAllocatorScope(mem)
+                               defer scope.CheckSize(t)
+
+                               dictWithNull, _, _ := array.FromJSON(mem, 
arrow.PrimitiveTypes.Int64, strings.NewReader(`[10, null, 30, 40]`))
+                               defer dictWithNull.Release()
+                               input := array.NewDictionaryArray(dictType, 
indices, dictWithNull)
+                               defer input.Release()
+                               exUniques := array.NewDictionaryArray(dictType, 
exIndices, dictWithNull)
+                               defer exUniques.Release()
+                               checkDictionaryUnique(t, 
&compute.ArrayDatum{Value: input.Data()}, exUniques)
+                       })
+
+                       t.Run("masked nulls", func(t *testing.T) {
+                               scope := memory.NewCheckedAllocatorScope(mem)
+                               defer scope.CheckSize(t)
+
+                               indicesWithNull, _, _ := array.FromJSON(mem, 
idxTyp, strings.NewReader(`[3, 0, 0, 0, null, null, 3, 0, null, 3, 0, null]`))
+                               defer indicesWithNull.Release()
+                               exIndicesWithNull, _, _ := array.FromJSON(mem, 
idxTyp, strings.NewReader(`[3, 0, null]`))
+                               defer exIndicesWithNull.Release()
+                               exUniques := array.NewDictionaryArray(dictType, 
exIndicesWithNull, dict)
+                               defer exUniques.Release()
+                               input := array.NewDictionaryArray(dictType, 
indicesWithNull, dict)
+                               defer input.Release()
+
+                               checkDictionaryUnique(t, 
&compute.ArrayDatum{Value: input.Data()}, exUniques)
+                       })
+               })
+       }
+}
diff --git a/go/internal/hashing/xxh3_memo_table.go 
b/go/internal/hashing/xxh3_memo_table.go
index e76f1cca3c..787a4df34d 100644
--- a/go/internal/hashing/xxh3_memo_table.go
+++ b/go/internal/hashing/xxh3_memo_table.go
@@ -381,6 +381,31 @@ func (b *BinaryMemoTable) CopyOffsetsSubset(start int, out 
[]int32) {
        out[sz-start] = int32(b.builder.DataLen() - (int(delta) - int(first)))
 }
 
+// CopyLargeOffsets copies the list of offsets into the passed in slice, the 
offsets
+// being the start and end values of the underlying allocated bytes in the 
builder
+// for the individual values of the table. out should be at least sized to 
Size()+1
+func (b *BinaryMemoTable) CopyLargeOffsets(out []int64) {
+       b.CopyLargeOffsetsSubset(0, out)
+}
+
+// CopyLargeOffsetsSubset is like CopyOffsets but instead of copying all of 
the offsets,
+// it gets a subset of the offsets in the table starting at the index provided 
by "start".
+func (b *BinaryMemoTable) CopyLargeOffsetsSubset(start int, out []int64) {
+       if b.builder.Len() <= start {
+               return
+       }
+
+       first := b.findOffset(0)
+       delta := b.findOffset(start)
+       sz := b.Size()
+       for i := start; i < sz; i++ {
+               offset := int64(b.findOffset(i) - delta)
+               out[i-start] = offset
+       }
+
+       out[sz-start] = int64(b.builder.DataLen() - (int(delta) - int(first)))
+}
+
 // CopyValues copies the raw binary data bytes out, out should be a []byte
 // with at least ValuesSize bytes allocated to copy into.
 func (b *BinaryMemoTable) CopyValues(out interface{}) {
@@ -428,19 +453,20 @@ func (b *BinaryMemoTable) CopyFixedWidthValues(start, 
width int, out []byte) {
        }
 
        var (
-               leftOffset = b.findOffset(start)
-               nullOffset = b.findOffset(null)
-               leftSize   = nullOffset - leftOffset
+               leftOffset  = b.findOffset(start)
+               nullOffset  = b.findOffset(null)
+               leftSize    = nullOffset - leftOffset
+               rightOffset = leftOffset + uintptr(b.ValuesSize())
        )
 
        if leftSize > 0 {
                copy(out, b.builder.Value(start)[0:leftSize])
        }
 
-       rightSize := b.ValuesSize() - int(nullOffset)
+       rightSize := rightOffset - nullOffset
        if rightSize > 0 {
                // skip the null fixed size value
-               copy(out[int(leftSize)+width:], 
b.builder.Value(int(nullOffset))[0:rightSize])
+               copy(out[int(leftSize)+width:], b.builder.Value(null + 
1)[0:rightSize])
        }
 }
 

Reply via email to