amol- commented on a change in pull request #12452: URL: https://github.com/apache/arrow/pull/12452#discussion_r833263515
########## File path: python/pyarrow/_exec_plan.pyx ########## @@ -0,0 +1,297 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Implement Internal ExecPlan bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from cython.operator cimport dereference as deref, preincrement as inc + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.lib cimport (Table, pyarrow_unwrap_table, pyarrow_wrap_table) +from pyarrow.lib import tobytes, _pc +from pyarrow._compute cimport Expression, _true + + +cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True): + """ + Internal Function to create an ExecPlan and run it. + + Parameters + ---------- + inputs : list of Table or Dataset + The sources from which the ExecPlan should fetch data. + In most cases this is only one, unless the first node of the + plan is able to get data from multiple different sources. + output_type : Table or Dataset + In which format the output should be provided. + plan : vector[CDeclaration] + The nodes of the plan that should be applied to the sources + to produce the output. + use_threads : bool, default True + Whenever to use multithreading or not. + """ + cdef: + CExecutor *c_executor + shared_ptr[CThreadPool] c_executor_sptr + shared_ptr[CExecContext] c_exec_context + shared_ptr[CExecPlan] c_exec_plan + vector[CDeclaration] c_decls + vector[CExecNode*] _empty + vector[CExecNode*] c_final_node_vec + CExecNode *c_node + CTable* c_table + shared_ptr[CTable] c_out_table + shared_ptr[CSourceNodeOptions] c_sourceopts + shared_ptr[CSinkNodeOptions] c_sinkopts + shared_ptr[CAsyncExecBatchGenerator] c_asyncexecbatchgen + shared_ptr[CRecordBatchReader] c_recordbatchreader + vector[CDeclaration].iterator plan_iter + + if use_threads: + c_executor = GetCpuThreadPool() + else: + c_executor_sptr = GetResultValue(CThreadPool.Make(1)) + c_executor = c_executor_sptr.get() + + c_exec_context = make_shared[CExecContext]( + c_default_memory_pool(), c_executor) + c_exec_plan = GetResultValue(CExecPlan.Make(c_exec_context.get())) + + plan_iter = plan.begin() + + # Create source nodes for each input + for ipt in inputs: + if isinstance(ipt, Table): + c_in_table = pyarrow_unwrap_table(ipt).get() + c_sourceopts = GetResultValue( + CSourceNodeOptions.FromTable(deref(c_in_table), deref(c_exec_context).executor())) + else: + raise TypeError("Unsupported type") + + if plan_iter != plan.end(): + # Flag the source as the input of the first plan node. + deref(plan_iter).inputs.push_back(CDeclaration.Input( + CDeclaration(tobytes("source"), deref(c_sourceopts)) + )) + else: + # Empty plan, make the source the first plan node. + c_decls.push_back( + CDeclaration(tobytes("source"), deref(c_sourceopts)) + ) + + # Add Here additional nodes + while plan_iter != plan.end(): + c_decls.push_back(deref(plan_iter)) + inc(plan_iter) + + # Add all CDeclarations to the plan + c_node = GetResultValue( + CDeclaration.Sequence(c_decls).AddToPlan(&deref(c_exec_plan)) + ) + c_final_node_vec.push_back(c_node) + + # Create the output node + c_asyncexecbatchgen = make_shared[CAsyncExecBatchGenerator]() + c_sinkopts = make_shared[CSinkNodeOptions](c_asyncexecbatchgen.get()) + GetResultValue( + MakeExecNode(tobytes("sink"), &deref(c_exec_plan), + c_final_node_vec, deref(c_sinkopts)) + ) + + # Convert the asyncgenerator to a sync batch reader + c_recordbatchreader = MakeGeneratorReader(c_node.output_schema(), + deref(c_asyncexecbatchgen), + deref(c_exec_context).memory_pool()) + + # Start execution of the ExecPlan + deref(c_exec_plan).Validate() + deref(c_exec_plan).StartProducing() + + # Convert output to the expected one. + if output_type == Table: + c_out_table = GetResultValue( + CTable.FromRecordBatchReader(c_recordbatchreader.get())) + output = pyarrow_wrap_table(c_out_table) + else: + raise TypeError("Unsupported output type") + + deref(c_exec_plan).StopProducing() + + return output + + +def tables_join(join_type, left_table not None, left_keys, + right_table not None, right_keys, + left_suffix=None, right_suffix=None, + use_threads=True, deduplicate=False): + """ + Perform join of two tables. + + The result will be an output table with the result of the join operation + + Parameters + ---------- + join_type : str + One of supported join types. + left_table : Table + The left table for the join operation. + left_keys : str or list[str] + The left table key (or keys) on which the join operation should be performed. + right_table : Table + The right table for the join operation. + right_keys : str or list[str] + The right table key (or keys) on which the join operation should be performed. + left_suffix : str, default None + Which suffix to add to right column names. This prevents confusion + when the columns in left and right tables have colliding names. + right_suffix : str, default None + Which suffic to add to the left column names. This prevents confusion + when the columns in left and right tables have colliding names. + use_threads : bool, default True + Whenever to use multithreading or not. + deduplicate : bool, default False + If the duplicated keys should be omitted from one of the sides + in the join result. + + Returns + ------- + result_table : Table + """ + cdef: + vector[CFieldRef] c_left_keys + vector[CFieldRef] c_right_keys + vector[CFieldRef] c_left_columns + vector[CFieldRef] c_right_columns + vector[CDeclaration] c_decl_plan + vector[CExpression] c_projections + vector[c_string] c_projected_col_names + CJoinType c_join_type + + # Prepare left and right tables Keys to send them to the C++ function + if isinstance(left_keys, str): + left_keys = [left_keys] + for key in left_keys: + c_left_keys.push_back(CFieldRef(<c_string>tobytes(key))) + + if isinstance(right_keys, str): + right_keys = [right_keys] + for key in right_keys: + c_right_keys.push_back(CFieldRef(<c_string>tobytes(key))) + + # By default expose all columns on both left and right table + left_columns = left_table.column_names + right_columns = right_table.column_names + + # Pick the join type + if join_type == "left semi": + c_join_type = CJoinType_LEFT_SEMI + right_columns = [] + elif join_type == "right semi": + c_join_type = CJoinType_RIGHT_SEMI + left_columns = [] + elif join_type == "left anti": + c_join_type = CJoinType_LEFT_ANTI + right_columns = [] + elif join_type == "right anti": + c_join_type = CJoinType_RIGHT_ANTI + left_columns = [] + elif join_type == "inner": + c_join_type = CJoinType_INNER + right_columns = set(right_columns) - set(right_keys) + elif join_type == "left outer": + c_join_type = CJoinType_LEFT_OUTER + right_columns = set(right_columns) - set(right_keys) + elif join_type == "right outer": + c_join_type = CJoinType_RIGHT_OUTER + left_columns = set(left_columns) - set(left_keys) + elif join_type == "full outer": + c_join_type = CJoinType_FULL_OUTER + else: + raise ValueError("Unsupported join type") + + # Turn the columns to vectors of FieldRefs + for colname in left_columns: + c_left_columns.push_back(CFieldRef(<c_string>tobytes(colname))) + for colname in right_columns: + c_right_columns.push_back(CFieldRef(<c_string>tobytes(colname))) + + # Add the join node to the execplan + if deduplicate: + c_decl_plan.push_back( + CDeclaration(tobytes("hashjoin"), CHashJoinNodeOptions( + c_join_type, c_left_keys, c_right_keys, + c_left_columns, c_right_columns, + _true, + <c_string>tobytes(left_suffix or ""), + <c_string>tobytes(right_suffix or "") + )) + ) + if join_type == "full outer": + # In case of full outer joins, the join operation will output all columns + # so that we can coalesce the keys and exclude duplicates in a subsequent projection. + left_columns_set = set(left_columns) + right_columns_set = set(right_columns) + # Where the right table columns start. + right_table_index = len(left_columns) + for idx, col in enumerate(left_columns + right_columns): + if idx < len(left_keys): + # Include keys only once and coalesce left+right table keys. + c_projected_col_names.push_back(tobytes(col)) + c_projections.push_back(Expression.unwrap( + Expression._call("coalesce", [ + Expression._field(idx), Expression._field( + right_table_index+idx) + ]) + )) + elif right_table_index <= idx < right_table_index+len(right_keys): + # Do not include right table keys. As they would lead to duplicated keys. + continue + else: + # For all the other columns incude them as they are. + # Just recompute the suffixes that the join produced as the projection + # would lose them otherwise. Review comment: Not when relying on `CDeclaration` as the way to build the `ExecPlan`, I don't think saving those four lines of code values the amount of work that would be required to expose node types directly such that you can extract the `output_schema` of a plan that has not yet executed and rewriting everything to use those. I'm all for creating a follow-up ticket if we care about changing that part, but my opinion, as expressed in https://ursalabs.zulipchat.com/#narrow/stream/180245-dev/topic/Preserve.20projection.20column.20names/near/276175594 is that the `ProjectNodeOptions` themselves should not force you to know the column names when you just want to preserve them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
