This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d682d22d30 feat: Support reading CSV files with inconsistent column
counts (#17553)
d682d22d30 is described below
commit d682d22d30bb267a849108450b04cf2e96a86aad
Author: EeshanBembi <[email protected]>
AuthorDate: Wed Oct 8 10:07:19 2025 +0530
feat: Support reading CSV files with inconsistent column counts (#17553)
* feat: Support CSV files with inconsistent column counts
Enable DataFusion to read directories containing CSV files with different
numbers of columns by implementing schema union during inference.
Changes:
- Modified CSV schema inference to create union schema from all files
- Extended infer_schema_from_stream to handle varying column counts
- Added tests for schema building logic and integration scenarios
Requires CsvReadOptions::new().truncated_rows(true) to handle files
with fewer columns than the inferred schema.
Fixes #17516
* refactor: Address review comments for CSV union schema feature
Addresses all review feedback from PR #17553 to improve the CSV schema
union implementation that allows reading CSV files with different column
counts.
Changes based on review:
- Moved unit tests from separate tests.rs to bottom of file_format.rs
- Updated documentation wording from "now supports" to "can handle"
- Removed all println statements from integration test
- Added comprehensive assertions for actual row content verification
- Simplified HashSet initialization using HashSet::from([...]) syntax
- Updated truncated_rows config documentation to reflect expanded purpose
- Removed unnecessary min() calculation in column processing loop
- Fixed clippy warnings by using enumerate() instead of range loop
Technical improvements:
- Tests now verify null patterns correctly across union schema
- Cleaner iteration logic without redundant bounds checking
- Better documentation explaining union schema behavior
The feature continues to work as designed:
- Creates union schema from all CSV files in a directory
- Files with fewer columns have nulls for missing fields
- Requires explicit opt-in via truncated_rows(true)
- Maintains full backward compatibility
* Apply cargo fmt formatting fixes
* refactor: Address PR review comments for CSV union schema feature
- Remove pub(crate) visibility from build_schema_helper function
- Refactor column type processing to use zip iterator before extension logic
- Add missing error handling for truncated_rows=false case
- Improve truncated_rows documentation to clarify dual purpose
- Replace manual testing with assert_snapshot for better test coverage
- Fix clippy warnings and ensure all tests pass
Addresses all reviewer feedback from PR #17553 while maintaining
backward compatibility and the original CSV union schema functionality.
---
datafusion/common/src/config.rs | 12 ++-
datafusion/core/tests/csv_schema_fix_test.rs | 122 ++++++++++++++++++++++++++
datafusion/datasource-csv/src/file_format.rs | 124 +++++++++++++++++++++++++--
3 files changed, 247 insertions(+), 11 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index adfb1a6efd..39d730eaaf 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -2535,9 +2535,15 @@ config_namespace! {
// The input regex for Nulls when loading CSVs.
pub null_regex: Option<String>, default = None
pub comment: Option<u8>, default = None
- // Whether to allow truncated rows when parsing.
- // By default this is set to false and will error if the CSV rows have
different lengths.
- // When set to true then it will allow records with less than the
expected number of columns
+ /// Whether to allow truncated rows when parsing, both within a single
file and across files.
+ ///
+ /// When set to false (default), reading a single CSV file which has
rows of different lengths will
+ /// error; if reading multiple CSV files with different number of
columns, it will also fail.
+ ///
+ /// When set to true, reading a single CSV file with rows of different
lengths will pad the truncated
+ /// rows with null values for the missing columns; if reading multiple
CSV files with different number
+ /// of columns, it creates a union schema containing all columns found
across the files, and will
+ /// pad any files missing columns with null values for their rows.
pub truncated_rows: Option<bool>, default = None
}
}
diff --git a/datafusion/core/tests/csv_schema_fix_test.rs
b/datafusion/core/tests/csv_schema_fix_test.rs
new file mode 100644
index 0000000000..2e1daa113b
--- /dev/null
+++ b/datafusion/core/tests/csv_schema_fix_test.rs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test for CSV schema inference with different column counts (GitHub issue
#17516)
+
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use datafusion_common::test_util::batches_to_sort_string;
+use insta::assert_snapshot;
+use std::fs;
+use tempfile::TempDir;
+
+#[tokio::test]
+async fn test_csv_schema_inference_different_column_counts() -> Result<()> {
+ // Create temporary directory for test files
+ let temp_dir = TempDir::new().expect("Failed to create temp dir");
+ let temp_path = temp_dir.path();
+
+ // Create CSV file 1 with 3 columns (simulating older railway services
format)
+ let csv1_content = r#"service_id,route_type,agency_id
+1,bus,agency1
+2,rail,agency2
+3,bus,agency3
+"#;
+ fs::write(temp_path.join("services_2024.csv"), csv1_content)?;
+
+ // Create CSV file 2 with 6 columns (simulating newer railway services
format)
+ let csv2_content =
r#"service_id,route_type,agency_id,stop_platform_change,stop_planned_platform,stop_actual_platform
+4,rail,agency2,true,Platform A,Platform B
+5,bus,agency1,false,Stop 1,Stop 1
+6,rail,agency3,true,Platform C,Platform D
+"#;
+ fs::write(temp_path.join("services_2025.csv"), csv2_content)?;
+
+ // Create DataFusion context
+ let ctx = SessionContext::new();
+
+ // This should now work (previously would have failed with column count
mismatch)
+ // Enable truncated_rows to handle files with different column counts
+ let df = ctx
+ .read_csv(
+ temp_path.to_str().unwrap(),
+ CsvReadOptions::new().truncated_rows(true),
+ )
+ .await
+ .expect("Should successfully read CSV directory with different column
counts");
+
+ // Verify the schema contains all 6 columns (union of both files)
+ let df_clone = df.clone();
+ let schema = df_clone.schema();
+ assert_eq!(
+ schema.fields().len(),
+ 6,
+ "Schema should contain all 6 columns"
+ );
+
+ // Check that we have all expected columns
+ let field_names: Vec<&str> =
+ schema.fields().iter().map(|f| f.name().as_str()).collect();
+ assert!(field_names.contains(&"service_id"));
+ assert!(field_names.contains(&"route_type"));
+ assert!(field_names.contains(&"agency_id"));
+ assert!(field_names.contains(&"stop_platform_change"));
+ assert!(field_names.contains(&"stop_planned_platform"));
+ assert!(field_names.contains(&"stop_actual_platform"));
+
+ // All fields should be nullable since they don't appear in all files
+ for field in schema.fields() {
+ assert!(
+ field.is_nullable(),
+ "Field {} should be nullable",
+ field.name()
+ );
+ }
+
+ // Verify we can actually read the data
+ let results = df.collect().await?;
+
+ // Calculate total rows across all batches
+ let total_rows: usize = results.iter().map(|batch| batch.num_rows()).sum();
+ assert_eq!(total_rows, 6, "Should have 6 total rows across all batches");
+
+ // All batches should have 6 columns (the union schema)
+ for batch in &results {
+ assert_eq!(batch.num_columns(), 6, "All batches should have 6
columns");
+ assert_eq!(
+ batch.schema().fields().len(),
+ 6,
+ "Each batch should use the union schema with 6 fields"
+ );
+ }
+
+ // Verify the actual content of the data using snapshot testing
+ assert_snapshot!(batches_to_sort_string(&results), @r"
+
+------------+------------+-----------+----------------------+-----------------------+----------------------+
+ | service_id | route_type | agency_id | stop_platform_change |
stop_planned_platform | stop_actual_platform |
+
+------------+------------+-----------+----------------------+-----------------------+----------------------+
+ | 1 | bus | agency1 | |
| |
+ | 2 | rail | agency2 | |
| |
+ | 3 | bus | agency3 | |
| |
+ | 4 | rail | agency2 | true | Platform A
| Platform B |
+ | 5 | bus | agency1 | false | Stop 1
| Stop 1 |
+ | 6 | rail | agency3 | true | Platform C
| Platform D |
+
+------------+------------+-----------+----------------------+-----------------------+----------------------+
+ ");
+
+ Ok(())
+}
diff --git a/datafusion/datasource-csv/src/file_format.rs
b/datafusion/datasource-csv/src/file_format.rs
index 21065f918a..6765939176 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -497,7 +497,20 @@ impl FileFormat for CsvFormat {
impl CsvFormat {
/// Return the inferred schema reading up to records_to_read from a
/// stream of delimited chunks returning the inferred schema and the
- /// number of lines that were read
+ /// number of lines that were read.
+ ///
+ /// This method can handle CSV files with different numbers of columns.
+ /// The inferred schema will be the union of all columns found across all
files.
+ /// Files with fewer columns will have missing columns filled with null
values.
+ ///
+ /// # Example
+ ///
+ /// If you have two CSV files:
+ /// - `file1.csv`: `col1,col2,col3`
+ /// - `file2.csv`: `col1,col2,col3,col4,col5`
+ ///
+ /// The inferred schema will contain all 5 columns, with files that don't
+ /// have columns 4 and 5 having null values for those columns.
pub async fn infer_schema_from_stream(
&self,
state: &dyn Session,
@@ -560,21 +573,37 @@ impl CsvFormat {
})
.unzip();
} else {
- if fields.len() != column_type_possibilities.len() {
+ if fields.len() != column_type_possibilities.len()
+ && !self.options.truncated_rows.unwrap_or(false)
+ {
return exec_err!(
- "Encountered unequal lengths between records on
CSV file whilst inferring schema. \
- Expected {} fields, found {} fields at record {}",
- column_type_possibilities.len(),
- fields.len(),
- record_number + 1
- );
+ "Encountered unequal lengths between records on CSV
file whilst inferring schema. \
+ Expected {} fields, found {} fields at record {}",
+ column_type_possibilities.len(),
+ fields.len(),
+ record_number + 1
+ );
}
+ // First update type possibilities for existing columns using
zip
column_type_possibilities.iter_mut().zip(&fields).for_each(
|(possibilities, field)| {
possibilities.insert(field.data_type().clone());
},
);
+
+ // Handle files with different numbers of columns by extending
the schema
+ if fields.len() > column_type_possibilities.len() {
+ // New columns found - extend our tracking structures
+ for field in
fields.iter().skip(column_type_possibilities.len()) {
+ column_names.push(field.name().clone());
+ let mut possibilities = HashSet::new();
+ if records_read > 0 {
+ possibilities.insert(field.data_type().clone());
+ }
+ column_type_possibilities.push(possibilities);
+ }
+ }
}
if records_to_read == 0 {
@@ -769,3 +798,82 @@ impl DataSink for CsvSink {
FileSink::write_all(self, data, context).await
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::build_schema_helper;
+ use arrow::datatypes::DataType;
+ use std::collections::HashSet;
+
+ #[test]
+ fn test_build_schema_helper_different_column_counts() {
+ // Test the core schema building logic with different column counts
+ let mut column_names =
+ vec!["col1".to_string(), "col2".to_string(), "col3".to_string()];
+
+ // Simulate adding two more columns from another file
+ column_names.push("col4".to_string());
+ column_names.push("col5".to_string());
+
+ let column_type_possibilities = vec![
+ HashSet::from([DataType::Int64]),
+ HashSet::from([DataType::Utf8]),
+ HashSet::from([DataType::Float64]),
+ HashSet::from([DataType::Utf8]), // col4
+ HashSet::from([DataType::Utf8]), // col5
+ ];
+
+ let schema = build_schema_helper(column_names,
&column_type_possibilities);
+
+ // Verify schema has 5 columns
+ assert_eq!(schema.fields().len(), 5);
+ assert_eq!(schema.field(0).name(), "col1");
+ assert_eq!(schema.field(1).name(), "col2");
+ assert_eq!(schema.field(2).name(), "col3");
+ assert_eq!(schema.field(3).name(), "col4");
+ assert_eq!(schema.field(4).name(), "col5");
+
+ // All fields should be nullable
+ for field in schema.fields() {
+ assert!(
+ field.is_nullable(),
+ "Field {} should be nullable",
+ field.name()
+ );
+ }
+ }
+
+ #[test]
+ fn test_build_schema_helper_type_merging() {
+ // Test type merging logic
+ let column_names = vec!["col1".to_string(), "col2".to_string()];
+
+ let column_type_possibilities = vec![
+ HashSet::from([DataType::Int64, DataType::Float64]), // Should
resolve to Float64
+ HashSet::from([DataType::Utf8]), // Should
remain Utf8
+ ];
+
+ let schema = build_schema_helper(column_names,
&column_type_possibilities);
+
+ // col1 should be Float64 due to Int64 + Float64 = Float64
+ assert_eq!(*schema.field(0).data_type(), DataType::Float64);
+
+ // col2 should remain Utf8
+ assert_eq!(*schema.field(1).data_type(), DataType::Utf8);
+ }
+
+ #[test]
+ fn test_build_schema_helper_conflicting_types() {
+ // Test when we have incompatible types - should default to Utf8
+ let column_names = vec!["col1".to_string()];
+
+ let column_type_possibilities = vec![
+ HashSet::from([DataType::Boolean, DataType::Int64,
DataType::Utf8]), // Should resolve to Utf8 due to conflicts
+ ];
+
+ let schema = build_schema_helper(column_names,
&column_type_possibilities);
+
+ // Should default to Utf8 for conflicting types
+ assert_eq!(*schema.field(0).data_type(), DataType::Utf8);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]