[ 
https://issues.apache.org/jira/browse/ARROW-12311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17491181#comment-17491181
 ] 

Weston Pace commented on ARROW-12311:
-------------------------------------

I spent some time today thinking about scan options.  I'd like to propose a 
simplification.  Note that this simplification requires changes to the existing 
behavior of the scanner itself in some substantial ways.

Here are my proposed options (with a bullet list of changes below)
{noformat}
/// Scan-specific options, which can be changed between scans of the same 
dataset.
struct ARROW_DS_EXPORT ScanOptions {
  /// A row filter
  ///
  /// This is an opportunistic pushdown filter.  Filtering capabilities will
  /// vary between fragments.
  ///
  /// Each fragment will do its best to filter the data based on the information
  /// (partitioning guarantees, statistics) available to it.  If it is able to
  /// apply some filtering then it will indicate what filtering it was able to
  /// apply by attaching a guarantee to the batch.
  ///
  /// For example, if a filter is x < 50 && y > 40 then a batch may be able to
  /// apply a guarantee x < 50.  Post-scan filtering would then only need to
  /// consider y > 40 (for this specific batch).  The next batch may not be able
  /// to attach any guarantee and both clauses would need to be applied to that 
batch.
  ///
  /// The filter expression may not be compatible with the schema of the 
underlying
  /// file / statistics.  For example, a filter expression may be x > 
literal(-50,
  /// int32()) The underlying data type might be uint64.  If possible, the 
reader may
  /// attempt to apply the filter (in this case the gaurantee x > -50 can be 
attached to
  /// the batch since we know all values will be >= 0 and thus x > -50 is true.
  ///
  /// If a reader is not capable of handling mismatched types in a filter then 
it should
  /// ignore the filter.
  ///
  /// A single guarantee-aware filtering operation should generally be applied 
to all
  /// resulting batches.  The scan node consumer is responsible for doing this.
  ///
  /// Note: If you are using a Scanner (as opposed to creating a scan node) 
there
  /// are a number of convenience methods such as Scanner::Head, 
Scanner::ScanBatches,
  /// etc.  These "convenience" methods will create an exec plan that includes a
  /// scan node, a filter node, and a projection node.  So if you are using 
these
  /// methods you do not need to worry about this final filtering step.
  compute::Expression filter = compute::literal(true);

  /// A mapping describing the desired output schema
  ///
  /// If a fragment contains a field that is not included in this mapping the 
field
  /// should not be returned.  If possible the column should not be loaded from 
disk.
  ///
  /// If a fragment does not contain a field specified by the mapping then this 
is
  /// not an error and the fragment should not return anything.  In particular, 
the
  /// reader should not attempt to create an all-null placeholder.
  ///
  /// If a reference is a nested reference then readers should only return the
  /// desired subset of the column.  For example, if the physical schema has a
  /// field with the type "points": struct<{"real": double, "imaginary": 
double}>
  /// and the ref is "points" then the entire column should be loaded and the 
data
  /// type of the column will be struct<...>.  If the ref is "points.real" then
  /// only the "real" portion of the column should be loaded and the data type 
of
  /// the column will be "real"
  ///
  /// If the reader does not support partial loading of nested columns (e.g. 
CSV) then
  /// the reader should load the entire column and then select the relevant 
nested
  /// portion.  Note that a single source nested column many end up populating 
several
  /// output columns.  Readers should take care not to load the source column 
from disk
  /// multiple times.
  ///
  /// The output field type is included for opportunistic purposes.  If the 
underlying
  /// column type does not match then this is not an error and the reader 
should return
  /// the underlying column as is.  In particular, the reader should not cast.  
However,
  /// if the column type is unknown then the reader should attempt to infer the 
column
  /// based on the desired output type.
  ///
  /// For example, consider the mapping: [ "items": {"values": int64 } ].  If 
the fragment
  /// is a parquet file with a column named "items" that has a float64 data 
type then the
  /// reader should emit the column "values":float64.
  ///
  /// If the fragment is a CSV file with a column named "items" it should 
attempt to parse
  /// the column as int64, inserting null/na or erroring (depending on the 
reader options)
  /// if it cannot parse a particular cell.
  ///
  /// The scan node itself is responsible for a final casting of data from 
whatever
  /// physical type the reader was able to produce to the desired output type.  
This means
  /// that a scan node will always emit batches according to the output 
mapping's schema.
  /// If a cast is impossible then the error_on_incompatible_types option will 
take
  /// effect.
  ///
  /// If empty then no columns will be loaded from the file.  This can be useful
  /// when you need row-count information only.  File readers that support a
  /// metadata-only scan should utilize such a scan when an empty vector is 
specified.
  std::vector<std::pair<FieldRef, Field>> output_mapping;

  /// If true the scanner will error when a fragment cannot satisfy a type
  /// requested by the output mapping
  ///
  /// If false the scanner will emit an all-null column (of the requested type) 
instead
  bool error_on_incompatible_types = true;

  /// If true the scanner will error when a batch does not include one of the 
fields
  /// requested in the output_mapping.
  ///
  /// If false the scanner will emit an all-null column (of the requested type) 
instead
  bool error_on_missing_fields = false;

  /// Maximum row count for scanned batches.
  int64_t batch_size = kDefaultBatchSize;

  /// How many batches to read ahead within a file
  ///
  /// Set to 0 to disable batch readahead
  ///
  /// Note: May not be supported by all formats
  int32_t batch_readahead = kDefaultBatchReadahead;

  /// How many files to read ahead
  ///
  /// Set to 0 to disable fragment readahead
  int32_t fragment_readahead = kDefaultFragmentReadahead;

  /// A pool from which materialized and scanned arrays will be allocated.
  MemoryPool* pool = arrow::default_memory_pool();

  /// If true the scanner will scan in parallel
  ///
  /// Note: If true, this will use threads from both the cpu_executor and the
  /// io_context.executor
  bool use_threads = false;
};
{noformat}

* The filtering behavior is not really different from what we have today but 
hopefully more explicit.  The only thing we don't handle great today is the 
case when the data type of the filter expression does not match the data type 
of the column.  See: ARROW-15146
* The projection behavior we have today is very confusing to the point that I 
don't even know if what I'm proposing is that much of a change.  The actual 
syntax is different.  The old syntax (projection_schema + projection + 
dataset_schema) was rather confusing so I think it's worth the cost of 
introducing a new almost-schema vector of field ref/field pairs.  Unlike the 
filtering I don't think it is acceptable to leave some projection for the exec 
plan.  I've learned more of relational algebra and it seems that nodes should 
really have a consistent output type known at plan creation time.  Therefore, I 
think the scan node should be responsible for the final projection.  However, I 
think the scan node can take care of the casting to keep the reader logic 
simple.  So basically, readers will do the best they can, and the scan node 
will finish the rest.
* Users often have clunky datasets that are full of inconsistencies.  As such, 
I introduced {{error_on_incompatible_types}} and {{error_on_missing_fields}} to 
allow the user to get nulls instead of errors.
* Similar to the above, we should be setup to handle datasets that have 
different files in different formats.  I've removed fragment_scan_options.  
This should be a property of the fragment.  If we want to make it easier for 
users then I think they can provide a default set of options during dataset 
discovery.  This would also mean that FileSystemDataset eventually loses it's 
"format" property but that can maybe be handled in a follow-up.
* I removed the io_context option as that should come from the filesystem.

> [Python][R] Expose (hide?) ScanOptions
> --------------------------------------
>
>                 Key: ARROW-12311
>                 URL: https://issues.apache.org/jira/browse/ARROW-12311
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Python, R
>            Reporter: Weston Pace
>            Assignee: Weston Pace
>            Priority: Major
>             Fix For: 8.0.0
>
>
> Currently R completely hides the `ScanOptions` class.
> In python the class is exposed but the documentation prefers `dataset.scan` 
> (which hides both the scanner and the scan options).
> However, there is some useful information in the `ScanOptions`.  
> Specifically, the projected schema (which is a product of the dataset schema 
> and the projection expression and not easily recreated) and the materialized 
> fields (the list of fields referenced by either the filter or the projection) 
> which might be useful for reporting purposes.
> Currently R uses the projected schema to convert a list of column names into 
> a partition schema.  Python does not rely on either field.
>  
> Options:
>  - Keep the status quo
>  - Expose the ScanOptions object (which itself is exposed via the Scanner)
>  - Expose the interesting fields via the Scanner
>  
> Currently the C++ design is halfway between the latter two (projected schema 
> is exposed and options).  My preference would be the third option.  It raises 
> a further question about how to expose the scanner itself in Python?  Should 
> the user be using ScannerBuilder?  Should they use NewScan?  Should they use 
> the scanner directly at all or should it be hidden?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to