This is an automated email from the ASF dual-hosted git repository. raulcd pushed a commit to branch maint-12.0.x in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 8ea2495ee0a199eba39007cc4124156d88933b69 Author: Joris Van den Bossche <[email protected]> AuthorDate: Thu May 11 11:00:32 2023 +0200 GH-35389: [Python] Fix coalesce_keys=False option in join operation (#35505) ### Rationale for this change During the refactor of the join cython code to use the new `pyarrow.acero` code, I accidentally ignored the `coalesce_keys=False` option. This PR restores the previous behaviour (by not passing a custom subset of column names to the HashJoinNode, but relying on its default behaviour to include all fields from left and right data (depending on the join type)). ### Are these changes tested? Expanded the existing tests to now properly cover the coalesce_keys=False option for all join types. ### Are there any user-facing changes? Fixes a regression in 12.0, restoring behaviour of 11.0 * Closes: #35389 Authored-by: Joris Van den Bossche <[email protected]> Signed-off-by: Joris Van den Bossche <[email protected]> --- python/pyarrow/acero.py | 30 +++++++++++++++++++++--------- python/pyarrow/tests/test_exec_plan.py | 18 +++++++++++++++--- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/acero.py b/python/pyarrow/acero.py index 6a72ea1996..04c396a26f 100644 --- a/python/pyarrow/acero.py +++ b/python/pyarrow/acero.py @@ -23,7 +23,7 @@ # cython: language_level = 3 from pyarrow.lib import Table -from pyarrow.compute import Expression +from pyarrow.compute import Expression, field from pyarrow._acero import ( # noqa _group_by, @@ -53,6 +53,13 @@ except ImportError: def _dataset_to_decl(dataset, use_threads=True): decl = Declaration("scan", ScanNodeOptions(dataset, use_threads=use_threads)) + # Get rid of special dataset columns + # "__fragment_index", "__batch_index", "__last_in_fragment", "__filename" + projections = [field(f) for f in dataset.schema.names] + decl = Declaration.from_sequence( + [decl, Declaration("project", ProjectNodeOptions(projections))] + ) + filter_expr = dataset._scan_options.get("filter") if filter_expr is not None: # Filters applied in CScanNodeOptions are "best effort" for the scan node itself @@ -163,11 +170,18 @@ def _perform_join(join_type, left_operand, left_keys, "table_source", TableSourceNodeOptions(right_operand) ) - join_opts = HashJoinNodeOptions( - join_type, left_keys, right_keys, left_columns, right_columns, - output_suffix_for_left=left_suffix or "", - output_suffix_for_right=right_suffix or "", - ) + if coalesce_keys: + join_opts = HashJoinNodeOptions( + join_type, left_keys, right_keys, left_columns, right_columns, + output_suffix_for_left=left_suffix or "", + output_suffix_for_right=right_suffix or "", + ) + else: + join_opts = HashJoinNodeOptions( + join_type, left_keys, right_keys, + output_suffix_for_left=left_suffix or "", + output_suffix_for_right=right_suffix or "", + ) decl = Declaration( "hashjoin", options=join_opts, inputs=[left_source, right_source] ) @@ -275,8 +289,6 @@ def _sort_source(table_or_dataset, sort_keys, output_type=Table, **kwargs): if output_type == Table: return result_table elif output_type == ds.InMemoryDataset: - # Get rid of special dataset columns - # "__fragment_index", "__batch_index", "__last_in_fragment", "__filename" - return ds.InMemoryDataset(result_table.select(table_or_dataset.schema.names)) + return ds.InMemoryDataset(result_table) else: raise TypeError("Unsupported output type") diff --git a/python/pyarrow/tests/test_exec_plan.py b/python/pyarrow/tests/test_exec_plan.py index 0fc3d4ec75..58c618179b 100644 --- a/python/pyarrow/tests/test_exec_plan.py +++ b/python/pyarrow/tests/test_exec_plan.py @@ -74,28 +74,33 @@ def test_joins_corner_cases(): ("inner", { "colA": [1, 2], "col2": ["a", "b"], + "colB": [1, 2], "col3": ["A", "B"] }), ("left outer", { "colA": [1, 2, 6], "col2": ["a", "b", "f"], + "colB": [1, 2, None], "col3": ["A", "B", None] }), ("right outer", { + "colA": [1, 2, None], "col2": ["a", "b", None], "colB": [1, 2, 99], "col3": ["A", "B", "Z"] }), ("full outer", { - "colA": [1, 2, 6, 99], + "colA": [1, 2, 6, None], "col2": ["a", "b", "f", None], + "colB": [1, 2, None, 99], "col3": ["A", "B", None, "Z"] }) ]) @pytest.mark.parametrize("use_threads", [True, False]) [email protected]("coalesce_keys", [True, False]) @pytest.mark.parametrize("use_datasets", [False, pytest.param(True, marks=pytest.mark.dataset)]) -def test_joins(jointype, expected, use_threads, use_datasets): +def test_joins(jointype, expected, use_threads, coalesce_keys, use_datasets): # Allocate table here instead of using parametrize # this prevents having arrow allocated memory forever around. expected = pa.table(expected) @@ -115,12 +120,19 @@ def test_joins(jointype, expected, use_threads, use_datasets): t2 = ds.dataset([t2]) r = _perform_join(jointype, t1, "colA", t2, "colB", - use_threads=use_threads, coalesce_keys=True) + use_threads=use_threads, coalesce_keys=coalesce_keys) r = r.combine_chunks() if "right" in jointype: r = r.sort_by("colB") else: r = r.sort_by("colA") + if coalesce_keys: + if jointype in ("inner", "left outer"): + expected = expected.drop(["colB"]) + elif jointype == "right outer": + expected = expected.drop(["colA"]) + elif jointype == "full outer": + expected = expected.drop(["colB"]).set_column(0, "colA", [[1, 2, 6, 99]]) assert r == expected
