icexelloss commented on code in PR #13028: URL: https://github.com/apache/arrow/pull/13028#discussion_r902954389
########## cpp/src/arrow/compute/exec/asof_join_node_test.cc: ########## @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gmock/gmock-matchers.h> + +#include <numeric> +#include <random> +#include <unordered_set> + +#include "arrow/api.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/compute/exec/util.h" +#include "arrow/compute/kernels/row_encoder.h" +#include "arrow/compute/kernels/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" +#include "arrow/testing/random.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/thread_pool.h" + +using testing::UnorderedElementsAreArray; + +namespace arrow { +namespace compute { + +BatchesWithSchema GenerateBatchesFromString( + const std::shared_ptr<Schema>& schema, + const std::vector<util::string_view>& json_strings, int multiplicity = 1) { + BatchesWithSchema out_batches{{}, schema}; + + std::vector<ValueDescr> descrs; + for (auto&& field : schema->fields()) { + descrs.emplace_back(field->type()); + } + + for (auto&& s : json_strings) { + out_batches.batches.push_back(ExecBatchFromJSON(descrs, s)); + } + + size_t batch_count = out_batches.batches.size(); + for (int repeat = 1; repeat < multiplicity; ++repeat) { + for (size_t i = 0; i < batch_count; ++i) { + out_batches.batches.push_back(out_batches.batches[i]); + } + } + + return out_batches; +} + +void CheckRunOutput(const BatchesWithSchema& l_batches, + const BatchesWithSchema& r0_batches, + const BatchesWithSchema& r1_batches, + const BatchesWithSchema& exp_batches, const FieldRef time, + const FieldRef keys, const int64_t tolerance) { + auto exec_ctx = + arrow::internal::make_unique<ExecContext>(default_memory_pool(), nullptr); + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); + + AsofJoinNodeOptions join_options(time, keys, tolerance); + Declaration join{"asofjoin", join_options}; + + join.inputs.emplace_back(Declaration{ + "source", SourceNodeOptions{l_batches.schema, l_batches.gen(false, false)}}); + join.inputs.emplace_back(Declaration{ + "source", SourceNodeOptions{r0_batches.schema, r0_batches.gen(false, false)}}); + join.inputs.emplace_back(Declaration{ + "source", SourceNodeOptions{r1_batches.schema, r1_batches.gen(false, false)}}); + + AsyncGenerator<util::optional<ExecBatch>> sink_gen; + + ASSERT_OK(Declaration::Sequence({join, {"sink", SinkNodeOptions{&sink_gen}}}) + .AddToPlan(plan.get())); + + ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); + + ASSERT_OK_AND_ASSIGN(auto exp_table, + TableFromExecBatches(exp_batches.schema, exp_batches.batches)); + + ASSERT_OK_AND_ASSIGN(auto res_table, TableFromExecBatches(exp_batches.schema, res)); + + AssertTablesEqual(*exp_table, *res_table, + /*same_chunk_layout=*/true, /*flatten=*/true); +} + +void DoRunBasicTest(const std::vector<util::string_view>& l_data, + const std::vector<util::string_view>& r0_data, + const std::vector<util::string_view>& r1_data, + const std::vector<util::string_view>& exp_data, int64_t tolerance) { + auto l_schema = + schema({field("time", int64()), field("key", int32()), field("l_v0", float64())}); + auto r0_schema = + schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}); + auto r1_schema = + schema({field("time", int64()), field("key", int32()), field("r1_v0", float32())}); + + auto exp_schema = schema({ + field("time", int64()), + field("key", int32()), + field("l_v0", float64()), + field("r0_v0", float64()), + field("r1_v0", float32()), + }); + + // Test three table join + BatchesWithSchema l_batches, r0_batches, r1_batches, exp_batches; + l_batches = GenerateBatchesFromString(l_schema, l_data); + r0_batches = GenerateBatchesFromString(r0_schema, r0_data); + r1_batches = GenerateBatchesFromString(r1_schema, r1_data); + exp_batches = GenerateBatchesFromString(exp_schema, exp_data); + CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches, "time", "key", + tolerance); +} + +void DoRunInvalidTypeTest(const std::shared_ptr<Schema>& l_schema, + const std::shared_ptr<Schema>& r_schema) { + BatchesWithSchema l_batches = GenerateBatchesFromString(l_schema, {R"([])"}); + BatchesWithSchema r_batches = GenerateBatchesFromString(r_schema, {R"([])"}); + + auto exec_ctx = + arrow::internal::make_unique<ExecContext>(default_memory_pool(), nullptr); + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); + + AsofJoinNodeOptions join_options("time", "key", 0); + Declaration join{"asofjoin", join_options}; + join.inputs.emplace_back(Declaration{ + "source", SourceNodeOptions{l_batches.schema, l_batches.gen(false, false)}}); + join.inputs.emplace_back(Declaration{ + "source", SourceNodeOptions{r_batches.schema, r_batches.gen(false, false)}}); + + ASSERT_RAISES(Invalid, join.AddToPlan(plan.get())); Review Comment: Added test ########## cpp/src/arrow/compute/exec/options.h: ########## @@ -361,6 +361,29 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions { Expression filter; }; +/// \brief Make a node which implements asof join operation +/// +/// This node takes one left table and (n-1) right tables, and asof joins them +/// together. Batches produced by each inputs must be ordered by the "on" key. +/// The batch size that this node produces is decided by the left table. +class ARROW_EXPORT AsofJoinNodeOptions : public ExecNodeOptions { + public: + AsofJoinNodeOptions(FieldRef on_key, FieldRef by_key, int64_t tolerance) + : on_key(std::move(on_key)), by_key(std::move(by_key)), tolerance(tolerance) {} + + // "on" key for the join. Each input table must be sorted by the "on" key. Inexact Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
