This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e15d26  ARROW-9521: [Rust][DataFusion] Handle custom CSV file 
extensions
0e15d26 is described below

commit 0e15d262c6a3df243501e0f6ed2f4f82efc1f642
Author: Daniel Russo <[email protected]>
AuthorDate: Tue Aug 11 08:57:21 2020 -0600

    ARROW-9521: [Rust][DataFusion] Handle custom CSV file extensions
    
    [ARROW-9521](https://issues.apache.org/jira/browse/ARROW-9521)
    
    This patch adds an option to `CsvReadOptions` for specifying a custom file 
extension.
    
    I am new to the project and to Rust, so this is a small contribution to one 
of the `beginner` issues. Any suggestions at all, I am happy to make the 
changes!
    
    Closes #7918 from drusso/ARROW-9521
    
    Authored-by: Daniel Russo <[email protected]>
    Signed-off-by: Andy Grove <[email protected]>
---
 rust/datafusion/src/datasource/csv.rs              |  5 ++-
 rust/datafusion/src/execution/context.rs           | 46 +++++++++++++++++++---
 rust/datafusion/src/execution/physical_plan/csv.rs | 22 ++++++++++-
 3 files changed, 65 insertions(+), 8 deletions(-)

diff --git a/rust/datafusion/src/datasource/csv.rs 
b/rust/datafusion/src/datasource/csv.rs
index a419286..225ebfb 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -54,6 +54,7 @@ pub struct CsvFile {
     schema: SchemaRef,
     has_header: bool,
     delimiter: u8,
+    file_extension: String,
 }
 
 impl CsvFile {
@@ -69,6 +70,7 @@ impl CsvFile {
             schema,
             has_header: options.has_header,
             delimiter: options.delimiter,
+            file_extension: String::from(options.file_extension),
         })
     }
 }
@@ -88,7 +90,8 @@ impl TableProvider for CsvFile {
             CsvReadOptions::new()
                 .schema(&self.schema)
                 .has_header(self.has_header)
-                .delimiter(self.delimiter),
+                .delimiter(self.delimiter)
+                .file_extension(self.file_extension.as_str()),
             projection.clone(),
             batch_size,
         )?;
diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index f822bc1..dcbfe7b 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -1098,6 +1098,31 @@ mod tests {
     }
 
     #[test]
+    fn query_csv_with_custom_partition_extension() -> Result<()> {
+        let tmp_dir = 
TempDir::new("query_csv_with_custom_partition_extension")?;
+
+        // The main stipulation of this test: use a file extension that isn't 
.csv.
+        let file_extension = ".tst";
+
+        let mut ctx = ExecutionContext::new();
+        let schema = populate_csv_partitions(&tmp_dir, 2, file_extension)?;
+        ctx.register_csv(
+            "test",
+            tmp_dir.path().to_str().unwrap(),
+            CsvReadOptions::new()
+                .schema(&schema)
+                .file_extension(file_extension),
+        )?;
+        let results = collect(&mut ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) 
FROM test")?;
+
+        assert_eq!(results.len(), 1);
+        assert_eq!(results[0].num_rows(), 1);
+        assert_eq!(test::format_batch(&results[0]), vec!["10,110,20"]);
+
+        Ok(())
+    }
+
+    #[test]
     fn scalar_udf() -> Result<()> {
         let schema = Schema::new(vec![
             Field::new("a", DataType::Int32, false),
@@ -1223,10 +1248,12 @@ mod tests {
         ctx.write_csv(physical_plan.as_ref(), out_dir)
     }
 
-    /// Generate a partitioned CSV file and register it with an execution 
context
-    fn create_ctx(tmp_dir: &TempDir, partition_count: usize) -> 
Result<ExecutionContext> {
-        let mut ctx = ExecutionContext::new();
-
+    /// Generate CSV partitions within the supplied directory
+    fn populate_csv_partitions(
+        tmp_dir: &TempDir,
+        partition_count: usize,
+        file_extension: &str,
+    ) -> Result<SchemaRef> {
         // define schema for data source (csv file)
         let schema = Arc::new(Schema::new(vec![
             Field::new("c1", DataType::UInt32, false),
@@ -1235,7 +1262,7 @@ mod tests {
 
         // generate a partitioned file
         for partition in 0..partition_count {
-            let filename = format!("partition-{}.csv", partition);
+            let filename = format!("partition-{}.{}", partition, 
file_extension);
             let file_path = tmp_dir.path().join(&filename);
             let mut file = File::create(file_path)?;
 
@@ -1246,6 +1273,15 @@ mod tests {
             }
         }
 
+        Ok(schema)
+    }
+
+    /// Generate a partitioned CSV file and register it with an execution 
context
+    fn create_ctx(tmp_dir: &TempDir, partition_count: usize) -> 
Result<ExecutionContext> {
+        let mut ctx = ExecutionContext::new();
+
+        let schema = populate_csv_partitions(tmp_dir, partition_count, 
".csv")?;
+
         // register csv file with the execution context
         ctx.register_csv(
             "test",
diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs 
b/rust/datafusion/src/execution/physical_plan/csv.rs
index 0d9963b..c53b21a 100644
--- a/rust/datafusion/src/execution/physical_plan/csv.rs
+++ b/rust/datafusion/src/execution/physical_plan/csv.rs
@@ -43,6 +43,9 @@ pub struct CsvReadOptions<'a> {
     pub schema: Option<&'a Schema>,
     /// Max number of rows to read from CSV files for schema inference if 
needed. Defaults to 1000.
     pub schema_infer_max_records: usize,
+    /// File extension; only files with this extension are selected for data 
input.
+    /// Defaults to ".csv".
+    pub file_extension: &'a str,
 }
 
 impl<'a> CsvReadOptions<'a> {
@@ -53,6 +56,7 @@ impl<'a> CsvReadOptions<'a> {
             schema: None,
             schema_infer_max_records: 1000,
             delimiter: b',',
+            file_extension: ".csv",
         }
     }
 
@@ -68,6 +72,12 @@ impl<'a> CsvReadOptions<'a> {
         self
     }
 
+    /// Specify the file extension for CSV file selection
+    pub fn file_extension(mut self, file_extension: &'a str) -> Self {
+        self.file_extension = file_extension;
+        self
+    }
+
     /// Configure delimiter setting with Option, None value will be ignored
     pub fn delimiter_option(mut self, delimiter: Option<u8>) -> Self {
         match delimiter {
@@ -103,6 +113,8 @@ pub struct CsvExec {
     has_header: bool,
     /// An optional column delimiter. Defaults to `b','`
     delimiter: Option<u8>,
+    /// File extension
+    file_extension: String,
     /// Optional projection for which columns to load
     projection: Option<Vec<usize>>,
     /// Schema after the projection has been applied
@@ -134,6 +146,7 @@ impl CsvExec {
             schema: Arc::new(schema),
             has_header: options.has_header,
             delimiter: Some(options.delimiter),
+            file_extension: String::from(options.file_extension),
             projection,
             projected_schema: Arc::new(projected_schema),
             batch_size,
@@ -143,7 +156,8 @@ impl CsvExec {
     /// Infer schema for given CSV dataset
     pub fn try_infer_schema(path: &str, options: &CsvReadOptions) -> 
Result<Schema> {
         let mut filenames: Vec<String> = vec![];
-        common::build_file_list(path, &mut filenames, ".csv")?;
+        common::build_file_list(path, &mut filenames, options.file_extension)?;
+
         if filenames.is_empty() {
             return Err(ExecutionError::General("No files found".to_string()));
         }
@@ -166,7 +180,11 @@ impl ExecutionPlan for CsvExec {
     /// Get the partitions for this execution plan. Each partition can be 
executed in parallel.
     fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
         let mut filenames: Vec<String> = vec![];
-        common::build_file_list(&self.path, &mut filenames, ".csv")?;
+        common::build_file_list(
+            &self.path,
+            &mut filenames,
+            self.file_extension.as_str(),
+        )?;
         let partitions = filenames
             .iter()
             .map(|filename| {

Reply via email to