icexelloss commented on code in PR #13028:
URL: https://github.com/apache/arrow/pull/13028#discussion_r902952561


##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -0,0 +1,806 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include <set>
+#include <unordered_map>
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/util/optional.h>
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/schema_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/make_unique.h"
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+
+namespace arrow {
+namespace compute {
+
+// Remove this when multiple keys and/or types is supported
+typedef int32_t KeyType;
+
+// Maximum number of tables that can be joined
+#define MAX_JOIN_TABLES 64
+typedef uint64_t row_index_t;
+typedef int col_index_t;
+
+/**
+ * Simple implementation for an unbound concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    auto item = queue_.front();
+    queue_.pop();
+    return item;
+  }
+
+  void Push(const T& item) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    queue_.push(item);
+    cond_.notify_one();
+  }
+
+  util::optional<T> TryPop() {
+    // Try to pop the oldest value from the queue (or return nullopt if none)
+    std::unique_lock<std::mutex> lock(mutex_);
+    if (queue_.empty()) {
+      return util::nullopt;
+    } else {
+      auto item = queue_.front();
+      queue_.pop();
+      return item;
+    }
+  }
+
+  bool Empty() const {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return queue_.empty();
+  }
+
+  // Un-synchronized access to front
+  // For this to be "safe":
+  // 1) the caller logically guarantees that queue is not empty
+  // 2) pop/try_pop cannot be called concurrently with this
+  const T& UnsyncFront() const { return queue_.front(); }
+
+ private:
+  std::queue<T> queue_;
+  mutable std::mutex mutex_;
+  std::condition_variable cond_;
+};
+
+struct MemoStore {
+  // Stores last known values for all the keys
+
+  struct Entry {
+    // Timestamp associated with the entry
+    int64_t _time;
+
+    // Batch associated with the entry (perf is probably OK for this; batches 
change
+    // rarely)
+    std::shared_ptr<arrow::RecordBatch> _batch;
+
+    // Row associated with the entry
+    row_index_t _row;
+  };
+
+  std::unordered_map<KeyType, Entry> _entries;
+
+  void Store(const std::shared_ptr<RecordBatch>& batch, row_index_t row, 
int64_t time,
+             KeyType key) {
+    auto& e = _entries[key];
+    // that we can do this assignment optionally, is why we
+    // can get array with using shared_ptr above (the batch
+    // shouldn't change that often)
+    if (e._batch != batch) e._batch = batch;
+    e._row = row;
+    e._time = time;
+  }
+
+  util::optional<const Entry*> GetEntryForKey(KeyType key) const {
+    auto e = _entries.find(key);
+    if (_entries.end() == e) return util::nullopt;
+    return util::optional<const Entry*>(&e->second);
+  }
+
+  void RemoveEntriesWithLesserTime(int64_t ts) {
+    size_t dbg_size0 = _entries.size();
+    for (auto e = _entries.begin(); e != _entries.end();)
+      if (e->second._time < ts)
+        e = _entries.erase(e);
+      else
+        ++e;
+    size_t dbg_size1 = _entries.size();
+    if (dbg_size1 < dbg_size0) {
+      // cerr << "Removed " << dbg_size0-dbg_size1 << " memo entries.\n";
+    }
+  }
+};
+
+class InputState {
+  // InputState correponds to an input
+  // Input record batches are queued up in InputState until processed and
+  // turned into output record batches.
+
+ public:
+  InputState(const std::shared_ptr<arrow::Schema>& schema,
+             const std::string& time_col_name, const std::string& key_col_name)
+      : queue_(),
+        schema_(schema),
+        time_col_index_(
+            schema->GetFieldIndex(time_col_name)),  // TODO: handle missing 
field name

Review Comment:
   I removed this and added logic to handling missing field in 
`MakeOutputSchema`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to