westonpace commented on code in PR #38380: URL: https://github.com/apache/arrow/pull/38380#discussion_r1373430790
########## cpp/src/arrow/acero/concurrent_queue.h: ########## @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <mutex> +#include <queue> +#include "arrow/acero/backpressure_handler.h" + +namespace arrow::acero { + +/** + * Simple implementation for an unbound concurrent queue Review Comment: ```suggestion * Simple implementation for a thread safe blocking unbound multi-consumer / multi-producer concurrent queue ``` ########## cpp/src/arrow/acero/concurrent_queue.h: ########## @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <mutex> +#include <queue> +#include "arrow/acero/backpressure_handler.h" + +namespace arrow::acero { + +/** + * Simple implementation for an unbound concurrent queue + */ +template <class T> +class ConcurrentQueue { + public: + T Pop() { + std::unique_lock<std::mutex> lock(mutex_); + cond_.wait(lock, [&] { return !queue_.empty(); }); + return PopUnlocked(); + } + + T PopUnlocked() { + auto item = queue_.front(); + queue_.pop(); + return item; + } + + void Push(const T& item) { + std::unique_lock<std::mutex> lock(mutex_); + return PushUnlocked(item); + } + + void PushUnlocked(const T& item) { + queue_.push(item); + cond_.notify_one(); + } + + void Clear() { + std::unique_lock<std::mutex> lock(mutex_); + ClearUnlocked(); + } + + void ClearUnlocked() { queue_ = std::queue<T>(); } + + std::optional<T> TryPop() { + std::unique_lock<std::mutex> lock(mutex_); + return TryPopUnlocked(); + } + + std::optional<T> TryPopUnlocked() { + // Try to pop the oldest value from the queue (or return nullopt if none) + if (queue_.empty()) { + return std::nullopt; + } else { + auto item = queue_.front(); Review Comment: Do we need a `std::move` somewhere? Are we assuming this queue will only be used for trivially copyable values? ########## cpp/src/arrow/acero/concurrent_queue.h: ########## @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <mutex> +#include <queue> +#include "arrow/acero/backpressure_handler.h" + +namespace arrow::acero { + +/** + * Simple implementation for an unbound concurrent queue + */ +template <class T> +class ConcurrentQueue { + public: + T Pop() { + std::unique_lock<std::mutex> lock(mutex_); + cond_.wait(lock, [&] { return !queue_.empty(); }); + return PopUnlocked(); + } + + T PopUnlocked() { + auto item = queue_.front(); + queue_.pop(); + return item; + } + + void Push(const T& item) { + std::unique_lock<std::mutex> lock(mutex_); + return PushUnlocked(item); + } + + void PushUnlocked(const T& item) { + queue_.push(item); + cond_.notify_one(); + } + + void Clear() { + std::unique_lock<std::mutex> lock(mutex_); + ClearUnlocked(); + } + + void ClearUnlocked() { queue_ = std::queue<T>(); } + + std::optional<T> TryPop() { + std::unique_lock<std::mutex> lock(mutex_); + return TryPopUnlocked(); + } + + std::optional<T> TryPopUnlocked() { + // Try to pop the oldest value from the queue (or return nullopt if none) + if (queue_.empty()) { + return std::nullopt; + } else { + auto item = queue_.front(); + queue_.pop(); + return item; + } + } + + bool Empty() const { + std::unique_lock<std::mutex> lock(mutex_); + return queue_.empty(); + } + + // Un-synchronized access to front + // For this to be "safe": + // 1) the caller logically guarantees that queue is not empty + // 2) pop/try_pop cannot be called concurrently with this + const T& UnsyncFront() const { return queue_.front(); } + + size_t UnsyncSize() const { return queue_.size(); } + + protected: + std::mutex& GetMutex() { return mutex_; } + + private: + std::queue<T> queue_; + mutable std::mutex mutex_; + std::condition_variable cond_; +}; + +template <typename T> +class BackpressureConcurrentQueue : public ConcurrentQueue<T> { + private: + struct DoHandle { + explicit DoHandle(BackpressureConcurrentQueue& queue) + : queue_(queue), start_size_(queue_.UnsyncSize()) {} + + ~DoHandle() { + size_t end_size = queue_.UnsyncSize(); Review Comment: Can we put a brief comment here that this is safe because `~DoHandle()` should always run while the lock is held? Also, is it simpler to just make `queue_` protected? ########## cpp/src/arrow/acero/time_series_util.cc: ########## @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/array/data.h" + +#include "arrow/acero/time_series_util.h" +#include "arrow/util/logging.h" + +namespace arrow::acero { + +// normalize the value to 64-bits while preserving ordering of values Review Comment: ```suggestion // normalize the value to unsigned 64-bits while preserving ordering of values ``` Normalizing to 64 bits is easy. The trick I think you are applying here is the fact that you are normalizing to an _unsigned_ value and so you have to fiddle with negative values so they compare correctly right? Also, we can probably remove this comment since you have the same comment in the header file. ########## cpp/src/arrow/acero/sorted_merge_node_test.cc: ########## @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> Review Comment: We generally try to avoid using the `api.h` headers internally as they are very large headers and it slows down compilation time. It's sometimes done in tests but even then it is nice if we can avoid it. ########## cpp/src/arrow/acero/sorted_merge_node.cc: ########## @@ -0,0 +1,606 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> Review Comment: ```suggestion ``` ########## cpp/src/arrow/acero/sorted_merge_node.cc: ########## @@ -0,0 +1,606 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <atomic> +#include <mutex> +#include <sstream> +#include <thread> +#include <tuple> +#include <unordered_map> +#include <vector> +#include "arrow/acero/concurrent_queue.h" +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/acero/query_context.h" +#include "arrow/acero/time_series_util.h" +#include "arrow/acero/unmaterialized_table.h" +#include "arrow/acero/util.h" +#include "arrow/array/builder_base.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace { +template <typename Callable> +struct Defer { + Callable callable; + explicit Defer(Callable callable_) : callable(std::move(callable_)) {} + ~Defer() noexcept { callable(); } +}; + +std::vector<std::string> GetInputLabels( + const arrow::acero::ExecNode::NodeVector& inputs) { + std::vector<std::string> labels(inputs.size()); + for (size_t i = 0; i < inputs.size(); i++) { + labels[i] = "input_" + std::to_string(i) + "_label"; + } + return labels; +} + +template <typename T, typename V = typename T::value_type> +inline typename T::const_iterator std_find(const T& container, const V& val) { + return std::find(container.begin(), container.end(), val); +} + +template <typename T, typename V = typename T::value_type> +inline bool std_has(const T& container, const V& val) { + return container.end() != std_find(container, val); +} + +} // namespace + +namespace arrow::acero { + +namespace sorted_merge { + +// Each slice is associated with a single input source, so we only need 1 record +// batch per slice +using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>; +using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>; Review Comment: Hmm...I thought the template parameter `<1>` was the # of inputs and not the # of batches. I could very easily be misunderstanding though. ########## cpp/src/arrow/acero/sorted_merge_node.cc: ########## @@ -0,0 +1,606 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <atomic> +#include <mutex> +#include <sstream> +#include <thread> +#include <tuple> +#include <unordered_map> +#include <vector> +#include "arrow/acero/concurrent_queue.h" +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/acero/query_context.h" +#include "arrow/acero/time_series_util.h" +#include "arrow/acero/unmaterialized_table.h" +#include "arrow/acero/util.h" +#include "arrow/array/builder_base.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace { +template <typename Callable> +struct Defer { + Callable callable; + explicit Defer(Callable callable_) : callable(std::move(callable_)) {} + ~Defer() noexcept { callable(); } +}; + +std::vector<std::string> GetInputLabels( + const arrow::acero::ExecNode::NodeVector& inputs) { + std::vector<std::string> labels(inputs.size()); + for (size_t i = 0; i < inputs.size(); i++) { + labels[i] = "input_" + std::to_string(i) + "_label"; + } + return labels; +} + +template <typename T, typename V = typename T::value_type> +inline typename T::const_iterator std_find(const T& container, const V& val) { + return std::find(container.begin(), container.end(), val); +} + +template <typename T, typename V = typename T::value_type> +inline bool std_has(const T& container, const V& val) { + return container.end() != std_find(container, val); +} + +} // namespace + +namespace arrow::acero { + +namespace sorted_merge { Review Comment: This could probably be an anonymous namespace? ########## cpp/build-support/lint_cpp_cli.py: ########## @@ -77,6 +77,7 @@ def lint_file(path): EXCLUSIONS = _paths('''\ + arrow/acero/concurrent_queue.h Review Comment: Generally we work around this by playing games (pimpl, virtual methods) to abstract details from the header files and keep `<mutex>` in the .cc files. However, that won't work since you're dealing with a templated class. There are two options I can think of. 1. Make the `concurrent_queue.h` header "internal". The lint check only applies to external headers which are those that are referenced (directly or transitively) in one of the `api.h` headers. At the moment I'm not even sure how this check failed since you are only referencing the header from `asof_join_node.cc` so this seems a simple enough fix. 2. You can use `src/arrow/util/mutex.h` instead of `<mutex>`. It adds a small dynamic function penalty but hides the details of the `<mutex>` header. Either way, I hope we can avoid listing this file here. ########## cpp/src/arrow/acero/concurrent_queue.h: ########## @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <mutex> +#include <queue> +#include "arrow/acero/backpressure_handler.h" + +namespace arrow::acero { + +/** + * Simple implementation for an unbound concurrent queue + */ +template <class T> +class ConcurrentQueue { + public: + T Pop() { + std::unique_lock<std::mutex> lock(mutex_); + cond_.wait(lock, [&] { return !queue_.empty(); }); + return PopUnlocked(); + } + + T PopUnlocked() { + auto item = queue_.front(); + queue_.pop(); + return item; + } + + void Push(const T& item) { + std::unique_lock<std::mutex> lock(mutex_); + return PushUnlocked(item); + } + + void PushUnlocked(const T& item) { + queue_.push(item); + cond_.notify_one(); + } + + void Clear() { + std::unique_lock<std::mutex> lock(mutex_); + ClearUnlocked(); + } + + void ClearUnlocked() { queue_ = std::queue<T>(); } + + std::optional<T> TryPop() { + std::unique_lock<std::mutex> lock(mutex_); + return TryPopUnlocked(); + } + + std::optional<T> TryPopUnlocked() { + // Try to pop the oldest value from the queue (or return nullopt if none) + if (queue_.empty()) { + return std::nullopt; + } else { + auto item = queue_.front(); + queue_.pop(); + return item; + } + } + + bool Empty() const { + std::unique_lock<std::mutex> lock(mutex_); + return queue_.empty(); + } + + // Un-synchronized access to front + // For this to be "safe": + // 1) the caller logically guarantees that queue is not empty + // 2) pop/try_pop cannot be called concurrently with this + const T& UnsyncFront() const { return queue_.front(); } + + size_t UnsyncSize() const { return queue_.size(); } Review Comment: Can these methods be protected or are they needed by the asof join node? ########## cpp/src/arrow/acero/concurrent_queue.h: ########## @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <mutex> +#include <queue> +#include "arrow/acero/backpressure_handler.h" + +namespace arrow::acero { + +/** + * Simple implementation for an unbound concurrent queue + */ +template <class T> +class ConcurrentQueue { + public: + T Pop() { + std::unique_lock<std::mutex> lock(mutex_); + cond_.wait(lock, [&] { return !queue_.empty(); }); + return PopUnlocked(); + } + + T PopUnlocked() { Review Comment: Can we document these methods? Should the unlocked variants be public? ########## cpp/src/arrow/acero/time_series_util.h: ########## @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/record_batch.h" +#include "arrow/type_traits.h" + +namespace arrow::acero { + +// normalize the value to 64-bits while preserving ordering of values +template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true> +inline uint64_t NormalizeTime(T t); + +uint64_t GetTime(const RecordBatch* batch, Type::type time_type, int col, uint64_t row); Review Comment: Can you document this? ########## cpp/src/arrow/acero/exec_plan.cc: ########## @@ -18,6 +18,7 @@ #include "arrow/acero/exec_plan.h" #include <atomic> +#include <iostream> Review Comment: ```suggestion ``` ########## cpp/src/arrow/acero/sorted_merge_node_test.cc: ########## @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <gtest/gtest.h> + +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/map_node.h" +#include "arrow/acero/options.h" +#include "arrow/acero/test_nodes.h" +#include "arrow/compute/ordering.h" +#include "arrow/result.h" +#include "arrow/table.h" +#include "arrow/testing/generator.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" + +namespace arrow::acero { + +std::shared_ptr<Table> TestTable(int start, int step, int rows_per_batch, + int num_batches) { + return gen::Gen({{"timestamp", gen::Step(start, step, /*signed_int=*/true)}, + {"str", gen::Random(utf8())}}) + ->FailOnError() + ->Table(rows_per_batch, num_batches); +} + +void CheckMerging() { + auto table1 = TestTable( + /*start=*/0, + /*step=*/2, + /*rows_per_batch=*/2, + /*num_batches=*/3); + auto table2 = TestTable( + /*start=*/1, + /*step=*/2, + /*rows_per_batch=*/3, + /*num_batches=*/2); + auto table3 = TestTable( + /*start=*/3, + /*step=*/3, + /*rows_per_batch=*/6, + /*num_batches=*/1); + std::vector<Declaration::Input> src_decls; + src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table1))); + src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table2))); + src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table3))); + + auto ops = OrderByNodeOptions(compute::Ordering({compute::SortKey("timestamp")})); + + Declaration sorted_merge{"sorted_merge", src_decls, ops}; + // We can't use threads for sorted merging since it relies on + // ascending deterministic order of timestamps + ASSERT_OK_AND_ASSIGN(auto output, + DeclarationToTable(sorted_merge, /*use_threads=*/false)); + ASSERT_EQ(output->num_rows(), 18); + + Int32Builder expected_ts_builder; + for (auto i : {0, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8, 9, 9, 10, 11, 12, 15, 18}) { + ASSERT_OK(expected_ts_builder.Append(i)); + } + ASSERT_OK_AND_ASSIGN(auto expected_timestamps, expected_ts_builder.Finish()); + auto chunked_array = + std::make_shared<arrow::ChunkedArray>(std::move(expected_timestamps)); + ASSERT_TRUE(chunked_array->Equals(output->column(0))) + << chunked_array->ToString() << " " << output->column(0)->ToString(); +} + +TEST(FetchNode, Basic) { CheckMerging(); } Review Comment: Are you planning on adding more tests? Why the indirection here? ########## cpp/src/arrow/acero/time_series_util.h: ########## @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/record_batch.h" +#include "arrow/type_traits.h" + +namespace arrow::acero { Review Comment: This change doesn't seem strictly related to a sorted merge node? It's probably fine to do a bit of unrelated refactoring as part of this PR. I just want to verify my understanding. ########## cpp/src/arrow/acero/time_series_util.h: ########## @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/record_batch.h" +#include "arrow/type_traits.h" + +namespace arrow::acero { + +// normalize the value to 64-bits while preserving ordering of values Review Comment: ```suggestion // normalize the value to unsigned 64-bits while preserving ordering of values ``` ########## cpp/src/arrow/acero/time_series_util.h: ########## @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/record_batch.h" +#include "arrow/type_traits.h" + +namespace arrow::acero { + +// normalize the value to 64-bits while preserving ordering of values +template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true> +inline uint64_t NormalizeTime(T t); Review Comment: Inline without definition? ########## cpp/src/arrow/acero/unmaterialized_table.h: ########## @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <optional> +#include <vector> +#include "arrow/array/builder_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/memory_pool.h" +#include "arrow/record_batch.h" +#include "arrow/type_traits.h" +#include "arrow/util/logging.h" + +namespace arrow::acero { + +struct CompositeEntry { + RecordBatch* batch; + uint64_t start; + uint64_t end; +}; + +template <size_t MAX_COMPOSITE_TABLES> +struct UnmaterializedSlice { + // A slice is represented by a [start, end) range of rows in a collection of record + // batches, where end-start is the same length + + CompositeEntry components[MAX_COMPOSITE_TABLES]; + size_t num_components; + + inline int64_t Size() const { + if (num_components == 0) { + return 0; + } + return components[0].end - components[0].start; + } +}; + +/// A table of composite reference rows. Rows maintain pointers to the +/// constituent record batches, but the overall table retains shared_ptr +/// references to ensure memory remains resident while the table is live. +/// +/// The main reason for this is that, especially for wide tables, some operations +/// such as sorted_merge or asof_join are effectively row-oriented, rather than +/// column-oriented. Separating the join part from the columnar materialization +/// part simplifies the logic around data types and increases efficiency. +/// +/// We don't put the shared_ptr's into the rows for efficiency reasons, so the caller +/// must manually call addRecordBatchRef to maintain the lifetime of the stored +/// record batches. +template <size_t MAX_COMPOSITE_TABLES> +class UnmaterializedCompositeTable { Review Comment: I'm assuming this is just exported as-is from the asof join node? ########## cpp/src/arrow/acero/sorted_merge_node.cc: ########## @@ -0,0 +1,606 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <atomic> +#include <mutex> +#include <sstream> +#include <thread> +#include <tuple> +#include <unordered_map> +#include <vector> +#include "arrow/acero/concurrent_queue.h" +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/acero/query_context.h" +#include "arrow/acero/time_series_util.h" +#include "arrow/acero/unmaterialized_table.h" +#include "arrow/acero/util.h" +#include "arrow/array/builder_base.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace { +template <typename Callable> +struct Defer { + Callable callable; + explicit Defer(Callable callable_) : callable(std::move(callable_)) {} + ~Defer() noexcept { callable(); } +}; + +std::vector<std::string> GetInputLabels( + const arrow::acero::ExecNode::NodeVector& inputs) { + std::vector<std::string> labels(inputs.size()); + for (size_t i = 0; i < inputs.size(); i++) { + labels[i] = "input_" + std::to_string(i) + "_label"; + } + return labels; +} + +template <typename T, typename V = typename T::value_type> +inline typename T::const_iterator std_find(const T& container, const V& val) { + return std::find(container.begin(), container.end(), val); +} + +template <typename T, typename V = typename T::value_type> +inline bool std_has(const T& container, const V& val) { + return container.end() != std_find(container, val); +} Review Comment: You've put other common helper utilities in their own files to avoid duplication. Why not these? ########## cpp/src/arrow/acero/unmaterialized_table.h: ########## @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <optional> +#include <vector> +#include "arrow/array/builder_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/memory_pool.h" +#include "arrow/record_batch.h" +#include "arrow/type_traits.h" +#include "arrow/util/logging.h" + +namespace arrow::acero { + +struct CompositeEntry { + RecordBatch* batch; + uint64_t start; + uint64_t end; +}; + +template <size_t MAX_COMPOSITE_TABLES> +struct UnmaterializedSlice { Review Comment: Let's document what these are ########## cpp/src/arrow/acero/sorted_merge_node.cc: ########## @@ -0,0 +1,606 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <atomic> +#include <mutex> +#include <sstream> +#include <thread> +#include <tuple> +#include <unordered_map> +#include <vector> +#include "arrow/acero/concurrent_queue.h" +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/acero/query_context.h" +#include "arrow/acero/time_series_util.h" +#include "arrow/acero/unmaterialized_table.h" +#include "arrow/acero/util.h" +#include "arrow/array/builder_base.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace { +template <typename Callable> +struct Defer { + Callable callable; + explicit Defer(Callable callable_) : callable(std::move(callable_)) {} + ~Defer() noexcept { callable(); } +}; + +std::vector<std::string> GetInputLabels( + const arrow::acero::ExecNode::NodeVector& inputs) { + std::vector<std::string> labels(inputs.size()); + for (size_t i = 0; i < inputs.size(); i++) { + labels[i] = "input_" + std::to_string(i) + "_label"; + } + return labels; +} + +template <typename T, typename V = typename T::value_type> +inline typename T::const_iterator std_find(const T& container, const V& val) { + return std::find(container.begin(), container.end(), val); +} + +template <typename T, typename V = typename T::value_type> +inline bool std_has(const T& container, const V& val) { + return container.end() != std_find(container, val); +} + +} // namespace + +namespace arrow::acero { + +namespace sorted_merge { + +// Each slice is associated with a single input source, so we only need 1 record +// batch per slice +using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>; +using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>; + +using row_index_t = uint64_t; +using time_unit_t = uint64_t; +using col_index_t = int; + +#define NEW_TASK true +#define POISON_PILL false + +class BackpressureController : public BackpressureControl { + public: + BackpressureController(ExecNode* node, ExecNode* output, + std::atomic<int32_t>& backpressure_counter) + : node_(node), output_(output), backpressure_counter_(backpressure_counter) {} + + void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); } + void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); } + + private: + ExecNode* node_; + ExecNode* output_; + std::atomic<int32_t>& backpressure_counter_; +}; + +/// InputState correponds to an input. Input record batches are queued up in InputState +/// until processed and turned into output record batches. +class InputState { + public: + InputState(size_t index, BackpressureHandler handler, + const std::shared_ptr<arrow::Schema>& schema, const int time_col_index) + : index_(index), + queue_(std::move(handler)), + schema_(schema), + time_col_index_(time_col_index), + time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {} + + template <typename PtrType> + static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input, + arrow::acero::ExecNode* output, + std::atomic<int32_t>& backpressure_counter, + const std::shared_ptr<arrow::Schema>& schema, + const col_index_t time_col_index) { + constexpr size_t low_threshold = 4, high_threshold = 8; + std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control = + std::make_unique<BackpressureController>(input, output, backpressure_counter); + ARROW_ASSIGN_OR_RAISE(auto handler, + BackpressureHandler::Make(input, low_threshold, high_threshold, + std::move(backpressure_control))); + return PtrType(new InputState(index, std::move(handler), schema, time_col_index)); + } + + bool IsTimeColumn(col_index_t i) const { + DCHECK_LT(i, schema_->num_fields()); + return (i == time_col_index_); + } + + // Gets the latest row index, assuming the queue isn't empty + row_index_t GetLatestRow() const { return latest_ref_row_; } + + bool Empty() const { + // cannot be empty if ref row is >0 -- can avoid slow queue lock + // below + if (latest_ref_row_ > 0) { + return false; + } + return queue_.Empty(); + } + + size_t index() const { return index_; } + + int total_batches() const { return total_batches_; } + + // Gets latest batch (precondition: must not be empty) + const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const { + return queue_.UnsyncFront(); + } + +#define LATEST_VAL_CASE(id, val) \ + case arrow::Type::id: { \ + using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \ + using CType = typename arrow::TypeTraits<T>::CType; \ + return val(data->GetValues<CType>(1)[row]); \ + } + + inline time_unit_t GetLatestTime() const { + return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_, + latest_ref_row_); + } + +#undef LATEST_VAL_CASE + + bool Finished() const { return batches_processed_ == total_batches_; } + + arrow::Result<std::pair<UnmaterializedSlice, std::shared_ptr<arrow::RecordBatch>>> + Advance() { + // Advance the row until a new time is encountered or the record batch + // ends. This will return a range of {-1, -1} and a nullptr if there is + // no input Review Comment: Time? We're not dealing with time here right? ########## cpp/src/arrow/acero/sorted_merge_node.cc: ########## @@ -0,0 +1,606 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <atomic> +#include <mutex> +#include <sstream> +#include <thread> +#include <tuple> +#include <unordered_map> +#include <vector> +#include "arrow/acero/concurrent_queue.h" +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/acero/query_context.h" +#include "arrow/acero/time_series_util.h" +#include "arrow/acero/unmaterialized_table.h" +#include "arrow/acero/util.h" +#include "arrow/array/builder_base.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace { +template <typename Callable> +struct Defer { + Callable callable; + explicit Defer(Callable callable_) : callable(std::move(callable_)) {} + ~Defer() noexcept { callable(); } +}; + +std::vector<std::string> GetInputLabels( + const arrow::acero::ExecNode::NodeVector& inputs) { + std::vector<std::string> labels(inputs.size()); + for (size_t i = 0; i < inputs.size(); i++) { + labels[i] = "input_" + std::to_string(i) + "_label"; + } + return labels; +} + +template <typename T, typename V = typename T::value_type> +inline typename T::const_iterator std_find(const T& container, const V& val) { + return std::find(container.begin(), container.end(), val); +} + +template <typename T, typename V = typename T::value_type> +inline bool std_has(const T& container, const V& val) { + return container.end() != std_find(container, val); +} + +} // namespace + +namespace arrow::acero { + +namespace sorted_merge { + +// Each slice is associated with a single input source, so we only need 1 record +// batch per slice +using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>; +using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>; + +using row_index_t = uint64_t; +using time_unit_t = uint64_t; +using col_index_t = int; + +#define NEW_TASK true +#define POISON_PILL false Review Comment: Can we use constants instead of macros? Also, then we can use `kNewTask` and `kPoisonPill`. ########## cpp/src/arrow/acero/sorted_merge_node.cc: ########## @@ -0,0 +1,606 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <atomic> +#include <mutex> +#include <sstream> +#include <thread> +#include <tuple> +#include <unordered_map> +#include <vector> +#include "arrow/acero/concurrent_queue.h" +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/acero/query_context.h" +#include "arrow/acero/time_series_util.h" +#include "arrow/acero/unmaterialized_table.h" +#include "arrow/acero/util.h" +#include "arrow/array/builder_base.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace { +template <typename Callable> +struct Defer { + Callable callable; + explicit Defer(Callable callable_) : callable(std::move(callable_)) {} + ~Defer() noexcept { callable(); } +}; + +std::vector<std::string> GetInputLabels( + const arrow::acero::ExecNode::NodeVector& inputs) { + std::vector<std::string> labels(inputs.size()); + for (size_t i = 0; i < inputs.size(); i++) { + labels[i] = "input_" + std::to_string(i) + "_label"; + } + return labels; +} + +template <typename T, typename V = typename T::value_type> +inline typename T::const_iterator std_find(const T& container, const V& val) { + return std::find(container.begin(), container.end(), val); +} + +template <typename T, typename V = typename T::value_type> +inline bool std_has(const T& container, const V& val) { + return container.end() != std_find(container, val); +} + +} // namespace + +namespace arrow::acero { + +namespace sorted_merge { + +// Each slice is associated with a single input source, so we only need 1 record +// batch per slice +using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>; +using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>; + +using row_index_t = uint64_t; +using time_unit_t = uint64_t; +using col_index_t = int; + +#define NEW_TASK true +#define POISON_PILL false + +class BackpressureController : public BackpressureControl { + public: + BackpressureController(ExecNode* node, ExecNode* output, + std::atomic<int32_t>& backpressure_counter) + : node_(node), output_(output), backpressure_counter_(backpressure_counter) {} + + void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); } + void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); } + + private: + ExecNode* node_; + ExecNode* output_; + std::atomic<int32_t>& backpressure_counter_; +}; + +/// InputState correponds to an input. Input record batches are queued up in InputState +/// until processed and turned into output record batches. +class InputState { + public: + InputState(size_t index, BackpressureHandler handler, + const std::shared_ptr<arrow::Schema>& schema, const int time_col_index) + : index_(index), + queue_(std::move(handler)), + schema_(schema), + time_col_index_(time_col_index), + time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {} + + template <typename PtrType> + static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input, + arrow::acero::ExecNode* output, + std::atomic<int32_t>& backpressure_counter, + const std::shared_ptr<arrow::Schema>& schema, + const col_index_t time_col_index) { + constexpr size_t low_threshold = 4, high_threshold = 8; + std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control = + std::make_unique<BackpressureController>(input, output, backpressure_counter); + ARROW_ASSIGN_OR_RAISE(auto handler, + BackpressureHandler::Make(input, low_threshold, high_threshold, + std::move(backpressure_control))); + return PtrType(new InputState(index, std::move(handler), schema, time_col_index)); + } + + bool IsTimeColumn(col_index_t i) const { + DCHECK_LT(i, schema_->num_fields()); + return (i == time_col_index_); + } + + // Gets the latest row index, assuming the queue isn't empty + row_index_t GetLatestRow() const { return latest_ref_row_; } + + bool Empty() const { + // cannot be empty if ref row is >0 -- can avoid slow queue lock + // below + if (latest_ref_row_ > 0) { + return false; + } + return queue_.Empty(); + } + + size_t index() const { return index_; } + + int total_batches() const { return total_batches_; } + + // Gets latest batch (precondition: must not be empty) + const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const { + return queue_.UnsyncFront(); + } + +#define LATEST_VAL_CASE(id, val) \ + case arrow::Type::id: { \ + using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \ + using CType = typename arrow::TypeTraits<T>::CType; \ + return val(data->GetValues<CType>(1)[row]); \ + } + + inline time_unit_t GetLatestTime() const { + return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_, + latest_ref_row_); + } + +#undef LATEST_VAL_CASE + + bool Finished() const { return batches_processed_ == total_batches_; } + + arrow::Result<std::pair<UnmaterializedSlice, std::shared_ptr<arrow::RecordBatch>>> + Advance() { + // Advance the row until a new time is encountered or the record batch + // ends. This will return a range of {-1, -1} and a nullptr if there is + // no input + + bool active = + (latest_ref_row_ > 0 /*short circuit the lock on the queue*/) || !queue_.Empty(); + + if (!active) { + return std::make_pair(UnmaterializedSlice(), nullptr); + } + + row_index_t start = latest_ref_row_; + row_index_t end = latest_ref_row_; + time_unit_t startTime = GetLatestTime(); + std::shared_ptr<arrow::RecordBatch> batch = queue_.UnsyncFront(); + auto rows_in_batch = (row_index_t)batch->num_rows(); + + while (GetLatestTime() == startTime) { + end = ++latest_ref_row_; + if (latest_ref_row_ >= rows_in_batch) { + // hit the end of the batch, need to get the next batch if + // possible. + ++batches_processed_; + latest_ref_row_ = 0; + active &= !queue_.TryPop(); + if (active) { + DCHECK_GT(queue_.UnsyncFront()->num_rows(), + 0); // empty batches disallowed, sanity check + } + break; + } + } + + UnmaterializedSlice slice; + slice.num_components = 1; + slice.components[0] = CompositeEntry{batch.get(), start, end}; + return std::make_pair(slice, batch); + } + + arrow::Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) { + if (rb->num_rows() > 0) { + queue_.Push(rb); + } else { + ++batches_processed_; // don't enqueue empty batches, just record + // as processed + } + return arrow::Status::OK(); + } + + const std::shared_ptr<arrow::Schema>& get_schema() const { return schema_; } + + void set_total_batches(int n) { + DCHECK_GE(n, 0); + DCHECK_EQ(total_batches_, -1) << "Set total batch more than once"; + total_batches_ = n; + } + + private: + size_t index_; + // Pending record batches. The latest is the front. Batches cannot be empty. + BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch>> queue_; + // Schema associated with the input + std::shared_ptr<arrow::Schema> schema_; + // Total number of batches (only int because InputFinished uses int) + std::atomic<int> total_batches_{-1}; + // Number of batches processed so far (only int because InputFinished uses + // int) + std::atomic<int> batches_processed_{0}; + // Index of the time col + col_index_t time_col_index_; + // Type id of the time column + arrow::Type::type time_type_id_; + // Index of the latest row reference within; if >0 then queue_ cannot be + // empty Must be < queue_.front()->num_rows() if queue_ is non-empty + row_index_t latest_ref_row_ = 0; + // Time of latest row + time_unit_t latest_time_ = std::numeric_limits<time_unit_t>::lowest(); +}; + +struct InputStateComparator { + bool operator()(const std::shared_ptr<InputState>& lhs, + const std::shared_ptr<InputState>& rhs) const { + // True if lhs is ahead of time of rhs + if (lhs->Finished()) { + return false; + } + if (rhs->Finished()) { + return false; + } + time_unit_t lFirst = lhs->GetLatestTime(); + time_unit_t rFirst = rhs->GetLatestTime(); + return lFirst > rFirst; + } +}; + +class SortedMergeNode : public ExecNode { + static constexpr int64_t kTargetOutputBatchSize = 1024 * 1024; + + public: + SortedMergeNode(arrow::acero::ExecPlan* plan, + std::vector<arrow::acero::ExecNode*> inputs, + std::shared_ptr<arrow::Schema> output_schema, + arrow::Ordering new_ordering) + : ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)), + ordering_(std::move(new_ordering)), + input_counter(inputs_.size()), + output_counter(inputs_.size()), + process_thread() { + SetLabel("sorted_merge"); + } + + ~SortedMergeNode() override { + process_queue.Push( + POISON_PILL); // poison pill + // We might create a temporary (such as to inspect the output + // schema), in which case there isn't anything to join + if (process_thread.joinable()) { + process_thread.join(); + } + } + + static arrow::Result<arrow::acero::ExecNode*> Make( + arrow::acero::ExecPlan* plan, std::vector<arrow::acero::ExecNode*> inputs, + const arrow::acero::ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast<int>(inputs.size()), + "SortedMergeNode")); + + if (inputs.size() < 1) { + return Status::Invalid("Constructing a `SortedMergeNode` with < 1 inputs"); + } + + const auto schema = inputs.at(0)->output_schema(); + for (const auto& input : inputs) { + if (!input->output_schema()->Equals(schema)) { + return Status::Invalid( + "SortedMergeNode input schemas must all " + "match, first schema " + "was: ", + schema->ToString(), " got schema: ", input->output_schema()->ToString()); + } + } + + const auto& order_options = + arrow::internal::checked_cast<const OrderByNodeOptions&>(options); + + if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) { + return Status::Invalid("`ordering` must be an explicit non-empty ordering"); + } + + std::shared_ptr<Schema> output_schema = inputs[0]->output_schema(); + return plan->EmplaceNode<SortedMergeNode>( + plan, std::move(inputs), std::move(output_schema), order_options.ordering); + } + + const char* kind_name() const override { return "SortedMergeNode"; } + + const arrow::Ordering& ordering() const override { return ordering_; } + + arrow::Status Init() override { + auto inputs = this->inputs(); + for (size_t i = 0; i < inputs.size(); i++) { + ExecNode* input = inputs[i]; + const auto& schema = input->output_schema(); + const auto& sort_key = ordering_.sort_keys()[0]; + if (sort_key.order != arrow::compute::SortOrder::Ascending) { + return Status::Invalid("Only ascending sort order is supported"); + } + + const auto& ref = sort_key.target; + if (!ref.IsName()) { + return Status::Invalid("Ordering must be a name. ", ref.ToString(), + " is not a name"); + } Review Comment: Why? ########## cpp/src/arrow/acero/sorted_merge_node.cc: ########## @@ -0,0 +1,606 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/api.h> +#include <atomic> +#include <mutex> +#include <sstream> +#include <thread> +#include <tuple> +#include <unordered_map> +#include <vector> +#include "arrow/acero/concurrent_queue.h" +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/acero/query_context.h" +#include "arrow/acero/time_series_util.h" +#include "arrow/acero/unmaterialized_table.h" +#include "arrow/acero/util.h" +#include "arrow/array/builder_base.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace { +template <typename Callable> +struct Defer { + Callable callable; + explicit Defer(Callable callable_) : callable(std::move(callable_)) {} + ~Defer() noexcept { callable(); } +}; + +std::vector<std::string> GetInputLabels( + const arrow::acero::ExecNode::NodeVector& inputs) { + std::vector<std::string> labels(inputs.size()); + for (size_t i = 0; i < inputs.size(); i++) { + labels[i] = "input_" + std::to_string(i) + "_label"; + } + return labels; +} + +template <typename T, typename V = typename T::value_type> +inline typename T::const_iterator std_find(const T& container, const V& val) { + return std::find(container.begin(), container.end(), val); +} + +template <typename T, typename V = typename T::value_type> +inline bool std_has(const T& container, const V& val) { + return container.end() != std_find(container, val); +} + +} // namespace + +namespace arrow::acero { + +namespace sorted_merge { + +// Each slice is associated with a single input source, so we only need 1 record +// batch per slice +using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>; +using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>; + +using row_index_t = uint64_t; +using time_unit_t = uint64_t; +using col_index_t = int; + +#define NEW_TASK true +#define POISON_PILL false + +class BackpressureController : public BackpressureControl { + public: + BackpressureController(ExecNode* node, ExecNode* output, + std::atomic<int32_t>& backpressure_counter) + : node_(node), output_(output), backpressure_counter_(backpressure_counter) {} + + void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); } + void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); } + + private: + ExecNode* node_; + ExecNode* output_; + std::atomic<int32_t>& backpressure_counter_; +}; + +/// InputState correponds to an input. Input record batches are queued up in InputState +/// until processed and turned into output record batches. +class InputState { + public: + InputState(size_t index, BackpressureHandler handler, + const std::shared_ptr<arrow::Schema>& schema, const int time_col_index) + : index_(index), + queue_(std::move(handler)), + schema_(schema), + time_col_index_(time_col_index), + time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {} + + template <typename PtrType> + static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input, + arrow::acero::ExecNode* output, + std::atomic<int32_t>& backpressure_counter, + const std::shared_ptr<arrow::Schema>& schema, + const col_index_t time_col_index) { + constexpr size_t low_threshold = 4, high_threshold = 8; + std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control = + std::make_unique<BackpressureController>(input, output, backpressure_counter); + ARROW_ASSIGN_OR_RAISE(auto handler, + BackpressureHandler::Make(input, low_threshold, high_threshold, + std::move(backpressure_control))); + return PtrType(new InputState(index, std::move(handler), schema, time_col_index)); + } + + bool IsTimeColumn(col_index_t i) const { + DCHECK_LT(i, schema_->num_fields()); + return (i == time_col_index_); + } + + // Gets the latest row index, assuming the queue isn't empty + row_index_t GetLatestRow() const { return latest_ref_row_; } + + bool Empty() const { + // cannot be empty if ref row is >0 -- can avoid slow queue lock + // below + if (latest_ref_row_ > 0) { + return false; + } + return queue_.Empty(); + } + + size_t index() const { return index_; } + + int total_batches() const { return total_batches_; } + + // Gets latest batch (precondition: must not be empty) + const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const { + return queue_.UnsyncFront(); + } + +#define LATEST_VAL_CASE(id, val) \ + case arrow::Type::id: { \ + using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \ + using CType = typename arrow::TypeTraits<T>::CType; \ + return val(data->GetValues<CType>(1)[row]); \ + } + + inline time_unit_t GetLatestTime() const { + return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_, + latest_ref_row_); + } + +#undef LATEST_VAL_CASE + + bool Finished() const { return batches_processed_ == total_batches_; } + + arrow::Result<std::pair<UnmaterializedSlice, std::shared_ptr<arrow::RecordBatch>>> + Advance() { + // Advance the row until a new time is encountered or the record batch + // ends. This will return a range of {-1, -1} and a nullptr if there is + // no input + + bool active = + (latest_ref_row_ > 0 /*short circuit the lock on the queue*/) || !queue_.Empty(); + + if (!active) { + return std::make_pair(UnmaterializedSlice(), nullptr); + } + + row_index_t start = latest_ref_row_; + row_index_t end = latest_ref_row_; + time_unit_t startTime = GetLatestTime(); + std::shared_ptr<arrow::RecordBatch> batch = queue_.UnsyncFront(); + auto rows_in_batch = (row_index_t)batch->num_rows(); + + while (GetLatestTime() == startTime) { + end = ++latest_ref_row_; + if (latest_ref_row_ >= rows_in_batch) { + // hit the end of the batch, need to get the next batch if + // possible. + ++batches_processed_; + latest_ref_row_ = 0; + active &= !queue_.TryPop(); + if (active) { + DCHECK_GT(queue_.UnsyncFront()->num_rows(), + 0); // empty batches disallowed, sanity check + } + break; + } + } + + UnmaterializedSlice slice; + slice.num_components = 1; + slice.components[0] = CompositeEntry{batch.get(), start, end}; + return std::make_pair(slice, batch); + } + + arrow::Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) { + if (rb->num_rows() > 0) { + queue_.Push(rb); + } else { + ++batches_processed_; // don't enqueue empty batches, just record + // as processed + } + return arrow::Status::OK(); + } + + const std::shared_ptr<arrow::Schema>& get_schema() const { return schema_; } + + void set_total_batches(int n) { + DCHECK_GE(n, 0); + DCHECK_EQ(total_batches_, -1) << "Set total batch more than once"; + total_batches_ = n; + } + + private: + size_t index_; + // Pending record batches. The latest is the front. Batches cannot be empty. + BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch>> queue_; + // Schema associated with the input + std::shared_ptr<arrow::Schema> schema_; + // Total number of batches (only int because InputFinished uses int) + std::atomic<int> total_batches_{-1}; + // Number of batches processed so far (only int because InputFinished uses + // int) + std::atomic<int> batches_processed_{0}; + // Index of the time col + col_index_t time_col_index_; + // Type id of the time column + arrow::Type::type time_type_id_; + // Index of the latest row reference within; if >0 then queue_ cannot be + // empty Must be < queue_.front()->num_rows() if queue_ is non-empty + row_index_t latest_ref_row_ = 0; + // Time of latest row + time_unit_t latest_time_ = std::numeric_limits<time_unit_t>::lowest(); +}; + +struct InputStateComparator { + bool operator()(const std::shared_ptr<InputState>& lhs, + const std::shared_ptr<InputState>& rhs) const { + // True if lhs is ahead of time of rhs + if (lhs->Finished()) { + return false; + } + if (rhs->Finished()) { + return false; + } + time_unit_t lFirst = lhs->GetLatestTime(); + time_unit_t rFirst = rhs->GetLatestTime(); + return lFirst > rFirst; + } +}; + +class SortedMergeNode : public ExecNode { + static constexpr int64_t kTargetOutputBatchSize = 1024 * 1024; + + public: + SortedMergeNode(arrow::acero::ExecPlan* plan, + std::vector<arrow::acero::ExecNode*> inputs, + std::shared_ptr<arrow::Schema> output_schema, + arrow::Ordering new_ordering) + : ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)), + ordering_(std::move(new_ordering)), + input_counter(inputs_.size()), + output_counter(inputs_.size()), + process_thread() { + SetLabel("sorted_merge"); + } + + ~SortedMergeNode() override { + process_queue.Push( + POISON_PILL); // poison pill + // We might create a temporary (such as to inspect the output + // schema), in which case there isn't anything to join + if (process_thread.joinable()) { + process_thread.join(); + } + } + + static arrow::Result<arrow::acero::ExecNode*> Make( + arrow::acero::ExecPlan* plan, std::vector<arrow::acero::ExecNode*> inputs, + const arrow::acero::ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast<int>(inputs.size()), + "SortedMergeNode")); + + if (inputs.size() < 1) { + return Status::Invalid("Constructing a `SortedMergeNode` with < 1 inputs"); + } + + const auto schema = inputs.at(0)->output_schema(); + for (const auto& input : inputs) { + if (!input->output_schema()->Equals(schema)) { + return Status::Invalid( + "SortedMergeNode input schemas must all " + "match, first schema " + "was: ", + schema->ToString(), " got schema: ", input->output_schema()->ToString()); + } + } + + const auto& order_options = + arrow::internal::checked_cast<const OrderByNodeOptions&>(options); + + if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) { + return Status::Invalid("`ordering` must be an explicit non-empty ordering"); + } + + std::shared_ptr<Schema> output_schema = inputs[0]->output_schema(); + return plan->EmplaceNode<SortedMergeNode>( + plan, std::move(inputs), std::move(output_schema), order_options.ordering); + } + + const char* kind_name() const override { return "SortedMergeNode"; } + + const arrow::Ordering& ordering() const override { return ordering_; } + + arrow::Status Init() override { + auto inputs = this->inputs(); + for (size_t i = 0; i < inputs.size(); i++) { + ExecNode* input = inputs[i]; + const auto& schema = input->output_schema(); + const auto& sort_key = ordering_.sort_keys()[0]; + if (sort_key.order != arrow::compute::SortOrder::Ascending) { + return Status::Invalid("Only ascending sort order is supported"); + } Review Comment: Maybe `NotSupported` instead of `Invalid` to indicate we can do it someday? It seems we should eventually be able to support descending order easily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
