This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new ba9cb62c feat(arrow/compute): Take kernel for Map type (#574)
ba9cb62c is described below
commit ba9cb62ce7c18f9cb0728c0d175cc89b6c266b07
Author: Alex <[email protected]>
AuthorDate: Mon Dec 1 12:07:15 2025 -0700
feat(arrow/compute): Take kernel for Map type (#574)
### Rationale for this change
Arrow Go is lacking a Take kernel for Map types which means from
`iceberg-go` we cannot write to partitioned Iceberg tables containing
columns with Map types.
### What changes are included in this PR?
- Adds a new Take kernel for Map types
- the same allocation behavior from #557 and #573 is used
### Are these changes tested?
Yes.
### Are there any user-facing changes?
* Can now use Take on Arrow schemas with Map columns.
* Can now write to partitioned Iceberg tables using `arrow-go`
---
arrow/compute/internal/kernels/vector_selection.go | 77 ++++++++++++++++++
arrow/compute/selection.go | 51 ++++++++++++
arrow/compute/vector_selection_test.go | 90 ++++++++++++++++++++++
3 files changed, 218 insertions(+)
diff --git a/arrow/compute/internal/kernels/vector_selection.go
b/arrow/compute/internal/kernels/vector_selection.go
index 62923407..0fe84b03 100644
--- a/arrow/compute/internal/kernels/vector_selection.go
+++ b/arrow/compute/internal/kernels/vector_selection.go
@@ -1690,6 +1690,83 @@ func FSLImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan,
outputLength int64, out
return nil
}
+func MapImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64,
out *exec.ExecResult, fn selectionOutputFn) error {
+ var (
+ values = &batch.Values[0].Array
+ selection = &batch.Values[1].Array
+
+ rawOffsets = exec.GetSpanOffsets[int32](values, 1)
+ mem = exec.GetAllocator(ctx.Ctx)
+ offsetBuilder = newBufferBuilder[int32](mem)
+ childIdxBuilder = newBufferBuilder[int32](mem)
+ )
+
+ // Maps use the same underlying structure as lists (offsets + child
indices)
+ // Pre-allocate based on mean map size
+ if values.Len > 0 {
+ dataLength := rawOffsets[values.Len] - rawOffsets[0]
+ meanMapLen := float64(dataLength) / float64(values.Len)
+ estimatedTotal := int(meanMapLen * float64(outputLength))
+
+ // Cap the pre-allocation at a reasonable size
+ const maxPreAlloc = 16777216 // 16M elements
+ estimatedTotal = min(estimatedTotal, maxPreAlloc)
+ childIdxBuilder.reserve(estimatedTotal)
+ }
+
+ offsetBuilder.reserve(int(outputLength) + 1)
+ spaceAvail := childIdxBuilder.cap()
+ var offset int32
+ err := fn(ctx, outputLength, values, selection, out,
+ func(idx int64) error {
+ offsetBuilder.unsafeAppend(offset)
+ valueOffset := rawOffsets[idx]
+ valueLength := rawOffsets[idx+1] - valueOffset
+ offset += valueLength
+ if int(valueLength) > spaceAvail {
+ // Calculate how much total capacity we need
+ needed := childIdxBuilder.len() +
int(valueLength)
+ newCap := childIdxBuilder.cap()
+
+ // Double capacity until we have enough space
+ // This gives us O(log n) reallocations instead
of O(n)
+ if newCap == 0 {
+ newCap = int(valueLength)
+ }
+ for newCap < needed {
+ newCap = newCap * 2
+ }
+
+ // Reserve the additional capacity
+ additional := newCap - childIdxBuilder.len()
+ childIdxBuilder.reserve(additional)
+ spaceAvail = childIdxBuilder.cap() -
childIdxBuilder.len()
+ }
+ for j := valueOffset; j < valueOffset+valueLength; j++ {
+ childIdxBuilder.unsafeAppend(j)
+ }
+ spaceAvail -= int(valueLength)
+ return nil
+ }, func() error {
+ offsetBuilder.unsafeAppend(offset)
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ offsetBuilder.unsafeAppend(offset)
+ out.Buffers[1].WrapBuffer(offsetBuilder.finish())
+
+ out.Children = make([]exec.ArraySpan, 1)
+ out.Children[0].Type = arrow.PrimitiveTypes.Int32
+ out.Children[0].Len = int64(childIdxBuilder.len())
+ out.Children[0].Buffers[1].WrapBuffer(childIdxBuilder.finish())
+
+ return nil
+}
+
func DenseUnionImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength
int64, out *exec.ExecResult, fn selectionOutputFn) error {
var (
values = &batch.Values[0].Array
diff --git a/arrow/compute/selection.go b/arrow/compute/selection.go
index 8839a481..5c0a9759 100644
--- a/arrow/compute/selection.go
+++ b/arrow/compute/selection.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute/exec"
"github.com/apache/arrow-go/v18/arrow/compute/internal/kernels"
+ "github.com/apache/arrow-go/v18/arrow/memory"
"golang.org/x/sync/errgroup"
)
@@ -360,6 +361,55 @@ func selectListImpl(fn exec.ArrayKernelExec)
exec.ArrayKernelExec {
}
}
+func selectMapImpl(fn exec.ArrayKernelExec) exec.ArrayKernelExec {
+ return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out
*exec.ExecResult) error {
+ if err := fn(ctx, batch, out); err != nil {
+ return err
+ }
+
+ // out.Children[0] contains the child indexes of key-value
pairs that we
+ // want to take after processing. Maps store their data as a
struct of keys/items.
+ values := batch.Values[0].Array.MakeArray().(*array.Map)
+ defer values.Release()
+
+ childIndices := out.Children[0].MakeArray()
+ defer childIndices.Release()
+
+ // Maps have a single struct child containing the keys and items
+ // We need to take from both the keys and items arrays
+ takenKeys, err := TakeArrayOpts(ctx.Ctx, values.Keys(),
childIndices, kernels.TakeOptions{BoundsCheck: false})
+ if err != nil {
+ return err
+ }
+ defer takenKeys.Release()
+
+ takenItems, err := TakeArrayOpts(ctx.Ctx, values.Items(),
childIndices, kernels.TakeOptions{BoundsCheck: false})
+ if err != nil {
+ return err
+ }
+ defer takenItems.Release()
+
+ // Build the struct child array with the taken keys and items
+ // Maps have a single struct child with "key" and "value" fields
+ structType := arrow.StructOf(
+ arrow.Field{Name: "key", Type:
values.Keys().DataType()},
+ arrow.Field{Name: "value", Type:
values.Items().DataType()},
+ )
+
+ // Create struct data with taken keys and items as children
+ structData := array.NewData(
+ structType,
+ int(childIndices.Len()),
+ []*memory.Buffer{nil},
+ []arrow.ArrayData{takenKeys.Data(), takenItems.Data()},
+ 0, 0)
+ defer structData.Release()
+
+ out.Children[0].TakeOwnership(structData)
+ return nil
+ }
+}
+
func denseUnionImpl(fn exec.ArrayKernelExec) exec.ArrayKernelExec {
return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out
*exec.ExecResult) error {
if err := fn(ctx, batch, out); err != nil {
@@ -510,6 +560,7 @@ func RegisterVectorSelection(reg FunctionRegistry) {
{In: exec.NewIDInput(arrow.LIST), Exec:
selectListImpl(kernels.TakeExec(kernels.ListImpl[int32]))},
{In: exec.NewIDInput(arrow.LARGE_LIST), Exec:
selectListImpl(kernels.TakeExec(kernels.ListImpl[int64]))},
{In: exec.NewIDInput(arrow.FIXED_SIZE_LIST), Exec:
selectListImpl(kernels.TakeExec(kernels.FSLImpl))},
+ {In: exec.NewIDInput(arrow.MAP), Exec:
selectMapImpl(kernels.TakeExec(kernels.MapImpl))},
{In: exec.NewIDInput(arrow.DENSE_UNION), Exec:
denseUnionImpl(kernels.TakeExec(kernels.DenseUnionImpl))},
{In: exec.NewIDInput(arrow.EXTENSION), Exec: extensionTakeImpl},
{In: exec.NewIDInput(arrow.STRUCT), Exec: structTake},
diff --git a/arrow/compute/vector_selection_test.go
b/arrow/compute/vector_selection_test.go
index a9d86a74..45601f44 100644
--- a/arrow/compute/vector_selection_test.go
+++ b/arrow/compute/vector_selection_test.go
@@ -1255,6 +1255,96 @@ func (tk *TakeKernelLists) TestFixedSizeListInt32() {
tk.assertNoValidityBitmapUnknownNullCountJSON(tk.dt, `[[1, null, 3],
[4, 5, 6], [7, 8, null]]`, `[0, 1, 0]`)
}
+type TakeKernelMap struct {
+ TakeKernelTestTyped
+}
+
+func (tk *TakeKernelMap) TestMapStringInt32() {
+ tk.dt = arrow.MapOf(arrow.BinaryTypes.String,
arrow.PrimitiveTypes.Int32)
+
+ mapJSON := `[
+ {},
+ {"a": 1},
+ {"a": 2, "b": 3, "c": 4},
+ {"w": 5, "x": 6, "y": 7, "z": null}
+ ]`
+ tk.checkTake(tk.dt, mapJSON, `[]`, `[]`)
+ tk.checkTake(tk.dt, mapJSON, `[3, 1, 3, 1, 3]`, `[
+ {"w": 5, "x": 6, "y": 7, "z": null}
+ {"a": 1},
+ {"w": 5, "x": 6, "y": 7, "z": null}
+ {"a": 1},
+ {"w": 5, "x": 6, "y": 7, "z": null}
+ ]`)
+ tk.checkTake(tk.dt, mapJSON, `[4, 2, 1, 6]`, `[
+ {"a": 2, "b": 3, "c": 4},
+ {"a": 1},
+ {"a": 2, "b": 3, "c": 4},
+ {"w": 5, "x": 6, "y": 7, "z": null}
+ ]`)
+ tk.checkTake(tk.dt, mapJSON, `[0, 1, 2, 3]`, mapJSON)
+
+ tk.assertNoValidityBitmapUnknownNullCountJSON(tk.dt, `[{"a": 1, "b":
2}, {"a": 2, "b": 3}]`, `[0, 1, 0]`)
+}
+
+func (tk *TakeKernelMap) TestMapStringLargeString() {
+ tk.dt = arrow.MapOf(arrow.BinaryTypes.String,
arrow.BinaryTypes.LargeString)
+
+ mapJSON := `[
+ {},
+ {"a": "b"},
+ {"a": "c", "d": "e", "f": "g"},
+ {"w": "x", "y": "z", "m": "n", "o": null}
+ ]`
+
+ tk.checkTake(tk.dt, mapJSON, `[]`, `[]`)
+ tk.checkTake(tk.dt, mapJSON, `[3, 1, 3, 1, 3]`, `[
+ {"w": "x", "y": "z", "m": "n", "o": null},
+ {"a": "b"},
+ {"w": "x", "y": "z", "m": "n", "o": null},
+ {"a": "b"},
+ {"w": "x", "y": "z", "m": "n", "o": null},
+ ]`)
+ tk.checkTake(tk.dt, mapJSON, `[4, 2, 1, 6]`, `[
+ {"a": "e", "c": "f"},
+ {"w": "x", "y": "z", "m": "n", "o": null},
+ {"a": "b"},
+ {"w": "x", "y": "z", "m": "n", "o": null}
+ ]`)
+ tk.checkTake(tk.dt, mapJSON, `[0, 1, 2, 3]`, mapJSON)
+
+ tk.assertNoValidityBitmapUnknownNullCountJSON(tk.dt, `[{"a": "b", "c":
"d"}, {"a": "e", "c": "f"}]`, `[0, 1, 0]`)
+}
+
+func (tk *TakeKernelMap) TestMapIntListString() {
+ tk.dt = arrow.MapOf(arrow.PrimitiveTypes.Int32,
arrow.ListOf(arrow.BinaryTypes.String))
+
+ mapJSON := `[
+ {},
+ {1: ["a"]},
+ {1: ["b", "c"], 2: ["d"], 3: ["e", "f", "g"]},
+ {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]}
+ ]`
+
+ tk.checkTake(tk.dt, mapJSON, `[]`, `[]`)
+ tk.checkTake(tk.dt, mapJSON, `[3, 1, 3, 1, 3]`, `[
+ {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]},
+ {1: ["a", "b"], 2: ["c", "d"]},
+ {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]},
+ {1: ["a", "b"], 2: ["c", "d"]},
+ {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]},
+ ]`)
+ tk.checkTake(tk.dt, mapJSON, `[4, 2, 1, 6]`, `[
+ {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]},
+ {1: ["a", "b"], 2: ["c", "d"]},
+ {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]},
+ {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]}
+ ]`)
+ tk.checkTake(tk.dt, mapJSON, `[0, 1, 2, 3]`, mapJSON)
+
+ tk.assertNoValidityBitmapUnknownNullCountJSON(tk.dt, `[{1: ["a", "b"],
2: ["c", "d"]}, {1: [3, 4], 2: ["e", "f"]}]`, `[0, 1, 0]`)
+}
+
type TakeKernelDenseUnion struct {
TakeKernelTestTyped
}