bkietz commented on a change in pull request #11579:
URL: https://github.com/apache/arrow/pull/11579#discussion_r743913857



##########
File path: cpp/src/arrow/compute/exec/expression.h
##########
@@ -91,6 +91,9 @@ class ARROW_EXPORT Expression {
   /// Return true if this expression could evaluate to true.
   bool IsSatisfiable() const;
 
+  /// Return true if this expression has no clauses.
+  bool IsEmpty() const;

Review comment:
       Please use `literal(true)` as an expression which is always satisfied; 
this is the convention throughout the rest of the project.

##########
File path: cpp/src/arrow/compute/exec/hash_join_node.cc
##########
@@ -255,17 +284,83 @@ std::shared_ptr<Schema> HashJoinSchema::MakeOutputSchema(
   return std::make_shared<Schema>(std::move(fields));
 }
 
+Result<Expression> HashJoinSchema::BindFilter(Expression filter,
+                                              const Schema& left_schema,
+                                              const Schema& right_schema) {
+  if (filter.IsBound()) {
+    return std::move(filter);
+  }
+  if (!filter.IsEmpty()) {
+    FieldVector fields;
+    auto left = proj_maps[0].map(HashJoinProjection::FILTER, 
HashJoinProjection::INPUT);
+    auto right = proj_maps[1].map(HashJoinProjection::FILTER, 
HashJoinProjection::INPUT);
+
+    auto AppendFieldsInMap = [&fields](const SchemaProjectionMap& map,
+                                       const Schema& schema) {
+      for (int i = 0; i < map.num_cols; i++) {
+        int input_idx = map.get(i);
+        fields.push_back(schema.fields()[input_idx]);
+      }
+    };
+    AppendFieldsInMap(left, left_schema);
+    AppendFieldsInMap(right, right_schema);
+    Schema filter_schema(fields);
+    ARROW_ASSIGN_OR_RAISE(filter, filter.Bind(filter_schema));
+    if (filter.type()->id() != Type::BOOL) {
+      return Status::TypeError("Filter expression must evaluate to bool, but ",
+                               filter.ToString(), " evaluates to ",
+                               filter.type()->ToString());
+    }
+    return std::move(filter);
+  }
+  return Expression();
+}
+
+Result<std::vector<FieldRef>> HashJoinSchema::CollectFilterColumns(
+    const Expression& filter, const Schema& schema) {
+  std::vector<FieldRef> nonunique_refs;
+  RETURN_NOT_OK(TraverseExpression(nonunique_refs, filter, schema));
+
+  std::vector<FieldRef> result;
+  std::unordered_set<int> seen_paths;
+  for (auto ref : nonunique_refs) {
+    ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOne(schema));
+    if (seen_paths.find(match[0]) == seen_paths.end()) {
+      seen_paths.insert(match[0]);
+      result.push_back(ref);
+    }
+  }
+  return result;

Review comment:
       I think this can be implemented using the existing `FieldsInExpression` 
helper and without requiring the field references be by-name (this is relevant 
since compute IR only supports referencing fields by index)
   ```suggestion
     std::vector<FieldRef> unique_refs, nonunique_refs = 
FieldsInExpression(filter);
     std::unordered_set<FieldPath, FieldPath::Hash> seen_paths;
     
     for (auto& ref : nonunique_refs) {
       ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(schema));
       if (match.empty()) continue;
   
       if (seen_paths.insert(std::move(match)).second) {
         // insertion succeeded
         unique_refs.push_back(std::move(ref));
       }
     }
     return unique_refs;
   ```

##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -173,15 +173,16 @@ class ARROW_EXPORT HashJoinNodeOptions : public 
ExecNodeOptions {
   static constexpr const char* default_output_prefix_for_right = "";
   HashJoinNodeOptions(
       JoinType in_join_type, std::vector<FieldRef> in_left_keys,
-      std::vector<FieldRef> in_right_keys,
+      std::vector<FieldRef> in_right_keys, Expression filter = Expression(),

Review comment:
       ```suggestion
         std::vector<FieldRef> in_right_keys, Expression filter = literal(true),
   ```

##########
File path: cpp/src/arrow/compute/exec/hash_join.h
##########
@@ -56,19 +56,36 @@ class ARROW_EXPORT HashJoinSchema {
                                 const std::string& left_field_name_prefix,
                                 const std::string& right_field_name_prefix);
 
+  Result<Expression> BindFilter(Expression filter, const Schema& left_schema,
+                                const Schema& right_schema);
   std::shared_ptr<Schema> MakeOutputSchema(const std::string& 
left_field_name_prefix,
                                            const std::string& 
right_field_name_prefix);
 
+  bool HasLeftPayload() { return HasPayload(0); }

Review comment:
       Nit: could you replace this with `LeftPayloadIsEmpty` for greater 
specificity?

##########
File path: cpp/src/arrow/compute/exec/hash_join.cc
##########
@@ -273,6 +270,125 @@ class HashJoinBasicImpl : public HashJoinImpl {
     num_batches_produced_++;
   }
 
+  Status ProbeBatch_ResidualFilter(ThreadLocalState& local_state,
+                                   std::vector<int32_t>& match,
+                                   std::vector<int32_t>& no_match,
+                                   std::vector<int32_t>& match_left,
+                                   std::vector<int32_t>& match_right) {
+    if (filter_.IsEmpty()) {

Review comment:
       ```suggestion
       if (filter_ == literal(true)) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to