This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new ba9cb62c feat(arrow/compute): Take kernel for Map type (#574)
ba9cb62c is described below

commit ba9cb62ce7c18f9cb0728c0d175cc89b6c266b07
Author: Alex <[email protected]>
AuthorDate: Mon Dec 1 12:07:15 2025 -0700

    feat(arrow/compute): Take kernel for Map type (#574)
    
    ### Rationale for this change
    
    Arrow Go is lacking a Take kernel for Map types which means from
    `iceberg-go` we cannot write to partitioned Iceberg tables containing
    columns with Map types.
    
    ### What changes are included in this PR?
    
    - Adds a new Take kernel for Map types
      - the same allocation behavior from #557 and #573 is used
    
    ### Are these changes tested?
    
    Yes.
    
    
    ### Are there any user-facing changes?
    
    * Can now use Take on Arrow schemas with Map columns.
    * Can now write to partitioned Iceberg tables using `arrow-go`
---
 arrow/compute/internal/kernels/vector_selection.go | 77 ++++++++++++++++++
 arrow/compute/selection.go                         | 51 ++++++++++++
 arrow/compute/vector_selection_test.go             | 90 ++++++++++++++++++++++
 3 files changed, 218 insertions(+)

diff --git a/arrow/compute/internal/kernels/vector_selection.go 
b/arrow/compute/internal/kernels/vector_selection.go
index 62923407..0fe84b03 100644
--- a/arrow/compute/internal/kernels/vector_selection.go
+++ b/arrow/compute/internal/kernels/vector_selection.go
@@ -1690,6 +1690,83 @@ func FSLImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, 
outputLength int64, out
        return nil
 }
 
+func MapImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, 
out *exec.ExecResult, fn selectionOutputFn) error {
+       var (
+               values    = &batch.Values[0].Array
+               selection = &batch.Values[1].Array
+
+               rawOffsets      = exec.GetSpanOffsets[int32](values, 1)
+               mem             = exec.GetAllocator(ctx.Ctx)
+               offsetBuilder   = newBufferBuilder[int32](mem)
+               childIdxBuilder = newBufferBuilder[int32](mem)
+       )
+
+       // Maps use the same underlying structure as lists (offsets + child 
indices)
+       // Pre-allocate based on mean map size
+       if values.Len > 0 {
+               dataLength := rawOffsets[values.Len] - rawOffsets[0]
+               meanMapLen := float64(dataLength) / float64(values.Len)
+               estimatedTotal := int(meanMapLen * float64(outputLength))
+
+               // Cap the pre-allocation at a reasonable size
+               const maxPreAlloc = 16777216 // 16M elements
+               estimatedTotal = min(estimatedTotal, maxPreAlloc)
+               childIdxBuilder.reserve(estimatedTotal)
+       }
+
+       offsetBuilder.reserve(int(outputLength) + 1)
+       spaceAvail := childIdxBuilder.cap()
+       var offset int32
+       err := fn(ctx, outputLength, values, selection, out,
+               func(idx int64) error {
+                       offsetBuilder.unsafeAppend(offset)
+                       valueOffset := rawOffsets[idx]
+                       valueLength := rawOffsets[idx+1] - valueOffset
+                       offset += valueLength
+                       if int(valueLength) > spaceAvail {
+                               // Calculate how much total capacity we need
+                               needed := childIdxBuilder.len() + 
int(valueLength)
+                               newCap := childIdxBuilder.cap()
+
+                               // Double capacity until we have enough space
+                               // This gives us O(log n) reallocations instead 
of O(n)
+                               if newCap == 0 {
+                                       newCap = int(valueLength)
+                               }
+                               for newCap < needed {
+                                       newCap = newCap * 2
+                               }
+
+                               // Reserve the additional capacity
+                               additional := newCap - childIdxBuilder.len()
+                               childIdxBuilder.reserve(additional)
+                               spaceAvail = childIdxBuilder.cap() - 
childIdxBuilder.len()
+                       }
+                       for j := valueOffset; j < valueOffset+valueLength; j++ {
+                               childIdxBuilder.unsafeAppend(j)
+                       }
+                       spaceAvail -= int(valueLength)
+                       return nil
+               }, func() error {
+                       offsetBuilder.unsafeAppend(offset)
+                       return nil
+               })
+
+       if err != nil {
+               return err
+       }
+
+       offsetBuilder.unsafeAppend(offset)
+       out.Buffers[1].WrapBuffer(offsetBuilder.finish())
+
+       out.Children = make([]exec.ArraySpan, 1)
+       out.Children[0].Type = arrow.PrimitiveTypes.Int32
+       out.Children[0].Len = int64(childIdxBuilder.len())
+       out.Children[0].Buffers[1].WrapBuffer(childIdxBuilder.finish())
+
+       return nil
+}
+
 func DenseUnionImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength 
int64, out *exec.ExecResult, fn selectionOutputFn) error {
        var (
                values    = &batch.Values[0].Array
diff --git a/arrow/compute/selection.go b/arrow/compute/selection.go
index 8839a481..5c0a9759 100644
--- a/arrow/compute/selection.go
+++ b/arrow/compute/selection.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/compute/exec"
        "github.com/apache/arrow-go/v18/arrow/compute/internal/kernels"
+       "github.com/apache/arrow-go/v18/arrow/memory"
        "golang.org/x/sync/errgroup"
 )
 
@@ -360,6 +361,55 @@ func selectListImpl(fn exec.ArrayKernelExec) 
exec.ArrayKernelExec {
        }
 }
 
+func selectMapImpl(fn exec.ArrayKernelExec) exec.ArrayKernelExec {
+       return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out 
*exec.ExecResult) error {
+               if err := fn(ctx, batch, out); err != nil {
+                       return err
+               }
+
+               // out.Children[0] contains the child indexes of key-value 
pairs that we
+               // want to take after processing. Maps store their data as a 
struct of keys/items.
+               values := batch.Values[0].Array.MakeArray().(*array.Map)
+               defer values.Release()
+
+               childIndices := out.Children[0].MakeArray()
+               defer childIndices.Release()
+
+               // Maps have a single struct child containing the keys and items
+               // We need to take from both the keys and items arrays
+               takenKeys, err := TakeArrayOpts(ctx.Ctx, values.Keys(), 
childIndices, kernels.TakeOptions{BoundsCheck: false})
+               if err != nil {
+                       return err
+               }
+               defer takenKeys.Release()
+
+               takenItems, err := TakeArrayOpts(ctx.Ctx, values.Items(), 
childIndices, kernels.TakeOptions{BoundsCheck: false})
+               if err != nil {
+                       return err
+               }
+               defer takenItems.Release()
+
+               // Build the struct child array with the taken keys and items
+               // Maps have a single struct child with "key" and "value" fields
+               structType := arrow.StructOf(
+                       arrow.Field{Name: "key", Type: 
values.Keys().DataType()},
+                       arrow.Field{Name: "value", Type: 
values.Items().DataType()},
+               )
+
+               // Create struct data with taken keys and items as children
+               structData := array.NewData(
+                       structType,
+                       int(childIndices.Len()),
+                       []*memory.Buffer{nil},
+                       []arrow.ArrayData{takenKeys.Data(), takenItems.Data()},
+                       0, 0)
+               defer structData.Release()
+
+               out.Children[0].TakeOwnership(structData)
+               return nil
+       }
+}
+
 func denseUnionImpl(fn exec.ArrayKernelExec) exec.ArrayKernelExec {
        return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out 
*exec.ExecResult) error {
                if err := fn(ctx, batch, out); err != nil {
@@ -510,6 +560,7 @@ func RegisterVectorSelection(reg FunctionRegistry) {
                {In: exec.NewIDInput(arrow.LIST), Exec: 
selectListImpl(kernels.TakeExec(kernels.ListImpl[int32]))},
                {In: exec.NewIDInput(arrow.LARGE_LIST), Exec: 
selectListImpl(kernels.TakeExec(kernels.ListImpl[int64]))},
                {In: exec.NewIDInput(arrow.FIXED_SIZE_LIST), Exec: 
selectListImpl(kernels.TakeExec(kernels.FSLImpl))},
+               {In: exec.NewIDInput(arrow.MAP), Exec: 
selectMapImpl(kernels.TakeExec(kernels.MapImpl))},
                {In: exec.NewIDInput(arrow.DENSE_UNION), Exec: 
denseUnionImpl(kernels.TakeExec(kernels.DenseUnionImpl))},
                {In: exec.NewIDInput(arrow.EXTENSION), Exec: extensionTakeImpl},
                {In: exec.NewIDInput(arrow.STRUCT), Exec: structTake},
diff --git a/arrow/compute/vector_selection_test.go 
b/arrow/compute/vector_selection_test.go
index a9d86a74..45601f44 100644
--- a/arrow/compute/vector_selection_test.go
+++ b/arrow/compute/vector_selection_test.go
@@ -1255,6 +1255,96 @@ func (tk *TakeKernelLists) TestFixedSizeListInt32() {
        tk.assertNoValidityBitmapUnknownNullCountJSON(tk.dt, `[[1, null, 3], 
[4, 5, 6], [7, 8, null]]`, `[0, 1, 0]`)
 }
 
+type TakeKernelMap struct {
+       TakeKernelTestTyped
+}
+
+func (tk *TakeKernelMap) TestMapStringInt32() {
+       tk.dt = arrow.MapOf(arrow.BinaryTypes.String, 
arrow.PrimitiveTypes.Int32)
+
+       mapJSON := `[
+               {},
+               {"a": 1},
+               {"a": 2, "b": 3, "c": 4},
+               {"w": 5, "x": 6, "y": 7, "z": null}
+       ]`
+       tk.checkTake(tk.dt, mapJSON, `[]`, `[]`)
+       tk.checkTake(tk.dt, mapJSON, `[3, 1, 3, 1, 3]`, `[
+               {"w": 5, "x": 6, "y": 7, "z": null}
+               {"a": 1},
+               {"w": 5, "x": 6, "y": 7, "z": null}
+               {"a": 1},
+               {"w": 5, "x": 6, "y": 7, "z": null}
+       ]`)
+       tk.checkTake(tk.dt, mapJSON, `[4, 2, 1, 6]`, `[
+               {"a": 2, "b": 3, "c": 4},
+               {"a": 1},
+               {"a": 2, "b": 3, "c": 4},
+               {"w": 5, "x": 6, "y": 7, "z": null}
+       ]`)
+       tk.checkTake(tk.dt, mapJSON, `[0, 1, 2, 3]`, mapJSON)
+
+       tk.assertNoValidityBitmapUnknownNullCountJSON(tk.dt, `[{"a": 1, "b": 
2}, {"a": 2, "b": 3}]`, `[0, 1, 0]`)
+}
+
+func (tk *TakeKernelMap) TestMapStringLargeString() {
+       tk.dt = arrow.MapOf(arrow.BinaryTypes.String, 
arrow.BinaryTypes.LargeString)
+
+       mapJSON := `[
+               {},
+               {"a": "b"},
+               {"a": "c", "d": "e", "f": "g"},
+               {"w": "x", "y": "z", "m": "n", "o": null}
+       ]`
+
+       tk.checkTake(tk.dt, mapJSON, `[]`, `[]`)
+       tk.checkTake(tk.dt, mapJSON, `[3, 1, 3, 1, 3]`, `[
+               {"w": "x", "y": "z", "m": "n", "o": null},
+               {"a": "b"},
+               {"w": "x", "y": "z", "m": "n", "o": null},
+               {"a": "b"},
+               {"w": "x", "y": "z", "m": "n", "o": null},
+       ]`)
+       tk.checkTake(tk.dt, mapJSON, `[4, 2, 1, 6]`, `[
+               {"a": "e", "c": "f"},
+               {"w": "x", "y": "z", "m": "n", "o": null},
+               {"a": "b"},
+               {"w": "x", "y": "z", "m": "n", "o": null}
+       ]`)
+       tk.checkTake(tk.dt, mapJSON, `[0, 1, 2, 3]`, mapJSON)
+
+       tk.assertNoValidityBitmapUnknownNullCountJSON(tk.dt, `[{"a": "b", "c": 
"d"}, {"a": "e", "c": "f"}]`, `[0, 1, 0]`)
+}
+
+func (tk *TakeKernelMap) TestMapIntListString() {
+       tk.dt = arrow.MapOf(arrow.PrimitiveTypes.Int32, 
arrow.ListOf(arrow.BinaryTypes.String))
+
+       mapJSON := `[
+               {},
+               {1: ["a"]},
+               {1: ["b", "c"], 2: ["d"], 3: ["e", "f", "g"]},
+               {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]}
+       ]`
+
+       tk.checkTake(tk.dt, mapJSON, `[]`, `[]`)
+       tk.checkTake(tk.dt, mapJSON, `[3, 1, 3, 1, 3]`, `[
+               {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]},
+               {1: ["a", "b"], 2: ["c", "d"]},
+               {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]},
+               {1: ["a", "b"], 2: ["c", "d"]},
+               {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]},
+       ]`)
+       tk.checkTake(tk.dt, mapJSON, `[4, 2, 1, 6]`, `[
+               {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]},
+               {1: ["a", "b"], 2: ["c", "d"]},
+               {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]},
+               {1: ["h"], 2: ["i", "j"], 3: [null], 4: ["k", "l", "m", "n"]}
+       ]`)
+       tk.checkTake(tk.dt, mapJSON, `[0, 1, 2, 3]`, mapJSON)
+
+       tk.assertNoValidityBitmapUnknownNullCountJSON(tk.dt, `[{1: ["a", "b"], 
2: ["c", "d"]}, {1: [3, 4], 2: ["e", "f"]}]`, `[0, 1, 0]`)
+}
+
 type TakeKernelDenseUnion struct {
        TakeKernelTestTyped
 }

Reply via email to