This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f7bd4c3  ARROW-15288: [GLib] Add 
garrow_execute_plan_build_hash_join_node()
f7bd4c3 is described below

commit f7bd4c3904e30ec63263eab1cb59876c15f67d5a
Author: Sutou Kouhei <[email protected]>
AuthorDate: Mon Jan 10 05:46:23 2022 +0900

    ARROW-15288: [GLib] Add garrow_execute_plan_build_hash_join_node()
    
    Closes #12107 from kou/glib-join
    
    Authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 c_glib/arrow-glib/compute.cpp    | 178 ++++++++++++++++++++++++++++++++++++++-
 c_glib/arrow-glib/compute.h      |  69 +++++++++++++++
 c_glib/test/test-execute-plan.rb | 138 ++++++++++++++++++++++++------
 3 files changed, 356 insertions(+), 29 deletions(-)

diff --git a/c_glib/arrow-glib/compute.cpp b/c_glib/arrow-glib/compute.cpp
index 4ab988c..7d84e06 100644
--- a/c_glib/arrow-glib/compute.cpp
+++ b/c_glib/arrow-glib/compute.cpp
@@ -142,6 +142,8 @@ G_BEGIN_DECLS
  *
  * #GArrowSinkNodeOptions is a class to customize a sink node.
  *
+ * #GArrowHashJoinNodeOptions is a class to customize a hash join node.
+ *
  * #GArrowExecuteNode is a class to execute an operation.
  *
  * #GArrowExecutePlan is a class to execute operations.
@@ -1238,7 +1240,8 @@ 
garrow_aggregate_node_options_class_init(GArrowAggregateNodeOptionsClass *klass)
  * @n_keys: The number of @keys.
  * @error: (nullable): Return location for a #GError or %NULL.
  *
- * Returns: A newly created #GArrowAggregateNodeOptions.
+ * Returns: (nullable): A newly created #GArrowAggregateNodeOptions on success,
+ *   %NULL otherwise.
  *
  * Since: 6.0.0
  */
@@ -1382,6 +1385,143 @@ 
garrow_sink_node_options_get_reader(GArrowSinkNodeOptions *options,
 }
 
 
+G_DEFINE_TYPE(GArrowHashJoinNodeOptions,
+              garrow_hash_join_node_options,
+              GARROW_TYPE_EXECUTE_NODE_OPTIONS)
+
+static void
+garrow_hash_join_node_options_init(GArrowHashJoinNodeOptions *object)
+{
+}
+
+static void
+garrow_hash_join_node_options_class_init(GArrowHashJoinNodeOptionsClass *klass)
+{
+  /* TODO: Add left_output_prefix and right_output_prefix properties */
+}
+
+/**
+ * garrow_hash_join_node_options_new:
+ * @type: A #GArrowJoinType to be used.
+ * @left_keys: (array length=n_left_keys): Left join keys.
+ * @n_left_keys: The number of @left_keys.
+ * @right_keys: (array length=n_right_keys): Right join keys.
+ * @n_right_keys: The number of @right_keys.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: (nullable): A newly created #GArrowHashJoinNodeOptions on success,
+ *   %NULL otherwise.
+ *
+ * Since: 7.0.0
+ */
+GArrowHashJoinNodeOptions *
+garrow_hash_join_node_options_new(GArrowJoinType type,
+                                  const gchar **left_keys,
+                                  gsize n_left_keys,
+                                  const gchar **right_keys,
+                                  gsize n_right_keys,
+                                  GError **error)
+{
+  auto arrow_type = static_cast<arrow::compute::JoinType>(type);
+  std::vector<arrow::FieldRef> arrow_left_keys;
+  for (gsize i = 0; i < n_left_keys; ++i) {
+    if (!garrow_field_refs_add(arrow_left_keys,
+                               left_keys[i],
+                               error,
+                               "[hash-join-node-options][new][left-key]")) {
+      return NULL;
+    }
+  }
+  std::vector<arrow::FieldRef> arrow_right_keys;
+  for (gsize i = 0; i < n_right_keys; ++i) {
+    if (!garrow_field_refs_add(arrow_right_keys,
+                               right_keys[i],
+                               error,
+                               "[hash-join-node-options][new][right-key]")) {
+      return NULL;
+    }
+  }
+  auto arrow_options =
+    new arrow::compute::HashJoinNodeOptions(arrow_type,
+                                            std::move(arrow_left_keys),
+                                            std::move(arrow_right_keys));
+  auto options = g_object_new(GARROW_TYPE_HASH_JOIN_NODE_OPTIONS,
+                              "options", arrow_options,
+                              NULL);
+  return GARROW_HASH_JOIN_NODE_OPTIONS(options);
+}
+
+/**
+ * garrow_hash_join_node_options_set_left_outputs:
+ * @options: A #GArrowHashJoinNodeOptions.
+ * @outputs: (array length=n_outputs): Output fields.
+ * @n_outputs: The number of @outputs.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: %TRUE on success, %FALSE on error.
+ *
+ * Since: 7.0.0
+ */
+gboolean
+garrow_hash_join_node_options_set_left_outputs(
+  GArrowHashJoinNodeOptions *options,
+  const gchar **outputs,
+  gsize n_outputs,
+  GError **error)
+{
+  auto arrow_options =
+    static_cast<arrow::compute::HashJoinNodeOptions *>(
+      garrow_execute_node_options_get_raw(
+        GARROW_EXECUTE_NODE_OPTIONS(options)));
+  arrow_options->output_all = false;
+  arrow_options->left_output.clear();
+  for (gsize i = 0; i < n_outputs; ++i) {
+    if (!garrow_field_refs_add(arrow_options->left_output,
+                               outputs[i],
+                               error,
+                               "[hash-join-node-options][set-left-outputs]")) {
+      return FALSE;
+    }
+  }
+  return TRUE;
+}
+
+/**
+ * garrow_hash_join_node_options_set_right_outputs:
+ * @options: A #GArrowHashJoinNodeOptions.
+ * @outputs: (array length=n_outputs): Output fields.
+ * @n_outputs: The number of @outputs.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: %TRUE on success, %FALSE on error.
+ *
+ * Since: 7.0.0
+ */
+gboolean
+garrow_hash_join_node_options_set_right_outputs(
+  GArrowHashJoinNodeOptions *options,
+  const gchar **outputs,
+  gsize n_outputs,
+  GError **error)
+{
+  auto arrow_options =
+    static_cast<arrow::compute::HashJoinNodeOptions *>(
+      garrow_execute_node_options_get_raw(
+        GARROW_EXECUTE_NODE_OPTIONS(options)));
+  arrow_options->output_all = false;
+  arrow_options->right_output.clear();
+  for (gsize i = 0; i < n_outputs; ++i) {
+    if (!garrow_field_refs_add(arrow_options->right_output,
+                               outputs[i],
+                               error,
+                               "[hash-join-node-options][set-right-outputs]")) 
{
+      return FALSE;
+    }
+  }
+  return TRUE;
+}
+
+
 typedef struct GArrowExecuteNodePrivate_ {
   arrow::compute::ExecNode *node;
 } GArrowExecuteNodePrivate;
@@ -1694,6 +1834,42 @@ garrow_execute_plan_build_sink_node(GArrowExecutePlan 
*plan,
 }
 
 /**
+ * garrow_execute_plan_build_hash_join_node:
+ * @plan: A #GArrowExecutePlan.
+ * @left: A left #GArrowExecuteNode.
+ * @right: A right #GArrowExecuteNode.
+ * @options: A #GArrowHashJoinNodeOptions.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * This is a shortcut of garrow_execute_plan_build_node() for hash
+ * join node.
+ *
+ * Returns: (transfer full): A newly built and added #GArrowExecuteNode
+ *   for hash join on success, %NULL on error.
+ *
+ * Since: 7.0.0
+ */
+GArrowExecuteNode *
+garrow_execute_plan_build_hash_join_node(GArrowExecutePlan *plan,
+                                         GArrowExecuteNode *left,
+                                         GArrowExecuteNode *right,
+                                         GArrowHashJoinNodeOptions *options,
+                                         GError **error)
+{
+  GList *inputs = NULL;
+  inputs = g_list_append(inputs, left);
+  inputs = g_list_append(inputs, right);
+  auto node =
+    garrow_execute_plan_build_node(plan,
+                                   "hashjoin",
+                                   inputs,
+                                   GARROW_EXECUTE_NODE_OPTIONS(options),
+                                   error);
+  g_list_free(inputs);
+  return node;
+}
+
+/**
  * garrow_execute_plan_validate:
  * @plan: A #GArrowExecutePlan.
  * @error: (nullable): Return location for a #GError or %NULL.
diff --git a/c_glib/arrow-glib/compute.h b/c_glib/arrow-glib/compute.h
index 88a7d40..a867f34 100644
--- a/c_glib/arrow-glib/compute.h
+++ b/c_glib/arrow-glib/compute.h
@@ -224,6 +224,68 @@ garrow_sink_node_options_get_reader(GArrowSinkNodeOptions 
*options,
                                     GArrowSchema *schema);
 
 
+/**
+ * GArrowJoinType:
+ * @GARROW_JOIN_TYPE_LEFT_SEMI:
+ * @GARROW_JOIN_TYPE_RIGHT_SEMI:
+ * @GARROW_JOIN_TYPE_LEFT_ANTI:
+ * @GARROW_JOIN_TYPE_RIGHT_ANTI:
+ * @GARROW_JOIN_TYPE_INNER:
+ * @GARROW_JOIN_TYPE_LEFT_OUTER:
+ * @GARROW_JOIN_TYPE_RIGHT_OUTER:
+ * @GARROW_JOIN_TYPE_FULL_OUTER:
+ *
+ * They correspond to the values of `arrow::compute::JoinType`.
+ *
+ * Since: 7.0.0
+ */
+typedef enum {
+  GARROW_JOIN_TYPE_LEFT_SEMI,
+  GARROW_JOIN_TYPE_RIGHT_SEMI,
+  GARROW_JOIN_TYPE_LEFT_ANTI,
+  GARROW_JOIN_TYPE_RIGHT_ANTI,
+  GARROW_JOIN_TYPE_INNER,
+  GARROW_JOIN_TYPE_LEFT_OUTER,
+  GARROW_JOIN_TYPE_RIGHT_OUTER,
+  GARROW_JOIN_TYPE_FULL_OUTER,
+} GArrowJoinType;
+
+#define GARROW_TYPE_HASH_JOIN_NODE_OPTIONS      \
+  (garrow_hash_join_node_options_get_type())
+G_DECLARE_DERIVABLE_TYPE(GArrowHashJoinNodeOptions,
+                         garrow_hash_join_node_options,
+                         GARROW,
+                         HASH_JOIN_NODE_OPTIONS,
+                         GArrowExecuteNodeOptions)
+struct _GArrowHashJoinNodeOptionsClass
+{
+  GArrowExecuteNodeOptionsClass parent_class;
+};
+
+GARROW_AVAILABLE_IN_7_0
+GArrowHashJoinNodeOptions *
+garrow_hash_join_node_options_new(GArrowJoinType type,
+                                  const gchar **left_keys,
+                                  gsize n_left_keys,
+                                  const gchar **right_keys,
+                                  gsize n_right_keys,
+                                  GError **error);
+GARROW_AVAILABLE_IN_7_0
+gboolean
+garrow_hash_join_node_options_set_left_outputs(
+  GArrowHashJoinNodeOptions *options,
+  const gchar **outputs,
+  gsize n_outputs,
+  GError **error);
+GARROW_AVAILABLE_IN_7_0
+gboolean
+garrow_hash_join_node_options_set_right_outputs(
+  GArrowHashJoinNodeOptions *options,
+  const gchar **outputs,
+  gsize n_outputs,
+  GError **error);
+
+
 #define GARROW_TYPE_EXECUTE_NODE (garrow_execute_node_get_type())
 G_DECLARE_DERIVABLE_TYPE(GArrowExecuteNode,
                          garrow_execute_node,
@@ -281,6 +343,13 @@ garrow_execute_plan_build_sink_node(GArrowExecutePlan 
*plan,
                                     GArrowExecuteNode *input,
                                     GArrowSinkNodeOptions *options,
                                     GError **error);
+GARROW_AVAILABLE_IN_7_0
+GArrowExecuteNode *
+garrow_execute_plan_build_hash_join_node(GArrowExecutePlan *plan,
+                                         GArrowExecuteNode *left,
+                                         GArrowExecuteNode *right,
+                                         GArrowHashJoinNodeOptions *options,
+                                         GError **error);
 GARROW_AVAILABLE_IN_6_0
 gboolean
 garrow_execute_plan_validate(GArrowExecutePlan *plan,
diff --git a/c_glib/test/test-execute-plan.rb b/c_glib/test/test-execute-plan.rb
index d698e1e..aeb9f90 100644
--- a/c_glib/test/test-execute-plan.rb
+++ b/c_glib/test/test-execute-plan.rb
@@ -19,35 +19,117 @@ class TestExecutePlan < Test::Unit::TestCase
   include Helper::Buildable
   include Helper::Omittable
 
-  def setup
-    @record_batch =
-      build_record_batch(number: build_int8_array([1, 2, 3, 4, 5]),
-                         string: build_string_array(["a", "b", "a", "b", "a"]))
-    @plan = Arrow::ExecutePlan.new
-    @source_node_options = Arrow::SourceNodeOptions.new(@record_batch)
-    @source_node = @plan.build_source_node(@source_node_options)
-    aggregations = [
-      Arrow::Aggregation.new("hash_sum", nil, "number", "sum(number)"),
-      Arrow::Aggregation.new("hash_count", nil, "number", "count(number)"),
-    ]
-    @aggregate_node_options =
-      Arrow::AggregateNodeOptions.new(aggregations, ["string"])
-    @aggregate_node = @plan.build_aggregate_node(@source_node,
-                                                 @aggregate_node_options)
-    @sink_node_options = Arrow::SinkNodeOptions.new
-    @sink_node = @plan.build_sink_node(@aggregate_node,
-                                       @sink_node_options)
+  def execute(plan)
+    plan.validate
+    plan.start
+    plan.wait
+    yield
+    plan.stop
   end
 
-  def test_start
-    @plan.validate
-    @plan.start
-    @plan.wait
-    reader = @sink_node_options.get_reader(@aggregate_node.output_schema)
-    assert_equal(build_table("sum(number)" => build_int64_array([9, 6]),
-                             "count(number)" => build_int64_array([3, 2]),
-                             "string" => build_string_array(["a", "b"])),
-                 reader.read_all)
-    @plan.stop
+  sub_test_case("aggregate") do
+    def build_plan
+      plan = Arrow::ExecutePlan.new
+
+      record_batch =
+        build_record_batch(number: build_int8_array([1, 2, 3, 4, 5]),
+                           string: build_string_array(["a", "b", "a", "b", 
"a"]))
+      source_node_options = Arrow::SourceNodeOptions.new(record_batch)
+      source_node = plan.build_source_node(source_node_options)
+
+      aggregate_node_options = yield
+      aggregate_node = plan.build_aggregate_node(source_node,
+                                                 aggregate_node_options)
+
+      sink_node_options = Arrow::SinkNodeOptions.new
+      sink_node = plan.build_sink_node(aggregate_node,
+                                       sink_node_options)
+
+      [plan, sink_node_options.get_reader(aggregate_node.output_schema)]
+    end
+
+    def test_by_string
+      plan, reader = build_plan do
+        aggregations = [
+          Arrow::Aggregation.new("hash_sum", nil, "number", "sum(number)"),
+          Arrow::Aggregation.new("hash_count", nil, "number", "count(number)"),
+        ]
+        Arrow::AggregateNodeOptions.new(aggregations, ["string"])
+      end
+      execute(plan) do
+        assert_equal(build_table("sum(number)" => build_int64_array([9, 6]),
+                                 "count(number)" => build_int64_array([3, 2]),
+                                 "string" => build_string_array(["a", "b"])),
+                     reader.read_all)
+      end
+    end
+  end
+
+  sub_test_case("hash join") do
+    def build_plan
+      plan = Arrow::ExecutePlan.new
+
+      left_record_batch =
+        build_record_batch(number: build_int8_array([1, 2, 3, 4, 5]),
+                           string: build_string_array(["a", "b", "a", "b", 
"a"]))
+      left_node_options = Arrow::SourceNodeOptions.new(left_record_batch)
+      left_node = plan.build_source_node(left_node_options)
+
+      right_record_batch =
+        build_record_batch(right_number: build_int8_array([1, 2]),
+                           right_string: build_string_array(["R-1", "R-2"]))
+      right_node_options = Arrow::SourceNodeOptions.new(right_record_batch)
+      right_node = plan.build_source_node(right_node_options)
+
+      hash_join_node_options = yield
+      hash_join_node = plan.build_hash_join_node(left_node,
+                                                 right_node,
+                                                 hash_join_node_options)
+
+      sink_node_options = Arrow::SinkNodeOptions.new
+      sink_node = plan.build_sink_node(hash_join_node,
+                                       sink_node_options)
+
+      [plan, sink_node_options.get_reader(hash_join_node.output_schema)]
+    end
+
+    def test_output_all
+      plan, reader = build_plan do
+        Arrow::HashJoinNodeOptions.new(:left_outer,
+                                       ["number"],
+                                       ["right_number"])
+      end
+
+      execute(plan) do
+        left_number = build_int8_array([1, 2, 3, 4, 5])
+        left_string = build_string_array(["a", "b", "a", "b", "a"])
+        right_number = build_int8_array([1, 2, nil, nil, nil])
+        right_string = build_string_array(["R-1", "R-2", nil, nil, nil])
+        assert_equal(build_table("number" => left_number,
+                                 "string" => left_string,
+                                 "right_number" => right_number,
+                                 "right_string" => right_string),
+                     reader.read_all)
+      end
+    end
+
+    def test_output_selected
+      plan, reader = build_plan do
+        options = Arrow::HashJoinNodeOptions.new(:left_outer,
+                                                 ["number"],
+                                                 ["right_number"])
+        options.left_outputs = ["number"]
+        options.right_outputs = ["right_number"]
+        options
+      end
+
+      execute(plan) do
+        left_number = build_int8_array([1, 2, 3, 4, 5])
+        right_number = build_int8_array([1, 2, nil, nil, nil])
+        assert_equal(build_table("number" => left_number,
+                                 "right_number" => right_number),
+                     reader.read_all)
+      end
+    end
   end
 end

Reply via email to