This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a16ffa7324 ARROW-17095: [Go] Allow Concatenating Dictionary Arrays 
(#13624)
a16ffa7324 is described below

commit a16ffa7324b0bbc8014d941d2a5eb8a9b3a91bb7
Author: Matt Topol <[email protected]>
AuthorDate: Tue Jul 19 10:35:42 2022 -0400

    ARROW-17095: [Go] Allow Concatenating Dictionary Arrays (#13624)
    
    Authored-by: Matthew Topol <[email protected]>
    Signed-off-by: Matthew Topol <[email protected]>
---
 go/arrow/array/binarybuilder.go |   2 +-
 go/arrow/array/concat.go        | 117 ++++++++++++++++++++
 go/arrow/array/concat_test.go   | 240 ++++++++++++++++++++++++++++++++++++++++
 go/arrow/array/string.go        |   6 +
 go/arrow/array/util.go          |  17 +++
 5 files changed, 381 insertions(+), 1 deletion(-)

diff --git a/go/arrow/array/binarybuilder.go b/go/arrow/array/binarybuilder.go
index 5905380787..2b45e5104a 100644
--- a/go/arrow/array/binarybuilder.go
+++ b/go/arrow/array/binarybuilder.go
@@ -180,7 +180,7 @@ func (b *BinaryBuilder) Resize(n int) {
 }
 
 func (b *BinaryBuilder) ResizeData(n int) {
-       b.values.length = 0
+       b.values.length = n
 }
 
 // NewArray creates a Binary array from the memory buffers used by the builder 
and resets the BinaryBuilder
diff --git a/go/arrow/array/concat.go b/go/arrow/array/concat.go
index 2842757859..22228dcd39 100644
--- a/go/arrow/array/concat.go
+++ b/go/arrow/array/concat.go
@@ -26,6 +26,8 @@ import (
        "github.com/apache/arrow/go/v9/arrow/bitutil"
        "github.com/apache/arrow/go/v9/arrow/internal/debug"
        "github.com/apache/arrow/go/v9/arrow/memory"
+       "github.com/apache/arrow/go/v9/internal/bitutils"
+       "github.com/apache/arrow/go/v9/internal/utils"
 )
 
 // Concatenate creates a new arrow.Array which is the concatenation of the
@@ -228,6 +230,85 @@ func concatOffsets(buffers []*memory.Buffer, mem 
memory.Allocator) (*memory.Buff
        return out, valuesRanges, nil
 }
 
+func unifyDictionaries(mem memory.Allocator, data []arrow.ArrayData, dt 
*arrow.DictionaryType) ([]*memory.Buffer, arrow.Array, error) {
+       unifier, err := NewDictionaryUnifier(mem, dt.ValueType)
+       if err != nil {
+               return nil, nil, err
+       }
+       defer unifier.Release()
+
+       newLookup := make([]*memory.Buffer, len(data))
+       for i, d := range data {
+               dictArr := MakeFromData(d.Dictionary())
+               defer dictArr.Release()
+               newLookup[i], err = unifier.UnifyAndTranspose(dictArr)
+               if err != nil {
+                       return nil, nil, err
+               }
+       }
+
+       unified, err := unifier.GetResultWithIndexType(dt.IndexType)
+       if err != nil {
+               for _, b := range newLookup {
+                       b.Release()
+               }
+               return nil, nil, err
+       }
+       return newLookup, unified, nil
+}
+
+func concatDictIndices(mem memory.Allocator, data []arrow.ArrayData, idxType 
arrow.FixedWidthDataType, transpositions []*memory.Buffer) (out *memory.Buffer, 
err error) {
+       defer func() {
+               if err != nil && out != nil {
+                       out.Release()
+                       out = nil
+               }
+       }()
+
+       idxWidth := idxType.BitWidth() / 8
+       outLen := 0
+       for i, d := range data {
+               outLen += d.Len()
+               defer transpositions[i].Release()
+       }
+
+       out = memory.NewResizableBuffer(mem)
+       out.Resize(outLen * idxWidth)
+
+       outData := out.Bytes()
+       for i, d := range data {
+               transposeMap := 
arrow.Int32Traits.CastFromBytes(transpositions[i].Bytes())
+               src := d.Buffers()[1].Bytes()
+               if d.Buffers()[0] == nil {
+                       if err = utils.TransposeIntsBuffers(idxType, idxType, 
src, outData, d.Offset(), 0, d.Len(), transposeMap); err != nil {
+                               return
+                       }
+               } else {
+                       rdr := bitutils.NewBitRunReader(d.Buffers()[0].Bytes(), 
int64(d.Offset()), int64(d.Len()))
+                       pos := 0
+                       for {
+                               run := rdr.NextRun()
+                               if run.Len == 0 {
+                                       break
+                               }
+
+                               if run.Set {
+                                       err = 
utils.TransposeIntsBuffers(idxType, idxType, src, outData, d.Offset()+pos, pos, 
int(run.Len), transposeMap)
+                                       if err != nil {
+                                               return
+                                       }
+                               } else {
+                                       
memory.Set(outData[pos:pos+(int(run.Len)*idxWidth)], 0x00)
+                               }
+
+                               pos += int(run.Len)
+                       }
+               }
+               outData = outData[d.Len()*idxWidth:]
+       }
+       return
+}
+
 // concat is the implementation for actually performing the concatenation of 
the arrow.ArrayData
 // objects that we can call internally for nested types.
 func concat(data []arrow.ArrayData, mem memory.Allocator) (arrow.ArrayData, 
error) {
@@ -258,6 +339,42 @@ func concat(data []arrow.ArrayData, mem memory.Allocator) 
(arrow.ArrayData, erro
                        return nil, err
                }
                out.buffers[1] = bm
+       case *arrow.DictionaryType:
+               idxType := dt.IndexType.(arrow.FixedWidthDataType)
+               // two cases: all dictionaries are the same or we need to unify 
them
+               dictsSame := true
+               dict0 := MakeFromData(data[0].Dictionary())
+               defer dict0.Release()
+               for _, d := range data {
+                       dict := MakeFromData(d.Dictionary())
+                       if !Equal(dict0, dict) {
+                               dict.Release()
+                               dictsSame = false
+                               break
+                       }
+                       dict.Release()
+               }
+
+               indexBuffers := gatherBuffersFixedWidthType(data, 1, idxType)
+               if dictsSame {
+                       out.dictionary = dict0.Data().(*Data)
+                       out.dictionary.Retain()
+                       out.buffers[1] = concatBuffers(indexBuffers, mem)
+                       break
+               }
+
+               indexLookup, unifiedDict, err := unifyDictionaries(mem, data, 
dt)
+               if err != nil {
+                       return nil, err
+               }
+               defer unifiedDict.Release()
+               out.dictionary = unifiedDict.Data().(*Data)
+               out.dictionary.Retain()
+
+               out.buffers[1], err = concatDictIndices(mem, data, idxType, 
indexLookup)
+               if err != nil {
+                       return nil, err
+               }
        case arrow.FixedWidthDataType:
                out.buffers[1] = 
concatBuffers(gatherBuffersFixedWidthType(data, 1, dt), mem)
        case arrow.BinaryDataType:
diff --git a/go/arrow/array/concat_test.go b/go/arrow/array/concat_test.go
index 8beee43306..5362fd33f5 100644
--- a/go/arrow/array/concat_test.go
+++ b/go/arrow/array/concat_test.go
@@ -18,6 +18,7 @@ package array_test
 
 import (
        "fmt"
+       "math"
        "sort"
        "testing"
 
@@ -27,6 +28,7 @@ import (
        "github.com/apache/arrow/go/v9/arrow/internal/testing/gen"
        "github.com/apache/arrow/go/v9/arrow/memory"
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
        "github.com/stretchr/testify/suite"
        "golang.org/x/exp/rand"
 )
@@ -76,6 +78,7 @@ func TestConcatenate(t *testing.T) {
                {arrow.FixedSizeListOf(3, arrow.PrimitiveTypes.Int8)},
                {arrow.StructOf()},
                {arrow.MapOf(arrow.PrimitiveTypes.Uint16, 
arrow.PrimitiveTypes.Int8)},
+               {&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, 
ValueType: arrow.PrimitiveTypes.Float64}},
        }
 
        for _, tt := range tests {
@@ -221,6 +224,12 @@ func (cts *ConcatTestSuite) generateArr(size int64, 
nullprob float64) arrow.Arra
                        }
                }
                return bldr.NewArray()
+       case arrow.DICTIONARY:
+               indices := cts.rng.Int32(size, 0, 127, nullprob)
+               defer indices.Release()
+               dict := cts.rng.Float64(128, 0.0, 127.0, nullprob)
+               defer dict.Release()
+               return array.NewDictionaryArray(cts.dt, indices, dict)
        default:
                return nil
        }
@@ -288,3 +297,234 @@ func (cts *ConcatTestSuite) TestCheckConcat() {
                })
        }
 }
+
+func TestConcatDifferentDicts(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       t.Run("simple dicts", func(t *testing.T) {
+               scopedMem := memory.NewCheckedAllocatorScope(mem)
+               defer scopedMem.CheckSize(t)
+
+               dictType := &arrow.DictionaryType{IndexType: 
arrow.PrimitiveTypes.Uint8, ValueType: arrow.BinaryTypes.String}
+               dict1, err := array.DictArrayFromJSON(mem, dictType, `[1, 2, 
null, 3, 0]`, `["A0", "A1", "A2", "A3"]`)
+               require.NoError(t, err)
+               defer dict1.Release()
+               dict2, err := array.DictArrayFromJSON(mem, dictType, `[null, 4, 
2, 1]`, `["B0", "B1", "B2", "B3", "B4"]`)
+               require.NoError(t, err)
+               defer dict2.Release()
+
+               expected, err := array.DictArrayFromJSON(mem, dictType, `[1, 2, 
null, 3, 0, null, 8, 6, 5]`, `["A0", "A1", "A2", "A3", "B0", "B1", "B2", "B3", 
"B4"]`)
+               require.NoError(t, err)
+               defer expected.Release()
+
+               concat, err := array.Concatenate([]arrow.Array{dict1, dict2}, 
mem)
+               assert.NoError(t, err)
+               defer concat.Release()
+               assert.Truef(t, array.Equal(concat, expected), "got: %s, 
expected: %s", concat, expected)
+       })
+
+       t.Run("larger", func(t *testing.T) {
+               scopedMem := memory.NewCheckedAllocatorScope(mem)
+               defer scopedMem.CheckSize(t)
+
+               const size = 500
+               dictType := &arrow.DictionaryType{IndexType: 
arrow.PrimitiveTypes.Uint16, ValueType: arrow.BinaryTypes.String}
+
+               idxBuilder, exIdxBldr := array.NewUint16Builder(mem), 
array.NewUint16Builder(mem)
+               defer idxBuilder.Release()
+               defer exIdxBldr.Release()
+               idxBuilder.Reserve(size)
+               exIdxBldr.Reserve(size * 2)
+
+               for i := uint16(0); i < size; i++ {
+                       idxBuilder.UnsafeAppend(i)
+                       exIdxBldr.UnsafeAppend(i)
+               }
+               for i := uint16(size); i < 2*size; i++ {
+                       exIdxBldr.UnsafeAppend(i)
+               }
+
+               indices, expIndices := idxBuilder.NewArray(), 
exIdxBldr.NewArray()
+               defer indices.Release()
+               defer expIndices.Release()
+
+               // create three dictionaries. First maps i -> "{i}", second 
maps i->"{500+i}",
+               // each for 500 values and the third maps i -> "{i}" but for 
1000 values.
+               // first and second concatenated should end up equaling the 
third. All strings
+               // padded to length 8 so we can know the size ahead of time.
+               valuesOneBldr, valuesTwoBldr := array.NewStringBuilder(mem), 
array.NewStringBuilder(mem)
+               defer valuesOneBldr.Release()
+               defer valuesTwoBldr.Release()
+
+               valuesOneBldr.Reserve(size)
+               valuesTwoBldr.Reserve(size)
+               valuesOneBldr.ReserveData(size * 8)
+               valuesTwoBldr.ReserveData(size * 8)
+
+               for i := 0; i < size; i++ {
+                       valuesOneBldr.Append(fmt.Sprintf("%-8d", i))
+                       valuesTwoBldr.Append(fmt.Sprintf("%-8d", i+size))
+               }
+
+               dict1, dict2 := valuesOneBldr.NewArray(), 
valuesTwoBldr.NewArray()
+               defer dict1.Release()
+               defer dict2.Release()
+               expectedDict, err := array.Concatenate([]arrow.Array{dict1, 
dict2}, mem)
+               require.NoError(t, err)
+               defer expectedDict.Release()
+
+               one, two := array.NewDictionaryArray(dictType, indices, dict1), 
array.NewDictionaryArray(dictType, indices, dict2)
+               defer one.Release()
+               defer two.Release()
+               expected := array.NewDictionaryArray(dictType, expIndices, 
expectedDict)
+               defer expected.Release()
+
+               combined, err := array.Concatenate([]arrow.Array{one, two}, mem)
+               assert.NoError(t, err)
+               defer combined.Release()
+               assert.Truef(t, array.Equal(combined, expected), "got: %s, 
expected: %s", combined, expected)
+       })
+}
+
+func TestConcatDictionaryPartialOverlap(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, 
ValueType: arrow.BinaryTypes.String}
+       dictOne, err := array.DictArrayFromJSON(mem, dt, `[1, 2, null, 3, 0]`, 
`["A0", "A1", "C2", "C3"]`)
+       require.NoError(t, err)
+       defer dictOne.Release()
+
+       dictTwo, err := array.DictArrayFromJSON(mem, dt, `[null, 4, 2, 1]`, 
`["B0", "B1", "C2", "C3", "B4"]`)
+       require.NoError(t, err)
+       defer dictTwo.Release()
+
+       expected, err := array.DictArrayFromJSON(mem, dt, `[1, 2, null, 3, 0, 
null, 6, 2, 5]`, `["A0", "A1", "C2", "C3", "B0", "B1", "B4"]`)
+       require.NoError(t, err)
+       defer expected.Release()
+
+       actual, err := array.Concatenate([]arrow.Array{dictOne, dictTwo}, mem)
+       assert.NoError(t, err)
+       defer actual.Release()
+
+       assert.Truef(t, array.Equal(actual, expected), "got: %s, expected: %s", 
actual, expected)
+}
+
+func TestConcatDictionaryDifferentSizeIndex(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, 
ValueType: arrow.BinaryTypes.String}
+       biggerDt := &arrow.DictionaryType{IndexType: 
arrow.PrimitiveTypes.Uint16, ValueType: arrow.BinaryTypes.String}
+       dictOne, err := array.DictArrayFromJSON(mem, dt, `[0]`, `["A0"]`)
+       require.NoError(t, err)
+       defer dictOne.Release()
+
+       dictTwo, err := array.DictArrayFromJSON(mem, biggerDt, `[0]`, `["B0"]`)
+       require.NoError(t, err)
+       defer dictTwo.Release()
+
+       arr, err := array.Concatenate([]arrow.Array{dictOne, dictTwo}, mem)
+       assert.Nil(t, arr)
+       assert.Error(t, err)
+}
+
+func TestConcatDictionaryUnifyNullInDict(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, 
ValueType: arrow.BinaryTypes.String}
+       dictOne, err := array.DictArrayFromJSON(mem, dt, `[0, 1]`, `[null, 
"A"]`)
+       require.NoError(t, err)
+       defer dictOne.Release()
+
+       dictTwo, err := array.DictArrayFromJSON(mem, dt, `[0, 1]`, `[null, 
"B"]`)
+       require.NoError(t, err)
+       defer dictTwo.Release()
+
+       expected, err := array.DictArrayFromJSON(mem, dt, `[0, 1, 0, 2]`, 
`[null, "A", "B"]`)
+       require.NoError(t, err)
+       defer expected.Release()
+
+       actual, err := array.Concatenate([]arrow.Array{dictOne, dictTwo}, mem)
+       assert.NoError(t, err)
+       defer actual.Release()
+
+       assert.Truef(t, array.Equal(actual, expected), "got: %s, expected: %s", 
actual, expected)
+}
+
+func TestConcatDictionaryEnlargedIndices(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       const size = math.MaxUint8 + 1
+       dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, 
ValueType: arrow.PrimitiveTypes.Uint16}
+
+       idxBuilder := array.NewUint8Builder(mem)
+       defer idxBuilder.Release()
+       idxBuilder.Reserve(size)
+       for i := 0; i < size; i++ {
+               idxBuilder.UnsafeAppend(uint8(i))
+       }
+       indices := idxBuilder.NewUint8Array()
+       defer indices.Release()
+
+       valuesBuilder := array.NewUint16Builder(mem)
+       defer valuesBuilder.Release()
+       valuesBuilder.Reserve(size)
+       valuesBuilderTwo := array.NewUint16Builder(mem)
+       defer valuesBuilderTwo.Release()
+       valuesBuilderTwo.Reserve(size)
+
+       for i := uint16(0); i < size; i++ {
+               valuesBuilder.UnsafeAppend(i)
+               valuesBuilderTwo.UnsafeAppend(i + size)
+       }
+
+       dict1, dict2 := valuesBuilder.NewUint16Array(), 
valuesBuilderTwo.NewUint16Array()
+       defer dict1.Release()
+       defer dict2.Release()
+
+       d1, d2 := array.NewDictionaryArray(dt, indices, dict1), 
array.NewDictionaryArray(dt, indices, dict2)
+       defer d1.Release()
+       defer d2.Release()
+
+       _, err := array.Concatenate([]arrow.Array{d1, d2}, mem)
+       assert.Error(t, err)
+
+       biggerDt := &arrow.DictionaryType{IndexType: 
arrow.PrimitiveTypes.Uint16, ValueType: arrow.PrimitiveTypes.Uint16}
+       bigger1, bigger2 := array.NewDictionaryArray(biggerDt, dict1, dict1), 
array.NewDictionaryArray(biggerDt, dict1, dict2)
+       defer bigger1.Release()
+       defer bigger2.Release()
+
+       combined, err := array.Concatenate([]arrow.Array{bigger1, bigger2}, mem)
+       assert.NoError(t, err)
+       defer combined.Release()
+
+       assert.EqualValues(t, size*2, combined.Len())
+}
+
+func TestConcatDictionaryNullSlots(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint32, 
ValueType: arrow.BinaryTypes.String}
+       dict1, err := array.DictArrayFromJSON(mem, dt, `[null, null, null, 
null]`, `[]`)
+       require.NoError(t, err)
+       defer dict1.Release()
+
+       dict2, err := array.DictArrayFromJSON(mem, dt, `[null, null, null, 
null, 0, 1]`, `["a", "b"]`)
+       require.NoError(t, err)
+       defer dict2.Release()
+
+       expected, err := array.DictArrayFromJSON(mem, dt, `[null, null, null, 
null, null, null, null, null, 0, 1]`, `["a", "b"]`)
+       require.NoError(t, err)
+       defer expected.Release()
+
+       actual, err := array.Concatenate([]arrow.Array{dict1, dict2}, mem)
+       assert.NoError(t, err)
+       defer actual.Release()
+
+       assert.Truef(t, array.Equal(actual, expected), "got: %s, expected: %s", 
actual, expected)
+}
diff --git a/go/arrow/array/string.go b/go/arrow/array/string.go
index 4c033118a6..237ea0166b 100644
--- a/go/arrow/array/string.go
+++ b/go/arrow/array/string.go
@@ -231,6 +231,12 @@ func (b *StringBuilder) Reserve(n int) {
        b.builder.Reserve(n)
 }
 
+// ReserveData ensures there is enough space for appending n bytes
+// by checking the capacity and resizing the data buffer if necessary.
+func (b *StringBuilder) ReserveData(n int) {
+       b.builder.ReserveData(n)
+}
+
 // Resize adjusts the space allocated by b to n elements. If n is greater than 
b.Cap(),
 // additional memory will be allocated. If n is smaller, the allocated memory 
may reduced.
 func (b *StringBuilder) Resize(n int) {
diff --git a/go/arrow/array/util.go b/go/arrow/array/util.go
index 5510b8df17..3945376312 100644
--- a/go/arrow/array/util.go
+++ b/go/arrow/array/util.go
@@ -20,6 +20,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "strings"
 
        "github.com/apache/arrow/go/v9/arrow"
        "github.com/apache/arrow/go/v9/arrow/bitutil"
@@ -282,3 +283,19 @@ func getDictArrayData(mem memory.Allocator, valueType 
arrow.DataType, memoTable
 
        return NewData(valueType, dictLen, buffers, nil, nullcount, 0), nil
 }
+
+func DictArrayFromJSON(mem memory.Allocator, dt *arrow.DictionaryType, 
indicesJSON, dictJSON string) (arrow.Array, error) {
+       indices, _, err := FromJSON(mem, dt.IndexType, 
strings.NewReader(indicesJSON))
+       if err != nil {
+               return nil, err
+       }
+       defer indices.Release()
+
+       dict, _, err := FromJSON(mem, dt.ValueType, strings.NewReader(dictJSON))
+       if err != nil {
+               return nil, err
+       }
+       defer dict.Release()
+
+       return NewDictionaryArray(dt, indices, dict), nil
+}

Reply via email to