lidavidm commented on a change in pull request #12162:
URL: https://github.com/apache/arrow/pull/12162#discussion_r786775060



##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,274 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::shared_ptr<arrow::Array> keys = map_array.keys();
+    std::shared_ptr<arrow::Array> items = map_array.items();
+
+    const auto& query_key = options.query_key;
+    const auto& occurence = options.occurrence;
+
+    if (occurence == MapArrayLookupOptions::Occurrence::FIRST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            found_one_key = true;
+            RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), 
key_idx_to_check, 1));
+            break;
+          }
+        }
+        if (!found_one_key) {
+          RETURN_NOT_OK(builder->AppendNull());
+        }
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+
+    } else if (occurence == MapArrayLookupOptions::Occurrence::LAST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      int32_t last_key_idx_match;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        last_key_idx_match = -1;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            last_key_idx_match = key_idx_to_check;
+          }
+        }
+        if (last_key_idx_match == -1) {
+          RETURN_NOT_OK(builder->AppendNull());
+        } else {
+          RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), 
last_key_idx_match, 1));
+        }
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+
+    } else /* occurrence == MapArrayLookupOptions::Occurrence::All) */ {
+      std::unique_ptr<ArrayBuilder> builder;
+      std::unique_ptr<ArrayBuilder> list_builder;
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(),
+                                list(map_array.map_type()->item_type()), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        if (list_struct_len > 0) {
+          RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), 
map_array.map_type()->item_type(),
+                                    &list_builder));
+        }
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            found_one_key = true;
+            RETURN_NOT_OK(
+                list_builder->AppendArraySlice(*items->data(), 
key_idx_to_check, 1));
+          }
+        }
+        if (!found_one_key) {
+          RETURN_NOT_OK(builder->AppendNull());
+        } else {
+          ARROW_ASSIGN_OR_RAISE(auto list_result, list_builder->Finish());
+          RETURN_NOT_OK(builder->AppendScalar(ListScalar(list_result)));
+        }
+        list_builder->Reset();
+
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+    }
+    return Status::OK();
+  }
+
+  static Status ExecMapScalar(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const auto& map_scalar = batch[0].scalar_as<MapScalar>();
+    const auto& struct_array = checked_cast<const 
StructArray&>(*map_scalar.value);
+    const std::shared_ptr<Array> keys = struct_array.field(0);
+    const std::shared_ptr<Array> items = struct_array.field(1);
+
+    if (ARROW_PREDICT_FALSE(!map_scalar.is_valid)) {
+      if (options.occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+        out->value = MakeNullScalar(list(items->type()));
+      } else {
+        out->value = MakeNullScalar(items->type());
+      }
+      return Status::OK();
+    }
+
+    const std::shared_ptr<Scalar> query_key = options.query_key;
+    const auto& occurrence = options.occurrence;
+
+    // for debugging
+    ARROW_LOG(WARNING) << keys->ToString();
+    ARROW_LOG(WARNING) << items->ToString();
+    ARROW_LOG(WARNING) << "Input: " << struct_array.ToString();
+
+    if (occurrence == MapArrayLookupOptions::Occurrence::FIRST) {
+      for (int32_t idx = 0; idx < struct_array.length(); ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys->GetScalar(idx));
+
+        if (key->Equals(*query_key)) {
+          ARROW_ASSIGN_OR_RAISE(auto result, items->GetScalar(idx));
+          out->value = result;

Review comment:
       ```suggestion
             ARROW_ASSIGN_OR_RAISE(out->value, items->GetScalar(idx));
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,274 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::shared_ptr<arrow::Array> keys = map_array.keys();
+    std::shared_ptr<arrow::Array> items = map_array.items();
+
+    const auto& query_key = options.query_key;
+    const auto& occurence = options.occurrence;
+
+    if (occurence == MapArrayLookupOptions::Occurrence::FIRST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;

Review comment:
       Instead of declaring these up here, why not just declare them inside the 
loop?

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,274 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::shared_ptr<arrow::Array> keys = map_array.keys();
+    std::shared_ptr<arrow::Array> items = map_array.items();
+
+    const auto& query_key = options.query_key;
+    const auto& occurence = options.occurrence;
+
+    if (occurence == MapArrayLookupOptions::Occurrence::FIRST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            found_one_key = true;
+            RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), 
key_idx_to_check, 1));
+            break;
+          }
+        }
+        if (!found_one_key) {
+          RETURN_NOT_OK(builder->AppendNull());
+        }
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+
+    } else if (occurence == MapArrayLookupOptions::Occurrence::LAST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      int32_t last_key_idx_match;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        last_key_idx_match = -1;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            last_key_idx_match = key_idx_to_check;
+          }
+        }
+        if (last_key_idx_match == -1) {
+          RETURN_NOT_OK(builder->AppendNull());
+        } else {
+          RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), 
last_key_idx_match, 1));
+        }
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+
+    } else /* occurrence == MapArrayLookupOptions::Occurrence::All) */ {
+      std::unique_ptr<ArrayBuilder> builder;
+      std::unique_ptr<ArrayBuilder> list_builder;
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(),
+                                list(map_array.map_type()->item_type()), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        if (list_struct_len > 0) {
+          RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), 
map_array.map_type()->item_type(),
+                                    &list_builder));
+        }
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            found_one_key = true;
+            RETURN_NOT_OK(
+                list_builder->AppendArraySlice(*items->data(), 
key_idx_to_check, 1));
+          }
+        }
+        if (!found_one_key) {
+          RETURN_NOT_OK(builder->AppendNull());
+        } else {
+          ARROW_ASSIGN_OR_RAISE(auto list_result, list_builder->Finish());
+          RETURN_NOT_OK(builder->AppendScalar(ListScalar(list_result)));
+        }
+        list_builder->Reset();
+
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+    }
+    return Status::OK();
+  }
+
+  static Status ExecMapScalar(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const auto& map_scalar = batch[0].scalar_as<MapScalar>();
+    const auto& struct_array = checked_cast<const 
StructArray&>(*map_scalar.value);
+    const std::shared_ptr<Array> keys = struct_array.field(0);
+    const std::shared_ptr<Array> items = struct_array.field(1);
+
+    if (ARROW_PREDICT_FALSE(!map_scalar.is_valid)) {
+      if (options.occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+        out->value = MakeNullScalar(list(items->type()));
+      } else {
+        out->value = MakeNullScalar(items->type());
+      }
+      return Status::OK();
+    }
+
+    const std::shared_ptr<Scalar> query_key = options.query_key;
+    const auto& occurrence = options.occurrence;
+
+    // for debugging
+    ARROW_LOG(WARNING) << keys->ToString();
+    ARROW_LOG(WARNING) << items->ToString();
+    ARROW_LOG(WARNING) << "Input: " << struct_array.ToString();
+
+    if (occurrence == MapArrayLookupOptions::Occurrence::FIRST) {
+      for (int32_t idx = 0; idx < struct_array.length(); ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys->GetScalar(idx));
+
+        if (key->Equals(*query_key)) {
+          ARROW_ASSIGN_OR_RAISE(auto result, items->GetScalar(idx));
+          out->value = result;
+          return Status::OK();
+        }
+      }
+      out->value = MakeNullScalar(items->type());
+    }
+
+    else if (occurrence == MapArrayLookupOptions::Occurrence::LAST) {
+      int32_t last_key_idx_match = -1;
+
+      for (int32_t idx = 0; idx < struct_array.length(); ++idx) {

Review comment:
       Why not just iterate backwards?

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,274 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::shared_ptr<arrow::Array> keys = map_array.keys();
+    std::shared_ptr<arrow::Array> items = map_array.items();
+
+    const auto& query_key = options.query_key;
+    const auto& occurence = options.occurrence;
+
+    if (occurence == MapArrayLookupOptions::Occurrence::FIRST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            found_one_key = true;
+            RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), 
key_idx_to_check, 1));
+            break;
+          }
+        }
+        if (!found_one_key) {
+          RETURN_NOT_OK(builder->AppendNull());
+        }

Review comment:
       The inner loop logic is very similar to the scalar logic; you could 
probably pull out a helper function that works in terms of indices and array 
offsets, and that way you could share most of the logic between the two cases.
   
   `Result<int64_t> FindOneMapValue(const Array& keys, const Array& values, 
const Scalar& key, int64_t start_offset, int64_t slot_length)` or so

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested_test.cc
##########
@@ -225,6 +225,56 @@ TEST(TestScalarNested, StructField) {
   }
 }
 
+TEST(TestScalarNested, MapArrayLookupNonRecursive) {

Review comment:
       Note that CheckScalar calls CheckScalarNonRecursive for you so this test 
isn't necessary once the one below is fixed.

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested_test.cc
##########
@@ -225,6 +225,56 @@ TEST(TestScalarNested, StructField) {
   }
 }
 
+TEST(TestScalarNested, MapArrayLookupNonRecursive) {
+  MapArrayLookupOptions foo_all(MakeScalar("foo"), MapArrayLookupOptions::ALL);
+  MapArrayLookupOptions foo_first(MakeScalar("foo"), 
MapArrayLookupOptions::FIRST);
+  MapArrayLookupOptions foo_last(MakeScalar("foo"), 
MapArrayLookupOptions::LAST);
+
+  auto type = map(utf8(), int32());
+  const char* input = R"(
+[
+    [["foo", 99], ["bar", 1], ["hello", 2], ["foo", 3], ["lesgo", 5], 
["whatnow", 8]],
+    null,
+    [["nothing", null], ["hat", null], ["foo", 101], ["sorry", 1], ["dip", 
null], ["foo", 22]],
+    []
+  ]

Review comment:
       nit, but try to keep the indent lined up here?

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,274 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::shared_ptr<arrow::Array> keys = map_array.keys();
+    std::shared_ptr<arrow::Array> items = map_array.items();
+
+    const auto& query_key = options.query_key;
+    const auto& occurence = options.occurrence;
+
+    if (occurence == MapArrayLookupOptions::Occurrence::FIRST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            found_one_key = true;
+            RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), 
key_idx_to_check, 1));
+            break;
+          }
+        }
+        if (!found_one_key) {
+          RETURN_NOT_OK(builder->AppendNull());
+        }
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+
+    } else if (occurence == MapArrayLookupOptions::Occurrence::LAST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      int32_t last_key_idx_match;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        last_key_idx_match = -1;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            last_key_idx_match = key_idx_to_check;
+          }
+        }
+        if (last_key_idx_match == -1) {
+          RETURN_NOT_OK(builder->AppendNull());
+        } else {
+          RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), 
last_key_idx_match, 1));
+        }
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+
+    } else /* occurrence == MapArrayLookupOptions::Occurrence::All) */ {
+      std::unique_ptr<ArrayBuilder> builder;
+      std::unique_ptr<ArrayBuilder> list_builder;
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(),
+                                list(map_array.map_type()->item_type()), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        if (list_struct_len > 0) {
+          RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), 
map_array.map_type()->item_type(),
+                                    &list_builder));
+        }
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            found_one_key = true;
+            RETURN_NOT_OK(
+                list_builder->AppendArraySlice(*items->data(), 
key_idx_to_check, 1));
+          }
+        }
+        if (!found_one_key) {
+          RETURN_NOT_OK(builder->AppendNull());
+        } else {
+          ARROW_ASSIGN_OR_RAISE(auto list_result, list_builder->Finish());
+          RETURN_NOT_OK(builder->AppendScalar(ListScalar(list_result)));
+        }
+        list_builder->Reset();
+
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+    }
+    return Status::OK();
+  }
+
+  static Status ExecMapScalar(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const auto& map_scalar = batch[0].scalar_as<MapScalar>();
+    const auto& struct_array = checked_cast<const 
StructArray&>(*map_scalar.value);
+    const std::shared_ptr<Array> keys = struct_array.field(0);
+    const std::shared_ptr<Array> items = struct_array.field(1);
+
+    if (ARROW_PREDICT_FALSE(!map_scalar.is_valid)) {
+      if (options.occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+        out->value = MakeNullScalar(list(items->type()));
+      } else {
+        out->value = MakeNullScalar(items->type());
+      }
+      return Status::OK();
+    }
+
+    const std::shared_ptr<Scalar> query_key = options.query_key;
+    const auto& occurrence = options.occurrence;
+
+    // for debugging
+    ARROW_LOG(WARNING) << keys->ToString();
+    ARROW_LOG(WARNING) << items->ToString();
+    ARROW_LOG(WARNING) << "Input: " << struct_array.ToString();
+
+    if (occurrence == MapArrayLookupOptions::Occurrence::FIRST) {
+      for (int32_t idx = 0; idx < struct_array.length(); ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys->GetScalar(idx));
+
+        if (key->Equals(*query_key)) {
+          ARROW_ASSIGN_OR_RAISE(auto result, items->GetScalar(idx));
+          out->value = result;
+          return Status::OK();
+        }
+      }
+      out->value = MakeNullScalar(items->type());
+    }
+
+    else if (occurrence == MapArrayLookupOptions::Occurrence::LAST) {
+      int32_t last_key_idx_match = -1;
+
+      for (int32_t idx = 0; idx < struct_array.length(); ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys->GetScalar(idx));
+
+        if (key->Equals(*query_key)) {
+          last_key_idx_match = idx;
+        }
+      }
+      if (last_key_idx_match == -1) {
+        out->value = MakeNullScalar(items->type());
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto result, 
items->GetScalar(last_key_idx_match));
+
+        ARROW_LOG(WARNING) << result->ToString() << " Type: " << *result->type;
+
+        out->value = result;
+      }
+
+    } else /* occurrence == MapArrayLookupOptions::Occurrence::ALL) */ {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), items->type(), &builder));
+
+      bool found_one_key = false;
+      for (int32_t idx = 0; idx < struct_array.length(); ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys->GetScalar(idx));
+
+        if (key->Equals(*query_key)) {
+          found_one_key = true;
+          RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), idx, 1));
+        }
+      }
+      if (!found_one_key) {
+        out->value = MakeNullScalar(list(items->type()));
+      } else {
+        ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+
+        ARROW_ASSIGN_OR_RAISE(auto scalar_list, 
MakeScalar(list(items->type()), result));
+        ARROW_LOG(WARNING) << "MapArrayLookup Scalar result: " << 
result->ToString()
+                           << " Type: " << *scalar_list->type;
+        out->value = scalar_list;
+      }
+    }
+
+    return Status::OK();
+  }
+};
+
+Result<ValueDescr> ResolveMapArrayLookupType(KernelContext* ctx,
+                                             const std::vector<ValueDescr>& 
descrs) {
+  const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+  std::shared_ptr<DataType> type = descrs.front().type;
+  std::shared_ptr<DataType> item_type = checked_cast<const 
MapType&>(*type).item_type();
+  std::shared_ptr<DataType> key_type = checked_cast<const 
MapType&>(*type).key_type();
+
+  if (!options.query_key->type->Equals(key_type)) {

Review comment:
       nit, but I would also check for `!options.query_key || 
!options.query_key->type` in case someone didn't initialize things properly

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested_test.cc
##########
@@ -225,6 +225,56 @@ TEST(TestScalarNested, StructField) {
   }
 }
 
+TEST(TestScalarNested, MapArrayLookupNonRecursive) {
+  MapArrayLookupOptions foo_all(MakeScalar("foo"), MapArrayLookupOptions::ALL);
+  MapArrayLookupOptions foo_first(MakeScalar("foo"), 
MapArrayLookupOptions::FIRST);
+  MapArrayLookupOptions foo_last(MakeScalar("foo"), 
MapArrayLookupOptions::LAST);
+
+  auto type = map(utf8(), int32());
+  const char* input = R"(
+[
+    [["foo", 99], ["bar", 1], ["hello", 2], ["foo", 3], ["lesgo", 5], 
["whatnow", 8]],
+    null,
+    [["nothing", null], ["hat", null], ["foo", 101], ["sorry", 1], ["dip", 
null], ["foo", 22]],
+    []
+  ]
+)";
+  auto map_array = ArrayFromJSON(type, input);
+
+  CheckScalarNonRecursive(
+      "map_array_lookup", {map_array},
+      ArrayFromJSON(list(int32()), "[[99, 3], null, [101, 22], null]"), 
&foo_all);
+  CheckScalarNonRecursive("map_array_lookup", {map_array},
+                          ArrayFromJSON(int32(), "[99, null, 101, null]"), 
&foo_first);
+  CheckScalarNonRecursive("map_array_lookup", {map_array},
+                          ArrayFromJSON(int32(), "[3, null, 22, null]"), 
&foo_last);
+}
+
+TEST(TestScalarNested, MapArrayLookup) {

Review comment:
       Have you tried using a debugger? It can show you which cast failed.

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested_test.cc
##########
@@ -225,6 +225,56 @@ TEST(TestScalarNested, StructField) {
   }
 }
 
+TEST(TestScalarNested, MapArrayLookupNonRecursive) {
+  MapArrayLookupOptions foo_all(MakeScalar("foo"), MapArrayLookupOptions::ALL);
+  MapArrayLookupOptions foo_first(MakeScalar("foo"), 
MapArrayLookupOptions::FIRST);
+  MapArrayLookupOptions foo_last(MakeScalar("foo"), 
MapArrayLookupOptions::LAST);
+
+  auto type = map(utf8(), int32());
+  const char* input = R"(
+[
+    [["foo", 99], ["bar", 1], ["hello", 2], ["foo", 3], ["lesgo", 5], 
["whatnow", 8]],
+    null,
+    [["nothing", null], ["hat", null], ["foo", 101], ["sorry", 1], ["dip", 
null], ["foo", 22]],
+    []
+  ]
+)";
+  auto map_array = ArrayFromJSON(type, input);
+
+  CheckScalarNonRecursive(
+      "map_array_lookup", {map_array},
+      ArrayFromJSON(list(int32()), "[[99, 3], null, [101, 22], null]"), 
&foo_all);
+  CheckScalarNonRecursive("map_array_lookup", {map_array},
+                          ArrayFromJSON(int32(), "[99, null, 101, null]"), 
&foo_first);
+  CheckScalarNonRecursive("map_array_lookup", {map_array},
+                          ArrayFromJSON(int32(), "[3, null, 22, null]"), 
&foo_last);
+}
+
+TEST(TestScalarNested, MapArrayLookup) {

Review comment:
       In this case it seems you need to check `map_scalar.is_valid` _before_ 
casting anything since in that case `map_scalar.value == nullptr`.

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,274 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::shared_ptr<arrow::Array> keys = map_array.keys();
+    std::shared_ptr<arrow::Array> items = map_array.items();
+
+    const auto& query_key = options.query_key;
+    const auto& occurence = options.occurrence;
+
+    if (occurence == MapArrayLookupOptions::Occurrence::FIRST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;

Review comment:
       You could check `map_array.IsValid` here and skip the inner loop 
entirely if the slot is null.

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,274 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::shared_ptr<arrow::Array> keys = map_array.keys();
+    std::shared_ptr<arrow::Array> items = map_array.items();
+
+    const auto& query_key = options.query_key;
+    const auto& occurence = options.occurrence;
+
+    if (occurence == MapArrayLookupOptions::Occurrence::FIRST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            found_one_key = true;
+            RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), 
key_idx_to_check, 1));
+            break;
+          }
+        }
+        if (!found_one_key) {
+          RETURN_NOT_OK(builder->AppendNull());
+        }
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+
+    } else if (occurence == MapArrayLookupOptions::Occurrence::LAST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      int32_t last_key_idx_match;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        last_key_idx_match = -1;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            last_key_idx_match = key_idx_to_check;
+          }
+        }
+        if (last_key_idx_match == -1) {
+          RETURN_NOT_OK(builder->AppendNull());
+        } else {
+          RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), 
last_key_idx_match, 1));
+        }
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+
+    } else /* occurrence == MapArrayLookupOptions::Occurrence::All) */ {
+      std::unique_ptr<ArrayBuilder> builder;
+      std::unique_ptr<ArrayBuilder> list_builder;
+      RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(),
+                                list(map_array.map_type()->item_type()), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        if (list_struct_len > 0) {
+          RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), 
map_array.map_type()->item_type(),
+                                    &list_builder));
+        }
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            found_one_key = true;
+            RETURN_NOT_OK(
+                list_builder->AppendArraySlice(*items->data(), 
key_idx_to_check, 1));
+          }
+        }
+        if (!found_one_key) {
+          RETURN_NOT_OK(builder->AppendNull());
+        } else {
+          ARROW_ASSIGN_OR_RAISE(auto list_result, list_builder->Finish());
+          RETURN_NOT_OK(builder->AppendScalar(ListScalar(list_result)));
+        }
+        list_builder->Reset();
+
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+    }
+    return Status::OK();
+  }
+
+  static Status ExecMapScalar(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const auto& map_scalar = batch[0].scalar_as<MapScalar>();
+    const auto& struct_array = checked_cast<const 
StructArray&>(*map_scalar.value);
+    const std::shared_ptr<Array> keys = struct_array.field(0);
+    const std::shared_ptr<Array> items = struct_array.field(1);
+
+    if (ARROW_PREDICT_FALSE(!map_scalar.is_valid)) {
+      if (options.occurrence == MapArrayLookupOptions::Occurrence::ALL) {
+        out->value = MakeNullScalar(list(items->type()));
+      } else {
+        out->value = MakeNullScalar(items->type());
+      }
+      return Status::OK();
+    }
+
+    const std::shared_ptr<Scalar> query_key = options.query_key;
+    const auto& occurrence = options.occurrence;
+
+    // for debugging
+    ARROW_LOG(WARNING) << keys->ToString();
+    ARROW_LOG(WARNING) << items->ToString();
+    ARROW_LOG(WARNING) << "Input: " << struct_array.ToString();
+
+    if (occurrence == MapArrayLookupOptions::Occurrence::FIRST) {
+      for (int32_t idx = 0; idx < struct_array.length(); ++idx) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key, 
keys->GetScalar(idx));
+
+        if (key->Equals(*query_key)) {
+          ARROW_ASSIGN_OR_RAISE(auto result, items->GetScalar(idx));
+          out->value = result;

Review comment:
       And ditto below.

##########
File path: cpp/src/arrow/compute/kernels/scalar_nested.cc
##########
@@ -429,6 +429,274 @@ const FunctionDoc make_struct_doc{"Wrap Arrays into a 
StructArray",
                                   {"*args"},
                                   "MakeStructOptions"};
 
+struct MapArrayLookupFunctor {
+  static Status ExecMapArray(KernelContext* ctx, const ExecBatch& batch, 
Datum* out) {
+    const auto& options = OptionsWrapper<MapArrayLookupOptions>::Get(ctx);
+
+    const MapArray map_array(batch[0].array());
+
+    std::shared_ptr<arrow::Array> keys = map_array.keys();
+    std::shared_ptr<arrow::Array> items = map_array.items();
+
+    const auto& query_key = options.query_key;
+    const auto& occurence = options.occurrence;
+
+    if (occurence == MapArrayLookupOptions::Occurrence::FIRST) {
+      std::unique_ptr<ArrayBuilder> builder;
+      RETURN_NOT_OK(
+          MakeBuilder(ctx->memory_pool(), map_array.map_type()->item_type(), 
&builder));
+
+      int32_t last_key_idx_checked = 0;
+
+      // aka, number of {key, value} pairs in the current map
+      int32_t list_struct_len;
+      bool found_one_key;
+
+      for (int32_t map_array_idx = 0; map_array_idx < map_array.length();
+           ++map_array_idx) {
+        // Number of Struct('s) = {key, value} in the list at the current index
+        list_struct_len = map_array.value_length(map_array_idx);
+        found_one_key = false;
+
+        for (int32_t key_idx_to_check = last_key_idx_checked;
+             key_idx_to_check < last_key_idx_checked + list_struct_len;
+             ++key_idx_to_check) {
+          ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> key,
+                                keys->GetScalar(key_idx_to_check));
+
+          if (key->Equals(*query_key)) {
+            found_one_key = true;
+            RETURN_NOT_OK(builder->AppendArraySlice(*items->data(), 
key_idx_to_check, 1));
+            break;
+          }
+        }
+        if (!found_one_key) {
+          RETURN_NOT_OK(builder->AppendNull());
+        }
+        // new index from where to start checking
+        last_key_idx_checked += list_struct_len;
+      }
+      ARROW_ASSIGN_OR_RAISE(auto result, builder->Finish());
+      out->value = result->data();
+
+    } else if (occurence == MapArrayLookupOptions::Occurrence::LAST) {

Review comment:
       It seems you could combine FIRST and LAST, you would just have to branch 
to determine the direction of iteration in the inner loop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to