westonpace commented on code in PR #15288:
URL: https://github.com/apache/arrow/pull/15288#discussion_r1070181367
##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -322,26 +306,16 @@ void TestSourceSink(
std::string source_factory_name,
std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
to_elements) {
- ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1));
- ExecContext exec_context(default_memory_pool(), executor.get());
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context));
- AsyncGenerator<std::optional<ExecBatch>> sink_gen;
-
auto exp_batches = MakeBasicBatches();
ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
auto element_it_maker = [&elements]() {
return MakeVectorIterator<ElementType>(elements);
};
-
- ASSERT_OK(Declaration::Sequence({
- {source_factory_name,
- OptionsType{exp_batches.schema,
element_it_maker}},
- {"sink", SinkNodeOptions{&sink_gen}},
- })
- .AddToPlan(plan.get()));
-
- ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
-
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
+ Declaration plan(source_factory_name,
+ OptionsType{exp_batches.schema, element_it_maker});
+ ASSERT_OK_AND_ASSIGN(auto result,
+ DeclarationToExecBatches(std::move(plan),
/*use_threads=*/false));
Review Comment:
Any particular reason for `use_threads=false` here?
##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -1046,26 +994,21 @@ TEST(ExecPlanExecution, SourceMinMaxScalar) {
auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1);
auto minmax_opts = std::make_shared<ScalarAggregateOptions>();
- auto expected_result = ExecBatch::Make(
- {ScalarFromJSON(struct_({field("min", int32()), field("max",
int32())}),
- R"({"min": -8, "max": 12})")});
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
- AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+ auto ty = struct_({field("min", int32()), field("max", int32())});
+ auto expected_table = TableFromJSON(schema({field("struct", ty)}), {R"([
+ [{"min": -8, "max": 12}]
+ ])"});
// NOTE: Test `ScalarAggregateNode` by omitting `keys` attribute
- ASSERT_OK(Declaration::Sequence(
- {{"source",
- SourceNodeOptions{input.schema, input.gen(parallel,
/*slow=*/false)}},
- {"aggregate", AggregateNodeOptions{
- /*aggregates=*/{{"min_max",
std::move(minmax_opts),
- "i32", "min_max"}},
- /*keys=*/{}}},
- {"sink", SinkNodeOptions{&sink_gen}}})
- .AddToPlan(plan.get()));
-
- ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
-
Finishes(ResultWith(UnorderedElementsAreArray({*expected_result}))));
+ Declaration plan = Declaration::Sequence(
+ {{"source", SourceNodeOptions{input.schema, input.gen(parallel,
/*slow=*/false)}},
+ {"aggregate",
+ AggregateNodeOptions{
+ /*aggregates=*/{{"min_max", std::move(minmax_opts), "i32",
"min_max"}},
+ /*keys=*/{}}}});
+ ASSERT_OK_AND_ASSIGN(auto result_table,
+ DeclarationToTable(std::move(plan), parallel));
+ AssertTablesEqual(*result_table, *expected_table);
Review Comment:
```suggestion
// No need to ignore order since there is only 1 row
AssertTablesEqual(*result_table, *expected_table);
```
##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -928,52 +894,34 @@ TEST(ExecPlanExecution, StressSourceSinkStopped) {
TEST(ExecPlanExecution, SourceFilterSink) {
auto basic_data = MakeBasicBatches();
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
- AsyncGenerator<std::optional<ExecBatch>> sink_gen;
-
- ASSERT_OK(Declaration::Sequence(
- {
- {"source", SourceNodeOptions{basic_data.schema,
-
basic_data.gen(/*parallel=*/false,
-
/*slow=*/false)}},
- {"filter", FilterNodeOptions{equal(field_ref("i32"),
literal(6))}},
- {"sink", SinkNodeOptions{&sink_gen}},
- })
- .AddToPlan(plan.get()));
-
- ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
- Finishes(ResultWith(UnorderedElementsAreArray(
- {ExecBatchFromJSON({int32(), boolean()}, "[]"),
- ExecBatchFromJSON({int32(), boolean()}, "[[6,
false]]")}))));
+ Declaration plan = Declaration::Sequence(
+ {{"source", SourceNodeOptions{basic_data.schema,
basic_data.gen(/*parallel=*/false,
+
/*slow=*/false)}},
+ {"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}});
+ ASSERT_OK_AND_ASSIGN(auto result,
+ DeclarationToExecBatches(std::move(plan),
/*user_threads=*/false));
Review Comment:
```suggestion
DeclarationToExecBatches(std::move(plan),
/*use_threads=*/false));
```
##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -1046,26 +994,21 @@ TEST(ExecPlanExecution, SourceMinMaxScalar) {
auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1);
auto minmax_opts = std::make_shared<ScalarAggregateOptions>();
- auto expected_result = ExecBatch::Make(
- {ScalarFromJSON(struct_({field("min", int32()), field("max",
int32())}),
- R"({"min": -8, "max": 12})")});
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
- AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+ auto ty = struct_({field("min", int32()), field("max", int32())});
+ auto expected_table = TableFromJSON(schema({field("struct", ty)}), {R"([
Review Comment:
```suggestion
auto min_max_type = struct_({field("min", int32()), field("max",
int32())});
auto expected_table = TableFromJSON(schema({field("struct",
min_max_type)}), {R"([
```
##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -1232,30 +1167,22 @@ TEST(ExecPlanExecution,
SourceFilterProjectGroupedSumTopK) {
}
TEST(ExecPlanExecution, SourceScalarAggSink) {
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
- AsyncGenerator<std::optional<ExecBatch>> sink_gen;
-
auto basic_data = MakeBasicBatches();
- ASSERT_OK(
- Declaration::Sequence(
- {
- {"source",
- SourceNodeOptions{basic_data.schema,
basic_data.gen(/*parallel=*/false,
-
/*slow=*/false)}},
- {"aggregate", AggregateNodeOptions{
- /*aggregates=*/{{"sum", nullptr, "i32",
"sum(i32)"},
- {"any", nullptr, "bool",
"any(bool)"}},
- }},
- {"sink", SinkNodeOptions{&sink_gen}},
- })
- .AddToPlan(plan.get()));
-
- ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
- Finishes(ResultWith(UnorderedElementsAreArray({
- ExecBatchFromJSON({int64(), boolean()},
- {ArgShape::SCALAR, ArgShape::SCALAR},
"[[22, true]]"),
- }))));
+ Declaration plan = Declaration::Sequence(
+ {{"source", SourceNodeOptions{basic_data.schema,
+ basic_data.gen(/*parallel=*/false,
/*slow=*/false)}},
+ {"aggregate", AggregateNodeOptions{
+ /*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"},
+ {"any", nullptr, "bool",
"any(bool)"}},
+ }}});
+ auto exp_batches = {
+ ExecBatchFromJSON({int64(), boolean()}, {ArgShape::SCALAR,
ArgShape::SCALAR},
+ "[[22, true]]"),
Review Comment:
```suggestion
"[[22, true]]")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]