westonpace commented on a change in pull request #9607:
URL: https://github.com/apache/arrow/pull/9607#discussion_r605537486
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -150,18 +199,43 @@ class ARROW_DS_EXPORT Scanner {
Scanner(std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanOptions>
scan_options)
: fragment_(std::move(fragment)), scan_options_(std::move(scan_options))
{}
- /// \brief The Scan operator returns a stream of ScanTask. The caller is
+ /// \brief The Scan operator returns a stream of ScanTask futures. The
caller is
Review comment:
Done.
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -150,18 +199,43 @@ class ARROW_DS_EXPORT Scanner {
Scanner(std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanOptions>
scan_options)
: fragment_(std::move(fragment)), scan_options_(std::move(scan_options))
{}
- /// \brief The Scan operator returns a stream of ScanTask. The caller is
+ /// \brief The Scan operator returns a stream of ScanTask futures. The
caller is
/// responsible to dispatch/schedule said tasks. Tasks should be safe to run
/// in a concurrent fashion and outlive the iterator.
+ PositionedRecordBatchGenerator ScanUnorderedAsync();
+
+ /// \brief The scan tasks returned in this version will be
Review comment:
Done.
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -72,24 +104,251 @@ Result<FragmentIterator> Scanner::GetFragments() {
return GetFragmentsFromDatasets({dataset_}, scan_options_->filter);
}
+Result<FragmentIterator> Scanner::GetFragments() {
+ auto fut = GetFragmentsAsync();
+ fut.Wait();
Review comment:
Fixed.
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -166,60 +425,67 @@ Result<std::shared_ptr<Scanner>> ScannerBuilder::Finish()
{
return std::make_shared<Scanner>(dataset_, scan_options_);
}
-static inline RecordBatchVector FlattenRecordBatchVector(
- std::vector<RecordBatchVector> nested_batches) {
- RecordBatchVector flattened;
-
- for (auto& task_batches : nested_batches) {
- for (auto& batch : task_batches) {
- flattened.emplace_back(std::move(batch));
- }
- }
-
- return flattened;
-}
-
struct TableAssemblyState {
/// Protecting mutating accesses to batches
std::mutex mutex{};
- std::vector<RecordBatchVector> batches{};
+ std::vector<std::vector<RecordBatchVector>> batches{};
+ int scan_task_id = 0;
- void Emplace(RecordBatchVector b, size_t position) {
+ void Emplace(std::shared_ptr<RecordBatch> batch, size_t fragment_index,
+ size_t task_index, size_t record_batch_index) {
std::lock_guard<std::mutex> lock(mutex);
- if (batches.size() <= position) {
- batches.resize(position + 1);
+ if (batches.size() <= fragment_index) {
+ batches.resize(fragment_index + 1);
+ }
+ if (batches[fragment_index].size() <= task_index) {
+ batches[fragment_index].resize(task_index + 1);
+ }
+ if (batches[fragment_index][task_index].size() <= record_batch_index) {
+ batches[fragment_index][task_index].resize(record_batch_index + 1);
}
- batches[position] = std::move(b);
+ batches[fragment_index][task_index][record_batch_index] = std::move(batch);
}
};
+struct TaggedRecordBatch {
Review comment:
Thanks. Removed.
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -166,60 +425,67 @@ Result<std::shared_ptr<Scanner>> ScannerBuilder::Finish()
{
return std::make_shared<Scanner>(dataset_, scan_options_);
}
-static inline RecordBatchVector FlattenRecordBatchVector(
- std::vector<RecordBatchVector> nested_batches) {
- RecordBatchVector flattened;
-
- for (auto& task_batches : nested_batches) {
- for (auto& batch : task_batches) {
- flattened.emplace_back(std::move(batch));
- }
- }
-
- return flattened;
-}
-
struct TableAssemblyState {
/// Protecting mutating accesses to batches
std::mutex mutex{};
- std::vector<RecordBatchVector> batches{};
+ std::vector<std::vector<RecordBatchVector>> batches{};
+ int scan_task_id = 0;
- void Emplace(RecordBatchVector b, size_t position) {
+ void Emplace(std::shared_ptr<RecordBatch> batch, size_t fragment_index,
+ size_t task_index, size_t record_batch_index) {
std::lock_guard<std::mutex> lock(mutex);
- if (batches.size() <= position) {
- batches.resize(position + 1);
+ if (batches.size() <= fragment_index) {
+ batches.resize(fragment_index + 1);
+ }
+ if (batches[fragment_index].size() <= task_index) {
+ batches[fragment_index].resize(task_index + 1);
+ }
+ if (batches[fragment_index][task_index].size() <= record_batch_index) {
+ batches[fragment_index][task_index].resize(record_batch_index + 1);
}
- batches[position] = std::move(b);
+ batches[fragment_index][task_index][record_batch_index] = std::move(batch);
}
};
+struct TaggedRecordBatch {
+ std::shared_ptr<RecordBatch> record_batch;
+};
+
Result<std::shared_ptr<Table>> Scanner::ToTable() {
- ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
- auto task_group = scan_options_->TaskGroup();
+ auto table_fut = ToTableAsync();
+ table_fut.Wait();
Review comment:
Yep. Removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]