westonpace commented on a change in pull request #9947:
URL: https://github.com/apache/arrow/pull/9947#discussion_r611938066
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator>
ScanTaskIteratorFromRecordBatch(
std::vector<std::shared_ptr<RecordBatch>> batches,
std::shared_ptr<ScanOptions> options);
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+ T value;
+ int index;
+ bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch
originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding
loaded data
+struct TaggedRecordBatch {
+ std::shared_ptr<RecordBatch> record_batch;
+ std::shared_ptr<Fragment> fragment;
+};
+using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
+using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
+
+/// \brief Combines a tagged batch with positional information
+///
+/// This is returned when scanning batches in an unordered fashion. This
information is
+/// needed if you ever want to reassemble the batches in order
+struct EnumeratedRecordBatch {
+ Enumerated<std::shared_ptr<RecordBatch>> record_batch;
+ Enumerated<std::shared_ptr<Fragment>> fragment;
+};
+using EnumeratedRecordBatchGenerator =
std::function<Future<EnumeratedRecordBatch>()>;
+using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
+
+} // namespace dataset
+
+template <>
+struct IterationTraits<dataset::TaggedRecordBatch> {
+ static dataset::TaggedRecordBatch End() {
+ return dataset::TaggedRecordBatch{NULL, NULL};
+ }
+ static bool IsEnd(const dataset::TaggedRecordBatch& val) {
+ return val.record_batch == NULL;
+ }
+};
+
+template <>
+struct IterationTraits<dataset::EnumeratedRecordBatch> {
+ static dataset::EnumeratedRecordBatch End() {
+ return dataset::EnumeratedRecordBatch{{NULL, -1, false}, {NULL, -1,
false}};
+ }
+ static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
+ return val.fragment.value == NULL;
+ }
+};
+
+namespace dataset {
+/// \brief A scanner glues together several dataset classes to load in data.
+/// The dataset contains a collection of fragments and partitioning rules.
+///
+/// The fragments identify independently loadable units of data (i.e. each
fragment has
+/// a potentially unique schema and possibly even format. It should be
possible to read
+/// fragments in parallel if desired).
+///
+/// The fragment's format contains the logic necessary to actually create a
task to load
+/// the fragment into memory. That task may or may not support parallel
execution of
+/// its own.
///
-/// def Scan():
-/// for fragment in self.dataset.GetFragments(this.options.filter):
-/// for scan_task in fragment.Scan(this.options):
-/// yield scan_task
+/// The scanner is then responsible for creating scan tasks from every
fragment in the
+/// dataset and (potentially) sequencing the loaded record batches together.
+///
+/// The scanner should not buffer the entire dataset in memory (unless asked)
but should
+/// return record batches as soon as they are ready to scan. Various readahead
Review comment:
Done.
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator>
ScanTaskIteratorFromRecordBatch(
std::vector<std::shared_ptr<RecordBatch>> batches,
std::shared_ptr<ScanOptions> options);
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+ T value;
+ int index;
+ bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch
originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding
loaded data
+struct TaggedRecordBatch {
+ std::shared_ptr<RecordBatch> record_batch;
+ std::shared_ptr<Fragment> fragment;
+};
+using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
+using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
+
+/// \brief Combines a tagged batch with positional information
+///
+/// This is returned when scanning batches in an unordered fashion. This
information is
+/// needed if you ever want to reassemble the batches in order
+struct EnumeratedRecordBatch {
+ Enumerated<std::shared_ptr<RecordBatch>> record_batch;
+ Enumerated<std::shared_ptr<Fragment>> fragment;
+};
+using EnumeratedRecordBatchGenerator =
std::function<Future<EnumeratedRecordBatch>()>;
+using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
+
+} // namespace dataset
+
+template <>
+struct IterationTraits<dataset::TaggedRecordBatch> {
+ static dataset::TaggedRecordBatch End() {
+ return dataset::TaggedRecordBatch{NULL, NULL};
+ }
+ static bool IsEnd(const dataset::TaggedRecordBatch& val) {
+ return val.record_batch == NULL;
+ }
+};
+
+template <>
+struct IterationTraits<dataset::EnumeratedRecordBatch> {
+ static dataset::EnumeratedRecordBatch End() {
+ return dataset::EnumeratedRecordBatch{{NULL, -1, false}, {NULL, -1,
false}};
+ }
+ static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
+ return val.fragment.value == NULL;
+ }
+};
+
+namespace dataset {
+/// \brief A scanner glues together several dataset classes to load in data.
+/// The dataset contains a collection of fragments and partitioning rules.
+///
+/// The fragments identify independently loadable units of data (i.e. each
fragment has
+/// a potentially unique schema and possibly even format. It should be
possible to read
+/// fragments in parallel if desired).
+///
+/// The fragment's format contains the logic necessary to actually create a
task to load
+/// the fragment into memory. That task may or may not support parallel
execution of
+/// its own.
///
-/// def Scan():
-/// for fragment in self.dataset.GetFragments(this.options.filter):
-/// for scan_task in fragment.Scan(this.options):
-/// yield scan_task
+/// The scanner is then responsible for creating scan tasks from every
fragment in the
+/// dataset and (potentially) sequencing the loaded record batches together.
+///
+/// The scanner should not buffer the entire dataset in memory (unless asked)
but should
+/// return record batches as soon as they are ready to scan. Various readahead
+/// properties control how much data is allowed to be scanned before pausing
to let a
+/// slow consumer catchup.
+///
+/// Today the scanner also delegates projection & filtering although that may
change in
Review comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]