lidavidm commented on code in PR #13782:
URL: https://github.com/apache/arrow/pull/13782#discussion_r973584495


##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,17 +38,91 @@ namespace dataset {
 
 using RecordBatchGenerator = 
std::function<Future<std::shared_ptr<RecordBatch>>()>;
 
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
+  /// \brief The path to the column to load
+  FieldPath path;
+  /// \brief The type of the column in the dataset schema
+  ///
+  /// A format may choose to ignore this field completely.  For example, when
+  /// reading from IPC the reader can just return the column in the data type
+  /// that is stored on disk.  There is no point in doing anything special.
+  ///
+  /// However, some formats may be capable of casting on the fly.  For example,
+  /// when reading from CSV, if we know the target type of the column, we can
+  /// convert from string to the target type as we read.
+  DataType* requested_type;
+  /// \brief The index in the output selection of this column
+  int selection_index;
+};
+/// \brief Instructions for scanning a particular fragment
+///
+/// The fragment scan request is dervied from ScanV2Options.  The main
+/// difference is that the scan options are based on the dataset schema
+/// while the fragment request is based on the fragment schema.
+struct FragmentScanRequest {
+  /// \brief A row filter
+  ///
+  /// The filter expression should be written against the fragment schema.
+  ///
+  /// \see ScanV2Options for details on how this filter should be applied
+  compute::Expression filter = compute::literal(true);
+
+  /// \brief The columns to scan
+  ///
+  /// These indices refer to the fragment schema
+  ///
+  /// Note: This is NOT a simple list of top-level column indices.
+  /// For more details \see ScanV2Options
+  ///
+  /// If possible a fragment should only read from disk the data needed
+  /// to satisfy these columns.  If a format cannot partially read a nested
+  /// column (e.g. JSON) then it must apply the column selection (in memory)
+  /// before returning the scanned batch.
+  std::vector<FragmentSelectionColumn> columns;
+  /// \brief Options specific to the format being scanned
+  FragmentScanOptions* format_scan_options;
+};
+
+class FragmentScanner {

Review Comment:
   ARROW_DS_EXPORT/docstring here?



##########
cpp/src/arrow/dataset/scanner_test.cc:
##########
@@ -54,6 +57,751 @@ using internal::Iota;
 
 namespace dataset {
 
+// The basic evolution strategy doesn't really need any info from the dataset
+// or the fragment other than the schema so we just make a dummy 
dataset/fragment
+// here.
+std::unique_ptr<Dataset> MakeDatasetFromSchema(std::shared_ptr<Schema> sch) {
+  return ::arrow::internal::make_unique<InMemoryDataset>(std::move(sch),

Review Comment:
   We can use std::make_unique now!



##########
cpp/src/arrow/compute/exec/expression.h:
##########
@@ -277,6 +279,53 @@ ARROW_EXPORT Expression or_(Expression lhs, Expression 
rhs);
 ARROW_EXPORT Expression or_(const std::vector<Expression>&);
 ARROW_EXPORT Expression not_(Expression operand);
 
+/// Modify an Expression with pre-order and post-order visitation.
+/// `pre` will be invoked on each Expression. `pre` will visit Calls before 
their
+/// arguments, `post_call` will visit Calls (and no other Expressions) after 
their
+/// arguments. Visitors should return the Identical expression to indicate no 
change; this
+/// will prevent unnecessary construction in the common case where a 
modification is not
+/// possible/necessary/...
+///
+/// If an argument was modified, `post_call` visits a reconstructed Call with 
the modified
+/// arguments but also receives a pointer to the unmodified Expression as a 
second
+/// argument. If no arguments were modified the unmodified Expression* will be 
nullptr.
+template <typename PreVisit, typename PostVisitCall>
+Result<Expression> Modify(Expression expr, const PreVisit& pre,

Review Comment:
   nit: did we have to move this out of the internal header?



##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,17 +38,91 @@ namespace dataset {
 
 using RecordBatchGenerator = 
std::function<Future<std::shared_ptr<RecordBatch>>()>;
 
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
+  /// \brief The path to the column to load
+  FieldPath path;
+  /// \brief The type of the column in the dataset schema
+  ///
+  /// A format may choose to ignore this field completely.  For example, when
+  /// reading from IPC the reader can just return the column in the data type
+  /// that is stored on disk.  There is no point in doing anything special.
+  ///
+  /// However, some formats may be capable of casting on the fly.  For example,
+  /// when reading from CSV, if we know the target type of the column, we can
+  /// convert from string to the target type as we read.
+  DataType* requested_type;
+  /// \brief The index in the output selection of this column
+  int selection_index;
+};
+/// \brief Instructions for scanning a particular fragment
+///
+/// The fragment scan request is dervied from ScanV2Options.  The main
+/// difference is that the scan options are based on the dataset schema
+/// while the fragment request is based on the fragment schema.
+struct FragmentScanRequest {
+  /// \brief A row filter
+  ///
+  /// The filter expression should be written against the fragment schema.
+  ///
+  /// \see ScanV2Options for details on how this filter should be applied
+  compute::Expression filter = compute::literal(true);
+
+  /// \brief The columns to scan
+  ///
+  /// These indices refer to the fragment schema
+  ///
+  /// Note: This is NOT a simple list of top-level column indices.
+  /// For more details \see ScanV2Options
+  ///
+  /// If possible a fragment should only read from disk the data needed
+  /// to satisfy these columns.  If a format cannot partially read a nested
+  /// column (e.g. JSON) then it must apply the column selection (in memory)
+  /// before returning the scanned batch.
+  std::vector<FragmentSelectionColumn> columns;
+  /// \brief Options specific to the format being scanned
+  FragmentScanOptions* format_scan_options;
+};
+
+class FragmentScanner {
+ public:
+  /// This instance will only be destroyed after all ongoing scan futures
+  /// have been completed.
+  ///
+  /// This means any callbacks created as part of the scan can safely
+  /// capture `this`
+  virtual ~FragmentScanner() = default;
+  /// \brief Scan a batch of data from the file
+  /// \param batch_number The index of the batch to read
+  virtual Future<std::shared_ptr<RecordBatch>> ScanBatch(int batch_number) = 0;
+  /// \brief Calculate an estimate of how many data bytes the given batch will 
represent
+  ///
+  /// "Data bytes" should be the total size of all the buffers once the data 
has been
+  /// decoded into the Arrow format.
+  virtual int64_t EstimatedDataBytes(int batch_number) = 0;
+  /// \brief The number of batches in the fragment to scan
+  virtual int NumBatches() = 0;

Review Comment:
   How would something like CSV implement this?



##########
cpp/src/arrow/dataset/scan_node.cc:
##########
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <functional>
+#include <list>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tracing_internal.h"
+#include "arrow/util/unreachable.h"
+
+namespace cp = arrow::compute;
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace dataset {
+
+namespace {
+
+Result<std::shared_ptr<Schema>> OutputSchemaFromOptions(const ScanV2Options& 
options) {
+  return FieldPath::GetAll(*options.dataset->schema(), options.columns);
+}
+
+// In the future we should support async scanning of fragments.  The
+// Dataset class doesn't support this yet but we pretend it does here to
+// ease future adoption of the feature.
+AsyncGenerator<std::shared_ptr<Fragment>> GetFragments(Dataset* dataset,
+                                                       cp::Expression 
predicate) {
+  // In the future the dataset should be responsible for figuring out
+  // the I/O context.  This will allow different I/O contexts to be used
+  // when scanning different datasets.  For example, if we are scanning a
+  // union of a remote dataset and a local dataset.
+  const auto& io_context = io::default_io_context();
+  auto io_executor = io_context.executor();
+  Future<std::shared_ptr<FragmentIterator>> fragments_it_fut =
+      DeferNotOk(io_executor->Submit(
+          [dataset, predicate]() -> Result<std::shared_ptr<FragmentIterator>> {
+            ARROW_ASSIGN_OR_RAISE(FragmentIterator fragments_iter,
+                                  dataset->GetFragments(predicate));
+            return 
std::make_shared<FragmentIterator>(std::move(fragments_iter));
+          }));
+  Future<AsyncGenerator<std::shared_ptr<Fragment>>> fragments_gen_fut =
+      fragments_it_fut.Then([](const std::shared_ptr<FragmentIterator>& 
fragments_it)
+                                -> 
Result<AsyncGenerator<std::shared_ptr<Fragment>>> {
+        ARROW_ASSIGN_OR_RAISE(std::vector<std::shared_ptr<Fragment>> fragments,
+                              fragments_it->ToVector());
+        return MakeVectorGenerator(std::move(fragments));
+      });
+  return MakeFromFuture(std::move(fragments_gen_fut));
+}
+
+/// \brief A node that scans a dataset
+///
+/// The scan node has three groups of io-tasks and one task.
+///
+/// The first io-task (listing) fetches the fragments from the dataset.  This 
may be a
+/// simple iteration of paths or, if the dataset is described with wildcards, 
this may
+/// involve I/O for listing and walking directory paths.  There is one listing 
io-task per
+/// dataset.
+///
+/// Ths next step is to fetch the metadata for the fragment.  For some formats 
(e.g. CSV)
+/// this may be quite simple (get the size of the file).  For other formats 
(e.g. parquet)
+/// this is more involved and requires reading data.  There is one metadata 
io-task per
+/// fragment.  The metadata io-task creates an AsyncGenerator<RecordBatch> 
from the
+/// fragment.
+///
+/// Once the metadata io-task is done we can issue read io-tasks.  Each read 
io-task
+/// requests a single batch of data from the disk by pulling the next Future 
from the
+/// generator.
+///
+/// Finally, when the future is fulfilled, we issue a pipeline task to drive 
the batch
+/// through the pipeline.
+///
+/// Most of these tasks are io-tasks.  They take very few CPU resources and 
they run on
+/// the I/O thread pool.  These io-tasks are invisible to the exec plan and so 
we need to
+/// do some custom scheduling.  We limit how many fragments we read from at 
any one time.
+/// This is referred to as "fragment readahead".
+///
+/// Within a fragment there is usually also some amount of "row readahead".  
This row
+/// readahead is handled by the fragment (and not the scanner) because the 
exact details
+/// of how it is performed depend on the underlying format.
+///
+/// When a scan node is aborted (StopProducing) we send a cancel signal to any 
active
+/// fragments.  On destruction we continue consuming the fragments until they 
complete
+/// (which should be fairly quick since we cancelled the fragment).  This 
ensures the
+/// I/O work is completely finished before the node is destroyed.
+class ScanNode : public cp::ExecNode {
+ public:
+  ScanNode(cp::ExecPlan* plan, ScanV2Options options,
+           std::shared_ptr<Schema> output_schema)
+      : cp::ExecNode(plan, {}, {}, std::move(output_schema),
+                     /*num_outputs=*/1),
+        options_(options),
+        fragments_throttle_(
+            util::AsyncTaskScheduler::MakeThrottle(options_.fragment_readahead 
+ 1)),
+        batches_throttle_(
+            
util::AsyncTaskScheduler::MakeThrottle(options_.target_bytes_readahead + 1)) {
+  }
+
+  static Result<ScanV2Options> NormalizeAndValidate(const ScanV2Options& 
options,
+                                                    compute::ExecContext* ctx) 
{
+    ScanV2Options normalized(options);
+    if (!normalized.dataset) {
+      return Status::Invalid("Scan options must include a dataset");
+    }
+
+    if (options.fragment_readahead < 0) {
+      return Status::Invalid(
+          "Fragment readahead may not be less than 0.  Set to 0 to disable 
readahead");
+    }
+
+    if (options.target_bytes_readahead < 0) {
+      return Status::Invalid(
+          "Batch readahead may not be less than 0.  Set to 0 to disable 
readahead");
+    }
+
+    if (!normalized.filter.is_valid()) {
+      normalized.filter = compute::literal(true);
+    }
+
+    if (normalized.filter.call() && normalized.filter.IsBound()) {
+      // There is no easy way to make sure a filter was bound agaisnt the same
+      // function registry as the one in ctx so we just require it to be 
unbound
+      // FIXME - Do we care if it was bound to a different function registry?
+      return Status::Invalid("Scan filter must be unbound");
+    } else if (!normalized.filter.IsBound()) {
+      ARROW_ASSIGN_OR_RAISE(normalized.filter,
+                            normalized.filter.Bind(*options.dataset->schema(), 
ctx));
+    }  // Else we must have some simple filter like literal(true) which might 
be bound
+       // but we don't care
+
+    return std::move(normalized);
+  }
+
+  static Result<cp::ExecNode*> Make(cp::ExecPlan* plan, 
std::vector<cp::ExecNode*> inputs,
+                                    const cp::ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "ScanNode"));
+    const auto& scan_options = checked_cast<const ScanV2Options&>(options);
+    ARROW_ASSIGN_OR_RAISE(ScanV2Options normalized_options,
+                          NormalizeAndValidate(scan_options, 
plan->exec_context()));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> output_schema,
+                          OutputSchemaFromOptions(normalized_options));
+    return plan->EmplaceNode<ScanNode>(plan, std::move(normalized_options),
+                                       std::move(output_schema));
+  }
+
+  const char* kind_name() const override { return "ScanNode"; }
+
+  [[noreturn]] static void NoInputs() {
+    Unreachable("no inputs; this should never be called");
+  }
+  [[noreturn]] void InputReceived(cp::ExecNode*, cp::ExecBatch) override { 
NoInputs(); }
+  [[noreturn]] void ErrorReceived(cp::ExecNode*, Status) override { 
NoInputs(); }
+  [[noreturn]] void InputFinished(cp::ExecNode*, int) override { NoInputs(); }
+
+  Status Init() override {
+    // batch_output_ =
+    //     ::arrow::internal::make_unique<UnorderedBatchOutputStrategy>(this,
+    //     outputs_[0]);
+    return Status::OK();
+  }
+
+  struct ScanState {
+    std::mutex mutex;
+    std::shared_ptr<FragmentScanner> fragment_scanner;
+    std::unique_ptr<FragmentEvolutionStrategy> fragment_evolution;
+    FragmentScanRequest scan_request;
+  };
+
+  struct ScanBatchTask : util::AsyncTaskScheduler::Task {
+    ScanBatchTask(ScanNode* node, ScanState* scan_state, int batch_index)
+        : node_(node), scan_(scan_state), batch_index_(batch_index) {
+      int64_t cost = 
scan_state->fragment_scanner->EstimatedDataBytes(batch_index_);
+      // It's possible, though probably a bad idea, for a single batch of a 
fragment
+      // to be larger than 2GiB.  In that case, it doesn't matter much if we 
underestimate
+      // because the largest the throttle can be is 2GiB and thus we will be 
in "one batch
+      // at a time" mode anyways which is the best we can do in this case.
+      cost_ = static_cast<int>(
+          std::min(cost, 
static_cast<int64_t>(std::numeric_limits<int>::max())));
+    }
+
+    Result<Future<>> operator()(util::AsyncTaskScheduler* scheduler) override {
+      // Prevent concurrent calls to ScanBatch which might not be thread safe
+      std::lock_guard<std::mutex> lk(scan_->mutex);
+      return scan_->fragment_scanner->ScanBatch(batch_index_)
+          .Then([this](const std::shared_ptr<RecordBatch> batch) {

Review Comment:
   nit: was this meant to be const&?



##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -274,7 +282,9 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler {
       AbortUnlocked(submit_result.status(), std::move(lk));
       return;
     }
-    submit_result->AddCallback([this, cost](const Status& st) {
+    // FIXME(C++17, move into lambda?)

Review Comment:
   should be able to do this now? 



##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,17 +38,91 @@ namespace dataset {
 
 using RecordBatchGenerator = 
std::function<Future<std::shared_ptr<RecordBatch>>()>;
 
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
+  /// \brief The path to the column to load
+  FieldPath path;
+  /// \brief The type of the column in the dataset schema
+  ///
+  /// A format may choose to ignore this field completely.  For example, when
+  /// reading from IPC the reader can just return the column in the data type
+  /// that is stored on disk.  There is no point in doing anything special.
+  ///
+  /// However, some formats may be capable of casting on the fly.  For example,
+  /// when reading from CSV, if we know the target type of the column, we can
+  /// convert from string to the target type as we read.
+  DataType* requested_type;
+  /// \brief The index in the output selection of this column
+  int selection_index;
+};
+/// \brief Instructions for scanning a particular fragment
+///
+/// The fragment scan request is dervied from ScanV2Options.  The main
+/// difference is that the scan options are based on the dataset schema
+/// while the fragment request is based on the fragment schema.
+struct FragmentScanRequest {
+  /// \brief A row filter
+  ///
+  /// The filter expression should be written against the fragment schema.
+  ///
+  /// \see ScanV2Options for details on how this filter should be applied
+  compute::Expression filter = compute::literal(true);
+
+  /// \brief The columns to scan
+  ///
+  /// These indices refer to the fragment schema
+  ///
+  /// Note: This is NOT a simple list of top-level column indices.
+  /// For more details \see ScanV2Options
+  ///
+  /// If possible a fragment should only read from disk the data needed
+  /// to satisfy these columns.  If a format cannot partially read a nested
+  /// column (e.g. JSON) then it must apply the column selection (in memory)
+  /// before returning the scanned batch.
+  std::vector<FragmentSelectionColumn> columns;
+  /// \brief Options specific to the format being scanned
+  FragmentScanOptions* format_scan_options;
+};
+
+class FragmentScanner {
+ public:
+  /// This instance will only be destroyed after all ongoing scan futures
+  /// have been completed.
+  ///
+  /// This means any callbacks created as part of the scan can safely
+  /// capture `this`
+  virtual ~FragmentScanner() = default;
+  /// \brief Scan a batch of data from the file
+  /// \param batch_number The index of the batch to read
+  virtual Future<std::shared_ptr<RecordBatch>> ScanBatch(int batch_number) = 0;
+  /// \brief Calculate an estimate of how many data bytes the given batch will 
represent
+  ///
+  /// "Data bytes" should be the total size of all the buffers once the data 
has been
+  /// decoded into the Arrow format.
+  virtual int64_t EstimatedDataBytes(int batch_number) = 0;
+  /// \brief The number of batches in the fragment to scan
+  virtual int NumBatches() = 0;

Review Comment:
   Should NumBatches and EstimatedDataBytes be `const`?



##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,17 +38,91 @@ namespace dataset {
 
 using RecordBatchGenerator = 
std::function<Future<std::shared_ptr<RecordBatch>>()>;
 
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
+  /// \brief The path to the column to load
+  FieldPath path;
+  /// \brief The type of the column in the dataset schema
+  ///
+  /// A format may choose to ignore this field completely.  For example, when
+  /// reading from IPC the reader can just return the column in the data type
+  /// that is stored on disk.  There is no point in doing anything special.
+  ///
+  /// However, some formats may be capable of casting on the fly.  For example,
+  /// when reading from CSV, if we know the target type of the column, we can
+  /// convert from string to the target type as we read.
+  DataType* requested_type;
+  /// \brief The index in the output selection of this column
+  int selection_index;
+};
+/// \brief Instructions for scanning a particular fragment
+///
+/// The fragment scan request is dervied from ScanV2Options.  The main
+/// difference is that the scan options are based on the dataset schema
+/// while the fragment request is based on the fragment schema.
+struct FragmentScanRequest {
+  /// \brief A row filter
+  ///
+  /// The filter expression should be written against the fragment schema.
+  ///
+  /// \see ScanV2Options for details on how this filter should be applied
+  compute::Expression filter = compute::literal(true);
+
+  /// \brief The columns to scan
+  ///
+  /// These indices refer to the fragment schema
+  ///
+  /// Note: This is NOT a simple list of top-level column indices.
+  /// For more details \see ScanV2Options
+  ///
+  /// If possible a fragment should only read from disk the data needed
+  /// to satisfy these columns.  If a format cannot partially read a nested
+  /// column (e.g. JSON) then it must apply the column selection (in memory)
+  /// before returning the scanned batch.
+  std::vector<FragmentSelectionColumn> columns;
+  /// \brief Options specific to the format being scanned
+  FragmentScanOptions* format_scan_options;
+};
+
+class FragmentScanner {
+ public:
+  /// This instance will only be destroyed after all ongoing scan futures
+  /// have been completed.
+  ///
+  /// This means any callbacks created as part of the scan can safely
+  /// capture `this`
+  virtual ~FragmentScanner() = default;
+  /// \brief Scan a batch of data from the file
+  /// \param batch_number The index of the batch to read
+  virtual Future<std::shared_ptr<RecordBatch>> ScanBatch(int batch_number) = 0;
+  /// \brief Calculate an estimate of how many data bytes the given batch will 
represent
+  ///
+  /// "Data bytes" should be the total size of all the buffers once the data 
has been
+  /// decoded into the Arrow format.
+  virtual int64_t EstimatedDataBytes(int batch_number) = 0;
+  /// \brief The number of batches in the fragment to scan
+  virtual int NumBatches() = 0;
+};
+
+struct InspectedFragment {
+  explicit InspectedFragment(std::vector<std::string> column_names)
+      : column_names(std::move(column_names)) {}
+  std::vector<std::string> column_names;

Review Comment:
   Hmm, why don't we care about the types here?



##########
cpp/src/arrow/dataset/scan_node.cc:
##########
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <functional>
+#include <list>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/dataset/scanner.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tracing_internal.h"
+#include "arrow/util/unreachable.h"
+
+namespace cp = arrow::compute;
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace dataset {
+
+namespace {
+
+Result<std::shared_ptr<Schema>> OutputSchemaFromOptions(const ScanV2Options& 
options) {
+  return FieldPath::GetAll(*options.dataset->schema(), options.columns);
+}
+
+// In the future we should support async scanning of fragments.  The
+// Dataset class doesn't support this yet but we pretend it does here to
+// ease future adoption of the feature.
+AsyncGenerator<std::shared_ptr<Fragment>> GetFragments(Dataset* dataset,
+                                                       cp::Expression 
predicate) {
+  // In the future the dataset should be responsible for figuring out
+  // the I/O context.  This will allow different I/O contexts to be used
+  // when scanning different datasets.  For example, if we are scanning a
+  // union of a remote dataset and a local dataset.
+  const auto& io_context = io::default_io_context();
+  auto io_executor = io_context.executor();
+  Future<std::shared_ptr<FragmentIterator>> fragments_it_fut =
+      DeferNotOk(io_executor->Submit(
+          [dataset, predicate]() -> Result<std::shared_ptr<FragmentIterator>> {
+            ARROW_ASSIGN_OR_RAISE(FragmentIterator fragments_iter,
+                                  dataset->GetFragments(predicate));
+            return 
std::make_shared<FragmentIterator>(std::move(fragments_iter));
+          }));
+  Future<AsyncGenerator<std::shared_ptr<Fragment>>> fragments_gen_fut =
+      fragments_it_fut.Then([](const std::shared_ptr<FragmentIterator>& 
fragments_it)
+                                -> 
Result<AsyncGenerator<std::shared_ptr<Fragment>>> {
+        ARROW_ASSIGN_OR_RAISE(std::vector<std::shared_ptr<Fragment>> fragments,
+                              fragments_it->ToVector());
+        return MakeVectorGenerator(std::move(fragments));
+      });
+  return MakeFromFuture(std::move(fragments_gen_fut));
+}
+
+/// \brief A node that scans a dataset
+///
+/// The scan node has three groups of io-tasks and one task.
+///
+/// The first io-task (listing) fetches the fragments from the dataset.  This 
may be a
+/// simple iteration of paths or, if the dataset is described with wildcards, 
this may
+/// involve I/O for listing and walking directory paths.  There is one listing 
io-task per
+/// dataset.
+///
+/// Ths next step is to fetch the metadata for the fragment.  For some formats 
(e.g. CSV)
+/// this may be quite simple (get the size of the file).  For other formats 
(e.g. parquet)
+/// this is more involved and requires reading data.  There is one metadata 
io-task per
+/// fragment.  The metadata io-task creates an AsyncGenerator<RecordBatch> 
from the
+/// fragment.
+///
+/// Once the metadata io-task is done we can issue read io-tasks.  Each read 
io-task
+/// requests a single batch of data from the disk by pulling the next Future 
from the
+/// generator.
+///
+/// Finally, when the future is fulfilled, we issue a pipeline task to drive 
the batch
+/// through the pipeline.
+///
+/// Most of these tasks are io-tasks.  They take very few CPU resources and 
they run on
+/// the I/O thread pool.  These io-tasks are invisible to the exec plan and so 
we need to
+/// do some custom scheduling.  We limit how many fragments we read from at 
any one time.
+/// This is referred to as "fragment readahead".
+///
+/// Within a fragment there is usually also some amount of "row readahead".  
This row
+/// readahead is handled by the fragment (and not the scanner) because the 
exact details
+/// of how it is performed depend on the underlying format.
+///
+/// When a scan node is aborted (StopProducing) we send a cancel signal to any 
active
+/// fragments.  On destruction we continue consuming the fragments until they 
complete
+/// (which should be fairly quick since we cancelled the fragment).  This 
ensures the
+/// I/O work is completely finished before the node is destroyed.
+class ScanNode : public cp::ExecNode {
+ public:
+  ScanNode(cp::ExecPlan* plan, ScanV2Options options,
+           std::shared_ptr<Schema> output_schema)
+      : cp::ExecNode(plan, {}, {}, std::move(output_schema),
+                     /*num_outputs=*/1),
+        options_(options),
+        fragments_throttle_(
+            util::AsyncTaskScheduler::MakeThrottle(options_.fragment_readahead 
+ 1)),
+        batches_throttle_(
+            
util::AsyncTaskScheduler::MakeThrottle(options_.target_bytes_readahead + 1)) {
+  }
+
+  static Result<ScanV2Options> NormalizeAndValidate(const ScanV2Options& 
options,
+                                                    compute::ExecContext* ctx) 
{
+    ScanV2Options normalized(options);
+    if (!normalized.dataset) {
+      return Status::Invalid("Scan options must include a dataset");
+    }
+
+    if (options.fragment_readahead < 0) {
+      return Status::Invalid(
+          "Fragment readahead may not be less than 0.  Set to 0 to disable 
readahead");
+    }
+
+    if (options.target_bytes_readahead < 0) {
+      return Status::Invalid(
+          "Batch readahead may not be less than 0.  Set to 0 to disable 
readahead");
+    }
+
+    if (!normalized.filter.is_valid()) {
+      normalized.filter = compute::literal(true);
+    }
+
+    if (normalized.filter.call() && normalized.filter.IsBound()) {
+      // There is no easy way to make sure a filter was bound agaisnt the same
+      // function registry as the one in ctx so we just require it to be 
unbound
+      // FIXME - Do we care if it was bound to a different function registry?
+      return Status::Invalid("Scan filter must be unbound");
+    } else if (!normalized.filter.IsBound()) {
+      ARROW_ASSIGN_OR_RAISE(normalized.filter,
+                            normalized.filter.Bind(*options.dataset->schema(), 
ctx));
+    }  // Else we must have some simple filter like literal(true) which might 
be bound
+       // but we don't care
+
+    return std::move(normalized);
+  }
+
+  static Result<cp::ExecNode*> Make(cp::ExecPlan* plan, 
std::vector<cp::ExecNode*> inputs,
+                                    const cp::ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "ScanNode"));
+    const auto& scan_options = checked_cast<const ScanV2Options&>(options);
+    ARROW_ASSIGN_OR_RAISE(ScanV2Options normalized_options,
+                          NormalizeAndValidate(scan_options, 
plan->exec_context()));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> output_schema,
+                          OutputSchemaFromOptions(normalized_options));
+    return plan->EmplaceNode<ScanNode>(plan, std::move(normalized_options),
+                                       std::move(output_schema));
+  }
+
+  const char* kind_name() const override { return "ScanNode"; }
+
+  [[noreturn]] static void NoInputs() {
+    Unreachable("no inputs; this should never be called");
+  }
+  [[noreturn]] void InputReceived(cp::ExecNode*, cp::ExecBatch) override { 
NoInputs(); }
+  [[noreturn]] void ErrorReceived(cp::ExecNode*, Status) override { 
NoInputs(); }
+  [[noreturn]] void InputFinished(cp::ExecNode*, int) override { NoInputs(); }
+
+  Status Init() override {
+    // batch_output_ =

Review Comment:
   Commented code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to