Jefffrey commented on code in PR #17553:
URL: https://github.com/apache/datafusion/pull/17553#discussion_r2361380684


##########
datafusion/core/tests/csv_schema_fix_test.rs:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test for CSV schema inference with different column counts (GitHub issue 
#17516)
+
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use std::fs;
+use tempfile::TempDir;
+
+#[tokio::test]
+async fn test_csv_schema_inference_different_column_counts() -> Result<()> {
+    // Create temporary directory for test files
+    let temp_dir = TempDir::new().expect("Failed to create temp dir");
+    let temp_path = temp_dir.path();
+
+    // Create CSV file 1 with 3 columns (simulating older railway services 
format)
+    let csv1_content = r#"service_id,route_type,agency_id
+1,bus,agency1
+2,rail,agency2
+3,bus,agency3
+"#;
+    fs::write(temp_path.join("services_2024.csv"), csv1_content)?;
+
+    // Create CSV file 2 with 6 columns (simulating newer railway services 
format)
+    let csv2_content = 
r#"service_id,route_type,agency_id,stop_platform_change,stop_planned_platform,stop_actual_platform
+4,rail,agency2,true,Platform A,Platform B
+5,bus,agency1,false,Stop 1,Stop 1
+6,rail,agency3,true,Platform C,Platform D
+"#;
+    fs::write(temp_path.join("services_2025.csv"), csv2_content)?;
+
+    // Create DataFusion context
+    let ctx = SessionContext::new();
+
+    // This should now work (previously would have failed with column count 
mismatch)
+    // Enable truncated_rows to handle files with different column counts
+    let df = ctx
+        .read_csv(
+            temp_path.to_str().unwrap(),
+            CsvReadOptions::new().truncated_rows(true),
+        )
+        .await
+        .expect("Should successfully read CSV directory with different column 
counts");
+
+    // Verify the schema contains all 6 columns (union of both files)
+    let df_clone = df.clone();
+    let schema = df_clone.schema();
+    assert_eq!(
+        schema.fields().len(),
+        6,
+        "Schema should contain all 6 columns"
+    );
+
+    // Check that we have all expected columns
+    let field_names: Vec<&str> =
+        schema.fields().iter().map(|f| f.name().as_str()).collect();
+    assert!(field_names.contains(&"service_id"));
+    assert!(field_names.contains(&"route_type"));
+    assert!(field_names.contains(&"agency_id"));
+    assert!(field_names.contains(&"stop_platform_change"));
+    assert!(field_names.contains(&"stop_planned_platform"));
+    assert!(field_names.contains(&"stop_actual_platform"));
+
+    // All fields should be nullable since they don't appear in all files
+    for field in schema.fields() {
+        assert!(
+            field.is_nullable(),
+            "Field {} should be nullable",
+            field.name()
+        );
+    }
+
+    // Verify we can actually read the data
+    let results = df.collect().await?;
+
+    // Calculate total rows across all batches
+    let total_rows: usize = results.iter().map(|batch| batch.num_rows()).sum();
+    assert_eq!(total_rows, 6, "Should have 6 total rows across all batches");
+
+    // All batches should have 6 columns (the union schema)
+    for batch in &results {
+        assert_eq!(batch.num_columns(), 6, "All batches should have 6 
columns");
+        assert_eq!(
+            batch.schema().fields().len(),
+            6,
+            "Each batch should use the union schema with 6 fields"
+        );
+    }
+
+    // Verify the actual content of the data
+    // Since we don't know the exact order of rows, just verify the overall 
structure

Review Comment:
   I suggest using `assert_snapshot!()` macro to check the contents, see a 
reference here:
   
   
https://github.com/apache/datafusion/blob/c1ca3c4d7c97c3c83b1c44fd7d0e3786da863572/datafusion/core/tests/sql/select.rs#L22-L51



##########
datafusion/datasource-csv/src/file_format.rs:
##########
@@ -560,21 +572,27 @@ impl CsvFormat {
                     })
                     .unzip();
             } else {
-                if fields.len() != column_type_possibilities.len() {
-                    return exec_err!(
-                            "Encountered unequal lengths between records on 
CSV file whilst inferring schema. \
-                             Expected {} fields, found {} fields at record {}",
-                            column_type_possibilities.len(),
-                            fields.len(),
-                            record_number + 1
-                        );

Review Comment:
   Would it be beneficial to keep this error message for when `truncated_rows` 
config is `false`? What does the error message look like right now if we read 
CSV files with differing column counts where this new behaviour is not enabled?



##########
datafusion/common/src/config.rs:
##########
@@ -2535,9 +2535,12 @@ config_namespace! {
         // The input regex for Nulls when loading CSVs.
         pub null_regex: Option<String>, default = None
         pub comment: Option<u8>, default = None
-        // Whether to allow truncated rows when parsing.
-        // By default this is set to false and will error if the CSV rows have 
different lengths.
-        // When set to true then it will allow records with less than the 
expected number of columns
+        /// Whether to allow CSV files with varying numbers of columns.
+        /// By default this is set to false and will error if the CSV rows 
have different lengths.
+        /// When set to true:
+        /// - Allows reading multiple CSV files with different column counts
+        /// - Creates a union schema during inference containing all columns 
found across files
+        /// - Files with fewer columns will have missing columns filled with 
null values

Review Comment:
   ```suggestion
           /// Whether to allow truncated rows when parsing, both within a 
single file and across files.
           ///
           /// When set to false (default), reading a single CSV file which has 
rows of different lengths will
           /// error; if reading multiple CSV files with different number of 
columns, it will also fail.
           ///
           /// When set to true, reading a single CSV file with rows of 
different lengths will pad the truncated
           /// rows with null values for the missing columns; if reading 
multiple CSV files with different number
           /// of columns, it creates a union schema containing all columns 
found across the files, and will
           /// pad any files missing columns with null values for their rows.
   ```
   
   Just to make it more obvious this config has a dual purpose.



##########
datafusion/datasource-csv/src/file_format.rs:
##########
@@ -560,21 +572,27 @@ impl CsvFormat {
                     })
                     .unzip();
             } else {
-                if fields.len() != column_type_possibilities.len() {
-                    return exec_err!(
-                            "Encountered unequal lengths between records on 
CSV file whilst inferring schema. \
-                             Expected {} fields, found {} fields at record {}",
-                            column_type_possibilities.len(),
-                            fields.len(),
-                            record_number + 1
-                        );
+                // Handle files with different numbers of columns by extending 
the schema
+                if fields.len() > column_type_possibilities.len() {
+                    // New columns found - extend our tracking structures
+                    for field in 
fields.iter().skip(column_type_possibilities.len()) {
+                        column_names.push(field.name().clone());
+                        let mut possibilities = HashSet::new();
+                        if records_read > 0 {
+                            possibilities.insert(field.data_type().clone());
+                        }
+                        column_type_possibilities.push(possibilities);
+                    }
                 }
 
-                column_type_possibilities.iter_mut().zip(&fields).for_each(
-                    |(possibilities, field)| {
-                        possibilities.insert(field.data_type().clone());
-                    },
-                );
+                // Update type possibilities for columns that exist in this 
file
+                // Only process fields that exist in both the current file and 
our tracking structures
+                for (field_idx, field) in fields.iter().enumerate() {
+                    if field_idx < column_type_possibilities.len() {
+                        column_type_possibilities[field_idx]
+                            .insert(field.data_type().clone());
+                    }
+                }

Review Comment:
   It might be less confusing if we keep the original code:
   
   ```rust
   column_type_possibilities.iter_mut().zip(&fields).for_each(
       |(possibilities, field)| {
           possibilities.insert(field.data_type().clone());
       },
   );
   ```
   
   But move it before the `Handle files with different numbers of columns by 
extending the schema` check (since zip will terminate on the shortest 
iterator), so we first extend possibilities for existing columns, then 
afterwards add the new columns.
   
   Another way could be to do both in a single iteration; something like
   
   ```
   for idx, field in enumerate(fields)
       if idx < len(possibilities)
           add_possibility
       else
           add_new_field
   ```
   
   Thoughts?



##########
datafusion/datasource-csv/src/file_format.rs:
##########
@@ -587,7 +605,10 @@ impl CsvFormat {
     }
 }
 
-fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> 
Schema {
+pub(crate) fn build_schema_helper(
+    names: Vec<String>,
+    types: &[HashSet<DataType>],
+) -> Schema {

Review Comment:
   ```suggestion
   fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> 
Schema {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to