westonpace commented on a change in pull request #9947:
URL: https://github.com/apache/arrow/pull/9947#discussion_r611939212
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator>
ScanTaskIteratorFromRecordBatch(
std::vector<std::shared_ptr<RecordBatch>> batches,
std::shared_ptr<ScanOptions> options);
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+ T value;
+ int index;
+ bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch
originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding
loaded data
+struct TaggedRecordBatch {
+ std::shared_ptr<RecordBatch> record_batch;
+ std::shared_ptr<Fragment> fragment;
+};
+using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
+using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
+
+/// \brief Combines a tagged batch with positional information
+///
+/// This is returned when scanning batches in an unordered fashion. This
information is
+/// needed if you ever want to reassemble the batches in order
+struct EnumeratedRecordBatch {
+ Enumerated<std::shared_ptr<RecordBatch>> record_batch;
+ Enumerated<std::shared_ptr<Fragment>> fragment;
+};
+using EnumeratedRecordBatchGenerator =
std::function<Future<EnumeratedRecordBatch>()>;
+using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
+
+} // namespace dataset
+
+template <>
+struct IterationTraits<dataset::TaggedRecordBatch> {
+ static dataset::TaggedRecordBatch End() {
+ return dataset::TaggedRecordBatch{NULL, NULL};
+ }
+ static bool IsEnd(const dataset::TaggedRecordBatch& val) {
+ return val.record_batch == NULL;
+ }
+};
+
+template <>
+struct IterationTraits<dataset::EnumeratedRecordBatch> {
+ static dataset::EnumeratedRecordBatch End() {
+ return dataset::EnumeratedRecordBatch{{NULL, -1, false}, {NULL, -1,
false}};
+ }
+ static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
+ return val.fragment.value == NULL;
+ }
+};
+
+namespace dataset {
+/// \brief A scanner glues together several dataset classes to load in data.
+/// The dataset contains a collection of fragments and partitioning rules.
+///
+/// The fragments identify independently loadable units of data (i.e. each
fragment has
+/// a potentially unique schema and possibly even format. It should be
possible to read
+/// fragments in parallel if desired).
+///
+/// The fragment's format contains the logic necessary to actually create a
task to load
+/// the fragment into memory. That task may or may not support parallel
execution of
+/// its own.
///
-/// def Scan():
-/// for fragment in self.dataset.GetFragments(this.options.filter):
-/// for scan_task in fragment.Scan(this.options):
-/// yield scan_task
+/// The scanner is then responsible for creating scan tasks from every
fragment in the
+/// dataset and (potentially) sequencing the loaded record batches together.
+///
+/// The scanner should not buffer the entire dataset in memory (unless asked)
but should
+/// return record batches as soon as they are ready to scan. Various readahead
+/// properties control how much data is allowed to be scanned before pausing
to let a
+/// slow consumer catchup.
+///
+/// Today the scanner also delegates projection & filtering although that may
change in
+/// the future.
class ARROW_DS_EXPORT Scanner {
public:
- Scanner(std::shared_ptr<Dataset> dataset, std::shared_ptr<ScanOptions>
scan_options)
- : dataset_(std::move(dataset)), scan_options_(std::move(scan_options)) {}
-
- Scanner(std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanOptions>
scan_options)
- : fragment_(std::move(fragment)), scan_options_(std::move(scan_options))
{}
+ virtual ~Scanner() = default;
/// \brief The Scan operator returns a stream of ScanTask. The caller is
/// responsible to dispatch/schedule said tasks. Tasks should be safe to run
/// in a concurrent fashion and outlive the iterator.
- Result<ScanTaskIterator> Scan();
-
+ ///
+ /// Note: Not supported by the async scanner
+ /// TODO(ARROW-11797) Deprecate Scan()
+ virtual Result<ScanTaskIterator> Scan();
/// \brief Convert a Scanner into a Table.
///
/// Use this convenience utility with care. This will serially materialize
the
/// Scan result in memory before creating the Table.
- Result<std::shared_ptr<Table>> ToTable();
+ virtual Result<std::shared_ptr<Table>> ToTable() = 0;
+ /// \brief Scan the dataset into a stream of record batches. Each batch is
tagged
+ /// with the fragment it originated from. The batches will arrive in order.
The
+ /// order of fragments is determined by the dataset.
+ ///
+ /// Note: The scanner will perform some readahead but will avoid
materializing too
+ /// much in memory (this is goverended by the readahead options and
use_threads option).
+ /// If the readahead queue fills up then I/O will pause until the calling
thread catches
+ /// up.
+ virtual Result<TaggedRecordBatchIterator> ScanBatches() = 0;
+ /// \brief Scan the dataset into a stream of record batches. Unlike
ScanBatches this
+ /// method may allow record batches to be returned out of order. This
allows for more
+ /// efficient scanning some fragments may be accessed more quickly than
others (e.g. may
Review comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]