lidavidm commented on code in PR #13782:
URL: https://github.com/apache/arrow/pull/13782#discussion_r936983246
##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,11 +38,107 @@ namespace dataset {
using RecordBatchGenerator =
std::function<Future<std::shared_ptr<RecordBatch>>()>;
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
Review Comment:
```suggestion
struct ARROW_DS_EXPORT FragmentSelectionColumn {
```
##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,11 +38,107 @@ namespace dataset {
using RecordBatchGenerator =
std::function<Future<std::shared_ptr<RecordBatch>>()>;
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
+ /// \brief The path to the column to load
+ FieldPath column_reference;
+ /// \brief The requested type of the column
+ ///
+ /// A format may choose to ignore this field completely. For example, when
+ /// reading from IPC the reader can just return the column in the data type
+ /// that is stored on disk. In this case the scan node will cast the column
+ /// to the appropriate type later.
+ ///
+ /// However, some formats may be capable of casting on the fly. For example,
+ /// when reading from CSV, if we know the target type of the column, we can
+ /// convert from string to the target type as we read.
+ DataType* requested_type;
+};
+/// \brief Instructions for scanning a particular fragment
+///
+/// The fragment scan request is dervied from ScanV2Options. The main
+/// difference is that the scan options are based on the dataset schema
+/// while the fragment request is based on the fragment schema.
+struct FragmentScanRequest {
Review Comment:
```suggestion
struct ARROW_DS_EXPORT FragmentScanRequest {
```
##########
cpp/src/arrow/dataset/scan_node.cc:
##########
@@ -0,0 +1,653 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/scanner.h"
+
+#include <functional>
+#include <iostream>
+#include <list>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tracing_internal.h"
+#include "arrow/util/unreachable.h"
+
+namespace cp = arrow::compute;
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace dataset {
+
+namespace {
+
+Result<std::shared_ptr<Schema>> OutputSchemaFromOptions(const ScanV2Options&
options) {
+ return FieldPath::GetAll(*options.dataset->schema(), options.columns);
+}
+
+class BatchOutputStrategy {
+ public:
+ BatchOutputStrategy(compute::ExecNode* self, compute::ExecNode* output)
+ : self_(self), output_(output) {}
+ virtual ~BatchOutputStrategy() = default;
+ virtual void DeliverBatch(compute::ExecBatch batch) = 0;
+ virtual void InputFinished() {
+ // std::cout << "BatchOutputFinished: " +
std::to_string(batch_count_.load()) + "\n";
+ output_->InputFinished(self_, batch_count_.load());
+ }
+
+ protected:
+ void SliceAndDoDeliverBatch(compute::ExecBatch batch) {
+ // FIXME - Add slicing
+ batch_count_++;
+ output_->InputReceived(self_, std::move(batch));
+ }
+
+ std::atomic<int32_t> batch_count_{0};
+ compute::ExecNode* self_;
+ compute::ExecNode* output_;
+};
+
+class UnorderedBatchOutputStrategy : public BatchOutputStrategy {
+ public:
+ UnorderedBatchOutputStrategy(compute::ExecNode* self, compute::ExecNode*
output)
+ : BatchOutputStrategy(self, output){};
+ ~UnorderedBatchOutputStrategy() = default;
+ void DeliverBatch(compute::ExecBatch batch) override {
+ return SliceAndDoDeliverBatch(std::move(batch));
+ }
+};
+
+// Unlike other nodes the scanner has to synchronize a number of resources
+// that are external to the plan itself. These things may start tasks that
+// are invisible to the exec plan.
+class ScannerSynchronization {
+ public:
+ ScannerSynchronization(int fragment_readahead)
+ : fragment_readahead_(fragment_readahead) {}
+
+ bool MarkListingCompleted() {
+ std::lock_guard<std::mutex> lg(mutex_);
+ finished_listing_ = true;
+ return fragments_active_ == 0;
+ }
+
+ void MarkListingFailed(Status err) {
+ std::lock_guard<std::mutex> lg(mutex_);
+ finished_listing_ = true;
+ cancelled_ = true;
+ error_ = std::move(err);
+ }
+
+ void Cancel() {
+ std::lock_guard<std::mutex> lg(mutex_);
+ if (!cancelled_) {
+ cancelled_ = true;
+ error_ = Status::Cancelled("Plan cancelled before scan completed");
+ }
+ }
+
+ void Pause() {
+ std::lock_guard<std::mutex> lg(mutex_);
+ paused_ = true;
+ }
+
+ void ReportFailedFragment(Status st) {
+ std::lock_guard<std::mutex> lg(mutex_);
+ if (!cancelled_) {
+ cancelled_ = true;
+ error_ = std::move(st);
+ fragments_active_--;
+ }
+ }
+
+ struct NextFragment {
+ std::shared_ptr<Fragment> fragment;
+ bool should_mark_input_finished;
+ };
+
+ NextFragment FinishFragmentAndGetNext() {
+ std::lock_guard<std::mutex> lg(mutex_);
+ if (cancelled_ || paused_ || fragments_to_scan_.empty()) {
+ fragments_active_--;
+ // std::cout << "Not continuing fragment inline fragments_active=" +
+ // std::to_string(fragments_active_) + "\n";
+ return {nullptr, /*should_mark_input_finished=*/finished_listing_};
+ }
+ // std::cout << "Continuing fragment inline " +
std::to_string(fragments_active_) +
+ // "\n";
+ std::shared_ptr<Fragment> next = std::move(fragments_to_scan_.front());
+ fragments_to_scan_.pop();
+ return {next, /*should_mark_input_finished=*/false};
+ }
+
+ bool QueueFragment(std::shared_ptr<Fragment> fragment) {
+ std::lock_guard<std::mutex> lg(mutex_);
+ if (cancelled_) {
+ return false;
+ }
+ if (!paused_ && fragments_active_ < fragment_readahead_) {
+ // If we have space start scanning the fragment immediately
+ fragments_active_++;
+ // std::cout << "Start scanning immediately: " +
std::to_string(fragments_active_) +
+ // "\n";
+ return true;
+ }
+ fragments_to_scan_.push(std::move(fragment));
+ return false;
+ }
+
+ private:
+ int fragment_readahead_;
+ std::mutex mutex_;
+ std::queue<std::shared_ptr<Fragment>> fragments_to_scan_;
+ Status error_;
+ bool cancelled_ = false;
+ bool finished_listing_ = false;
+ int fragments_active_ = 0;
+ bool paused_ = false;
+};
+
+// In the future we should support async scanning of fragments. The
+// Dataset class doesn't support this yet but we pretend it does here to
+// ease future adoption of the feature.
+AsyncGenerator<std::shared_ptr<Fragment>> GetFragments(Dataset* dataset,
+ cp::Expression
predicate) {
+ // In the future the dataset should be responsible for figuring out
+ // the I/O context. This will allow different I/O contexts to be used
+ // when scanning different datasets. For example, if we are scanning a
+ // union of a remote dataset and a local dataset.
+ const auto& io_context = io::default_io_context();
+ auto io_executor = io_context.executor();
+ Future<std::shared_ptr<FragmentIterator>> fragments_it_fut =
+ DeferNotOk(io_executor->Submit(
+ [dataset, predicate]() -> Result<std::shared_ptr<FragmentIterator>> {
+ // std::cout << "dataset->GetFragments\n";
+ ARROW_ASSIGN_OR_RAISE(FragmentIterator fragments_iter,
+ dataset->GetFragments(predicate));
+ return
std::make_shared<FragmentIterator>(std::move(fragments_iter));
+ }));
+ Future<AsyncGenerator<std::shared_ptr<Fragment>>> fragments_gen_fut =
+ fragments_it_fut.Then([](const std::shared_ptr<FragmentIterator>&
fragments_it)
+ ->
Result<AsyncGenerator<std::shared_ptr<Fragment>>> {
+ ARROW_ASSIGN_OR_RAISE(std::vector<std::shared_ptr<Fragment>> fragments,
+ fragments_it->ToVector());
+ return MakeVectorGenerator(std::move(fragments));
+ });
+ return MakeFromFuture(std::move(fragments_gen_fut));
+}
+
+/// \brief A node that scans a dataset
+///
+/// The scan node has three groups of io-tasks and one task.
+///
+/// The first io-task (listing) fetches the fragments from the dataset. This
may be a
+/// simple iteration of paths or, if the dataset is described with wildcards,
this may
+/// involve I/O for listing and walking directory paths. There is one listing
io-task per
+/// dataset.
+///
+/// Ths next step is to fetch the metadata for the fragment. For some formats
(e.g. CSV)
+/// this may be quite simple (get the size of the file). For other formats
(e.g. parquet)
+/// this is more involved and requires reading data. There is one metadata
io-task per
+/// fragment. The metadata io-task creates an AsyncGenerator<RecordBatch>
from the
+/// fragment.
+///
+/// Once the metadata io-task is done we can issue read io-tasks. Each read
io-task
+/// requests a single batch of data from the disk by pulling the next Future
from the
+/// generator.
+///
+/// Finally, when the future is fulfilled, we issue a pipeline task to drive
the batch
+/// through the pipeline.
+///
+/// Most of these tasks are io-tasks. They take very few CPU resources and
they run on
+/// the I/O thread pool. These io-tasks are invisible to the exec plan and so
we need to
+/// do some custom scheduling. We limit how many fragments we read from at
any one time.
+/// This is referred to as "fragment readahead".
+///
+/// Within a fragment there is usually also some amount of "row readahead".
This row
+/// readahead is handled by the fragment (and not the scanner) because the
exact details
+/// of how it is performed depend on the underlying format.
+///
+/// When a scan node is aborted (StopProducing) we send a cancel signal to any
active
+/// fragments. On destruction we continue consuming the fragments until they
complete
+/// (which should be fairly quick since we cancelled the fragment). This
ensures the
+/// I/O work is completely finished before the node is destroyed.
+class ScanNode : public cp::ExecNode {
+ public:
+ ScanNode(cp::ExecPlan* plan, ScanV2Options options,
+ std::shared_ptr<Schema> output_schema)
+ : cp::ExecNode(plan, {}, {}, std::move(output_schema),
+ /*num_outputs=*/1),
+ options_(options),
+ synchronization_(options_.fragment_readahead) {}
+
+ static Result<ScanV2Options> NormalizeAndValidate(const ScanV2Options&
options,
+ compute::ExecContext* ctx)
{
+ ScanV2Options normalized(options);
+ if (!normalized.dataset) {
+ return Status::Invalid("Scan options must include a dataset");
+ }
+
+ if (normalized.filter.IsBound()) {
+ // There is no easy way to make sure a filter was bound agaisnt the same
+ // function registry as the one in ctx so we just require it to be
unbound
+ // FIXME - Do we care if it was bound to a different function registry?
+ return Status::Invalid("Scan filter must be unbound");
+ } else {
+ ARROW_ASSIGN_OR_RAISE(normalized.filter,
+ normalized.filter.Bind(*options.dataset->schema(),
ctx));
+ }
+
+ return std::move(normalized);
+ }
+
+ static Result<cp::ExecNode*> Make(cp::ExecPlan* plan,
std::vector<cp::ExecNode*> inputs,
+ const cp::ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "ScanNode"));
+ const auto& scan_options = checked_cast<const ScanV2Options&>(options);
+ ARROW_ASSIGN_OR_RAISE(ScanV2Options normalized_options,
+ NormalizeAndValidate(scan_options,
plan->exec_context()));
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> output_schema,
+ OutputSchemaFromOptions(normalized_options));
+ return plan->EmplaceNode<ScanNode>(plan, std::move(normalized_options),
+ std::move(output_schema));
+ }
+
+ const char* kind_name() const override { return "ScanNode"; }
+
+ [[noreturn]] static void NoInputs() {
+ Unreachable("no inputs; this should never be called");
+ }
+ [[noreturn]] void InputReceived(cp::ExecNode*, cp::ExecBatch) override {
NoInputs(); }
+ [[noreturn]] void ErrorReceived(cp::ExecNode*, Status) override {
NoInputs(); }
+ [[noreturn]] void InputFinished(cp::ExecNode*, int) override { NoInputs(); }
+
+ Status Init() override {
+ batch_output_ =
+ ::arrow::internal::make_unique<UnorderedBatchOutputStrategy>(this,
outputs_[0]);
+ return Status::OK();
+ }
+
+ Status StartProducing() override {
+ START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
+ {{"node.kind", kind_name()},
+ {"node.label", label()},
+ {"node.output_schema", output_schema()->ToString()},
+ {"node.detail", ToString()}});
+ END_SPAN_ON_FUTURE_COMPLETION(span_, finished_);
+ // std::cout << "StartProducing\n";
+ ARROW_ASSIGN_OR_RAISE(Future<> list_task, plan_->BeginExternalTask());
+ if (!list_task.is_valid()) {
+ // Cancelled before we even started
+ return Status::OK();
+ }
+ // std::cout << "LIST TASK: BEGIN\n";
+
+ AsyncGenerator<std::shared_ptr<Fragment>> frag_gen =
+ GetFragments(options_.dataset.get(), options_.filter);
+ Future<> visit_task = VisitAsyncGenerator(
+ std::move(frag_gen), [this](const std::shared_ptr<Fragment>& fragment)
{
+ // std::cout << "Queuing fragment: " + fragment->ToString() + "\n";
+ QueueOrStartFragment(fragment);
+ return Status::OK();
+ });
+ visit_task
+ .Then(
+ [this]() {
+ // std::cout << "Marking listing completed\n";
+ if (synchronization_.MarkListingCompleted()) {
+ // It's possible, but probably unlikely, that all fragment
scans have
+ // already finished and so we need to report the total batch
count here in
+ // this case
+ batch_output_->InputFinished();
+ finished_.MarkFinished();
+ }
+ },
+ [this](const Status& err) {
+ synchronization_.MarkListingFailed(err);
+ outputs_[0]->ErrorReceived(this, err);
+ })
+ .AddCallback([list_task](const Status& st) mutable {
+ // std::cout << "LIST TASK: END\n";
+ list_task.MarkFinished();
+ });
+
+ return Status::OK();
+ }
+
+ void PauseProducing(ExecNode* output, int32_t counter) override {
+ // FIXME(TODO)
+ // Need to ressurect AsyncToggle and then all fragment scanners
+ // should share the same toggle
+ }
+
+ void ResumeProducing(ExecNode* output, int32_t counter) override {
+ // FIXME(TODO)
+ }
+
+ void StopProducing(ExecNode* output) override {
+ DCHECK_EQ(output, outputs_[0]);
+ StopProducing();
+ }
+
+ void StopProducing() override { synchronization_.Cancel(); }
+
+ private:
+ struct FragmentScanTask;
+ using ScanTaskPtr = std::list<FragmentScanTask>::iterator;
+ struct FragmentScanTask {
+ std::unique_ptr<FragmentEvolutionStrategy> fragment_evolution;
+ std::shared_ptr<FragmentScanner> fragment_scanner;
+ uint64_t unused_readahead_bytes;
+ bool iterating_tasks = false;
+ // Transitions to true when we've finished listing fragments for this task
+ bool finished = false;
+ // Transitions to true when we receieve an error or cancellation is
requested
Review Comment:
Might it make sense to have an enum instead of individual flags?
##########
cpp/src/arrow/dataset/dataset.cc:
##########
@@ -238,5 +277,112 @@ Result<FragmentIterator>
UnionDataset::GetFragmentsImpl(compute::Expression pred
return GetFragmentsFromDatasets(children_, predicate);
}
+namespace {
+
+class BasicFragmentEvolution : public FragmentEvolutionStrategy {
+ public:
+ BasicFragmentEvolution(std::vector<int> ds_to_frag_map, Schema*
dataset_schema)
+ : ds_to_frag_map(std::move(ds_to_frag_map)),
dataset_schema(dataset_schema) {}
+
+ virtual Result<compute::Expression> GetGuarantee(
+ const std::vector<FieldPath>& dataset_schema_selection) const override {
+ std::vector<compute::Expression> missing_fields;
+ for (const FieldPath& path : dataset_schema_selection) {
+ int top_level_field_idx = path[0];
+ if (ds_to_frag_map[top_level_field_idx] < 0) {
+ missing_fields.push_back(compute::equal(
Review Comment:
I think the expression simplification only recognizes `is_null(x)`
##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,11 +38,107 @@ namespace dataset {
using RecordBatchGenerator =
std::function<Future<std::shared_ptr<RecordBatch>>()>;
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
+ /// \brief The path to the column to load
+ FieldPath column_reference;
+ /// \brief The requested type of the column
+ ///
+ /// A format may choose to ignore this field completely. For example, when
+ /// reading from IPC the reader can just return the column in the data type
+ /// that is stored on disk. In this case the scan node will cast the column
+ /// to the appropriate type later.
+ ///
+ /// However, some formats may be capable of casting on the fly. For example,
+ /// when reading from CSV, if we know the target type of the column, we can
+ /// convert from string to the target type as we read.
+ DataType* requested_type;
+};
+/// \brief Instructions for scanning a particular fragment
+///
+/// The fragment scan request is dervied from ScanV2Options. The main
+/// difference is that the scan options are based on the dataset schema
+/// while the fragment request is based on the fragment schema.
+struct FragmentScanRequest {
+ /// \brief A row filter
+ ///
+ /// The filter expression should be written against the fragment schema.
+ ///
+ /// \see ScanV2Options for details on how this filter should be applied
+ compute::Expression filter = compute::literal(true);
+
+ /// \brief The columns to scan
+ ///
+ /// These indices refer to the fragment schema
+ ///
+ /// Note: This is NOT a simple list of top-level column indices.
+ /// For more details \see ScanV2Options
+ ///
+ /// If possible a fragment should only read from disk the data needed
+ /// to satisfy these columns. If a format cannot partially read a nested
+ /// column (e.g. JSON) then it must apply the column selection (in memory)
+ /// before returning the scanned batch.
+ std::vector<FragmentSelectionColumn> columns;
+ /// \brief Options specific to the format being scanned
+ FragmentScanOptions* format_scan_options;
+};
+
+/// \brief A collection of reads issued by a FragmentScanner
+struct ReadTask {
+ /// \brief A future which will complete when the read is finished
+ Future<std::shared_ptr<RecordBatch>> task;
+ /// \brief An estimate of the number of data bytes that will
+ /// be read when all the reads have finished
+ ///
+ /// This may be lower than the requested number of data bytes
+ /// if the fragment is almost finished
+ ///
+ /// This may be higher than the requested number of data bytes
+ /// if the fragment had to read more to respect record batch
+ /// boundaries
+ uint64_t num_bytes;
+};
+
+class FragmentScanner {
+ public:
+ /// This instance will only be destroyed after all ongoing scan futures
+ /// have been completed.
+ ///
+ /// This means any callbacks created as part of the scan can safely
+ /// capture `this`
+ virtual ~FragmentScanner() = default;
+ /// \brief Scan further ahead into the fragment
+ /// \param target_num_bytes The number of data bytes the scanner is ready to
receive
+ /// \return Futures for each read operation that has been started
+ ///
+ /// `target_num_bytes` is a hint only and fragments do not need to match this
+ /// exactly. For example, when scanning a parquet file, reads have to align
+ /// with pages and so it is often neccesary to read more than asked for.
+ /// See ScanV2Options for more details.
+ ///
+ /// It is acceptable for a future to complete with an empty batch.
+ ///
+ /// This may be called before the previous read task has finished and the
caller
+ /// should always check HasUnscannedData before each call.
+ virtual ReadTask ScanMore(uint64_t target_num_bytes) = 0;
+
+ /// \brief True if there is still data that has not been scanned
+ ///
+ /// This should return true when there is no more data to start scanning.
+ /// Some scan operations may still be in progress and that is fine.
+ virtual bool HasUnscannedData() = 0;
+};
+
+struct InspectedFragment {
Review Comment:
nit: docstring?
##########
cpp/src/arrow/dataset/dataset.cc:
##########
@@ -238,5 +277,112 @@ Result<FragmentIterator>
UnionDataset::GetFragmentsImpl(compute::Expression pred
return GetFragmentsFromDatasets(children_, predicate);
}
+namespace {
+
+class BasicFragmentEvolution : public FragmentEvolutionStrategy {
+ public:
+ BasicFragmentEvolution(std::vector<int> ds_to_frag_map, Schema*
dataset_schema)
+ : ds_to_frag_map(std::move(ds_to_frag_map)),
dataset_schema(dataset_schema) {}
+
+ virtual Result<compute::Expression> GetGuarantee(
+ const std::vector<FieldPath>& dataset_schema_selection) const override {
+ std::vector<compute::Expression> missing_fields;
+ for (const FieldPath& path : dataset_schema_selection) {
+ int top_level_field_idx = path[0];
+ if (ds_to_frag_map[top_level_field_idx] < 0) {
+ missing_fields.push_back(compute::equal(
+ compute::field_ref(top_level_field_idx),
+ compute::literal(
+
MakeNullScalar(dataset_schema->fields()[top_level_field_idx]->type()))));
+ }
+ }
+ if (missing_fields.empty()) {
+ return compute::literal(true);
+ }
+ if (missing_fields.size() == 1) {
+ return missing_fields[0];
+ }
+ return compute::and_(missing_fields);
Review Comment:
nit: std::move?
##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,11 +38,107 @@ namespace dataset {
using RecordBatchGenerator =
std::function<Future<std::shared_ptr<RecordBatch>>()>;
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
+ /// \brief The path to the column to load
+ FieldPath column_reference;
+ /// \brief The requested type of the column
+ ///
+ /// A format may choose to ignore this field completely. For example, when
+ /// reading from IPC the reader can just return the column in the data type
+ /// that is stored on disk. In this case the scan node will cast the column
+ /// to the appropriate type later.
+ ///
+ /// However, some formats may be capable of casting on the fly. For example,
+ /// when reading from CSV, if we know the target type of the column, we can
+ /// convert from string to the target type as we read.
+ DataType* requested_type;
+};
+/// \brief Instructions for scanning a particular fragment
+///
+/// The fragment scan request is dervied from ScanV2Options. The main
+/// difference is that the scan options are based on the dataset schema
+/// while the fragment request is based on the fragment schema.
+struct FragmentScanRequest {
+ /// \brief A row filter
+ ///
+ /// The filter expression should be written against the fragment schema.
+ ///
+ /// \see ScanV2Options for details on how this filter should be applied
+ compute::Expression filter = compute::literal(true);
+
+ /// \brief The columns to scan
+ ///
+ /// These indices refer to the fragment schema
+ ///
+ /// Note: This is NOT a simple list of top-level column indices.
+ /// For more details \see ScanV2Options
+ ///
+ /// If possible a fragment should only read from disk the data needed
+ /// to satisfy these columns. If a format cannot partially read a nested
+ /// column (e.g. JSON) then it must apply the column selection (in memory)
+ /// before returning the scanned batch.
Review Comment:
Hmm, wouldn't we want to keep this responsibility in a more common place?
##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,11 +38,107 @@ namespace dataset {
using RecordBatchGenerator =
std::function<Future<std::shared_ptr<RecordBatch>>()>;
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
+ /// \brief The path to the column to load
+ FieldPath column_reference;
+ /// \brief The requested type of the column
+ ///
+ /// A format may choose to ignore this field completely. For example, when
+ /// reading from IPC the reader can just return the column in the data type
+ /// that is stored on disk. In this case the scan node will cast the column
+ /// to the appropriate type later.
+ ///
+ /// However, some formats may be capable of casting on the fly. For example,
+ /// when reading from CSV, if we know the target type of the column, we can
+ /// convert from string to the target type as we read.
+ DataType* requested_type;
+};
+/// \brief Instructions for scanning a particular fragment
+///
+/// The fragment scan request is dervied from ScanV2Options. The main
+/// difference is that the scan options are based on the dataset schema
+/// while the fragment request is based on the fragment schema.
+struct FragmentScanRequest {
Review Comment:
etc.
##########
cpp/src/arrow/dataset/scan_node.cc:
##########
@@ -0,0 +1,653 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/scanner.h"
+
+#include <functional>
+#include <iostream>
+#include <list>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tracing_internal.h"
+#include "arrow/util/unreachable.h"
+
+namespace cp = arrow::compute;
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace dataset {
+
+namespace {
+
+Result<std::shared_ptr<Schema>> OutputSchemaFromOptions(const ScanV2Options&
options) {
+ return FieldPath::GetAll(*options.dataset->schema(), options.columns);
+}
+
+class BatchOutputStrategy {
+ public:
+ BatchOutputStrategy(compute::ExecNode* self, compute::ExecNode* output)
+ : self_(self), output_(output) {}
+ virtual ~BatchOutputStrategy() = default;
+ virtual void DeliverBatch(compute::ExecBatch batch) = 0;
+ virtual void InputFinished() {
+ // std::cout << "BatchOutputFinished: " +
std::to_string(batch_count_.load()) + "\n";
+ output_->InputFinished(self_, batch_count_.load());
+ }
+
+ protected:
+ void SliceAndDoDeliverBatch(compute::ExecBatch batch) {
+ // FIXME - Add slicing
+ batch_count_++;
+ output_->InputReceived(self_, std::move(batch));
+ }
+
+ std::atomic<int32_t> batch_count_{0};
+ compute::ExecNode* self_;
+ compute::ExecNode* output_;
+};
+
+class UnorderedBatchOutputStrategy : public BatchOutputStrategy {
+ public:
+ UnorderedBatchOutputStrategy(compute::ExecNode* self, compute::ExecNode*
output)
+ : BatchOutputStrategy(self, output){};
+ ~UnorderedBatchOutputStrategy() = default;
+ void DeliverBatch(compute::ExecBatch batch) override {
+ return SliceAndDoDeliverBatch(std::move(batch));
+ }
+};
+
+// Unlike other nodes the scanner has to synchronize a number of resources
+// that are external to the plan itself. These things may start tasks that
+// are invisible to the exec plan.
+class ScannerSynchronization {
+ public:
+ ScannerSynchronization(int fragment_readahead)
+ : fragment_readahead_(fragment_readahead) {}
+
+ bool MarkListingCompleted() {
+ std::lock_guard<std::mutex> lg(mutex_);
+ finished_listing_ = true;
+ return fragments_active_ == 0;
+ }
+
+ void MarkListingFailed(Status err) {
+ std::lock_guard<std::mutex> lg(mutex_);
+ finished_listing_ = true;
+ cancelled_ = true;
+ error_ = std::move(err);
+ }
+
+ void Cancel() {
+ std::lock_guard<std::mutex> lg(mutex_);
+ if (!cancelled_) {
+ cancelled_ = true;
+ error_ = Status::Cancelled("Plan cancelled before scan completed");
+ }
+ }
+
+ void Pause() {
+ std::lock_guard<std::mutex> lg(mutex_);
+ paused_ = true;
+ }
+
+ void ReportFailedFragment(Status st) {
+ std::lock_guard<std::mutex> lg(mutex_);
+ if (!cancelled_) {
+ cancelled_ = true;
+ error_ = std::move(st);
+ fragments_active_--;
+ }
+ }
+
+ struct NextFragment {
+ std::shared_ptr<Fragment> fragment;
+ bool should_mark_input_finished;
+ };
+
+ NextFragment FinishFragmentAndGetNext() {
+ std::lock_guard<std::mutex> lg(mutex_);
+ if (cancelled_ || paused_ || fragments_to_scan_.empty()) {
+ fragments_active_--;
+ // std::cout << "Not continuing fragment inline fragments_active=" +
+ // std::to_string(fragments_active_) + "\n";
+ return {nullptr, /*should_mark_input_finished=*/finished_listing_};
+ }
+ // std::cout << "Continuing fragment inline " +
std::to_string(fragments_active_) +
+ // "\n";
+ std::shared_ptr<Fragment> next = std::move(fragments_to_scan_.front());
+ fragments_to_scan_.pop();
+ return {next, /*should_mark_input_finished=*/false};
+ }
+
+ bool QueueFragment(std::shared_ptr<Fragment> fragment) {
+ std::lock_guard<std::mutex> lg(mutex_);
+ if (cancelled_) {
+ return false;
+ }
+ if (!paused_ && fragments_active_ < fragment_readahead_) {
+ // If we have space start scanning the fragment immediately
+ fragments_active_++;
+ // std::cout << "Start scanning immediately: " +
std::to_string(fragments_active_) +
+ // "\n";
+ return true;
+ }
+ fragments_to_scan_.push(std::move(fragment));
+ return false;
+ }
+
+ private:
+ int fragment_readahead_;
+ std::mutex mutex_;
+ std::queue<std::shared_ptr<Fragment>> fragments_to_scan_;
+ Status error_;
+ bool cancelled_ = false;
+ bool finished_listing_ = false;
+ int fragments_active_ = 0;
+ bool paused_ = false;
+};
+
+// In the future we should support async scanning of fragments. The
+// Dataset class doesn't support this yet but we pretend it does here to
+// ease future adoption of the feature.
+AsyncGenerator<std::shared_ptr<Fragment>> GetFragments(Dataset* dataset,
+ cp::Expression
predicate) {
+ // In the future the dataset should be responsible for figuring out
+ // the I/O context. This will allow different I/O contexts to be used
+ // when scanning different datasets. For example, if we are scanning a
+ // union of a remote dataset and a local dataset.
+ const auto& io_context = io::default_io_context();
+ auto io_executor = io_context.executor();
+ Future<std::shared_ptr<FragmentIterator>> fragments_it_fut =
+ DeferNotOk(io_executor->Submit(
+ [dataset, predicate]() -> Result<std::shared_ptr<FragmentIterator>> {
+ // std::cout << "dataset->GetFragments\n";
+ ARROW_ASSIGN_OR_RAISE(FragmentIterator fragments_iter,
+ dataset->GetFragments(predicate));
+ return
std::make_shared<FragmentIterator>(std::move(fragments_iter));
+ }));
+ Future<AsyncGenerator<std::shared_ptr<Fragment>>> fragments_gen_fut =
+ fragments_it_fut.Then([](const std::shared_ptr<FragmentIterator>&
fragments_it)
+ ->
Result<AsyncGenerator<std::shared_ptr<Fragment>>> {
+ ARROW_ASSIGN_OR_RAISE(std::vector<std::shared_ptr<Fragment>> fragments,
+ fragments_it->ToVector());
+ return MakeVectorGenerator(std::move(fragments));
+ });
+ return MakeFromFuture(std::move(fragments_gen_fut));
+}
+
+/// \brief A node that scans a dataset
Review Comment:
Thanks for the great explanation here!
##########
cpp/src/arrow/dataset/scan_node.cc:
##########
@@ -0,0 +1,653 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/scanner.h"
+
+#include <functional>
+#include <iostream>
+#include <list>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tracing_internal.h"
+#include "arrow/util/unreachable.h"
+
+namespace cp = arrow::compute;
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace dataset {
+
+namespace {
+
+Result<std::shared_ptr<Schema>> OutputSchemaFromOptions(const ScanV2Options&
options) {
+ return FieldPath::GetAll(*options.dataset->schema(), options.columns);
+}
+
+class BatchOutputStrategy {
+ public:
+ BatchOutputStrategy(compute::ExecNode* self, compute::ExecNode* output)
+ : self_(self), output_(output) {}
+ virtual ~BatchOutputStrategy() = default;
+ virtual void DeliverBatch(compute::ExecBatch batch) = 0;
+ virtual void InputFinished() {
+ // std::cout << "BatchOutputFinished: " +
std::to_string(batch_count_.load()) + "\n";
Review Comment:
nit: commented code (well, I guess there's lots of that for debugging)
##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -59,6 +156,17 @@ class ARROW_DS_EXPORT Fragment : public
std::enable_shared_from_this<Fragment> {
virtual Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) = 0;
+ /// \brief Inspect a fragment to learn basic information
Review Comment:
This is basically to read the footer ahead of time?
So, ParquetFileFragment would attach the actual Parquet footer in the
structure it returns, the Arrow fragment would return the schema, the CSV
fragment would throw its hands up, etc.?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]