yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r692658418
##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +52,304 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send +
Sync + 'static>> {
/// Read data from a reader
Reader(std::sync::Mutex<Option<R>>),
}
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+ /// Path for the file (e.g. URL, filesystem path, etc)
+ pub file_path: String,
+ /// Schema of the file
+ pub schema: Schema,
+ /// Statistics of the file
+ pub statistics: Statistics,
+ // Values of partition columns to be appended to each row
+ // pub partition_value: Option<Vec<ScalarValue>>,
+ // Schema of partition columns
+ // pub partition_schema: Option<Schema>,
+ // We may include row group range here for a more fine-grained parallel
execution
+}
+
+impl From<String> for PartitionedFile {
+ fn from(file_path: String) -> Self {
+ Self {
+ file_path,
+ schema: Schema::empty(),
+ statistics: Default::default(),
+ }
+ }
+}
+
+impl std::fmt::Display for PartitionedFile {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{}", self.file_path)
+ }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+ /// The index of the partition among all partitions
+ pub index: usize,
+ /// The contained files of the partition
+ pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let files: Vec<String> = self.files.iter().map(|f|
f.to_string()).collect();
+ write!(f, "{}", files.join(", "))
+ }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct SourceRootDescriptor {
+ /// All source files in the path
+ pub partition_files: Vec<PartitionedFile>,
+ /// The schema of the files
+ pub schema: SchemaRef,
+}
+
+/// Stream of
+pub type PartitionedFileStream =
+ Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync +
'static>>;
+
+/// Builder for ['SourceRootDescriptor'] inside given path
+#[async_trait]
+pub trait SourceRootDescBuilder: Sync + Send + Debug {
+ /// Construct a ['SourceRootDescriptor'] from the provided path
+ fn get_source_desc(
+ path: &str,
+ object_store: Arc<dyn ObjectStore>,
+ ext: &str,
+ provided_schema: Option<Schema>,
+ collect_statistics: bool,
+ ) -> Result<SourceRootDescriptor> {
+ let mut results: Vec<Result<PartitionedFile>> = Vec::new();
+ futures::executor::block_on(async {
+ match Self::get_source_desc_async(
+ path,
+ object_store,
+ ext,
+ provided_schema,
+ collect_statistics,
+ )
+ .await
+ {
+ Ok(mut stream) => {
+ while let Some(pf) = stream.next().await {
+ results.push(pf);
+ }
+ }
+ Err(e) => {
+ results.push(Err(e));
+ }
+ }
+ });
+
+ let partition_results: Result<Vec<PartitionedFile>> =
+ results.into_iter().collect();
+ let partition_files = partition_results?;
+
+ // build a list of Parquet partitions with statistics and gather all
unique schemas
Review comment:
Yes, the `get_source_desc` is used to adapting `async` to `sync` to stop
propagating async to API.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]