westonpace commented on code in PR #13782:
URL: https://github.com/apache/arrow/pull/13782#discussion_r980546636
##########
cpp/src/arrow/dataset/dataset.h:
##########
@@ -37,11 +38,107 @@ namespace dataset {
using RecordBatchGenerator =
std::function<Future<std::shared_ptr<RecordBatch>>()>;
+/// \brief Description of a column to scan
+struct FragmentSelectionColumn {
+ /// \brief The path to the column to load
+ FieldPath column_reference;
+ /// \brief The requested type of the column
+ ///
+ /// A format may choose to ignore this field completely. For example, when
+ /// reading from IPC the reader can just return the column in the data type
+ /// that is stored on disk. In this case the scan node will cast the column
+ /// to the appropriate type later.
+ ///
+ /// However, some formats may be capable of casting on the fly. For example,
+ /// when reading from CSV, if we know the target type of the column, we can
+ /// convert from string to the target type as we read.
+ DataType* requested_type;
+};
+/// \brief Instructions for scanning a particular fragment
+///
+/// The fragment scan request is dervied from ScanV2Options. The main
+/// difference is that the scan options are based on the dataset schema
+/// while the fragment request is based on the fragment schema.
+struct FragmentScanRequest {
+ /// \brief A row filter
+ ///
+ /// The filter expression should be written against the fragment schema.
+ ///
+ /// \see ScanV2Options for details on how this filter should be applied
+ compute::Expression filter = compute::literal(true);
+
+ /// \brief The columns to scan
+ ///
+ /// These indices refer to the fragment schema
+ ///
+ /// Note: This is NOT a simple list of top-level column indices.
+ /// For more details \see ScanV2Options
+ ///
+ /// If possible a fragment should only read from disk the data needed
+ /// to satisfy these columns. If a format cannot partially read a nested
+ /// column (e.g. JSON) then it must apply the column selection (in memory)
+ /// before returning the scanned batch.
+ std::vector<FragmentSelectionColumn> columns;
+ /// \brief Options specific to the format being scanned
+ FragmentScanOptions* format_scan_options;
+};
+
+/// \brief A collection of reads issued by a FragmentScanner
+struct ReadTask {
+ /// \brief A future which will complete when the read is finished
+ Future<std::shared_ptr<RecordBatch>> task;
+ /// \brief An estimate of the number of data bytes that will
+ /// be read when all the reads have finished
+ ///
+ /// This may be lower than the requested number of data bytes
+ /// if the fragment is almost finished
+ ///
+ /// This may be higher than the requested number of data bytes
+ /// if the fragment had to read more to respect record batch
+ /// boundaries
+ uint64_t num_bytes;
+};
+
+class FragmentScanner {
+ public:
+ /// This instance will only be destroyed after all ongoing scan futures
+ /// have been completed.
+ ///
+ /// This means any callbacks created as part of the scan can safely
+ /// capture `this`
+ virtual ~FragmentScanner() = default;
+ /// \brief Scan further ahead into the fragment
+ /// \param target_num_bytes The number of data bytes the scanner is ready to
receive
+ /// \return Futures for each read operation that has been started
+ ///
+ /// `target_num_bytes` is a hint only and fragments do not need to match this
+ /// exactly. For example, when scanning a parquet file, reads have to align
+ /// with pages and so it is often neccesary to read more than asked for.
+ /// See ScanV2Options for more details.
+ ///
+ /// It is acceptable for a future to complete with an empty batch.
+ ///
+ /// This may be called before the previous read task has finished and the
caller
+ /// should always check HasUnscannedData before each call.
+ virtual ReadTask ScanMore(uint64_t target_num_bytes) = 0;
+
+ /// \brief True if there is still data that has not been scanned
+ ///
+ /// This should return true when there is no more data to start scanning.
+ /// Some scan operations may still be in progress and that is fine.
+ virtual bool HasUnscannedData() = 0;
+};
+
+struct InspectedFragment {
Review Comment:
Added
##########
cpp/src/arrow/dataset/dataset.cc:
##########
@@ -238,5 +277,112 @@ Result<FragmentIterator>
UnionDataset::GetFragmentsImpl(compute::Expression pred
return GetFragmentsFromDatasets(children_, predicate);
}
+namespace {
+
+class BasicFragmentEvolution : public FragmentEvolutionStrategy {
+ public:
+ BasicFragmentEvolution(std::vector<int> ds_to_frag_map, Schema*
dataset_schema)
+ : ds_to_frag_map(std::move(ds_to_frag_map)),
dataset_schema(dataset_schema) {}
+
+ virtual Result<compute::Expression> GetGuarantee(
+ const std::vector<FieldPath>& dataset_schema_selection) const override {
+ std::vector<compute::Expression> missing_fields;
+ for (const FieldPath& path : dataset_schema_selection) {
+ int top_level_field_idx = path[0];
+ if (ds_to_frag_map[top_level_field_idx] < 0) {
+ missing_fields.push_back(compute::equal(
Review Comment:
Changed to `is_null`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]