lidavidm commented on a change in pull request #9947:
URL: https://github.com/apache/arrow/pull/9947#discussion_r610653229
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -67,7 +68,105 @@ Result<RecordBatchGenerator> ScanTask::ExecuteAsync() {
bool ScanTask::supports_async() const { return false; }
-Result<FragmentIterator> Scanner::GetFragments() {
+Result<ScanTaskIterator> Scanner::Scan() {
+ // TODO(ARROW-12289) This is overridden in SyncScanner and will never be
implemented in
+ // AsyncScanner. It is deprecated and will eventually go away.
+ return Status::NotImplemented("This scanner does not support the legacy
Scan() method");
+}
+
+Result<EnumeratedRecordBatchIterator> Scanner::ScanBatchesUnordered() {
+ // If a scanner doesn't support unordered scanning (i.e. SyncScanner) then
we just
+ // fall back to an ordered scan and assign the appropriate tagging
+ ARROW_ASSIGN_OR_RAISE(auto ordered_scan, ScanBatches());
+ return AddPositioningToInOrderScan(std::move(ordered_scan));
+}
+
+Result<EnumeratedRecordBatchIterator> Scanner::AddPositioningToInOrderScan(
+ TaggedRecordBatchIterator scan) {
+ ARROW_ASSIGN_OR_RAISE(auto first, scan.Next());
+ if (IsIterationEnd(first)) {
+ return MakeEmptyIterator<EnumeratedRecordBatch>();
+ }
+ struct State {
+ State(TaggedRecordBatchIterator source, TaggedRecordBatch first)
Review comment:
`first` is unused here. (I guess it was meant to be used to initialize
prev_batch?)
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator>
ScanTaskIteratorFromRecordBatch(
std::vector<std::shared_ptr<RecordBatch>> batches,
std::shared_ptr<ScanOptions> options);
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+ T value;
+ int index;
+ bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch
originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding
loaded data
+struct TaggedRecordBatch {
+ std::shared_ptr<RecordBatch> record_batch;
+ std::shared_ptr<Fragment> fragment;
+};
Review comment:
N.B. you also need `operator==` if you want to use range-for over an
iterator of these values.
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -67,7 +68,105 @@ Result<RecordBatchGenerator> ScanTask::ExecuteAsync() {
bool ScanTask::supports_async() const { return false; }
-Result<FragmentIterator> Scanner::GetFragments() {
+Result<ScanTaskIterator> Scanner::Scan() {
+ // TODO(ARROW-12289) This is overridden in SyncScanner and will never be
implemented in
+ // AsyncScanner. It is deprecated and will eventually go away.
+ return Status::NotImplemented("This scanner does not support the legacy
Scan() method");
+}
+
+Result<EnumeratedRecordBatchIterator> Scanner::ScanBatchesUnordered() {
+ // If a scanner doesn't support unordered scanning (i.e. SyncScanner) then
we just
+ // fall back to an ordered scan and assign the appropriate tagging
+ ARROW_ASSIGN_OR_RAISE(auto ordered_scan, ScanBatches());
+ return AddPositioningToInOrderScan(std::move(ordered_scan));
+}
+
+Result<EnumeratedRecordBatchIterator> Scanner::AddPositioningToInOrderScan(
+ TaggedRecordBatchIterator scan) {
+ ARROW_ASSIGN_OR_RAISE(auto first, scan.Next());
+ if (IsIterationEnd(first)) {
+ return MakeEmptyIterator<EnumeratedRecordBatch>();
+ }
+ struct State {
+ State(TaggedRecordBatchIterator source, TaggedRecordBatch first)
+ : source(std::move(source)),
+ batch_index(1),
+ finished(false),
+ prev_batch(TaggedRecordBatch{nullptr, nullptr}) {}
+ TaggedRecordBatchIterator source;
+ int batch_index;
+ int fragment_index;
+ bool finished;
+ TaggedRecordBatch prev_batch;
+ };
+ struct TaggingIterator {
Review comment:
Overall this state pattern is common enough for iterators/async
generators that I feel like we should encapsulate it instead of defining it
ad-hoc every time. (Not for this PR, but as a future task.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]