westonpace commented on code in PR #13782:
URL: https://github.com/apache/arrow/pull/13782#discussion_r974188037
##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,17 +38,91 @@ namespace dataset {
using RecordBatchGenerator =
std::function<Future<std::shared_ptr<RecordBatch>>()>;
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
+ /// \brief The path to the column to load
+ FieldPath path;
+ /// \brief The type of the column in the dataset schema
+ ///
+ /// A format may choose to ignore this field completely. For example, when
+ /// reading from IPC the reader can just return the column in the data type
+ /// that is stored on disk. There is no point in doing anything special.
+ ///
+ /// However, some formats may be capable of casting on the fly. For example,
+ /// when reading from CSV, if we know the target type of the column, we can
+ /// convert from string to the target type as we read.
+ DataType* requested_type;
+ /// \brief The index in the output selection of this column
+ int selection_index;
+};
+/// \brief Instructions for scanning a particular fragment
+///
+/// The fragment scan request is dervied from ScanV2Options. The main
+/// difference is that the scan options are based on the dataset schema
+/// while the fragment request is based on the fragment schema.
+struct FragmentScanRequest {
+ /// \brief A row filter
+ ///
+ /// The filter expression should be written against the fragment schema.
+ ///
+ /// \see ScanV2Options for details on how this filter should be applied
+ compute::Expression filter = compute::literal(true);
+
+ /// \brief The columns to scan
+ ///
+ /// These indices refer to the fragment schema
+ ///
+ /// Note: This is NOT a simple list of top-level column indices.
+ /// For more details \see ScanV2Options
+ ///
+ /// If possible a fragment should only read from disk the data needed
+ /// to satisfy these columns. If a format cannot partially read a nested
+ /// column (e.g. JSON) then it must apply the column selection (in memory)
+ /// before returning the scanned batch.
+ std::vector<FragmentSelectionColumn> columns;
+ /// \brief Options specific to the format being scanned
+ FragmentScanOptions* format_scan_options;
+};
+
+class FragmentScanner {
+ public:
+ /// This instance will only be destroyed after all ongoing scan futures
+ /// have been completed.
+ ///
+ /// This means any callbacks created as part of the scan can safely
+ /// capture `this`
+ virtual ~FragmentScanner() = default;
+ /// \brief Scan a batch of data from the file
+ /// \param batch_number The index of the batch to read
+ virtual Future<std::shared_ptr<RecordBatch>> ScanBatch(int batch_number) = 0;
+ /// \brief Calculate an estimate of how many data bytes the given batch will
represent
+ ///
+ /// "Data bytes" should be the total size of all the buffers once the data
has been
+ /// decoded into the Arrow format.
+ virtual int64_t EstimatedDataBytes(int batch_number) = 0;
+ /// \brief The number of batches in the fragment to scan
+ virtual int NumBatches() = 0;
Review Comment:
> How would something like CSV implement this?
It's # of batches instead of # of rows so it should just be
`ceil(file_size_bytes / block_size)`. That being said, this is a change from
the previous fragment implementation which didn't require fragments to know the
# of batches ahead of time. It will be a problem for something like the python
iterator-based scanner.
It is possible to make this work without knowing the # of batches up front.
However, it creates a bit more complexity for two reasons:
1. It makes it harder to know the batch index (I'm also working on ordered
execution) because you can't know the batch index until previous fragments have
been fully scanned (where this implementation just requires that previous
fragments have been inspected).
2. It makes it difficult to know when we should stop spawning tasks to read
further in the current fragment and start spawning tasks to read the next
fragment. If we do not know `NumBatches` we have to rely on a fragment scanner
returning an end-of-fragment future quickly. However, this should also be
possible.
My hope is that an iterator-based dataset could easily just be an
iterator-based source node and so it's ok for the scan node to have this
expectation. If we were adapting flight to the scan node do you know if this
is something that could cause a problem? Does a flight stream have a known #
of batches before we even start consuming it? Or do we not know the # of
batches until we have finished reading it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]