rtpsw commented on code in PR #13880:
URL: https://github.com/apache/arrow/pull/13880#discussion_r954700249
##########
cpp/src/arrow/compute/exec/asof_join_node_test.cc:
##########
@@ -74,237 +226,723 @@ void CheckRunOutput(const BatchesWithSchema& l_batches,
/*same_chunk_layout=*/true, /*flatten=*/true);
}
-void DoRunBasicTest(const std::vector<util::string_view>& l_data,
- const std::vector<util::string_view>& r0_data,
- const std::vector<util::string_view>& r1_data,
- const std::vector<util::string_view>& exp_data, int64_t
tolerance) {
- auto l_schema =
- schema({field("time", int64()), field("key", int32()), field("l_v0",
float64())});
- auto r0_schema =
- schema({field("time", int64()), field("key", int32()), field("r0_v0",
float64())});
- auto r1_schema =
- schema({field("time", int64()), field("key", int32()), field("r1_v0",
float32())});
-
- auto exp_schema = schema({
- field("time", int64()),
- field("key", int32()),
- field("l_v0", float64()),
- field("r0_v0", float64()),
- field("r1_v0", float32()),
- });
-
- // Test three table join
- BatchesWithSchema l_batches, r0_batches, r1_batches, exp_batches;
- l_batches = MakeBatchesFromString(l_schema, l_data);
- r0_batches = MakeBatchesFromString(r0_schema, r0_data);
- r1_batches = MakeBatchesFromString(r1_schema, r1_data);
- exp_batches = MakeBatchesFromString(exp_schema, exp_data);
- CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches, "time", "key",
- tolerance);
-}
-
-void DoRunInvalidTypeTest(const std::shared_ptr<Schema>& l_schema,
- const std::shared_ptr<Schema>& r_schema) {
- BatchesWithSchema l_batches = MakeBatchesFromString(l_schema, {R"([])"});
- BatchesWithSchema r_batches = MakeBatchesFromString(r_schema, {R"([])"});
-
+#define CHECK_RUN_OUTPUT(by_key_type)
\
+ void CheckRunOutput(
\
+ const BatchesWithSchema& l_batches, const BatchesWithSchema& r0_batches,
\
+ const BatchesWithSchema& r1_batches, const BatchesWithSchema&
exp_batches, \
+ const FieldRef time, by_key_type keys, const int64_t tolerance) {
\
+ CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches,
\
+ AsofJoinNodeOptions(time, keys, tolerance));
\
+ }
+
+EXPAND_BY_KEY_TYPE(CHECK_RUN_OUTPUT)
+
+void DoInvalidPlanTest(const BatchesWithSchema& l_batches,
+ const BatchesWithSchema& r_batches,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str,
+ bool then_run_plan = false) {
ExecContext exec_ctx;
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_ctx));
- AsofJoinNodeOptions join_options("time", "key", 0);
Declaration join{"asofjoin", join_options};
join.inputs.emplace_back(Declaration{
"source", SourceNodeOptions{l_batches.schema, l_batches.gen(false,
false)}});
join.inputs.emplace_back(Declaration{
"source", SourceNodeOptions{r_batches.schema, r_batches.gen(false,
false)}});
- ASSERT_RAISES(Invalid, join.AddToPlan(plan.get()));
+ if (then_run_plan) {
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+ ASSERT_OK(Declaration::Sequence({join, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(Invalid,
+
::testing::HasSubstr(expected_error_str),
+ StartAndCollect(plan.get(),
sink_gen));
+ } else {
+ EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
::testing::HasSubstr(expected_error_str),
+ join.AddToPlan(plan.get()));
+ }
+}
+
+void DoRunInvalidPlanTest(const BatchesWithSchema& l_batches,
+ const BatchesWithSchema& r_batches,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str) {
+ DoInvalidPlanTest(l_batches, r_batches, join_options, expected_error_str);
+}
+
+void DoRunInvalidPlanTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str) {
+ BatchesWithSchema l_batches = MakeBatchesFromNumString(l_schema, {R"([])"});
+ BatchesWithSchema r_batches = MakeBatchesFromNumString(r_schema, {R"([])"});
+
+ return DoRunInvalidPlanTest(l_batches, r_batches, join_options,
expected_error_str);
+}
+
+void DoRunInvalidPlanTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema, int64_t
tolerance,
+ const std::string& expected_error_str) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time", "key",
tolerance),
+ expected_error_str);
+}
+
+void DoRunInvalidTypeTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Unsupported type for ");
+}
+
+void DoRunInvalidToleranceTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, -1,
+ "AsOfJoin tolerance must be non-negative but is ");
+}
+
+void DoRunMissingKeysTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Bad join key on table : No
match");
+}
+
+void DoRunEmptyByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time", {}, 0),
+ "AsOfJoin by_key must not be empty");
+}
+
+void DoRunMissingOnKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("invalid_time",
"key", 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunMissingByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time",
"invalid_key", 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunNestedOnKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions({0, "time"},
"key", 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunNestedByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time",
FieldRef{0, 1}, 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunAmbiguousOnKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Bad join key on table :
Multiple matches");
+}
+
+void DoRunAmbiguousByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Bad join key on table :
Multiple matches");
+}
+
+std::string GetJsonString(int n_rows, int n_cols, bool unordered = false) {
Review Comment:
No advantage other than the quickest I came up with; I try to spend less
effort on test code. I'll fix the name and add a doc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]