westonpace commented on a change in pull request #11579:
URL: https://github.com/apache/arrow/pull/11579#discussion_r749697848



##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -244,6 +248,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public 
ExecNodeOptions {
   std::string output_prefix_for_left;
   // prefix added to names of output fields coming from right input
   std::string output_prefix_for_right;
+  // residual filter

Review comment:
       ```suggestion
     // residual filter which is applied to matching rows.  Rows that do not 
match
     // the filter are not included.  The filter is applied against the
     // concatenated input schema (left fields then right fields) and can 
reference
     // fields that are not included in the output.
   ```

##########
File path: cpp/src/arrow/compute/exec/hash_join.cc
##########
@@ -282,6 +279,126 @@ class HashJoinBasicImpl : public HashJoinImpl {
     num_batches_produced_++;
   }
 
+  Status ProbeBatch_ResidualFilter(ThreadLocalState& local_state,
+                                   std::vector<int32_t>& match,
+                                   std::vector<int32_t>& no_match,
+                                   std::vector<int32_t>& match_left,
+                                   std::vector<int32_t>& match_right) {
+    if (filter_ == literal(true)) {
+      return Status::OK();
+    }
+    ARROW_DCHECK_EQ(match_left.size(), match_right.size());
+
+    ExecBatch concatenated({}, match_left.size());
+
+    ARROW_ASSIGN_OR_RAISE(ExecBatch left_key, 
local_state.exec_batch_keys.Decode(
+                                                  match_left.size(), 
match_left.data()));
+    ARROW_ASSIGN_OR_RAISE(
+        ExecBatch right_key,
+        hash_table_keys_.Decode(match_right.size(), match_right.data()));
+
+    ExecBatch left_payload;
+    if (!schema_mgr_->LeftPayloadIsEmpty()) {
+      ARROW_ASSIGN_OR_RAISE(left_payload, 
local_state.exec_batch_payloads.Decode(
+                                              match_left.size(), 
match_left.data()));
+    }
+
+    ExecBatch right_payload;
+    if (!schema_mgr_->RightPayloadIsEmpty()) {
+      ARROW_ASSIGN_OR_RAISE(right_payload, hash_table_payloads_.Decode(
+                                               match_right.size(), 
match_right.data()));
+    }
+
+    auto AppendFields = [&concatenated](const SchemaProjectionMap& to_key,
+                                        const SchemaProjectionMap& to_pay,
+                                        const ExecBatch& key, const ExecBatch& 
payload) {
+      ARROW_DCHECK(to_key.num_cols == to_pay.num_cols);
+      for (int i = 0; i < to_key.num_cols; i++) {
+        if (to_key.get(i) != SchemaProjectionMap::kMissingField) {
+          int key_idx = to_key.get(i);
+          concatenated.values.push_back(key.values[key_idx]);
+        } else if (to_pay.get(i) != SchemaProjectionMap::kMissingField) {
+          int pay_idx = to_pay.get(i);
+          concatenated.values.push_back(payload.values[pay_idx]);
+        }
+      }
+    };
+
+    SchemaProjectionMap left_to_key = schema_mgr_->proj_maps[0].map(
+        HashJoinProjection::FILTER, HashJoinProjection::KEY);
+    SchemaProjectionMap left_to_pay = schema_mgr_->proj_maps[0].map(
+        HashJoinProjection::FILTER, HashJoinProjection::PAYLOAD);
+    SchemaProjectionMap right_to_key = schema_mgr_->proj_maps[1].map(
+        HashJoinProjection::FILTER, HashJoinProjection::KEY);
+    SchemaProjectionMap right_to_pay = schema_mgr_->proj_maps[1].map(
+        HashJoinProjection::FILTER, HashJoinProjection::PAYLOAD);
+
+    AppendFields(left_to_key, left_to_pay, left_key, left_payload);
+    AppendFields(right_to_key, right_to_pay, right_key, right_payload);
+
+    ARROW_ASSIGN_OR_RAISE(Datum mask,
+                          ExecuteScalarExpression(filter_, concatenated, 
ctx_));
+
+    size_t num_probed_rows = match.size() + no_match.size();
+    if (mask.is_scalar()) {
+      const auto& mask_scalar = mask.scalar_as<BooleanScalar>();
+      if (mask_scalar.is_valid && mask_scalar.value) {
+        // All rows passed, nothing left to do
+        return Status::OK();
+      } else {
+        // Nothing passed, no_match becomes everything
+        no_match.resize(num_probed_rows);
+        std::iota(no_match.begin(), no_match.end(), 0);
+        match_left.clear();
+        match_right.clear();
+        match.clear();
+        return Status::OK();
+      }
+    }
+    ARROW_DCHECK(mask.array()->offset == 0);

Review comment:
       ```suggestion
       ARROW_DCHECK_EQ(mask.array()->offset, 0);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to