This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a16ffa7324 ARROW-17095: [Go] Allow Concatenating Dictionary Arrays
(#13624)
a16ffa7324 is described below
commit a16ffa7324b0bbc8014d941d2a5eb8a9b3a91bb7
Author: Matt Topol <[email protected]>
AuthorDate: Tue Jul 19 10:35:42 2022 -0400
ARROW-17095: [Go] Allow Concatenating Dictionary Arrays (#13624)
Authored-by: Matthew Topol <[email protected]>
Signed-off-by: Matthew Topol <[email protected]>
---
go/arrow/array/binarybuilder.go | 2 +-
go/arrow/array/concat.go | 117 ++++++++++++++++++++
go/arrow/array/concat_test.go | 240 ++++++++++++++++++++++++++++++++++++++++
go/arrow/array/string.go | 6 +
go/arrow/array/util.go | 17 +++
5 files changed, 381 insertions(+), 1 deletion(-)
diff --git a/go/arrow/array/binarybuilder.go b/go/arrow/array/binarybuilder.go
index 5905380787..2b45e5104a 100644
--- a/go/arrow/array/binarybuilder.go
+++ b/go/arrow/array/binarybuilder.go
@@ -180,7 +180,7 @@ func (b *BinaryBuilder) Resize(n int) {
}
func (b *BinaryBuilder) ResizeData(n int) {
- b.values.length = 0
+ b.values.length = n
}
// NewArray creates a Binary array from the memory buffers used by the builder
and resets the BinaryBuilder
diff --git a/go/arrow/array/concat.go b/go/arrow/array/concat.go
index 2842757859..22228dcd39 100644
--- a/go/arrow/array/concat.go
+++ b/go/arrow/array/concat.go
@@ -26,6 +26,8 @@ import (
"github.com/apache/arrow/go/v9/arrow/bitutil"
"github.com/apache/arrow/go/v9/arrow/internal/debug"
"github.com/apache/arrow/go/v9/arrow/memory"
+ "github.com/apache/arrow/go/v9/internal/bitutils"
+ "github.com/apache/arrow/go/v9/internal/utils"
)
// Concatenate creates a new arrow.Array which is the concatenation of the
@@ -228,6 +230,85 @@ func concatOffsets(buffers []*memory.Buffer, mem
memory.Allocator) (*memory.Buff
return out, valuesRanges, nil
}
+func unifyDictionaries(mem memory.Allocator, data []arrow.ArrayData, dt
*arrow.DictionaryType) ([]*memory.Buffer, arrow.Array, error) {
+ unifier, err := NewDictionaryUnifier(mem, dt.ValueType)
+ if err != nil {
+ return nil, nil, err
+ }
+ defer unifier.Release()
+
+ newLookup := make([]*memory.Buffer, len(data))
+ for i, d := range data {
+ dictArr := MakeFromData(d.Dictionary())
+ defer dictArr.Release()
+ newLookup[i], err = unifier.UnifyAndTranspose(dictArr)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+
+ unified, err := unifier.GetResultWithIndexType(dt.IndexType)
+ if err != nil {
+ for _, b := range newLookup {
+ b.Release()
+ }
+ return nil, nil, err
+ }
+ return newLookup, unified, nil
+}
+
+func concatDictIndices(mem memory.Allocator, data []arrow.ArrayData, idxType
arrow.FixedWidthDataType, transpositions []*memory.Buffer) (out *memory.Buffer,
err error) {
+ defer func() {
+ if err != nil && out != nil {
+ out.Release()
+ out = nil
+ }
+ }()
+
+ idxWidth := idxType.BitWidth() / 8
+ outLen := 0
+ for i, d := range data {
+ outLen += d.Len()
+ defer transpositions[i].Release()
+ }
+
+ out = memory.NewResizableBuffer(mem)
+ out.Resize(outLen * idxWidth)
+
+ outData := out.Bytes()
+ for i, d := range data {
+ transposeMap :=
arrow.Int32Traits.CastFromBytes(transpositions[i].Bytes())
+ src := d.Buffers()[1].Bytes()
+ if d.Buffers()[0] == nil {
+ if err = utils.TransposeIntsBuffers(idxType, idxType,
src, outData, d.Offset(), 0, d.Len(), transposeMap); err != nil {
+ return
+ }
+ } else {
+ rdr := bitutils.NewBitRunReader(d.Buffers()[0].Bytes(),
int64(d.Offset()), int64(d.Len()))
+ pos := 0
+ for {
+ run := rdr.NextRun()
+ if run.Len == 0 {
+ break
+ }
+
+ if run.Set {
+ err =
utils.TransposeIntsBuffers(idxType, idxType, src, outData, d.Offset()+pos, pos,
int(run.Len), transposeMap)
+ if err != nil {
+ return
+ }
+ } else {
+
memory.Set(outData[pos:pos+(int(run.Len)*idxWidth)], 0x00)
+ }
+
+ pos += int(run.Len)
+ }
+ }
+ outData = outData[d.Len()*idxWidth:]
+ }
+ return
+}
+
// concat is the implementation for actually performing the concatenation of
the arrow.ArrayData
// objects that we can call internally for nested types.
func concat(data []arrow.ArrayData, mem memory.Allocator) (arrow.ArrayData,
error) {
@@ -258,6 +339,42 @@ func concat(data []arrow.ArrayData, mem memory.Allocator)
(arrow.ArrayData, erro
return nil, err
}
out.buffers[1] = bm
+ case *arrow.DictionaryType:
+ idxType := dt.IndexType.(arrow.FixedWidthDataType)
+ // two cases: all dictionaries are the same or we need to unify
them
+ dictsSame := true
+ dict0 := MakeFromData(data[0].Dictionary())
+ defer dict0.Release()
+ for _, d := range data {
+ dict := MakeFromData(d.Dictionary())
+ if !Equal(dict0, dict) {
+ dict.Release()
+ dictsSame = false
+ break
+ }
+ dict.Release()
+ }
+
+ indexBuffers := gatherBuffersFixedWidthType(data, 1, idxType)
+ if dictsSame {
+ out.dictionary = dict0.Data().(*Data)
+ out.dictionary.Retain()
+ out.buffers[1] = concatBuffers(indexBuffers, mem)
+ break
+ }
+
+ indexLookup, unifiedDict, err := unifyDictionaries(mem, data,
dt)
+ if err != nil {
+ return nil, err
+ }
+ defer unifiedDict.Release()
+ out.dictionary = unifiedDict.Data().(*Data)
+ out.dictionary.Retain()
+
+ out.buffers[1], err = concatDictIndices(mem, data, idxType,
indexLookup)
+ if err != nil {
+ return nil, err
+ }
case arrow.FixedWidthDataType:
out.buffers[1] =
concatBuffers(gatherBuffersFixedWidthType(data, 1, dt), mem)
case arrow.BinaryDataType:
diff --git a/go/arrow/array/concat_test.go b/go/arrow/array/concat_test.go
index 8beee43306..5362fd33f5 100644
--- a/go/arrow/array/concat_test.go
+++ b/go/arrow/array/concat_test.go
@@ -18,6 +18,7 @@ package array_test
import (
"fmt"
+ "math"
"sort"
"testing"
@@ -27,6 +28,7 @@ import (
"github.com/apache/arrow/go/v9/arrow/internal/testing/gen"
"github.com/apache/arrow/go/v9/arrow/memory"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/rand"
)
@@ -76,6 +78,7 @@ func TestConcatenate(t *testing.T) {
{arrow.FixedSizeListOf(3, arrow.PrimitiveTypes.Int8)},
{arrow.StructOf()},
{arrow.MapOf(arrow.PrimitiveTypes.Uint16,
arrow.PrimitiveTypes.Int8)},
+ {&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32,
ValueType: arrow.PrimitiveTypes.Float64}},
}
for _, tt := range tests {
@@ -221,6 +224,12 @@ func (cts *ConcatTestSuite) generateArr(size int64,
nullprob float64) arrow.Arra
}
}
return bldr.NewArray()
+ case arrow.DICTIONARY:
+ indices := cts.rng.Int32(size, 0, 127, nullprob)
+ defer indices.Release()
+ dict := cts.rng.Float64(128, 0.0, 127.0, nullprob)
+ defer dict.Release()
+ return array.NewDictionaryArray(cts.dt, indices, dict)
default:
return nil
}
@@ -288,3 +297,234 @@ func (cts *ConcatTestSuite) TestCheckConcat() {
})
}
}
+
+func TestConcatDifferentDicts(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ t.Run("simple dicts", func(t *testing.T) {
+ scopedMem := memory.NewCheckedAllocatorScope(mem)
+ defer scopedMem.CheckSize(t)
+
+ dictType := &arrow.DictionaryType{IndexType:
arrow.PrimitiveTypes.Uint8, ValueType: arrow.BinaryTypes.String}
+ dict1, err := array.DictArrayFromJSON(mem, dictType, `[1, 2,
null, 3, 0]`, `["A0", "A1", "A2", "A3"]`)
+ require.NoError(t, err)
+ defer dict1.Release()
+ dict2, err := array.DictArrayFromJSON(mem, dictType, `[null, 4,
2, 1]`, `["B0", "B1", "B2", "B3", "B4"]`)
+ require.NoError(t, err)
+ defer dict2.Release()
+
+ expected, err := array.DictArrayFromJSON(mem, dictType, `[1, 2,
null, 3, 0, null, 8, 6, 5]`, `["A0", "A1", "A2", "A3", "B0", "B1", "B2", "B3",
"B4"]`)
+ require.NoError(t, err)
+ defer expected.Release()
+
+ concat, err := array.Concatenate([]arrow.Array{dict1, dict2},
mem)
+ assert.NoError(t, err)
+ defer concat.Release()
+ assert.Truef(t, array.Equal(concat, expected), "got: %s,
expected: %s", concat, expected)
+ })
+
+ t.Run("larger", func(t *testing.T) {
+ scopedMem := memory.NewCheckedAllocatorScope(mem)
+ defer scopedMem.CheckSize(t)
+
+ const size = 500
+ dictType := &arrow.DictionaryType{IndexType:
arrow.PrimitiveTypes.Uint16, ValueType: arrow.BinaryTypes.String}
+
+ idxBuilder, exIdxBldr := array.NewUint16Builder(mem),
array.NewUint16Builder(mem)
+ defer idxBuilder.Release()
+ defer exIdxBldr.Release()
+ idxBuilder.Reserve(size)
+ exIdxBldr.Reserve(size * 2)
+
+ for i := uint16(0); i < size; i++ {
+ idxBuilder.UnsafeAppend(i)
+ exIdxBldr.UnsafeAppend(i)
+ }
+ for i := uint16(size); i < 2*size; i++ {
+ exIdxBldr.UnsafeAppend(i)
+ }
+
+ indices, expIndices := idxBuilder.NewArray(),
exIdxBldr.NewArray()
+ defer indices.Release()
+ defer expIndices.Release()
+
+ // create three dictionaries. First maps i -> "{i}", second
maps i->"{500+i}",
+ // each for 500 values and the third maps i -> "{i}" but for
1000 values.
+ // first and second concatenated should end up equaling the
third. All strings
+ // padded to length 8 so we can know the size ahead of time.
+ valuesOneBldr, valuesTwoBldr := array.NewStringBuilder(mem),
array.NewStringBuilder(mem)
+ defer valuesOneBldr.Release()
+ defer valuesTwoBldr.Release()
+
+ valuesOneBldr.Reserve(size)
+ valuesTwoBldr.Reserve(size)
+ valuesOneBldr.ReserveData(size * 8)
+ valuesTwoBldr.ReserveData(size * 8)
+
+ for i := 0; i < size; i++ {
+ valuesOneBldr.Append(fmt.Sprintf("%-8d", i))
+ valuesTwoBldr.Append(fmt.Sprintf("%-8d", i+size))
+ }
+
+ dict1, dict2 := valuesOneBldr.NewArray(),
valuesTwoBldr.NewArray()
+ defer dict1.Release()
+ defer dict2.Release()
+ expectedDict, err := array.Concatenate([]arrow.Array{dict1,
dict2}, mem)
+ require.NoError(t, err)
+ defer expectedDict.Release()
+
+ one, two := array.NewDictionaryArray(dictType, indices, dict1),
array.NewDictionaryArray(dictType, indices, dict2)
+ defer one.Release()
+ defer two.Release()
+ expected := array.NewDictionaryArray(dictType, expIndices,
expectedDict)
+ defer expected.Release()
+
+ combined, err := array.Concatenate([]arrow.Array{one, two}, mem)
+ assert.NoError(t, err)
+ defer combined.Release()
+ assert.Truef(t, array.Equal(combined, expected), "got: %s,
expected: %s", combined, expected)
+ })
+}
+
+func TestConcatDictionaryPartialOverlap(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8,
ValueType: arrow.BinaryTypes.String}
+ dictOne, err := array.DictArrayFromJSON(mem, dt, `[1, 2, null, 3, 0]`,
`["A0", "A1", "C2", "C3"]`)
+ require.NoError(t, err)
+ defer dictOne.Release()
+
+ dictTwo, err := array.DictArrayFromJSON(mem, dt, `[null, 4, 2, 1]`,
`["B0", "B1", "C2", "C3", "B4"]`)
+ require.NoError(t, err)
+ defer dictTwo.Release()
+
+ expected, err := array.DictArrayFromJSON(mem, dt, `[1, 2, null, 3, 0,
null, 6, 2, 5]`, `["A0", "A1", "C2", "C3", "B0", "B1", "B4"]`)
+ require.NoError(t, err)
+ defer expected.Release()
+
+ actual, err := array.Concatenate([]arrow.Array{dictOne, dictTwo}, mem)
+ assert.NoError(t, err)
+ defer actual.Release()
+
+ assert.Truef(t, array.Equal(actual, expected), "got: %s, expected: %s",
actual, expected)
+}
+
+func TestConcatDictionaryDifferentSizeIndex(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8,
ValueType: arrow.BinaryTypes.String}
+ biggerDt := &arrow.DictionaryType{IndexType:
arrow.PrimitiveTypes.Uint16, ValueType: arrow.BinaryTypes.String}
+ dictOne, err := array.DictArrayFromJSON(mem, dt, `[0]`, `["A0"]`)
+ require.NoError(t, err)
+ defer dictOne.Release()
+
+ dictTwo, err := array.DictArrayFromJSON(mem, biggerDt, `[0]`, `["B0"]`)
+ require.NoError(t, err)
+ defer dictTwo.Release()
+
+ arr, err := array.Concatenate([]arrow.Array{dictOne, dictTwo}, mem)
+ assert.Nil(t, arr)
+ assert.Error(t, err)
+}
+
+func TestConcatDictionaryUnifyNullInDict(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8,
ValueType: arrow.BinaryTypes.String}
+ dictOne, err := array.DictArrayFromJSON(mem, dt, `[0, 1]`, `[null,
"A"]`)
+ require.NoError(t, err)
+ defer dictOne.Release()
+
+ dictTwo, err := array.DictArrayFromJSON(mem, dt, `[0, 1]`, `[null,
"B"]`)
+ require.NoError(t, err)
+ defer dictTwo.Release()
+
+ expected, err := array.DictArrayFromJSON(mem, dt, `[0, 1, 0, 2]`,
`[null, "A", "B"]`)
+ require.NoError(t, err)
+ defer expected.Release()
+
+ actual, err := array.Concatenate([]arrow.Array{dictOne, dictTwo}, mem)
+ assert.NoError(t, err)
+ defer actual.Release()
+
+ assert.Truef(t, array.Equal(actual, expected), "got: %s, expected: %s",
actual, expected)
+}
+
+func TestConcatDictionaryEnlargedIndices(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ const size = math.MaxUint8 + 1
+ dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8,
ValueType: arrow.PrimitiveTypes.Uint16}
+
+ idxBuilder := array.NewUint8Builder(mem)
+ defer idxBuilder.Release()
+ idxBuilder.Reserve(size)
+ for i := 0; i < size; i++ {
+ idxBuilder.UnsafeAppend(uint8(i))
+ }
+ indices := idxBuilder.NewUint8Array()
+ defer indices.Release()
+
+ valuesBuilder := array.NewUint16Builder(mem)
+ defer valuesBuilder.Release()
+ valuesBuilder.Reserve(size)
+ valuesBuilderTwo := array.NewUint16Builder(mem)
+ defer valuesBuilderTwo.Release()
+ valuesBuilderTwo.Reserve(size)
+
+ for i := uint16(0); i < size; i++ {
+ valuesBuilder.UnsafeAppend(i)
+ valuesBuilderTwo.UnsafeAppend(i + size)
+ }
+
+ dict1, dict2 := valuesBuilder.NewUint16Array(),
valuesBuilderTwo.NewUint16Array()
+ defer dict1.Release()
+ defer dict2.Release()
+
+ d1, d2 := array.NewDictionaryArray(dt, indices, dict1),
array.NewDictionaryArray(dt, indices, dict2)
+ defer d1.Release()
+ defer d2.Release()
+
+ _, err := array.Concatenate([]arrow.Array{d1, d2}, mem)
+ assert.Error(t, err)
+
+ biggerDt := &arrow.DictionaryType{IndexType:
arrow.PrimitiveTypes.Uint16, ValueType: arrow.PrimitiveTypes.Uint16}
+ bigger1, bigger2 := array.NewDictionaryArray(biggerDt, dict1, dict1),
array.NewDictionaryArray(biggerDt, dict1, dict2)
+ defer bigger1.Release()
+ defer bigger2.Release()
+
+ combined, err := array.Concatenate([]arrow.Array{bigger1, bigger2}, mem)
+ assert.NoError(t, err)
+ defer combined.Release()
+
+ assert.EqualValues(t, size*2, combined.Len())
+}
+
+func TestConcatDictionaryNullSlots(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint32,
ValueType: arrow.BinaryTypes.String}
+ dict1, err := array.DictArrayFromJSON(mem, dt, `[null, null, null,
null]`, `[]`)
+ require.NoError(t, err)
+ defer dict1.Release()
+
+ dict2, err := array.DictArrayFromJSON(mem, dt, `[null, null, null,
null, 0, 1]`, `["a", "b"]`)
+ require.NoError(t, err)
+ defer dict2.Release()
+
+ expected, err := array.DictArrayFromJSON(mem, dt, `[null, null, null,
null, null, null, null, null, 0, 1]`, `["a", "b"]`)
+ require.NoError(t, err)
+ defer expected.Release()
+
+ actual, err := array.Concatenate([]arrow.Array{dict1, dict2}, mem)
+ assert.NoError(t, err)
+ defer actual.Release()
+
+ assert.Truef(t, array.Equal(actual, expected), "got: %s, expected: %s",
actual, expected)
+}
diff --git a/go/arrow/array/string.go b/go/arrow/array/string.go
index 4c033118a6..237ea0166b 100644
--- a/go/arrow/array/string.go
+++ b/go/arrow/array/string.go
@@ -231,6 +231,12 @@ func (b *StringBuilder) Reserve(n int) {
b.builder.Reserve(n)
}
+// ReserveData ensures there is enough space for appending n bytes
+// by checking the capacity and resizing the data buffer if necessary.
+func (b *StringBuilder) ReserveData(n int) {
+ b.builder.ReserveData(n)
+}
+
// Resize adjusts the space allocated by b to n elements. If n is greater than
b.Cap(),
// additional memory will be allocated. If n is smaller, the allocated memory
may reduced.
func (b *StringBuilder) Resize(n int) {
diff --git a/go/arrow/array/util.go b/go/arrow/array/util.go
index 5510b8df17..3945376312 100644
--- a/go/arrow/array/util.go
+++ b/go/arrow/array/util.go
@@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
+ "strings"
"github.com/apache/arrow/go/v9/arrow"
"github.com/apache/arrow/go/v9/arrow/bitutil"
@@ -282,3 +283,19 @@ func getDictArrayData(mem memory.Allocator, valueType
arrow.DataType, memoTable
return NewData(valueType, dictLen, buffers, nil, nullcount, 0), nil
}
+
+func DictArrayFromJSON(mem memory.Allocator, dt *arrow.DictionaryType,
indicesJSON, dictJSON string) (arrow.Array, error) {
+ indices, _, err := FromJSON(mem, dt.IndexType,
strings.NewReader(indicesJSON))
+ if err != nil {
+ return nil, err
+ }
+ defer indices.Release()
+
+ dict, _, err := FromJSON(mem, dt.ValueType, strings.NewReader(dictJSON))
+ if err != nil {
+ return nil, err
+ }
+ defer dict.Release()
+
+ return NewDictionaryArray(dt, indices, dict), nil
+}