pitrou commented on code in PR #34392:
URL: https://github.com/apache/arrow/pull/34392#discussion_r1196711606
##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -239,24 +344,45 @@ struct MemoStore {
// the time of the current entry, defaulting to 0.
// when entries with a time less than T are removed, the current time is
updated to the
// time of the next (by-time) and now-current entry or to T if no such entry
exists.
- OnType current_time_;
+ std::atomic<OnType> current_time_;
// current entry per key
std::unordered_map<ByType, Entry> entries_;
// future entries per key
std::unordered_map<ByType, std::queue<Entry>> future_entries_;
// current and future (distinct) times of existing entries
std::deque<OnType> times_;
+#ifndef NDEBUG
+ // Owning node
+ AsofJoinNode* node_;
+ // Index of owning input
+ size_t index_;
+#endif
void swap(MemoStore& memo) {
+#ifndef NDEBUG
+ std::swap(node_, memo.node_);
+ std::swap(index_, memo.index_);
+#endif
std::swap(no_future_, memo.no_future_);
- std::swap(current_time_, memo.current_time_);
+ current_time_ =
memo.current_time_.exchange(static_cast<OnType>(current_time_));
entries_.swap(memo.entries_);
future_entries_.swap(memo.future_entries_);
times_.swap(memo.times_);
}
+ bool UpdateTime(OnType ts) {
+ OnType prev_time = current_time_;
+ bool update = prev_time < ts;
+ while (prev_time < ts && current_time_.compare_exchange_weak(prev_time,
ts)) {
+ // intentionally empty - standard CAS loop
Review Comment:
I'm not sure I understand. A standard CAS loop would retry when the CAS
operation fails, right? Here, it retries when the CAS succeeds...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]