rtpsw commented on code in PR #13880:
URL: https://github.com/apache/arrow/pull/13880#discussion_r954265680


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -137,18 +173,88 @@ struct MemoStore {
   }
 };
 
+// a specialized higher-performance variation of Hashing64 logic from 
hash_join_node
+// the code here avoids recreating objects that are independent of each batch 
processed
+class KeyHasher {
+  static constexpr int kMiniBatchLength = util::MiniBatch::kMiniBatchLength;
+
+ public:
+  explicit KeyHasher(const vec_col_index_t& indices)
+      : indices_(indices),
+        metadata_(indices.size()),
+        batch_(NULLPTR),
+        hashes_(),
+        ctx_(),
+        column_arrays_(),
+        stack_() {
+    ctx_.stack = &stack_;
+    column_arrays_.resize(indices.size());
+  }
+
+  Status Init(ExecContext* exec_context, const std::shared_ptr<arrow::Schema>& 
schema) {
+    ctx_.hardware_flags = exec_context->cpu_info()->hardware_flags();
+    const auto& fields = schema->fields();
+    for (size_t k = 0; k < metadata_.size(); k++) {
+      ARROW_ASSIGN_OR_RAISE(metadata_[k],
+                            
ColumnMetadataFromDataType(fields[indices_[k]]->type()));
+    }
+    return stack_.Init(exec_context->memory_pool(),
+                       4 * kMiniBatchLength * sizeof(uint32_t));
+  }
+
+  const std::vector<HashType>& HashesFor(const RecordBatch* batch) {
+    if (batch_ == batch) {
+      return hashes_;
+    }
+    batch_ = NULLPTR;  // invalidate cached hashes for batch
+    size_t batch_length = batch->num_rows();
+    hashes_.resize(batch_length);
+    for (int64_t i = 0; i < static_cast<int64_t>(batch_length); i += 
kMiniBatchLength) {
+      int64_t length = std::min(static_cast<int64_t>(batch_length - i),
+                                static_cast<int64_t>(kMiniBatchLength));
+      for (size_t k = 0; k < indices_.size(); k++) {
+        auto array_data = batch->column_data(indices_[k]);
+        column_arrays_[k] =
+            ColumnArrayFromArrayDataAndMetadata(array_data, metadata_[k], i, 
length);
+      }
+      Hashing64::HashMultiColumn(column_arrays_, &ctx_, hashes_.data() + i);
+    }
+    batch_ = batch;
+    return hashes_;
+  }
+
+ private:
+  vec_col_index_t indices_;
+  std::vector<KeyColumnMetadata> metadata_;
+  const RecordBatch* batch_;
+  std::vector<HashType> hashes_;
+  LightContext ctx_;
+  std::vector<KeyColumnArray> column_arrays_;
+  util::TempVectorStack stack_;
+};
+
 class InputState {
   // InputState correponds to an input
   // Input record batches are queued up in InputState until processed and
   // turned into output record batches.
 
  public:
-  InputState(const std::shared_ptr<arrow::Schema>& schema,
-             const std::string& time_col_name, const std::string& key_col_name)
+  InputState(bool must_hash, bool nullable_by_key, KeyHasher* key_hasher,
+             const std::shared_ptr<arrow::Schema>& schema,
+             const col_index_t time_col_index, const vec_col_index_t& 
key_col_index)
       : queue_(),
         schema_(schema),
-        time_col_index_(schema->GetFieldIndex(time_col_name)),
-        key_col_index_(schema->GetFieldIndex(key_col_name)) {}
+        time_col_index_(time_col_index),
+        key_col_index_(key_col_index),
+        time_type_id_(schema_->fields()[time_col_index_]->type()->id()),
+        key_type_id_(schema_->num_fields()),
+        key_hasher_(key_hasher),
+        must_hash_(must_hash),

Review Comment:
   See [docs for these 
fields](https://github.com/apache/arrow/pull/13880/files#diff-5789a42aebc3bd0f5a6db687c78ab0c3eef3e316982f99fe9e0ea21fabed354cR456-R463).
 Basically, the type id fields are used for faster access to the key values and 
the hash fields are used when hashing path (rather than the [faster  
](https://github.com/apache/arrow/pull/13880/files#diff-5789a42aebc3bd0f5a6db687c78ab0c3eef3e316982f99fe9e0ea21fabed354cR306)
 primitive-type 
[path](https://github.com/apache/arrow/pull/13880/files#diff-5789a42aebc3bd0f5a6db687c78ab0c3eef3e316982f99fe9e0ea21fabed354cR328))
 is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to