icexelloss commented on code in PR #13880:
URL: https://github.com/apache/arrow/pull/13880#discussion_r954258192


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -137,18 +173,88 @@ struct MemoStore {
   }
 };
 
+// a specialized higher-performance variation of Hashing64 logic from 
hash_join_node
+// the code here avoids recreating objects that are independent of each batch 
processed
+class KeyHasher {
+  static constexpr int kMiniBatchLength = util::MiniBatch::kMiniBatchLength;
+
+ public:
+  explicit KeyHasher(const vec_col_index_t& indices)
+      : indices_(indices),
+        metadata_(indices.size()),
+        batch_(NULLPTR),
+        hashes_(),
+        ctx_(),
+        column_arrays_(),
+        stack_() {
+    ctx_.stack = &stack_;
+    column_arrays_.resize(indices.size());
+  }
+
+  Status Init(ExecContext* exec_context, const std::shared_ptr<arrow::Schema>& 
schema) {
+    ctx_.hardware_flags = exec_context->cpu_info()->hardware_flags();
+    const auto& fields = schema->fields();
+    for (size_t k = 0; k < metadata_.size(); k++) {
+      ARROW_ASSIGN_OR_RAISE(metadata_[k],
+                            
ColumnMetadataFromDataType(fields[indices_[k]]->type()));
+    }
+    return stack_.Init(exec_context->memory_pool(),
+                       4 * kMiniBatchLength * sizeof(uint32_t));
+  }
+
+  const std::vector<HashType>& HashesFor(const RecordBatch* batch) {
+    if (batch_ == batch) {
+      return hashes_;
+    }
+    batch_ = NULLPTR;  // invalidate cached hashes for batch
+    size_t batch_length = batch->num_rows();
+    hashes_.resize(batch_length);
+    for (int64_t i = 0; i < static_cast<int64_t>(batch_length); i += 
kMiniBatchLength) {
+      int64_t length = std::min(static_cast<int64_t>(batch_length - i),
+                                static_cast<int64_t>(kMiniBatchLength));
+      for (size_t k = 0; k < indices_.size(); k++) {
+        auto array_data = batch->column_data(indices_[k]);
+        column_arrays_[k] =
+            ColumnArrayFromArrayDataAndMetadata(array_data, metadata_[k], i, 
length);
+      }
+      Hashing64::HashMultiColumn(column_arrays_, &ctx_, hashes_.data() + i);
+    }
+    batch_ = batch;
+    return hashes_;
+  }
+
+ private:
+  vec_col_index_t indices_;
+  std::vector<KeyColumnMetadata> metadata_;
+  const RecordBatch* batch_;
+  std::vector<HashType> hashes_;
+  LightContext ctx_;
+  std::vector<KeyColumnArray> column_arrays_;
+  util::TempVectorStack stack_;
+};
+
 class InputState {
   // InputState correponds to an input
   // Input record batches are queued up in InputState until processed and
   // turned into output record batches.
 
  public:
-  InputState(const std::shared_ptr<arrow::Schema>& schema,
-             const std::string& time_col_name, const std::string& key_col_name)
+  InputState(bool must_hash, bool nullable_by_key, KeyHasher* key_hasher,
+             const std::shared_ptr<arrow::Schema>& schema,
+             const col_index_t time_col_index, const vec_col_index_t& 
key_col_index)
       : queue_(),
         schema_(schema),
-        time_col_index_(schema->GetFieldIndex(time_col_name)),
-        key_col_index_(schema->GetFieldIndex(key_col_name)) {}
+        time_col_index_(time_col_index),
+        key_col_index_(key_col_index),
+        time_type_id_(schema_->fields()[time_col_index_]->type()->id()),
+        key_type_id_(schema_->num_fields()),
+        key_hasher_(key_hasher),
+        must_hash_(must_hash),

Review Comment:
   What is this used for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to