westonpace commented on a change in pull request #11991:
URL: https://github.com/apache/arrow/pull/11991#discussion_r780473351



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2239,10 +2233,6 @@ cdef class Scanner(_Weakrefable):
     use_threads : bool, default True
         If enabled, then maximum parallelism will be used determined by
         the number of available CPU cores.
-    use_async : bool, default False

Review comment:
       This was bugging me so I went ahead and played around with it.  I don't 
see any consequences if I change `bint use_async=True` to `object use_async = 
None` and that allows me to emit the deprecation warning on both 
`use_async=True` and `use_async=False` (i.e. emit a warning if the user uses 
the flag in any way).

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -823,10 +584,77 @@ Result<int64_t> AsyncScanner::CountRows() {
   return total.load();
 }
 
+Result<std::shared_ptr<RecordBatchReader>> AsyncScanner::ToRecordBatchReader() 
{
+  ARROW_ASSIGN_OR_RAISE(auto it, ScanBatches());
+  return 
std::make_shared<ScannerRecordBatchReader>(options()->projected_schema,
+                                                    std::move(it));
+}
+
 const std::shared_ptr<Dataset>& AsyncScanner::dataset() const { return 
dataset_; }
 
+Status NestedFieldRefsNotImplemented() {
+  // TODO(ARROW-11259) Several functions (for example, IpcScanTask::Make) 
assume that
+  // only top level fields will be materialized.
+  return Status::NotImplemented("Nested field references in scans.");
+}
+
 }  // namespace
 
+Result<ProjectionDescr> ProjectionDescr::FromStructExpression(
+    const compute::Expression& projection, const Schema& dataset_schema) {
+  ARROW_ASSIGN_OR_RAISE(compute::Expression bound_expression,
+                        projection.Bind(dataset_schema));
+
+  if (bound_expression.type()->id() != Type::STRUCT) {
+    return Status::Invalid("Projection ", projection.ToString(),
+                           " cannot yield record batches");
+  }
+  std::shared_ptr<Schema> projection_schema =
+      ::arrow::schema(checked_cast<const 
StructType&>(*bound_expression.type()).fields(),
+                      dataset_schema.metadata());
+
+  return ProjectionDescr{std::move(bound_expression), 
std::move(projection_schema)};
+}
+
+Result<ProjectionDescr> ProjectionDescr::FromExpressions(
+    const std::vector<compute::Expression>& exprs, std::vector<std::string> 
names,
+    const Schema& dataset_schema) {
+  compute::MakeStructOptions project_options{std::move(names)};
+
+  for (size_t i = 0; i < exprs.size(); ++i) {
+    if (auto ref = exprs[i].field_ref()) {
+      if (!ref->name()) return NestedFieldRefsNotImplemented();
+
+      // set metadata and nullability for plain field references
+      ARROW_ASSIGN_OR_RAISE(auto field, ref->GetOne(dataset_schema));
+      project_options.field_nullability[i] = field->nullable();
+      project_options.field_metadata[i] = field->metadata();
+    }
+  }
+
+  return ProjectionDescr::FromStructExpression(
+      call("make_struct", std::move(exprs), std::move(project_options)), 
dataset_schema);

Review comment:
       Good catch.  Fixed.  All the callers were moving into it anyways.

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,41 +133,46 @@ struct ARROW_DS_EXPORT ScanOptions {
   // This is used by Fragment implementations to apply the column
   // sub-selection optimization.
   std::vector<std::string> MaterializedFields() const;
-
-  // Return a threaded or serial TaskGroup according to use_threads.
-  std::shared_ptr<::arrow::internal::TaskGroup> TaskGroup() const;
 };
 
-/// \brief Read record batches from a range of a single data fragment. A
-/// ScanTask is meant to be a unit of work to be dispatched. The implementation
-/// must be thread and concurrent safe.
-class ARROW_DS_EXPORT ScanTask {
- public:
-  /// \brief Iterate through sequence of materialized record batches
-  /// resulting from the Scan. Execution semantics are encapsulated in the
-  /// particular ScanTask implementation
-  virtual Result<RecordBatchIterator> Execute() = 0;
-  virtual Future<RecordBatchVector> SafeExecute(::arrow::internal::Executor* 
executor);
-  virtual Future<> SafeVisit(::arrow::internal::Executor* executor,
-                             
std::function<Status(std::shared_ptr<RecordBatch>)> visitor);
+/// \brief Describes a projection
+struct ARROW_DS_EXPORT ProjectionDescr {
+  /// \brief The projection expression itself
+  /// This expression must be a call to make_struct
+  compute::Expression expression;
+  /// \brief The output schema of the projection.
 
-  virtual ~ScanTask() = default;
+  /// This can be calculated from the input schema and the expression but it
+  /// is cached here for convenience.
+  std::shared_ptr<Schema> schema;
 
-  const std::shared_ptr<ScanOptions>& options() const { return options_; }
-  const std::shared_ptr<Fragment>& fragment() const { return fragment_; }
+  /// \brief Create a ProjectionDescr by binding an expression to the dataset 
schema
+  ///
+  /// expression must return a struct type
+  static Result<ProjectionDescr> FromStructExpression(
+      const compute::Expression& expression, const Schema& dataset_schema);
 
- protected:
-  ScanTask(std::shared_ptr<ScanOptions> options, std::shared_ptr<Fragment> 
fragment)
-      : options_(std::move(options)), fragment_(std::move(fragment)) {}
+  /// \brief Create a ProjectionDescr from expressions/names for each field
+  static Result<ProjectionDescr> FromExpressions(
+      const std::vector<compute::Expression>& exprs, std::vector<std::string> 
names,
+      const Schema& dataset_schema);
 
-  std::shared_ptr<ScanOptions> options_;
-  std::shared_ptr<Fragment> fragment_;
+  /// \brief Create a default projection referencing fields in the dataset 
schema
+  static Result<ProjectionDescr> FromNames(std::vector<std::string> names,
+                                           const Schema& dataset_schema);
+
+  /// \brief Make a projection that projects every field in the dataset schema
+  static Result<ProjectionDescr> Default(const Schema& dataset_schema);
 };
 
+/// \brief Utility method to set the projection expression and schema
+ARROW_DS_EXPORT void SetProjection(ScanOptions* options, ProjectionDescr 
projection);

Review comment:
       Yes...but I don't like the idea of "options" classes having methods.  I 
think someday down the road (I keep mentioning ARROW-12311 so maybe then) this 
will change.  One possible fix would be to have a single `ProjectionDescr` 
field in `ScanOptions` although, as mentioned elsewhere, I'd like to remove 
projection options from scan options entirely (changing it instead to "which 
columns to load").
   
   So I think I'd rather leave it in the current incorrect state rather than 
move it to an intermediate state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to