aocsa commented on a change in pull request #11210: URL: https://github.com/apache/arrow/pull/11210#discussion_r725048789
########## File path: cpp/src/arrow/dataset/scanner_benchmark.cc ########## @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/api.h" +#include "arrow/compute/api.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/plan.h" +#include "arrow/dataset/scanner.h" +#include "arrow/dataset/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" +#include "arrow/testing/random.h" + +namespace arrow { +namespace compute { + +constexpr auto kSeed = 0x0ff1ce; + +void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema, size_t num_batches, + BatchesWithSchema* out_batches, int multiplicity = 1, + int64_t batch_size = 4) { + ::arrow::random::RandomArrayGenerator rng_(kSeed); + if (num_batches == 0) { + auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0)); + out_batches->batches.push_back(empty_record_batch); + } else { + for (size_t j = 0; j < num_batches; j++) { + out_batches->batches.push_back( + ExecBatch(*rng_.BatchOf(schema->fields(), batch_size))); + } + } + + size_t batch_count = out_batches->batches.size(); + for (int repeat = 1; repeat < multiplicity; ++repeat) { + for (size_t i = 0; i < batch_count; ++i) { + out_batches->batches.push_back(out_batches->batches[i]); + } + } + out_batches->schema = schema; +} + +RecordBatchVector GenerateBatches(const std::shared_ptr<Schema>& schema, + size_t num_batches, size_t batch_size) { + BatchesWithSchema input_batches; + + RecordBatchVector batches; + GenerateBatchesFromSchema(schema, num_batches, &input_batches, 1, batch_size); + + for (const auto& batch : input_batches.batches) { + batches.push_back(batch.ToRecordBatch(schema).MoveValueUnsafe()); + } + return batches; +} + +} // namespace compute + +namespace dataset { + +void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool use_executor) { + // NB: This test is here for didactic purposes + + // Specify a MemoryPool and ThreadPool for the ExecPlan + compute::ExecContext exec_context( + default_memory_pool(), + use_executor ? ::arrow::internal::GetCpuThreadPool() : nullptr); + + // ensure arrow::dataset node factories are in the registry + ::arrow::dataset::internal::Initialize(); + + // A ScanNode is constructed from an ExecPlan (into which it is inserted), + // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for + // predicate pushdown, a projection to skip materialization of unnecessary columns, + // ...) + ASSERT_OK_AND_ASSIGN(std::shared_ptr<compute::ExecPlan> plan, + compute::ExecPlan::Make(&exec_context)); + + std::shared_ptr<Schema> sch = schema({field("a", int32()), field("b", boolean())}); + RecordBatchVector batches = + ::arrow::compute::GenerateBatches(sch, num_batches, batch_size); + + std::shared_ptr<Dataset> dataset = std::make_shared<InMemoryDataset>(sch, batches); + + auto options = std::make_shared<ScanOptions>(); + // sync scanning is not supported by ScanNode + options->use_async = true; + // specify the filter + compute::Expression b_is_true = field_ref("b"); + options->filter = b_is_true; + // for now, specify the projection as the full project expression (eventually this can + // just be a list of materialized field names) + compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)}); + options->projection = + call("make_struct", {a_times_2}, compute::MakeStructOptions{{"a * 2"}}); + + // construct the scan node + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * scan, + compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); + + // pipe the scan node into a filter node + ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter, + compute::MakeExecNode("filter", plan.get(), {scan}, + compute::FilterNodeOptions{b_is_true})); + + // pipe the filter node into a project node + // NB: we're using the project node factory which preserves fragment/batch index + // tagging, so we *can* reorder later if we choose. The tags will not appear in + // our output. + ASSERT_OK_AND_ASSIGN(compute::ExecNode * project, + compute::MakeExecNode("augmented_project", plan.get(), {filter}, + compute::ProjectNodeOptions{{a_times_2}})); + + // finally, pipe the project node into a sink node + AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen; + ASSERT_OK_AND_ASSIGN(compute::ExecNode * sink, + compute::MakeExecNode("ordered_sink", plan.get(), {project}, + compute::SinkNodeOptions{&sink_gen})); + + ASSERT_NE(sink, nullptr); Review comment: Yes, like this https://github.com/apache/arrow/blob/eee13b0acc3397d132051dcf47e6f5813436bf91/cpp/src/arrow/dataset/scanner.cc#L589, I've just wanted to be clear in the bench that filter and project are used in this bench and the executor can be enabled or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
