lidavidm commented on a change in pull request #12715:
URL: https://github.com/apache/arrow/pull/12715#discussion_r835546831
##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -559,6 +559,71 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) {
}
}
+TEST(ExecPlanExecution, SinkNodeOutputSchema) {
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+ auto basic_data = MakeBasicBatches();
+
+ ASSERT_OK(
+ Declaration::Sequence({
+ {"source", SourceNodeOptions{basic_data.schema,
+
basic_data.gen(true, true)}},
+ {"sink", SinkNodeOptions{&sink_gen}},
+ })
+ .AddToPlan(plan.get()));
+ ASSERT_EQ(plan->sources()[0]->output_schema(),
plan->sinks()[0]->output_schema());
+}
+
+TEST(ExecPlanExecution, ConsumingSinkNodeOutputSchema) {
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+ std::atomic<uint32_t> batches_seen{0};
+ Future<> finish = Future<>::Make();
+ struct TestConsumer : public SinkNodeConsumer {
+ TestConsumer(std::atomic<uint32_t>* batches_seen, Future<> finish)
+ : batches_seen(batches_seen), finish(std::move(finish)) {}
+
+ Status Consume(ExecBatch batch) override {
+ (*batches_seen)++;
+ return Status::OK();
+ }
+
+ Future<> Finish() override { return finish; }
+
+ std::atomic<uint32_t>* batches_seen;
+ Future<> finish;
+ };
+ std::shared_ptr<TestConsumer> consumer =
+ std::make_shared<TestConsumer>(&batches_seen, finish);
+
+ auto basic_data = MakeBasicBatches();
+ ASSERT_OK_AND_ASSIGN(
+ auto source,
+ MakeExecNode("source", plan.get(), {},
+ SourceNodeOptions(basic_data.schema, basic_data.gen(true,
true))));
+ ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
+ ConsumingSinkNodeOptions(consumer)));
+ ASSERT_EQ(plan->sources()[0]->output_schema(),
plan->sinks()[0]->output_schema());
+}
+
+TEST(ExecPlanExecution, OrderBySinkNodeOutputSchema) {
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+ auto input_schema = schema({field("a", int32()), field("b", boolean())});
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+ auto random_data = MakeRandomBatches(input_schema, 10);
+
+ SortOptions options({SortKey("a", SortOrder::Ascending)});
+ ASSERT_OK(Declaration::Sequence(
+ {
+ {"source",
+ SourceNodeOptions{random_data.schema,
random_data.gen(true, true)}},
+ {"order_by_sink", OrderBySinkNodeOptions{options,
&sink_gen}},
+ })
+ .AddToPlan(plan.get()));
+ ASSERT_EQ(plan->sources()[0]->output_schema(),
plan->sinks()[0]->output_schema());
Review comment:
Is it possible to just add these checks to existing tests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]