bkietz commented on a change in pull request #11579: URL: https://github.com/apache/arrow/pull/11579#discussion_r743913857
########## File path: cpp/src/arrow/compute/exec/expression.h ########## @@ -91,6 +91,9 @@ class ARROW_EXPORT Expression { /// Return true if this expression could evaluate to true. bool IsSatisfiable() const; + /// Return true if this expression has no clauses. + bool IsEmpty() const; Review comment: Please use `literal(true)` as an expression which is always satisfied; this is the convention throughout the rest of the project. ########## File path: cpp/src/arrow/compute/exec/hash_join_node.cc ########## @@ -255,17 +284,83 @@ std::shared_ptr<Schema> HashJoinSchema::MakeOutputSchema( return std::make_shared<Schema>(std::move(fields)); } +Result<Expression> HashJoinSchema::BindFilter(Expression filter, + const Schema& left_schema, + const Schema& right_schema) { + if (filter.IsBound()) { + return std::move(filter); + } + if (!filter.IsEmpty()) { + FieldVector fields; + auto left = proj_maps[0].map(HashJoinProjection::FILTER, HashJoinProjection::INPUT); + auto right = proj_maps[1].map(HashJoinProjection::FILTER, HashJoinProjection::INPUT); + + auto AppendFieldsInMap = [&fields](const SchemaProjectionMap& map, + const Schema& schema) { + for (int i = 0; i < map.num_cols; i++) { + int input_idx = map.get(i); + fields.push_back(schema.fields()[input_idx]); + } + }; + AppendFieldsInMap(left, left_schema); + AppendFieldsInMap(right, right_schema); + Schema filter_schema(fields); + ARROW_ASSIGN_OR_RAISE(filter, filter.Bind(filter_schema)); + if (filter.type()->id() != Type::BOOL) { + return Status::TypeError("Filter expression must evaluate to bool, but ", + filter.ToString(), " evaluates to ", + filter.type()->ToString()); + } + return std::move(filter); + } + return Expression(); +} + +Result<std::vector<FieldRef>> HashJoinSchema::CollectFilterColumns( + const Expression& filter, const Schema& schema) { + std::vector<FieldRef> nonunique_refs; + RETURN_NOT_OK(TraverseExpression(nonunique_refs, filter, schema)); + + std::vector<FieldRef> result; + std::unordered_set<int> seen_paths; + for (auto ref : nonunique_refs) { + ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOne(schema)); + if (seen_paths.find(match[0]) == seen_paths.end()) { + seen_paths.insert(match[0]); + result.push_back(ref); + } + } + return result; Review comment: I think this can be implemented using the existing `FieldsInExpression` helper and without requiring the field references be by-name (this is relevant since compute IR only supports referencing fields by index) ```suggestion std::vector<FieldRef> unique_refs, nonunique_refs = FieldsInExpression(filter); std::unordered_set<FieldPath, FieldPath::Hash> seen_paths; for (auto& ref : nonunique_refs) { ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(schema)); if (match.empty()) continue; if (seen_paths.insert(std::move(match)).second) { // insertion succeeded unique_refs.push_back(std::move(ref)); } } return unique_refs; ``` ########## File path: cpp/src/arrow/compute/exec/options.h ########## @@ -173,15 +173,16 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions { static constexpr const char* default_output_prefix_for_right = ""; HashJoinNodeOptions( JoinType in_join_type, std::vector<FieldRef> in_left_keys, - std::vector<FieldRef> in_right_keys, + std::vector<FieldRef> in_right_keys, Expression filter = Expression(), Review comment: ```suggestion std::vector<FieldRef> in_right_keys, Expression filter = literal(true), ``` ########## File path: cpp/src/arrow/compute/exec/hash_join.h ########## @@ -56,19 +56,36 @@ class ARROW_EXPORT HashJoinSchema { const std::string& left_field_name_prefix, const std::string& right_field_name_prefix); + Result<Expression> BindFilter(Expression filter, const Schema& left_schema, + const Schema& right_schema); std::shared_ptr<Schema> MakeOutputSchema(const std::string& left_field_name_prefix, const std::string& right_field_name_prefix); + bool HasLeftPayload() { return HasPayload(0); } Review comment: Nit: could you replace this with `LeftPayloadIsEmpty` for greater specificity? ########## File path: cpp/src/arrow/compute/exec/hash_join.cc ########## @@ -273,6 +270,125 @@ class HashJoinBasicImpl : public HashJoinImpl { num_batches_produced_++; } + Status ProbeBatch_ResidualFilter(ThreadLocalState& local_state, + std::vector<int32_t>& match, + std::vector<int32_t>& no_match, + std::vector<int32_t>& match_left, + std::vector<int32_t>& match_right) { + if (filter_.IsEmpty()) { Review comment: ```suggestion if (filter_ == literal(true)) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org