yjshen commented on a change in pull request #1010:
URL: https://github.com/apache/arrow-datafusion/pull/1010#discussion_r714511626



##########
File path: datafusion/src/datasource/file_format/mod.rs
##########
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods for the various file formats
+
+pub mod avro;
+pub mod csv;
+pub mod json;
+pub mod parquet;
+
+use std::pin::Pin;
+use std::sync::Arc;
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::datasource::{create_max_min_accs, get_col_stats};
+use crate::error::Result;
+use crate::logical_plan::Expr;
+use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};
+
+use async_trait::async_trait;
+use futures::Stream;
+
+/// A stream of String that can be used accross await calls
+pub type StringStream = Pin<Box<dyn Stream<Item = String> + Send + Sync>>;
+
+/// Convert a vector into a stream
+pub fn string_stream(strings: Vec<String>) -> StringStream {
+    Box::pin(futures::stream::iter(strings))
+}
+
+/// This trait abstracts all the file format specific implementations
+/// from the `TableProvider`. This helps code re-utilization accross
+/// providers that support the the same file formats.
+#[async_trait]
+pub trait FileFormat: Send + Sync {
+    /// Open the files at the paths provided by iterator and infer the
+    /// common schema
+    async fn infer_schema(&self, paths: StringStream) -> Result<SchemaRef>;
+
+    /// Open the file at the given path and infer its statistics
+    async fn infer_stats(&self, path: &str) -> Result<Statistics>;
+
+    /// Take a list of files and convert it to the appropriate executor
+    /// according to this file format.
+    /// TODO group params into TableDescription(schema,files,stats) and
+    /// ScanOptions(projection,batch_size,filters) to avoid too_many_arguments
+    #[allow(clippy::too_many_arguments)]
+    async fn create_executor(

Review comment:
       The name `executor` seems vague to me. Since it creates a scan physical 
plan, can we name it more precisely? 

##########
File path: datafusion/src/datasource/listing.rs
##########
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A table that uses the `ObjectStore` listing capability
+//! to get the list of files to process.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+use futures::{StreamExt, TryStreamExt};
+
+use crate::{
+    datasource::file_format::{self, PartitionedFile},
+    error::Result,
+    logical_plan::Expr,
+    physical_plan::{common, ExecutionPlan, Statistics},
+};
+
+use super::{
+    datasource::TableProviderFilterPushDown, file_format::FileFormat, 
TableProvider,
+};
+
+/// Options for creating a `ListingTable`
+pub struct ListingOptions {
+    /// A suffix on which files should be filtered (leave empty to
+    /// keep all files on the path)
+    pub file_extension: String,
+    /// The file format
+    pub format: Arc<dyn FileFormat>,
+    /// The expected partition column names.
+    /// For example `Vec["a", "b"]` means that the two first levels of
+    /// partitioning expected should be named "a" and "b":
+    /// - If there is a third level of partitioning it will be ignored.
+    /// - Files that don't follow this partitioning will be ignored.

Review comment:
       Regarding the comments here, we haven't think about how to deal with 
default partition semantics yet. 
   
   I.e while inserting a table with a partition column values NULL, how do we 
deal with a=20210922/b=NULL/1.parquet? how to differentiate if the NULL is a 
valid string value or it denotes none exists?
   
    I recall Hive have `__HIVE_DEFAULT_PARTITION__` for this purpose.
   

##########
File path: datafusion/src/datasource/file_format/mod.rs
##########
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods for the various file formats
+
+pub mod avro;
+pub mod csv;
+pub mod json;
+pub mod parquet;
+
+use std::pin::Pin;
+use std::sync::Arc;
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::datasource::{create_max_min_accs, get_col_stats};
+use crate::error::Result;
+use crate::logical_plan::Expr;
+use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};
+
+use async_trait::async_trait;
+use futures::Stream;
+
+/// A stream of String that can be used accross await calls
+pub type StringStream = Pin<Box<dyn Stream<Item = String> + Send + Sync>>;
+
+/// Convert a vector into a stream
+pub fn string_stream(strings: Vec<String>) -> StringStream {
+    Box::pin(futures::stream::iter(strings))
+}
+
+/// This trait abstracts all the file format specific implementations
+/// from the `TableProvider`. This helps code re-utilization accross
+/// providers that support the the same file formats.
+#[async_trait]
+pub trait FileFormat: Send + Sync {
+    /// Open the files at the paths provided by iterator and infer the
+    /// common schema
+    async fn infer_schema(&self, paths: StringStream) -> Result<SchemaRef>;

Review comment:
       Maybe `FileMetaStream` as arguments instead?

##########
File path: datafusion/src/datasource/listing.rs
##########
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A table that uses the `ObjectStore` listing capability
+//! to get the list of files to process.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+use futures::{StreamExt, TryStreamExt};
+
+use crate::{
+    datasource::file_format::{self, PartitionedFile},
+    error::Result,
+    logical_plan::Expr,
+    physical_plan::{common, ExecutionPlan, Statistics},
+};
+
+use super::{
+    datasource::TableProviderFilterPushDown, file_format::FileFormat, 
TableProvider,
+};
+
+/// Options for creating a `ListingTable`
+pub struct ListingOptions {
+    /// A suffix on which files should be filtered (leave empty to
+    /// keep all files on the path)
+    pub file_extension: String,
+    /// The file format
+    pub format: Arc<dyn FileFormat>,
+    /// The expected partition column names.
+    /// For example `Vec["a", "b"]` means that the two first levels of
+    /// partitioning expected should be named "a" and "b":
+    /// - If there is a third level of partitioning it will be ignored.
+    /// - Files that don't follow this partitioning will be ignored.
+    /// Note that only `DataType::Utf8` is supported for the column type.
+    /// TODO implement case where partitions.len() > 0
+    pub partitions: Vec<String>,

Review comment:
       Does it make sense to have `partitions` a schema instead of 
`Vec<String>`, and introduce a cast if needed from path string to its 
`partition_values` or `Vec<ScalarValue>`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to