icexelloss commented on code in PR #34392:
URL: https://github.com/apache/arrow/pull/34392#discussion_r1211614943
##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -239,24 +344,48 @@ struct MemoStore {
// the time of the current entry, defaulting to 0.
// when entries with a time less than T are removed, the current time is
updated to the
// time of the next (by-time) and now-current entry or to T if no such entry
exists.
- OnType current_time_;
+ std::atomic<OnType> current_time_;
Review Comment:
Can you explain why do we change this to atomic.
##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -731,7 +898,9 @@ class InputState {
Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
if (rb->num_rows() > 0) {
- queue_.Push(rb);
+ key_hasher_->Invalidate(); // batch changed - invalidate key hasher's
cache
+ memo_.UpdateTime(GetTime(rb.get(), 0)); // time changed - update in
MemoStore
Review Comment:
Can you explain why do we add the UpdateTime here. How was memostore time
updated before?
##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -704,21 +846,27 @@ class InputState {
Rehash();
}
memo_.Store(ts, rb, latest_ref_row_, latest_time, GetLatestKey());
+ // negative tolerance means a last-known entry was stored - set
`updated` to `true`
updated = memo_.no_future_;
ARROW_ASSIGN_OR_RAISE(advanced, Advance());
} while (advanced);
- if (!memo_.no_future_) { // "updated" was not modified in the loop; set
it here
+ if (!memo_.no_future_ && latest_time >= ts) {
Review Comment:
Can you please add comments in the code to explain this. Looks like
something could tricky to understand just from the code.
##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -856,10 +1030,23 @@ class CompositeReferenceTable {
row.refs[0].row = lhs_latest_row;
AddRecordBatchRef(lhs_latest_batch);
+ DEBUG_SYNC(node_, "Emplace: key=", key, " lhs_latest_row=", lhs_latest_row,
+ " lhs_latest_time=", lhs_latest_time, DEBUG_MANIP(std::endl));
+
// Get the state for that key from all on the RHS -- assumes it's up to
date
// (the RHS state comes from the memoized row references)
for (size_t i = 1; i < in.size(); ++i) {
std::optional<const MemoStore::Entry*> opt_entry =
in[i]->GetMemoEntryForKey(key);
+#ifndef NDEBUG
+ {
+ bool has_entry = opt_entry.has_value();
+ OnType entry_time = has_entry ? (*opt_entry)->time :
TolType::kMinValue;
+ row_index_t entry_row = has_entry ? (*opt_entry)->row : 0;
+ bool accepted = has_entry && tolerance.Accepts(lhs_latest_time,
entry_time);
Review Comment:
What does "accepted" mean here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]