lidavidm commented on a change in pull request #12162:
URL: https://github.com/apache/arrow/pull/12162#discussion_r790811473



##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,169 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Result<int64_t> FindOneMapValueIndex(const Array& keys, const Scalar& 
query_key,
+                                              const int64_t start, const 
int64_t end,
+                                              const bool from_back = false) {
+    if (!from_back) {
+      for (int64_t idx = start; idx < end; ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys.GetScalar(idx));
+
+        if (key->Equals(query_key)) return idx;
+      }
+    } else {
+      for (int64_t idx = end - 1; idx >= start; --idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys.GetScalar(idx));
+
+        if (key->Equals(query_key)) return idx;
+      }
+    }
+    return -1;
+  }
+
+  static Result<std::shared_ptr<Scalar>> GetScalarOutput(KernelContext* ctx,
+                                                         const MapScalar 
map_scalar) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+    const std::shared_ptr<Scalar>& query_key = options.query_key;
+    const auto& occurrence = options.occurrence;
+
+    const auto& struct_array = checked_cast<const 
StructArray&>(*map_scalar.value);
+    const std::shared_ptr<Array> keys = struct_array.field(0);
+    const std::shared_ptr<Array> items = struct_array.field(1);
+
+    std::shared_ptr<Scalar> output;
+
+    if (occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), items->type(), &builder));
+
+      bool found_at_least_one_key = false;
+      for (int64_t idx = 0; idx < struct_array.length(); ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys->GetScalar(idx));
+
+        if (key->Equals(*query_key)) {
+          found_at_least_one_key = true;
+          RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), idx, 1));
+        }
+      }
+      if (!found_at_least_one_key) {
+        output = MakeNullScalar(list(items->type()));
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+        ARROW_ASSIGN_OR_RAISE(output, MakeScalar(list(items->type()), result));
+      }
+    }
+
+    else { /* occurrence == FIRST || LAST */
+      bool from_back = (occurrence == MapArrayLookupOptions::LAST);
+
+      ARROW_ASSIGN_OR_RAISE(
+          int64_t key_match_idx,
+          FindOneMapValueIndex(*keys, *query_key, 0, struct_array.length(), 
from_back));
+      if (key_match_idx != -1) {
+        ARROW_ASSIGN_OR_RAISE(output, items->GetScalar(key_match_idx));
+      } else {
+        output = MakeNullScalar(items->type());
+      }
+    }
+    return output;
+  }
+
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::unique_ptr<ArrayBuilder> builder;
+    if (options.occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(),
+                                list(map_array.map_type()->item_type()), 
&builder));
+    } else {
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+    }
+    for (int64_t map_array_idx = 0; map_array_idx < map_array.length(); 
++map_array_idx) {
+      if (!map_array.IsValid(map_array_idx)) {
+        RETURN_NOT_OK(builder->AppendNull());
+        continue;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> scalar,
+                              map_array.GetScalar(map_array_idx));
+        auto map_scalar = std::static_pointer_cast<MapScalar>(scalar);
+        ARROW_ASSIGN_OR_RAISE(auto scalar_output, GetScalarOutput(ctx, 
*map_scalar));
+        RETURN_NOT_OK(builder->AppendScalar(*scalar_output));

Review comment:
       ```suggestion
           auto items = map_array.value_slice(map_array_idx);
           auto keys = checked_cast<const StructArray*>(*items).field(0);
           ARROW_ASSIGN_OR_RAISE(auto index, FindOneMapValueIndex(*keys, ...));
           // Append the value (adjusting index since index is relative to the 
current item)
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested_test.cc
##########
@@ -225,6 +225,294 @@ TEST(TestScalarNested, StructField) {
   }
 }
 
+void CheckMapArrayLookupWithDifferentOptions(
+    const std::shared_ptr<Array>& map, const std::shared_ptr<Scalar>& 
query_key,
+    const std::shared_ptr<Array>& expected_all,
+    const std::shared_ptr<Array>& expected_first,
+    const std::shared_ptr<Array>& expected_last) {
+  MapArrayLookupOptions all_matches(query_key, MapArrayLookupOptions::ALL);
+  MapArrayLookupOptions first_matches(query_key, MapArrayLookupOptions::FIRST);
+  MapArrayLookupOptions last_matches(query_key, MapArrayLookupOptions::LAST);
+
+  CheckScalar("map_array_lookup", {map}, expected_all, &all_matches);
+  CheckScalar("map_array_lookup", {map}, expected_first, &first_matches);
+  CheckScalar("map_array_lookup", {map}, expected_last, &last_matches);
+}
+
+class TestMapArrayLookupKernel : public ::testing::Test {};
+
+TEST_F(TestMapArrayLookupKernel, Basic) {
+  auto type = map(utf8(), int32());
+  const char* input = R"(
+    [
+      [["foo", 99], ["bar", 1], ["hello", 2], ["foo", 3], ["lets go", 5], 
["what now?", 8]],
+      null,
+      [["nothing", null], ["hat", null], ["foo", 101], ["sorry", 1], ["dip", 
null],
+      ["foo", 22]],
+      []
+    ])";
+  auto map_array = ArrayFromJSON(type, input);
+  CheckMapArrayLookupWithDifferentOptions(
+      map_array, MakeScalar("foo"),
+      ArrayFromJSON(list(int32()), R"([[99, 3], null, [101, 22], null])"),
+      ArrayFromJSON(int32(), R"([99, null, 101, null])"),
+      ArrayFromJSON(int32(), R"([3, null, 22, null])"));
+}
+
+TEST_F(TestMapArrayLookupKernel, NestedItems) {
+  auto type = map(utf8(), map(int16(), int16()));
+  const char* input = R"(
+    [
+      [
+        [
+          "just",
+          [[0, 0], [1, 1]]
+        ],
+        [
+          "random",
+          [[2, 2], [3, 3]]
+        ],
+        [
+          "foo",
+          [[4, 4], [5, 5]]
+        ],
+        [
+          "values",
+          [[6, 6], [7, 7]]
+        ],
+        [
+          "foo",
+          [[8, 8], [9, 9]]
+        ],
+        [
+          "point",
+          [[10, 10], [11, 11]]
+        ],
+        [
+          "foo",
+          [[12, 12], [13, 13]]
+        ]
+      ],
+      null,
+      [
+        [
+          "yet",
+          [[0, 1], [1, 2]]
+        ],
+        [
+          "more",
+          [[2, 3], [3, 4]]
+        ],
+        [
+          "foo",
+          [[4, 5], [5, 6]]
+        ],
+        [
+          "random",
+          [[6, 7], [7, 8]]
+        ],
+        [
+          "foo",
+          [[8, 9], [9, 10]]
+        ],
+        [
+          "values",
+          [[10, 11], [11, 12]]
+        ],
+        [
+          "foo",
+          [[12, 13], [13, 14]]
+        ]
+      ],
+      []
+    ]
+  )";
+  const auto map_array = ArrayFromJSON(type, input);
+  const auto expected_all = ArrayFromJSON(list(map(int16(), int16())), R"(
+                                [
+                                  [
+                                    [[4, 4], [5, 5]], [[8, 8], [9, 9]],
+                                    [[12, 12], [13, 13]]
+                                  ],
+                                  null,
+                                  [
+                                    [[4, 5], [5, 6]], [[8, 9], [9, 10]],
+                                    [[12, 13], [13, 14]]
+                                  ],
+                                  null
+                                ])");
+  const auto expected_first = ArrayFromJSON(map(int16(), int16()), R"(
+                                [
+                                  [[4, 4], [5, 5]],
+                                  null,
+                                  [[4, 5], [5, 6]],
+                                  null
+                                ])");
+  const auto expected_last = ArrayFromJSON(map(int16(), int16()), R"(
+                                [
+                                  [[12, 12], [13, 13]],
+                                  null,
+                                  [[12, 13], [13, 14]],
+                                  null
+                                ])");
+  CheckMapArrayLookupWithDifferentOptions(map_array, MakeScalar("foo"), 
expected_all,
+                                          expected_first, expected_last);
+}
+
+template <typename KeyType>
+class TestMapArrayLookupIntegralKeys : public ::testing ::Test {};
+
+TYPED_TEST_SUITE(TestMapArrayLookupIntegralKeys, PhysicalIntegralArrowTypes);
+
+TYPED_TEST(TestMapArrayLookupIntegralKeys, StringItems) {
+  std::shared_ptr<DataType> type = default_type_instance<TypeParam>();
+
+  auto map_type = map(type, utf8());
+  const char* input = R"(
+    [
+      [ 
+        [0, "zero"], [1, "first_one"], [2, "two"], [1, null], [3, "three"], 
[1, "second_one"],
+        [1, "last_one"]
+      ],
+      null,
+      [ 
+        [0, "zero_hero"], [9, "almost_six"], [1, "the_dumb_one"], [7, 
"eleven"],
+        [1, "the_chosen_one"], [42, "meaning of life?"], [1, "just_one"],
+        [1, "no more ones!"]
+      ],
+      [
+        [4, "this"], [6, "has"], [8, "no"], [2, "ones"]
+      ],
+      [
+        [1, "this"], [1, "should"], [1, "also"], [1, "be"], [1, "null"]
+      ],
+      []
+    ])";
+  auto map_array = ArrayFromJSON(map_type, input);
+  auto map_array_tweaked = TweakValidityBit(map_array, 4, false);
+
+  auto expected_all = ArrayFromJSON(list(utf8()), R"(
+                          [
+                            ["first_one", null, "second_one", "last_one"],
+                            null,
+                            ["the_dumb_one", "the_chosen_one", "just_one", "no 
more ones!"],
+                            null,
+                            null,
+                            null
+                          ])");
+  auto expected_first =
+      ArrayFromJSON(utf8(), R"(["first_one", null, "the_dumb_one", null, null, 
null])");
+  auto expected_last =
+      ArrayFromJSON(utf8(), R"(["last_one", null, "no more ones!", null, null, 
null])");
+
+  CheckMapArrayLookupWithDifferentOptions(map_array_tweaked,
+                                          MakeScalar(type, 1).ValueOrDie(), 
expected_all,
+                                          expected_first, expected_last);
+}
+template <typename KeyType>
+class TestMapArrayLookupDecimalKeys : public ::testing ::Test {
+ protected:
+  std::shared_ptr<DataType> GetType() {

Review comment:
       Note that most tests call this `std::shared_ptr<DataType> 
type_singleton() const`.

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,169 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Result<int64_t> FindOneMapValueIndex(const Array& keys, const Scalar& 
query_key,
+                                              const int64_t start, const 
int64_t end,
+                                              const bool from_back = false) {
+    if (!from_back) {
+      for (int64_t idx = start; idx < end; ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys.GetScalar(idx));
+
+        if (key->Equals(query_key)) return idx;
+      }
+    } else {
+      for (int64_t idx = end - 1; idx >= start; --idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys.GetScalar(idx));
+
+        if (key->Equals(query_key)) return idx;
+      }
+    }
+    return -1;
+  }
+
+  static Result<std::shared_ptr<Scalar>> GetScalarOutput(KernelContext* ctx,
+                                                         const MapScalar 
map_scalar) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+    const std::shared_ptr<Scalar>& query_key = options.query_key;
+    const auto& occurrence = options.occurrence;
+
+    const auto& struct_array = checked_cast<const 
StructArray&>(*map_scalar.value);
+    const std::shared_ptr<Array> keys = struct_array.field(0);
+    const std::shared_ptr<Array> items = struct_array.field(1);
+
+    std::shared_ptr<Scalar> output;
+
+    if (occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), items->type(), &builder));
+
+      bool found_at_least_one_key = false;
+      for (int64_t idx = 0; idx < struct_array.length(); ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys->GetScalar(idx));
+
+        if (key->Equals(*query_key)) {
+          found_at_least_one_key = true;
+          RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), idx, 1));
+        }
+      }
+      if (!found_at_least_one_key) {
+        output = MakeNullScalar(list(items->type()));
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+        ARROW_ASSIGN_OR_RAISE(output, MakeScalar(list(items->type()), result));
+      }
+    }
+
+    else { /* occurrence == FIRST || LAST */
+      bool from_back = (occurrence == MapArrayLookupOptions::LAST);
+
+      ARROW_ASSIGN_OR_RAISE(
+          int64_t key_match_idx,
+          FindOneMapValueIndex(*keys, *query_key, 0, struct_array.length(), 
from_back));
+      if (key_match_idx != -1) {
+        ARROW_ASSIGN_OR_RAISE(output, items->GetScalar(key_match_idx));
+      } else {
+        output = MakeNullScalar(items->type());
+      }
+    }
+    return output;
+  }
+
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::unique_ptr<ArrayBuilder> builder;
+    if (options.occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(),
+                                list(map_array.map_type()->item_type()), 
&builder));
+    } else {
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+    }
+    for (int64_t map_array_idx = 0; map_array_idx < map_array.length(); 
++map_array_idx) {
+      if (!map_array.IsValid(map_array_idx)) {
+        RETURN_NOT_OK(builder->AppendNull());
+        continue;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> scalar,
+                              map_array.GetScalar(map_array_idx));
+        auto map_scalar = std::static_pointer_cast<MapScalar>(scalar);
+        ARROW_ASSIGN_OR_RAISE(auto scalar_output, GetScalarOutput(ctx, 
*map_scalar));
+        RETURN_NOT_OK(builder->AppendScalar(*scalar_output));
+      }
+    }
+    ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+    out->value = result->data();
+    return Status::OK();
+  }
+
+  static Status ExecMapScalar(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+    std::shared_ptr<DataType> item_type =
+        checked_cast<const MapType&>(*batch[0].type()).item_type();
+
+    const auto& map_scalar = batch[0].scalar_as<MapScalar>();
+
+    if (ARROW_PREDICT_FALSE(!map_scalar.is_valid)) {
+      if (options.occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+        out->value = MakeNullScalar(list(item_type));
+      } else {
+        out->value = MakeNullScalar(item_type);
+      }
+      return Status::OK();
+    }
+
+    ARROW_ASSIGN_OR_RAISE(out->value, GetScalarOutput(ctx, map_scalar));
+    return Status::OK();
+  }
+};
+
+Result<ValueDescr> ResolveMapArrayLookupType(KernelContext* ctx,
+                                             const std::vector<ValueDescr>& 
descrs) {
+  const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+  std::shared_ptr<DataType> type = descrs.front().type;
+  std::shared_ptr<DataType> item_type = checked_cast<const 
MapType&>(*type).item_type();
+  std::shared_ptr<DataType> key_type = checked_cast<const 
MapType&>(*type).key_type();
+
+  if (!options.query_key || !options.query_key->type ||
+      !options.query_key->type->Equals(key_type)) {
+    return Status::TypeError(
+        "map_array_lookup: query_key type and MapArray key_type don't match. 
Expected "
+        "type: ",
+        *item_type, ", but got type: ", *options.query_key->type);
+  }
+
+  if (options.occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+    return ValueDescr(list(item_type), descrs.front().shape);
+  } else /* occurrence == FIRST || LAST */ {
+    return ValueDescr(item_type, descrs.front().shape);
+  }
+}
+
+void AddMapArrayLookupKernels(ScalarFunction* func) {
+  for (const auto shape : {ValueDescr::ARRAY, ValueDescr::SCALAR}) {
+    ScalarKernel kernel({InputType(Type::MAP, shape)},
+                        OutputType(ResolveMapArrayLookupType),
+                        shape == ValueDescr::ARRAY ? 
MapArrayLookupFunctor::ExecMapArray
+                                                   : 
MapArrayLookupFunctor::ExecMapScalar,

Review comment:
       In order to use `VisitArrayValuesInline` this will have to change since 
you need to template `MapArrayLookupFunctor` with a `template <typename Type>`. 
Normally we would register one kernel for each supported type here, however, 
that breaks down because the input type needs to be parametric and we don't 
support that.
   
   We could make a custom type mapper, however, instead, I think we can do the 
following and use a visitor
   
   ```
   ScalarKernel kernel(InputType(Type::MAP), ..., ResolveMapArrayLookup::Exec);
   
   struct ResolveMapArrayLookup {
     KernelContext* ctx;
     const ExecBatch& batch;
     Datum* out;
   
     template <typename Type>
     Status Execute() {
       if (batch[0].kind() == Datum::SCALAR) {
         return MapArrayLookupFunctor::ExecMapScalar(ctx, batch, out);
       }
       return MapArrayLookupFunctor::ExecMapArray(ctx, batch, out)
     }
   
     template <typename Type>
     enable_if_physical_integer<Type, Status> Visit(const Type& type) {
       return Execute<Type>();
     }
   
     // ...
   
     Status Visit(const DataType& type) { return Status::TypeError(...); }
   
     static Status Exec(...) {
       ResolveMapArrayLookup visitor{ctx, batch, out};
       return VisitTypeInline(checked_cast<const MapType&>(*batch[0].type()), 
*this);
     }
   };
   ```
   Another visitor example: 
https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc#L332
   
   The custom type mapper would look something like this, I think:
   
https://github.com/apache/arrow/blob/8d0efd98bd4c4f2d181e01c02a66f924cb3e4c94/cpp/src/arrow/compute/kernel.cc#L176
   
   You would define a custom TypeMatcher that checks the map key type, then 
register it as follows
   
   ```
   for (const auto& type : IntTypes()) {
     ScalarKernel kernel({InputType(MapKeyTypeMatcher(type)), ..., 
GeneratePhysicalNumeric<MapArrayLookupFunctor>(type));
   }
   ```
   
   (I think I gave examples of this earlier)

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,169 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Result<int64_t> FindOneMapValueIndex(const Array& keys, const Scalar& 
query_key,
+                                              const int64_t start, const 
int64_t end,
+                                              const bool from_back = false) {
+    if (!from_back) {
+      for (int64_t idx = start; idx < end; ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys.GetScalar(idx));
+
+        if (key->Equals(query_key)) return idx;
+      }
+    } else {
+      for (int64_t idx = end - 1; idx >= start; --idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys.GetScalar(idx));
+
+        if (key->Equals(query_key)) return idx;
+      }
+    }
+    return -1;
+  }
+
+  static Result<std::shared_ptr<Scalar>> GetScalarOutput(KernelContext* ctx,
+                                                         const MapScalar 
map_scalar) {

Review comment:
       Here perhaps: 
https://github.com/apache/arrow/blob/91e3ac53e2e21736ce6291d73fc37da6fa21259d/cpp/src/arrow/compute/kernels/scalar_if_else.cc#L2565

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested_test.cc
##########
@@ -225,6 +225,294 @@ TEST(TestScalarNested, StructField) {
   }
 }
 
+void CheckMapArrayLookupWithDifferentOptions(
+    const std::shared_ptr<Array>& map, const std::shared_ptr<Scalar>& 
query_key,
+    const std::shared_ptr<Array>& expected_all,
+    const std::shared_ptr<Array>& expected_first,
+    const std::shared_ptr<Array>& expected_last) {
+  MapArrayLookupOptions all_matches(query_key, MapArrayLookupOptions::ALL);
+  MapArrayLookupOptions first_matches(query_key, MapArrayLookupOptions::FIRST);
+  MapArrayLookupOptions last_matches(query_key, MapArrayLookupOptions::LAST);
+
+  CheckScalar("map_array_lookup", {map}, expected_all, &all_matches);
+  CheckScalar("map_array_lookup", {map}, expected_first, &first_matches);
+  CheckScalar("map_array_lookup", {map}, expected_last, &last_matches);
+}
+
+class TestMapArrayLookupKernel : public ::testing::Test {};
+
+TEST_F(TestMapArrayLookupKernel, Basic) {
+  auto type = map(utf8(), int32());
+  const char* input = R"(
+    [
+      [["foo", 99], ["bar", 1], ["hello", 2], ["foo", 3], ["lets go", 5], 
["what now?", 8]],
+      null,
+      [["nothing", null], ["hat", null], ["foo", 101], ["sorry", 1], ["dip", 
null],
+      ["foo", 22]],
+      []
+    ])";
+  auto map_array = ArrayFromJSON(type, input);
+  CheckMapArrayLookupWithDifferentOptions(
+      map_array, MakeScalar("foo"),
+      ArrayFromJSON(list(int32()), R"([[99, 3], null, [101, 22], null])"),
+      ArrayFromJSON(int32(), R"([99, null, 101, null])"),
+      ArrayFromJSON(int32(), R"([3, null, 22, null])"));
+}
+
+TEST_F(TestMapArrayLookupKernel, NestedItems) {
+  auto type = map(utf8(), map(int16(), int16()));
+  const char* input = R"(
+    [
+      [
+        [
+          "just",
+          [[0, 0], [1, 1]]
+        ],
+        [
+          "random",
+          [[2, 2], [3, 3]]
+        ],
+        [
+          "foo",
+          [[4, 4], [5, 5]]
+        ],
+        [
+          "values",
+          [[6, 6], [7, 7]]
+        ],
+        [
+          "foo",
+          [[8, 8], [9, 9]]
+        ],
+        [
+          "point",
+          [[10, 10], [11, 11]]
+        ],
+        [
+          "foo",
+          [[12, 12], [13, 13]]
+        ]
+      ],
+      null,
+      [
+        [
+          "yet",
+          [[0, 1], [1, 2]]
+        ],
+        [
+          "more",
+          [[2, 3], [3, 4]]
+        ],
+        [
+          "foo",
+          [[4, 5], [5, 6]]
+        ],
+        [
+          "random",
+          [[6, 7], [7, 8]]
+        ],
+        [
+          "foo",
+          [[8, 9], [9, 10]]
+        ],
+        [
+          "values",
+          [[10, 11], [11, 12]]
+        ],
+        [
+          "foo",
+          [[12, 13], [13, 14]]
+        ]
+      ],
+      []
+    ]
+  )";
+  const auto map_array = ArrayFromJSON(type, input);
+  const auto expected_all = ArrayFromJSON(list(map(int16(), int16())), R"(
+                                [
+                                  [
+                                    [[4, 4], [5, 5]], [[8, 8], [9, 9]],
+                                    [[12, 12], [13, 13]]
+                                  ],
+                                  null,
+                                  [
+                                    [[4, 5], [5, 6]], [[8, 9], [9, 10]],
+                                    [[12, 13], [13, 14]]
+                                  ],
+                                  null
+                                ])");
+  const auto expected_first = ArrayFromJSON(map(int16(), int16()), R"(
+                                [
+                                  [[4, 4], [5, 5]],
+                                  null,
+                                  [[4, 5], [5, 6]],
+                                  null
+                                ])");
+  const auto expected_last = ArrayFromJSON(map(int16(), int16()), R"(
+                                [
+                                  [[12, 12], [13, 13]],
+                                  null,
+                                  [[12, 13], [13, 14]],
+                                  null
+                                ])");
+  CheckMapArrayLookupWithDifferentOptions(map_array, MakeScalar("foo"), 
expected_all,
+                                          expected_first, expected_last);
+}
+
+template <typename KeyType>
+class TestMapArrayLookupIntegralKeys : public ::testing ::Test {};
+
+TYPED_TEST_SUITE(TestMapArrayLookupIntegralKeys, PhysicalIntegralArrowTypes);
+
+TYPED_TEST(TestMapArrayLookupIntegralKeys, StringItems) {
+  std::shared_ptr<DataType> type = default_type_instance<TypeParam>();
+
+  auto map_type = map(type, utf8());

Review comment:
       This could be refactored into a `type_singleton()` method for 
consistency with below.

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,169 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Result<int64_t> FindOneMapValueIndex(const Array& keys, const Scalar& 
query_key,
+                                              const int64_t start, const 
int64_t end,
+                                              const bool from_back = false) {
+    if (!from_back) {
+      for (int64_t idx = start; idx < end; ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys.GetScalar(idx));
+
+        if (key->Equals(query_key)) return idx;
+      }
+    } else {
+      for (int64_t idx = end - 1; idx >= start; --idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys.GetScalar(idx));
+
+        if (key->Equals(query_key)) return idx;
+      }
+    }
+    return -1;
+  }
+
+  static Result<std::shared_ptr<Scalar>> GetScalarOutput(KernelContext* ctx,
+                                                         const MapScalar 
map_scalar) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+    const std::shared_ptr<Scalar>& query_key = options.query_key;
+    const auto& occurrence = options.occurrence;
+
+    const auto& struct_array = checked_cast<const 
StructArray&>(*map_scalar.value);
+    const std::shared_ptr<Array> keys = struct_array.field(0);
+    const std::shared_ptr<Array> items = struct_array.field(1);
+
+    std::shared_ptr<Scalar> output;
+
+    if (occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), items->type(), &builder));
+
+      bool found_at_least_one_key = false;
+      for (int64_t idx = 0; idx < struct_array.length(); ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys->GetScalar(idx));
+
+        if (key->Equals(*query_key)) {
+          found_at_least_one_key = true;
+          RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), idx, 1));
+        }
+      }
+      if (!found_at_least_one_key) {
+        output = MakeNullScalar(list(items->type()));
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+        ARROW_ASSIGN_OR_RAISE(output, MakeScalar(list(items->type()), result));
+      }
+    }
+
+    else { /* occurrence == FIRST || LAST */
+      bool from_back = (occurrence == MapArrayLookupOptions::LAST);
+
+      ARROW_ASSIGN_OR_RAISE(
+          int64_t key_match_idx,
+          FindOneMapValueIndex(*keys, *query_key, 0, struct_array.length(), 
from_back));
+      if (key_match_idx != -1) {
+        ARROW_ASSIGN_OR_RAISE(output, items->GetScalar(key_match_idx));
+      } else {
+        output = MakeNullScalar(items->type());
+      }
+    }
+    return output;
+  }
+
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::unique_ptr<ArrayBuilder> builder;
+    if (options.occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(),
+                                list(map_array.map_type()->item_type()), 
&builder));
+    } else {
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+    }
+    for (int64_t map_array_idx = 0; map_array_idx < map_array.length(); 
++map_array_idx) {
+      if (!map_array.IsValid(map_array_idx)) {
+        RETURN_NOT_OK(builder->AppendNull());
+        continue;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> scalar,
+                              map_array.GetScalar(map_array_idx));
+        auto map_scalar = std::static_pointer_cast<MapScalar>(scalar);
+        ARROW_ASSIGN_OR_RAISE(auto scalar_output, GetScalarOutput(ctx, 
*map_scalar));
+        RETURN_NOT_OK(builder->AppendScalar(*scalar_output));

Review comment:
       This still does unnecessary boxing/unboxing, you could improve it 
further if `FindOneMapValueIndex` took `const Array& keys, int64_t start`.

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested_test.cc
##########
@@ -225,6 +225,294 @@ TEST(TestScalarNested, StructField) {
   }
 }
 
+void CheckMapArrayLookupWithDifferentOptions(
+    const std::shared_ptr<Array>& map, const std::shared_ptr<Scalar>& 
query_key,
+    const std::shared_ptr<Array>& expected_all,
+    const std::shared_ptr<Array>& expected_first,
+    const std::shared_ptr<Array>& expected_last) {
+  MapArrayLookupOptions all_matches(query_key, MapArrayLookupOptions::ALL);
+  MapArrayLookupOptions first_matches(query_key, MapArrayLookupOptions::FIRST);
+  MapArrayLookupOptions last_matches(query_key, MapArrayLookupOptions::LAST);
+
+  CheckScalar("map_array_lookup", {map}, expected_all, &all_matches);
+  CheckScalar("map_array_lookup", {map}, expected_first, &first_matches);
+  CheckScalar("map_array_lookup", {map}, expected_last, &last_matches);
+}
+
+class TestMapArrayLookupKernel : public ::testing::Test {};
+
+TEST_F(TestMapArrayLookupKernel, Basic) {
+  auto type = map(utf8(), int32());
+  const char* input = R"(
+    [
+      [["foo", 99], ["bar", 1], ["hello", 2], ["foo", 3], ["lets go", 5], 
["what now?", 8]],
+      null,
+      [["nothing", null], ["hat", null], ["foo", 101], ["sorry", 1], ["dip", 
null],
+      ["foo", 22]],
+      []
+    ])";
+  auto map_array = ArrayFromJSON(type, input);
+  CheckMapArrayLookupWithDifferentOptions(
+      map_array, MakeScalar("foo"),
+      ArrayFromJSON(list(int32()), R"([[99, 3], null, [101, 22], null])"),
+      ArrayFromJSON(int32(), R"([99, null, 101, null])"),
+      ArrayFromJSON(int32(), R"([3, null, 22, null])"));
+}
+
+TEST_F(TestMapArrayLookupKernel, NestedItems) {

Review comment:
       We probably don't need `Basic` or `NestedItems` anymore but we could 
keep `TestMapArrayLookupKernel` for testing boolean, fixed-size string, and 
month-day-nano-interval key types. (Also, you can just use `TEST` instead of 
`TEST_F` and remove the `TestMapArrayLookupKernel` class here.)

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,169 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Result<int64_t> FindOneMapValueIndex(const Array& keys, const Scalar& 
query_key,
+                                              const int64_t start, const 
int64_t end,
+                                              const bool from_back = false) {
+    if (!from_back) {
+      for (int64_t idx = start; idx < end; ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys.GetScalar(idx));
+
+        if (key->Equals(query_key)) return idx;
+      }
+    } else {
+      for (int64_t idx = end - 1; idx >= start; --idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys.GetScalar(idx));
+
+        if (key->Equals(query_key)) return idx;
+      }
+    }
+    return -1;
+  }
+
+  static Result<std::shared_ptr<Scalar>> GetScalarOutput(KernelContext* ctx,
+                                                         const MapScalar 
map_scalar) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+    const std::shared_ptr<Scalar>& query_key = options.query_key;
+    const auto& occurrence = options.occurrence;
+
+    const auto& struct_array = checked_cast<const 
StructArray&>(*map_scalar.value);
+    const std::shared_ptr<Array> keys = struct_array.field(0);
+    const std::shared_ptr<Array> items = struct_array.field(1);
+
+    std::shared_ptr<Scalar> output;
+
+    if (occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), items->type(), &builder));
+
+      bool found_at_least_one_key = false;
+      for (int64_t idx = 0; idx < struct_array.length(); ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys->GetScalar(idx));
+
+        if (key->Equals(*query_key)) {
+          found_at_least_one_key = true;
+          RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), idx, 1));
+        }
+      }
+      if (!found_at_least_one_key) {
+        output = MakeNullScalar(list(items->type()));
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+        ARROW_ASSIGN_OR_RAISE(output, MakeScalar(list(items->type()), result));
+      }
+    }
+
+    else { /* occurrence == FIRST || LAST */
+      bool from_back = (occurrence == MapArrayLookupOptions::LAST);
+
+      ARROW_ASSIGN_OR_RAISE(
+          int64_t key_match_idx,
+          FindOneMapValueIndex(*keys, *query_key, 0, struct_array.length(), 
from_back));
+      if (key_match_idx != -1) {
+        ARROW_ASSIGN_OR_RAISE(output, items->GetScalar(key_match_idx));
+      } else {
+        output = MakeNullScalar(items->type());
+      }
+    }
+    return output;
+  }
+
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::unique_ptr<ArrayBuilder> builder;
+    if (options.occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(),
+                                list(map_array.map_type()->item_type()), 
&builder));
+    } else {
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+    }
+    for (int64_t map_array_idx = 0; map_array_idx < map_array.length(); 
++map_array_idx) {
+      if (!map_array.IsValid(map_array_idx)) {
+        RETURN_NOT_OK(builder->AppendNull());
+        continue;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> scalar,
+                              map_array.GetScalar(map_array_idx));
+        auto map_scalar = std::static_pointer_cast<MapScalar>(scalar);
+        ARROW_ASSIGN_OR_RAISE(auto scalar_output, GetScalarOutput(ctx, 
*map_scalar));
+        RETURN_NOT_OK(builder->AppendScalar(*scalar_output));

Review comment:
       And then you can inline `ExecMapScalar` back into the scalar case. The 
point of this is to avoid boxing/unboxing individual values into scalars.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to