This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new f7bd4c3 ARROW-15288: [GLib] Add
garrow_execute_plan_build_hash_join_node()
f7bd4c3 is described below
commit f7bd4c3904e30ec63263eab1cb59876c15f67d5a
Author: Sutou Kouhei <[email protected]>
AuthorDate: Mon Jan 10 05:46:23 2022 +0900
ARROW-15288: [GLib] Add garrow_execute_plan_build_hash_join_node()
Closes #12107 from kou/glib-join
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
c_glib/arrow-glib/compute.cpp | 178 ++++++++++++++++++++++++++++++++++++++-
c_glib/arrow-glib/compute.h | 69 +++++++++++++++
c_glib/test/test-execute-plan.rb | 138 ++++++++++++++++++++++++------
3 files changed, 356 insertions(+), 29 deletions(-)
diff --git a/c_glib/arrow-glib/compute.cpp b/c_glib/arrow-glib/compute.cpp
index 4ab988c..7d84e06 100644
--- a/c_glib/arrow-glib/compute.cpp
+++ b/c_glib/arrow-glib/compute.cpp
@@ -142,6 +142,8 @@ G_BEGIN_DECLS
*
* #GArrowSinkNodeOptions is a class to customize a sink node.
*
+ * #GArrowHashJoinNodeOptions is a class to customize a hash join node.
+ *
* #GArrowExecuteNode is a class to execute an operation.
*
* #GArrowExecutePlan is a class to execute operations.
@@ -1238,7 +1240,8 @@
garrow_aggregate_node_options_class_init(GArrowAggregateNodeOptionsClass *klass)
* @n_keys: The number of @keys.
* @error: (nullable): Return location for a #GError or %NULL.
*
- * Returns: A newly created #GArrowAggregateNodeOptions.
+ * Returns: (nullable): A newly created #GArrowAggregateNodeOptions on success,
+ * %NULL otherwise.
*
* Since: 6.0.0
*/
@@ -1382,6 +1385,143 @@
garrow_sink_node_options_get_reader(GArrowSinkNodeOptions *options,
}
+G_DEFINE_TYPE(GArrowHashJoinNodeOptions,
+ garrow_hash_join_node_options,
+ GARROW_TYPE_EXECUTE_NODE_OPTIONS)
+
+static void
+garrow_hash_join_node_options_init(GArrowHashJoinNodeOptions *object)
+{
+}
+
+static void
+garrow_hash_join_node_options_class_init(GArrowHashJoinNodeOptionsClass *klass)
+{
+ /* TODO: Add left_output_prefix and right_output_prefix properties */
+}
+
+/**
+ * garrow_hash_join_node_options_new:
+ * @type: A #GArrowJoinType to be used.
+ * @left_keys: (array length=n_left_keys): Left join keys.
+ * @n_left_keys: The number of @left_keys.
+ * @right_keys: (array length=n_right_keys): Right join keys.
+ * @n_right_keys: The number of @right_keys.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: (nullable): A newly created #GArrowHashJoinNodeOptions on success,
+ * %NULL otherwise.
+ *
+ * Since: 7.0.0
+ */
+GArrowHashJoinNodeOptions *
+garrow_hash_join_node_options_new(GArrowJoinType type,
+ const gchar **left_keys,
+ gsize n_left_keys,
+ const gchar **right_keys,
+ gsize n_right_keys,
+ GError **error)
+{
+ auto arrow_type = static_cast<arrow::compute::JoinType>(type);
+ std::vector<arrow::FieldRef> arrow_left_keys;
+ for (gsize i = 0; i < n_left_keys; ++i) {
+ if (!garrow_field_refs_add(arrow_left_keys,
+ left_keys[i],
+ error,
+ "[hash-join-node-options][new][left-key]")) {
+ return NULL;
+ }
+ }
+ std::vector<arrow::FieldRef> arrow_right_keys;
+ for (gsize i = 0; i < n_right_keys; ++i) {
+ if (!garrow_field_refs_add(arrow_right_keys,
+ right_keys[i],
+ error,
+ "[hash-join-node-options][new][right-key]")) {
+ return NULL;
+ }
+ }
+ auto arrow_options =
+ new arrow::compute::HashJoinNodeOptions(arrow_type,
+ std::move(arrow_left_keys),
+ std::move(arrow_right_keys));
+ auto options = g_object_new(GARROW_TYPE_HASH_JOIN_NODE_OPTIONS,
+ "options", arrow_options,
+ NULL);
+ return GARROW_HASH_JOIN_NODE_OPTIONS(options);
+}
+
+/**
+ * garrow_hash_join_node_options_set_left_outputs:
+ * @options: A #GArrowHashJoinNodeOptions.
+ * @outputs: (array length=n_outputs): Output fields.
+ * @n_outputs: The number of @outputs.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: %TRUE on success, %FALSE on error.
+ *
+ * Since: 7.0.0
+ */
+gboolean
+garrow_hash_join_node_options_set_left_outputs(
+ GArrowHashJoinNodeOptions *options,
+ const gchar **outputs,
+ gsize n_outputs,
+ GError **error)
+{
+ auto arrow_options =
+ static_cast<arrow::compute::HashJoinNodeOptions *>(
+ garrow_execute_node_options_get_raw(
+ GARROW_EXECUTE_NODE_OPTIONS(options)));
+ arrow_options->output_all = false;
+ arrow_options->left_output.clear();
+ for (gsize i = 0; i < n_outputs; ++i) {
+ if (!garrow_field_refs_add(arrow_options->left_output,
+ outputs[i],
+ error,
+ "[hash-join-node-options][set-left-outputs]")) {
+ return FALSE;
+ }
+ }
+ return TRUE;
+}
+
+/**
+ * garrow_hash_join_node_options_set_right_outputs:
+ * @options: A #GArrowHashJoinNodeOptions.
+ * @outputs: (array length=n_outputs): Output fields.
+ * @n_outputs: The number of @outputs.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: %TRUE on success, %FALSE on error.
+ *
+ * Since: 7.0.0
+ */
+gboolean
+garrow_hash_join_node_options_set_right_outputs(
+ GArrowHashJoinNodeOptions *options,
+ const gchar **outputs,
+ gsize n_outputs,
+ GError **error)
+{
+ auto arrow_options =
+ static_cast<arrow::compute::HashJoinNodeOptions *>(
+ garrow_execute_node_options_get_raw(
+ GARROW_EXECUTE_NODE_OPTIONS(options)));
+ arrow_options->output_all = false;
+ arrow_options->right_output.clear();
+ for (gsize i = 0; i < n_outputs; ++i) {
+ if (!garrow_field_refs_add(arrow_options->right_output,
+ outputs[i],
+ error,
+ "[hash-join-node-options][set-right-outputs]"))
{
+ return FALSE;
+ }
+ }
+ return TRUE;
+}
+
+
typedef struct GArrowExecuteNodePrivate_ {
arrow::compute::ExecNode *node;
} GArrowExecuteNodePrivate;
@@ -1694,6 +1834,42 @@ garrow_execute_plan_build_sink_node(GArrowExecutePlan
*plan,
}
/**
+ * garrow_execute_plan_build_hash_join_node:
+ * @plan: A #GArrowExecutePlan.
+ * @left: A left #GArrowExecuteNode.
+ * @right: A right #GArrowExecuteNode.
+ * @options: A #GArrowHashJoinNodeOptions.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * This is a shortcut of garrow_execute_plan_build_node() for hash
+ * join node.
+ *
+ * Returns: (transfer full): A newly built and added #GArrowExecuteNode
+ * for hash join on success, %NULL on error.
+ *
+ * Since: 7.0.0
+ */
+GArrowExecuteNode *
+garrow_execute_plan_build_hash_join_node(GArrowExecutePlan *plan,
+ GArrowExecuteNode *left,
+ GArrowExecuteNode *right,
+ GArrowHashJoinNodeOptions *options,
+ GError **error)
+{
+ GList *inputs = NULL;
+ inputs = g_list_append(inputs, left);
+ inputs = g_list_append(inputs, right);
+ auto node =
+ garrow_execute_plan_build_node(plan,
+ "hashjoin",
+ inputs,
+ GARROW_EXECUTE_NODE_OPTIONS(options),
+ error);
+ g_list_free(inputs);
+ return node;
+}
+
+/**
* garrow_execute_plan_validate:
* @plan: A #GArrowExecutePlan.
* @error: (nullable): Return location for a #GError or %NULL.
diff --git a/c_glib/arrow-glib/compute.h b/c_glib/arrow-glib/compute.h
index 88a7d40..a867f34 100644
--- a/c_glib/arrow-glib/compute.h
+++ b/c_glib/arrow-glib/compute.h
@@ -224,6 +224,68 @@ garrow_sink_node_options_get_reader(GArrowSinkNodeOptions
*options,
GArrowSchema *schema);
+/**
+ * GArrowJoinType:
+ * @GARROW_JOIN_TYPE_LEFT_SEMI:
+ * @GARROW_JOIN_TYPE_RIGHT_SEMI:
+ * @GARROW_JOIN_TYPE_LEFT_ANTI:
+ * @GARROW_JOIN_TYPE_RIGHT_ANTI:
+ * @GARROW_JOIN_TYPE_INNER:
+ * @GARROW_JOIN_TYPE_LEFT_OUTER:
+ * @GARROW_JOIN_TYPE_RIGHT_OUTER:
+ * @GARROW_JOIN_TYPE_FULL_OUTER:
+ *
+ * They correspond to the values of `arrow::compute::JoinType`.
+ *
+ * Since: 7.0.0
+ */
+typedef enum {
+ GARROW_JOIN_TYPE_LEFT_SEMI,
+ GARROW_JOIN_TYPE_RIGHT_SEMI,
+ GARROW_JOIN_TYPE_LEFT_ANTI,
+ GARROW_JOIN_TYPE_RIGHT_ANTI,
+ GARROW_JOIN_TYPE_INNER,
+ GARROW_JOIN_TYPE_LEFT_OUTER,
+ GARROW_JOIN_TYPE_RIGHT_OUTER,
+ GARROW_JOIN_TYPE_FULL_OUTER,
+} GArrowJoinType;
+
+#define GARROW_TYPE_HASH_JOIN_NODE_OPTIONS \
+ (garrow_hash_join_node_options_get_type())
+G_DECLARE_DERIVABLE_TYPE(GArrowHashJoinNodeOptions,
+ garrow_hash_join_node_options,
+ GARROW,
+ HASH_JOIN_NODE_OPTIONS,
+ GArrowExecuteNodeOptions)
+struct _GArrowHashJoinNodeOptionsClass
+{
+ GArrowExecuteNodeOptionsClass parent_class;
+};
+
+GARROW_AVAILABLE_IN_7_0
+GArrowHashJoinNodeOptions *
+garrow_hash_join_node_options_new(GArrowJoinType type,
+ const gchar **left_keys,
+ gsize n_left_keys,
+ const gchar **right_keys,
+ gsize n_right_keys,
+ GError **error);
+GARROW_AVAILABLE_IN_7_0
+gboolean
+garrow_hash_join_node_options_set_left_outputs(
+ GArrowHashJoinNodeOptions *options,
+ const gchar **outputs,
+ gsize n_outputs,
+ GError **error);
+GARROW_AVAILABLE_IN_7_0
+gboolean
+garrow_hash_join_node_options_set_right_outputs(
+ GArrowHashJoinNodeOptions *options,
+ const gchar **outputs,
+ gsize n_outputs,
+ GError **error);
+
+
#define GARROW_TYPE_EXECUTE_NODE (garrow_execute_node_get_type())
G_DECLARE_DERIVABLE_TYPE(GArrowExecuteNode,
garrow_execute_node,
@@ -281,6 +343,13 @@ garrow_execute_plan_build_sink_node(GArrowExecutePlan
*plan,
GArrowExecuteNode *input,
GArrowSinkNodeOptions *options,
GError **error);
+GARROW_AVAILABLE_IN_7_0
+GArrowExecuteNode *
+garrow_execute_plan_build_hash_join_node(GArrowExecutePlan *plan,
+ GArrowExecuteNode *left,
+ GArrowExecuteNode *right,
+ GArrowHashJoinNodeOptions *options,
+ GError **error);
GARROW_AVAILABLE_IN_6_0
gboolean
garrow_execute_plan_validate(GArrowExecutePlan *plan,
diff --git a/c_glib/test/test-execute-plan.rb b/c_glib/test/test-execute-plan.rb
index d698e1e..aeb9f90 100644
--- a/c_glib/test/test-execute-plan.rb
+++ b/c_glib/test/test-execute-plan.rb
@@ -19,35 +19,117 @@ class TestExecutePlan < Test::Unit::TestCase
include Helper::Buildable
include Helper::Omittable
- def setup
- @record_batch =
- build_record_batch(number: build_int8_array([1, 2, 3, 4, 5]),
- string: build_string_array(["a", "b", "a", "b", "a"]))
- @plan = Arrow::ExecutePlan.new
- @source_node_options = Arrow::SourceNodeOptions.new(@record_batch)
- @source_node = @plan.build_source_node(@source_node_options)
- aggregations = [
- Arrow::Aggregation.new("hash_sum", nil, "number", "sum(number)"),
- Arrow::Aggregation.new("hash_count", nil, "number", "count(number)"),
- ]
- @aggregate_node_options =
- Arrow::AggregateNodeOptions.new(aggregations, ["string"])
- @aggregate_node = @plan.build_aggregate_node(@source_node,
- @aggregate_node_options)
- @sink_node_options = Arrow::SinkNodeOptions.new
- @sink_node = @plan.build_sink_node(@aggregate_node,
- @sink_node_options)
+ def execute(plan)
+ plan.validate
+ plan.start
+ plan.wait
+ yield
+ plan.stop
end
- def test_start
- @plan.validate
- @plan.start
- @plan.wait
- reader = @sink_node_options.get_reader(@aggregate_node.output_schema)
- assert_equal(build_table("sum(number)" => build_int64_array([9, 6]),
- "count(number)" => build_int64_array([3, 2]),
- "string" => build_string_array(["a", "b"])),
- reader.read_all)
- @plan.stop
+ sub_test_case("aggregate") do
+ def build_plan
+ plan = Arrow::ExecutePlan.new
+
+ record_batch =
+ build_record_batch(number: build_int8_array([1, 2, 3, 4, 5]),
+ string: build_string_array(["a", "b", "a", "b",
"a"]))
+ source_node_options = Arrow::SourceNodeOptions.new(record_batch)
+ source_node = plan.build_source_node(source_node_options)
+
+ aggregate_node_options = yield
+ aggregate_node = plan.build_aggregate_node(source_node,
+ aggregate_node_options)
+
+ sink_node_options = Arrow::SinkNodeOptions.new
+ sink_node = plan.build_sink_node(aggregate_node,
+ sink_node_options)
+
+ [plan, sink_node_options.get_reader(aggregate_node.output_schema)]
+ end
+
+ def test_by_string
+ plan, reader = build_plan do
+ aggregations = [
+ Arrow::Aggregation.new("hash_sum", nil, "number", "sum(number)"),
+ Arrow::Aggregation.new("hash_count", nil, "number", "count(number)"),
+ ]
+ Arrow::AggregateNodeOptions.new(aggregations, ["string"])
+ end
+ execute(plan) do
+ assert_equal(build_table("sum(number)" => build_int64_array([9, 6]),
+ "count(number)" => build_int64_array([3, 2]),
+ "string" => build_string_array(["a", "b"])),
+ reader.read_all)
+ end
+ end
+ end
+
+ sub_test_case("hash join") do
+ def build_plan
+ plan = Arrow::ExecutePlan.new
+
+ left_record_batch =
+ build_record_batch(number: build_int8_array([1, 2, 3, 4, 5]),
+ string: build_string_array(["a", "b", "a", "b",
"a"]))
+ left_node_options = Arrow::SourceNodeOptions.new(left_record_batch)
+ left_node = plan.build_source_node(left_node_options)
+
+ right_record_batch =
+ build_record_batch(right_number: build_int8_array([1, 2]),
+ right_string: build_string_array(["R-1", "R-2"]))
+ right_node_options = Arrow::SourceNodeOptions.new(right_record_batch)
+ right_node = plan.build_source_node(right_node_options)
+
+ hash_join_node_options = yield
+ hash_join_node = plan.build_hash_join_node(left_node,
+ right_node,
+ hash_join_node_options)
+
+ sink_node_options = Arrow::SinkNodeOptions.new
+ sink_node = plan.build_sink_node(hash_join_node,
+ sink_node_options)
+
+ [plan, sink_node_options.get_reader(hash_join_node.output_schema)]
+ end
+
+ def test_output_all
+ plan, reader = build_plan do
+ Arrow::HashJoinNodeOptions.new(:left_outer,
+ ["number"],
+ ["right_number"])
+ end
+
+ execute(plan) do
+ left_number = build_int8_array([1, 2, 3, 4, 5])
+ left_string = build_string_array(["a", "b", "a", "b", "a"])
+ right_number = build_int8_array([1, 2, nil, nil, nil])
+ right_string = build_string_array(["R-1", "R-2", nil, nil, nil])
+ assert_equal(build_table("number" => left_number,
+ "string" => left_string,
+ "right_number" => right_number,
+ "right_string" => right_string),
+ reader.read_all)
+ end
+ end
+
+ def test_output_selected
+ plan, reader = build_plan do
+ options = Arrow::HashJoinNodeOptions.new(:left_outer,
+ ["number"],
+ ["right_number"])
+ options.left_outputs = ["number"]
+ options.right_outputs = ["right_number"]
+ options
+ end
+
+ execute(plan) do
+ left_number = build_int8_array([1, 2, 3, 4, 5])
+ right_number = build_int8_array([1, 2, nil, nil, nil])
+ assert_equal(build_table("number" => left_number,
+ "right_number" => right_number),
+ reader.read_all)
+ end
+ end
end
end