lidavidm commented on a change in pull request #10705:
URL: https://github.com/apache/arrow/pull/10705#discussion_r669103875
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1323,5 +1323,86 @@ TEST(ScanNode, MaterializationOfVirtualColumn) {
Finishes(ResultWith(UnorderedElementsAreArray(expected))));
}
+TEST(ScanNode, MinimalEndToEnd) {
+ // NB: This test is here for didactic purposes
+
+ // Specify a MemoryPool and ThreadPool for the ExecPlan
+ compute::ExecContext exec_context(default_memory_pool(),
internal::GetCpuThreadPool());
+
+ // A ScanNode is constructed from an ExecPlan (into which it is inserted),
+ // a Dataset (whose batches will be scanned), and ScanOptions (to specify a
filter for
+ // predicate pushdown, a projection to skip materialization of unnecessary
columns, ...)
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<compute::ExecPlan> plan,
+ compute::ExecPlan::Make(&exec_context));
+
+ std::shared_ptr<Dataset> dataset = std::make_shared<InMemoryDataset>(
+ TableFromJSON(schema({field("a", int32()), field("b", boolean())}),
+ {
+ R"([{"a": 1, "b": null},
+ {"a": 2, "b": true}])",
+ R"([{"a": null, "b": true},
+ {"a": 3, "b": false}])",
+ R"([{"a": null, "b": true},
+ {"a": 4, "b": false}])",
+ R"([{"a": 5, "b": null},
+ {"a": 6, "b": false},
+ {"a": 7, "b": false}])",
+ }));
+
+ auto options = std::make_shared<ScanOptions>();
+ // sync scanning is not supported by ScanNode
+ options->use_async = true;
+ // for now, we must replicate the dataset schema here
+ options->dataset_schema = dataset->schema();
+ // specify the filter
+ compute::Expression b_is_true = field_ref("b");
+ ASSERT_OK_AND_ASSIGN(b_is_true, b_is_true.Bind(*dataset->schema()));
+ options->filter = b_is_true;
+ // for now, specify the projection as the full project expression
(eventually this can
+ // just be a list of materialized field names)
+ compute::Expression a_times_2 = call("multiply", {field_ref("a"),
literal(2)});
+ ASSERT_OK_AND_ASSIGN(a_times_2, a_times_2.Bind(*dataset->schema()));
+ options->projection = call("project", {a_times_2},
compute::ProjectOptions{{"a * 2"}});
+
+ // construct the scan node
+ ASSERT_OK_AND_ASSIGN(compute::ExecNode * scan,
+ dataset::MakeScanNode(plan.get(), dataset, options));
+
+ // pipe the scan node into a filter node
+ ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter,
+ compute::MakeFilterNode(scan, "filter", b_is_true));
+
+ // pipe the filter node into a project node
+ // NB: we're using the project node factory which preserves fragment/batch
index
+ // tagging, so we *can* reorder later if we choose. The tags will not appear
in
+ // our output.
+ ASSERT_OK_AND_ASSIGN(compute::ExecNode * project,
+ dataset::MakeAugmentedProjectNode(filter, "project",
{a_times_2}));
+
+ // finally, pipe the project node into a sink node
+ // NB: if we don't need ordering, we could use compute::MakeSinkNode instead
+ ASSERT_OK_AND_ASSIGN(auto sink_gen, dataset::MakeOrderedSinkNode(project,
"sink"));
Review comment:
Thanks for the example! I do like this setup a lot more since it is
clearer what all the steps are in reading a dataset + everything is neatly
separated. It is not that the current scanner does not separate up the various
stages of its pipeline, but the pipeline in the scanner is rather tied together
while this is clearly partitioned.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]