This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8afccc9510 Allow dropping qualified columns (#19549)
8afccc9510 is described below

commit 8afccc9510fc4d9d3511bf45ad46871687a11f40
Author: Nick <[email protected]>
AuthorDate: Fri Jan 16 07:28:59 2026 -0500

    Allow dropping qualified columns (#19549)
    
    ## Which issue does this PR close?
    
    - Closes #19548
    
    ## Rationale for this change
    
    Explanation in the issue. Motivation coming more concretely from
    datafusion-python
    
https://github.com/apache/datafusion-python/issues/1305#issuecomment-3649891506
    
    ## What changes are included in this PR?
    
    * Adds the test from the issue to highlight expected behavior
    * Expands drop_columns to coerce things into a fully qualified column to
    support the range of column varieties
    * This further adds a helper to extract the table name associated with
    the dataframe to simplify use of qualified drop columns support
    * This is potentially the most controversial part. I could see a nicer
    api being `df.col(<name>)` to match the expr version but then we
    probably do repeated checks for the underlying table name unless there
    is some caching somewhere. Maybe that performance impact isn't
    significant.
    
    ## Are these changes tested?
    
    Yes some additional tests are provided.
    
    ## Are there any user-facing changes?
    
    I had to update the `drop_columns(&[])` test since the type can no
    longer be inferred. I'm not sure if that is representative of any actual
    use cases though since I expect the more common is a vector that might
    be empty in which case the type would be specified.
    
    It now requires specifying columns with dots in them similar to other
    places `"\"f.col1\""` to disambiguate from `"f.col1"`.
---
 datafusion-cli/src/functions.rs        |   2 +-
 datafusion/core/src/dataframe/mod.rs   |  70 ++++++++++++++++++++--
 datafusion/core/tests/dataframe/mod.rs | 106 ++++++++++++++++++++++++++++++++-
 3 files changed, 169 insertions(+), 9 deletions(-)

diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs
index e50339d296..cef057545c 100644
--- a/datafusion-cli/src/functions.rs
+++ b/datafusion-cli/src/functions.rs
@@ -426,7 +426,7 @@ impl TableFunctionImpl for ParquetMetadataFunc {
                 compression_arr.push(format!("{:?}", column.compression()));
                 // need to collect into Vec to format
                 let encodings: Vec<_> = column.encodings().collect();
-                encodings_arr.push(format!("{:?}", encodings));
+                encodings_arr.push(format!("{encodings:?}"));
                 index_page_offset_arr.push(column.index_page_offset());
                 
dictionary_page_offset_arr.push(column.dictionary_page_offset());
                 data_page_offset_arr.push(column.data_page_offset());
diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index fe760760ee..1e9f72501e 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -447,15 +447,31 @@ impl DataFrame {
     /// # Ok(())
     /// # }
     /// ```
-    pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame> {
+    pub fn drop_columns<T>(self, columns: &[T]) -> Result<DataFrame>
+    where
+        T: Into<Column> + Clone,
+    {
         let fields_to_drop = columns
             .iter()
-            .flat_map(|name| {
-                self.plan
-                    .schema()
-                    .qualified_fields_with_unqualified_name(name)
+            .flat_map(|col| {
+                let column: Column = col.clone().into();
+                match column.relation.as_ref() {
+                    Some(_) => {
+                        // qualified_field_from_column returns 
Result<(Option<&TableReference>, &FieldRef)>
+                        
vec![self.plan.schema().qualified_field_from_column(&column)]
+                    }
+                    None => {
+                        // qualified_fields_with_unqualified_name returns 
Vec<(Option<&TableReference>, &FieldRef)>
+                        self.plan
+                            .schema()
+                            
.qualified_fields_with_unqualified_name(&column.name)
+                            .into_iter()
+                            .map(Ok)
+                            .collect::<Vec<_>>()
+                    }
+                }
             })
-            .collect::<Vec<_>>();
+            .collect::<Result<Vec<_>, _>>()?;
         let expr: Vec<Expr> = self
             .plan
             .schema()
@@ -2465,6 +2481,48 @@ impl DataFrame {
             .collect()
     }
 
+    /// Find qualified columns for this dataframe from names
+    ///
+    /// # Arguments
+    /// * `names` - Unqualified names to find.
+    ///
+    /// # Example
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # use datafusion_common::ScalarValue;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let ctx = SessionContext::new();
+    /// ctx.register_csv("first_table", "tests/data/example.csv", 
CsvReadOptions::new())
+    ///     .await?;
+    /// let df = ctx.table("first_table").await?;
+    /// ctx.register_csv("second_table", "tests/data/example.csv", 
CsvReadOptions::new())
+    ///     .await?;
+    /// let df2 = ctx.table("second_table").await?;
+    /// let join_expr = df.find_qualified_columns(&["a"])?.iter()
+    ///     .zip(df2.find_qualified_columns(&["a"])?.iter())
+    ///     .map(|(col1, col2)| col(*col1).eq(col(*col2)))
+    ///     .collect::<Vec<Expr>>();
+    /// let df3 = df.join_on(df2, JoinType::Inner, join_expr)?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn find_qualified_columns(
+        &self,
+        names: &[&str],
+    ) -> Result<Vec<(Option<&TableReference>, &FieldRef)>> {
+        let schema = self.logical_plan().schema();
+        names
+            .iter()
+            .map(|name| {
+                schema
+                    .qualified_field_from_column(&Column::from_name(*name))
+                    .map_err(|_| plan_datafusion_err!("Column '{}' not found", 
name))
+            })
+            .collect()
+    }
+
     /// Helper for creating DataFrame.
     /// # Example
     /// ```
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index c09db37191..1ae6ef5c4a 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -534,7 +534,8 @@ async fn drop_columns_with_nonexistent_columns() -> 
Result<()> {
 async fn drop_columns_with_empty_array() -> Result<()> {
     // build plan using Table API
     let t = test_table().await?;
-    let t2 = t.drop_columns(&[])?;
+    let drop_columns = vec![] as Vec<&str>;
+    let t2 = t.drop_columns(&drop_columns)?;
     let plan = t2.logical_plan().clone();
 
     // build query using SQL
@@ -549,6 +550,107 @@ async fn drop_columns_with_empty_array() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn drop_columns_qualified() -> Result<()> {
+    // build plan using Table API
+    let mut t = test_table().await?;
+    t = t.select_columns(&["c1", "c2", "c11"])?;
+    let mut t2 = test_table_with_name("another_table").await?;
+    t2 = t2.select_columns(&["c1", "c2", "c11"])?;
+    let mut t3 = t.join_on(
+        t2,
+        JoinType::Inner,
+        [col("aggregate_test_100.c1").eq(col("another_table.c1"))],
+    )?;
+    t3 = t3.drop_columns(&["another_table.c2", "another_table.c11"])?;
+
+    let plan = t3.logical_plan().clone();
+
+    let sql = "SELECT aggregate_test_100.c1, aggregate_test_100.c2, 
aggregate_test_100.c11, another_table.c1 FROM (SELECT c1, c2, c11 FROM 
aggregate_test_100) INNER JOIN (SELECT c1, c2, c11 FROM another_table) ON 
aggregate_test_100.c1 = another_table.c1";
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx, "aggregate_test_100").await?;
+    register_aggregate_csv(&ctx, "another_table").await?;
+    let sql_plan = ctx.sql(sql).await?.into_unoptimized_plan();
+
+    // the two plans should be identical
+    assert_same_plan(&plan, &sql_plan);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn drop_columns_qualified_find_qualified() -> Result<()> {
+    // build plan using Table API
+    let mut t = test_table().await?;
+    t = t.select_columns(&["c1", "c2", "c11"])?;
+    let mut t2 = test_table_with_name("another_table").await?;
+    t2 = t2.select_columns(&["c1", "c2", "c11"])?;
+    let mut t3 = t.join_on(
+        t2.clone(),
+        JoinType::Inner,
+        [col("aggregate_test_100.c1").eq(col("another_table.c1"))],
+    )?;
+    t3 = t3.drop_columns(&t2.find_qualified_columns(&["c2", "c11"])?)?;
+
+    let plan = t3.logical_plan().clone();
+
+    let sql = "SELECT aggregate_test_100.c1, aggregate_test_100.c2, 
aggregate_test_100.c11, another_table.c1 FROM (SELECT c1, c2, c11 FROM 
aggregate_test_100) INNER JOIN (SELECT c1, c2, c11 FROM another_table) ON 
aggregate_test_100.c1 = another_table.c1";
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx, "aggregate_test_100").await?;
+    register_aggregate_csv(&ctx, "another_table").await?;
+    let sql_plan = ctx.sql(sql).await?.into_unoptimized_plan();
+
+    // the two plans should be identical
+    assert_same_plan(&plan, &sql_plan);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_find_qualified_names() -> Result<()> {
+    let t = test_table().await?;
+    let column_names = ["c1", "c2", "c3"];
+    let columns = t.find_qualified_columns(&column_names)?;
+
+    // Expected results for each column
+    let binding = TableReference::bare("aggregate_test_100");
+    let expected = [
+        (Some(&binding), "c1"),
+        (Some(&binding), "c2"),
+        (Some(&binding), "c3"),
+    ];
+
+    // Verify we got the expected number of results
+    assert_eq!(
+        columns.len(),
+        expected.len(),
+        "Expected {} columns, got {}",
+        expected.len(),
+        columns.len()
+    );
+
+    // Iterate over the results and check each one individually
+    for (i, (actual, expected)) in 
columns.iter().zip(expected.iter()).enumerate() {
+        let (actual_table_ref, actual_field_ref) = actual;
+        let (expected_table_ref, expected_field_name) = expected;
+
+        // Check table reference
+        assert_eq!(
+            actual_table_ref, expected_table_ref,
+            "Column {i}: expected table reference {expected_table_ref:?}, got 
{actual_table_ref:?}"
+        );
+
+        // Check field name
+        assert_eq!(
+            actual_field_ref.name(),
+            *expected_field_name,
+            "Column {i}: expected field name '{expected_field_name}', got 
'{actual_field_ref}'"
+        );
+    }
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn drop_with_quotes() -> Result<()> {
     // define data with a column name that has a "." in it:
@@ -594,7 +696,7 @@ async fn drop_with_periods() -> Result<()> {
     let ctx = SessionContext::new();
     ctx.register_batch("t", batch)?;
 
-    let df = ctx.table("t").await?.drop_columns(&["f.c1"])?;
+    let df = ctx.table("t").await?.drop_columns(&["\"f.c1\""])?;
 
     let df_results = df.collect().await?;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to