This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 0e15d26 ARROW-9521: [Rust][DataFusion] Handle custom CSV file
extensions
0e15d26 is described below
commit 0e15d262c6a3df243501e0f6ed2f4f82efc1f642
Author: Daniel Russo <[email protected]>
AuthorDate: Tue Aug 11 08:57:21 2020 -0600
ARROW-9521: [Rust][DataFusion] Handle custom CSV file extensions
[ARROW-9521](https://issues.apache.org/jira/browse/ARROW-9521)
This patch adds an option to `CsvReadOptions` for specifying a custom file
extension.
I am new to the project and to Rust, so this is a small contribution to one
of the `beginner` issues. Any suggestions at all, I am happy to make the
changes!
Closes #7918 from drusso/ARROW-9521
Authored-by: Daniel Russo <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
---
rust/datafusion/src/datasource/csv.rs | 5 ++-
rust/datafusion/src/execution/context.rs | 46 +++++++++++++++++++---
rust/datafusion/src/execution/physical_plan/csv.rs | 22 ++++++++++-
3 files changed, 65 insertions(+), 8 deletions(-)
diff --git a/rust/datafusion/src/datasource/csv.rs
b/rust/datafusion/src/datasource/csv.rs
index a419286..225ebfb 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -54,6 +54,7 @@ pub struct CsvFile {
schema: SchemaRef,
has_header: bool,
delimiter: u8,
+ file_extension: String,
}
impl CsvFile {
@@ -69,6 +70,7 @@ impl CsvFile {
schema,
has_header: options.has_header,
delimiter: options.delimiter,
+ file_extension: String::from(options.file_extension),
})
}
}
@@ -88,7 +90,8 @@ impl TableProvider for CsvFile {
CsvReadOptions::new()
.schema(&self.schema)
.has_header(self.has_header)
- .delimiter(self.delimiter),
+ .delimiter(self.delimiter)
+ .file_extension(self.file_extension.as_str()),
projection.clone(),
batch_size,
)?;
diff --git a/rust/datafusion/src/execution/context.rs
b/rust/datafusion/src/execution/context.rs
index f822bc1..dcbfe7b 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -1098,6 +1098,31 @@ mod tests {
}
#[test]
+ fn query_csv_with_custom_partition_extension() -> Result<()> {
+ let tmp_dir =
TempDir::new("query_csv_with_custom_partition_extension")?;
+
+ // The main stipulation of this test: use a file extension that isn't
.csv.
+ let file_extension = ".tst";
+
+ let mut ctx = ExecutionContext::new();
+ let schema = populate_csv_partitions(&tmp_dir, 2, file_extension)?;
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new()
+ .schema(&schema)
+ .file_extension(file_extension),
+ )?;
+ let results = collect(&mut ctx, "SELECT SUM(c1), SUM(c2), COUNT(*)
FROM test")?;
+
+ assert_eq!(results.len(), 1);
+ assert_eq!(results[0].num_rows(), 1);
+ assert_eq!(test::format_batch(&results[0]), vec!["10,110,20"]);
+
+ Ok(())
+ }
+
+ #[test]
fn scalar_udf() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
@@ -1223,10 +1248,12 @@ mod tests {
ctx.write_csv(physical_plan.as_ref(), out_dir)
}
- /// Generate a partitioned CSV file and register it with an execution
context
- fn create_ctx(tmp_dir: &TempDir, partition_count: usize) ->
Result<ExecutionContext> {
- let mut ctx = ExecutionContext::new();
-
+ /// Generate CSV partitions within the supplied directory
+ fn populate_csv_partitions(
+ tmp_dir: &TempDir,
+ partition_count: usize,
+ file_extension: &str,
+ ) -> Result<SchemaRef> {
// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
@@ -1235,7 +1262,7 @@ mod tests {
// generate a partitioned file
for partition in 0..partition_count {
- let filename = format!("partition-{}.csv", partition);
+ let filename = format!("partition-{}.{}", partition,
file_extension);
let file_path = tmp_dir.path().join(&filename);
let mut file = File::create(file_path)?;
@@ -1246,6 +1273,15 @@ mod tests {
}
}
+ Ok(schema)
+ }
+
+ /// Generate a partitioned CSV file and register it with an execution
context
+ fn create_ctx(tmp_dir: &TempDir, partition_count: usize) ->
Result<ExecutionContext> {
+ let mut ctx = ExecutionContext::new();
+
+ let schema = populate_csv_partitions(tmp_dir, partition_count,
".csv")?;
+
// register csv file with the execution context
ctx.register_csv(
"test",
diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs
b/rust/datafusion/src/execution/physical_plan/csv.rs
index 0d9963b..c53b21a 100644
--- a/rust/datafusion/src/execution/physical_plan/csv.rs
+++ b/rust/datafusion/src/execution/physical_plan/csv.rs
@@ -43,6 +43,9 @@ pub struct CsvReadOptions<'a> {
pub schema: Option<&'a Schema>,
/// Max number of rows to read from CSV files for schema inference if
needed. Defaults to 1000.
pub schema_infer_max_records: usize,
+ /// File extension; only files with this extension are selected for data
input.
+ /// Defaults to ".csv".
+ pub file_extension: &'a str,
}
impl<'a> CsvReadOptions<'a> {
@@ -53,6 +56,7 @@ impl<'a> CsvReadOptions<'a> {
schema: None,
schema_infer_max_records: 1000,
delimiter: b',',
+ file_extension: ".csv",
}
}
@@ -68,6 +72,12 @@ impl<'a> CsvReadOptions<'a> {
self
}
+ /// Specify the file extension for CSV file selection
+ pub fn file_extension(mut self, file_extension: &'a str) -> Self {
+ self.file_extension = file_extension;
+ self
+ }
+
/// Configure delimiter setting with Option, None value will be ignored
pub fn delimiter_option(mut self, delimiter: Option<u8>) -> Self {
match delimiter {
@@ -103,6 +113,8 @@ pub struct CsvExec {
has_header: bool,
/// An optional column delimiter. Defaults to `b','`
delimiter: Option<u8>,
+ /// File extension
+ file_extension: String,
/// Optional projection for which columns to load
projection: Option<Vec<usize>>,
/// Schema after the projection has been applied
@@ -134,6 +146,7 @@ impl CsvExec {
schema: Arc::new(schema),
has_header: options.has_header,
delimiter: Some(options.delimiter),
+ file_extension: String::from(options.file_extension),
projection,
projected_schema: Arc::new(projected_schema),
batch_size,
@@ -143,7 +156,8 @@ impl CsvExec {
/// Infer schema for given CSV dataset
pub fn try_infer_schema(path: &str, options: &CsvReadOptions) ->
Result<Schema> {
let mut filenames: Vec<String> = vec![];
- common::build_file_list(path, &mut filenames, ".csv")?;
+ common::build_file_list(path, &mut filenames, options.file_extension)?;
+
if filenames.is_empty() {
return Err(ExecutionError::General("No files found".to_string()));
}
@@ -166,7 +180,11 @@ impl ExecutionPlan for CsvExec {
/// Get the partitions for this execution plan. Each partition can be
executed in parallel.
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
let mut filenames: Vec<String> = vec![];
- common::build_file_list(&self.path, &mut filenames, ".csv")?;
+ common::build_file_list(
+ &self.path,
+ &mut filenames,
+ self.file_extension.as_str(),
+ )?;
let partitions = filenames
.iter()
.map(|filename| {