save-buffer commented on a change in pull request #11579:
URL: https://github.com/apache/arrow/pull/11579#discussion_r748785868



##########
File path: cpp/src/arrow/compute/exec/hash_join.cc
##########
@@ -282,6 +279,125 @@ class HashJoinBasicImpl : public HashJoinImpl {
     num_batches_produced_++;
   }
 
+  Status ProbeBatch_ResidualFilter(ThreadLocalState& local_state,
+                                   std::vector<int32_t>& match,
+                                   std::vector<int32_t>& no_match,
+                                   std::vector<int32_t>& match_left,
+                                   std::vector<int32_t>& match_right) {
+    if (filter_ == literal(true)) {
+      return Status::OK();
+    }
+    ARROW_DCHECK_EQ(match_left.size(), match_right.size());
+
+    ExecBatch concatenated({}, match_left.size());
+
+    ARROW_ASSIGN_OR_RAISE(ExecBatch left_key, 
local_state.exec_batch_keys.Decode(
+                                                  match_left.size(), 
match_left.data()));
+    ARROW_ASSIGN_OR_RAISE(
+        ExecBatch right_key,
+        hash_table_keys_.Decode(match_right.size(), match_right.data()));
+
+    ExecBatch left_payload;
+    if (!schema_mgr_->LeftPayloadIsEmpty()) {
+      ARROW_ASSIGN_OR_RAISE(left_payload, 
local_state.exec_batch_payloads.Decode(
+                                              match_left.size(), 
match_left.data()));
+    }
+
+    ExecBatch right_payload;
+    if (!schema_mgr_->RightPayloadIsEmpty()) {
+      ARROW_ASSIGN_OR_RAISE(right_payload, hash_table_payloads_.Decode(
+                                               match_right.size(), 
match_right.data()));
+    }
+
+    auto AppendFields = [&concatenated](const SchemaProjectionMap& to_key,
+                                        const SchemaProjectionMap& to_pay,
+                                        const ExecBatch& key, const ExecBatch& 
payload) {
+      ARROW_DCHECK(to_key.num_cols == to_pay.num_cols);
+      for (int i = 0; i < to_key.num_cols; i++) {
+        if (to_key.get(i) != SchemaProjectionMap::kMissingField) {
+          int key_idx = to_key.get(i);
+          concatenated.values.push_back(key.values[key_idx]);
+        } else if (to_pay.get(i) != SchemaProjectionMap::kMissingField) {
+          int pay_idx = to_pay.get(i);
+          concatenated.values.push_back(payload.values[pay_idx]);
+        }
+      }
+    };
+
+    SchemaProjectionMap left_to_key = schema_mgr_->proj_maps[0].map(
+        HashJoinProjection::FILTER, HashJoinProjection::KEY);
+    SchemaProjectionMap left_to_pay = schema_mgr_->proj_maps[0].map(
+        HashJoinProjection::FILTER, HashJoinProjection::PAYLOAD);
+    SchemaProjectionMap right_to_key = schema_mgr_->proj_maps[1].map(
+        HashJoinProjection::FILTER, HashJoinProjection::KEY);
+    SchemaProjectionMap right_to_pay = schema_mgr_->proj_maps[1].map(
+        HashJoinProjection::FILTER, HashJoinProjection::PAYLOAD);
+
+    AppendFields(left_to_key, left_to_pay, left_key, left_payload);
+    AppendFields(right_to_key, right_to_pay, right_key, right_payload);
+
+    ARROW_ASSIGN_OR_RAISE(Datum mask,
+                          ExecuteScalarExpression(filter_, concatenated, 
ctx_));
+
+    size_t num_probed_rows = match.size() + no_match.size();
+    if (mask.is_scalar()) {
+      const auto& mask_scalar = mask.scalar_as<BooleanScalar>();

Review comment:
       Added a test to cover it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to