jerry-024 commented on code in PR #292:
URL: https://github.com/apache/paimon-rust/pull/292#discussion_r3151738639


##########
crates/integrations/datafusion/src/sql_handler.rs:
##########
@@ -973,6 +1057,60 @@ fn extract_options(opts: &CreateTableOptions) -> 
DFResult<Vec<(String, String)>>
         .collect()
 }
 
+/// Parse partition expressions (`col = val, ...`) into partition value maps
+/// suitable for `TableCommit::truncate_partitions`.
+///
+/// All expressions are treated as belonging to a single partition 
specification.
+/// For multiple partitions, callers should invoke this once per partition 
clause.
+fn parse_partition_values(
+    exprs: &[SqlExpr],
+    all_fields: &[PaimonDataField],
+    partition_keys: &[String],
+) -> DFResult<Vec<HashMap<String, Option<Datum>>>> {
+    let field_map: HashMap<&str, &PaimonDataField> =
+        all_fields.iter().map(|f| (f.name(), f)).collect();
+
+    let mut partition = HashMap::new();
+    for expr in exprs {
+        let (col_name, val_expr) = match expr {
+            SqlExpr::BinaryOp {
+                left,
+                op: datafusion::sql::sqlparser::ast::BinaryOperator::Eq,
+                right,
+            } => {
+                let col = match left.as_ref() {
+                    SqlExpr::Identifier(ident) => ident.value.clone(),
+                    other => {
+                        return Err(DataFusionError::Plan(format!(
+                            "Expected column name in partition spec, got: 
{other}"
+                        )))
+                    }
+                };
+                (col, right.as_ref())
+            }
+            other => {
+                return Err(DataFusionError::Plan(format!(
+                    "Expected 'column = value' in partition spec, got: {other}"
+                )))
+            }
+        };
+
+        if !partition_keys.iter().any(|k| k == &col_name) {
+            return Err(DataFusionError::Plan(format!(
+                "Column '{col_name}' is not a partition column"
+            )));
+        }
+
+        let field = field_map.get(col_name.as_str()).ok_or_else(|| {
+            DataFusionError::Plan(format!("Column '{col_name}' not found in 
table schema"))

Review Comment:
   **HIGH: silent wrong-partition deletion on incomplete specs.**
   
   This builds `partition` only from keys the user supplied. The map flows 
through `TableCommit::truncate_partitions` → `partitions_to_bytes`:
   
   ```rust
   let datum = p.get(key).cloned().flatten();  // missing key → None
   ```
   
   `None` is encoded into the partition filter as `IS NULL`. So on a `(dt, 
region)` partitioned table, `DROP PARTITION (dt = '2026-04-28')` becomes a 
filter `dt='2026-04-28' AND region IS NULL` — it does **not** delete all 
`dt='2026-04-28'` partitions like Hive/Spark prefix semantics would. If a 
NULL-region partition exists, it gets deleted (silent wrong-target). If not, 
silent no-op. Either way, user intent is mishandled with no error.
   
   Two acceptable fixes:
   - **Reject incomplete specs explicitly:** after the loop, `if 
partition.len() != partition_keys.len()` → `Err` listing the missing keys.
   - **Implement true prefix-match deletion:** requires a partition-filter 
primitive that matches on a subset of keys; not just a downstream-API tweak.
   
   For reference, Java's Spark 
[`PaimonPartitionManagement.toPaimonPartitions`](https://github.com/apache/paimon/blob/master/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala)
 does `require(partitionFieldCount <= partitionKeys.length)` and projects the 
partition row type so prefix specs work as Hive expects.



##########
crates/integrations/datafusion/src/sql_handler.rs:
##########
@@ -556,6 +570,76 @@ impl PaimonSqlHandler {
         crate::merge_into::ok_result(&self.ctx, row_count)
     }
 
+    async fn handle_truncate_table(&self, truncate: &Truncate) -> 
DFResult<DataFrame> {
+        if truncate.table_names.len() > 1 {
+            return Err(DataFusionError::Plan(
+                "TRUNCATE TABLE does not support multiple tables".to_string(),
+            ));
+        }
+        let target = truncate.table_names.first().ok_or_else(|| {
+            DataFusionError::Plan("TRUNCATE TABLE requires a table 
name".to_string())
+        })?;
+        let identifier = self.resolve_table_name(&target.name)?;
+        let table = self
+            .catalog
+            .get_table(&identifier)
+            .await
+            .map_err(to_datafusion_error)?;
+
+        let wb = table.new_write_builder();
+        let commit = wb.new_commit();

Review Comment:
   **MEDIUM: empty `PARTITION ()` clause falls through to a full-table 
truncate.**
   
   If `truncate.partitions` is `Some(empty_vec)` (whatever AST shape sqlparser 
produces for a degenerate parser edge case), this branch is skipped and 
execution drops into `commit.truncate_table()` below — the entire table is 
wiped despite the user writing a `PARTITION` clause. Opposite of intent.
   
   Suggest tightening:
   
   ```rust
   if let Some(partitions) = &truncate.partitions {
       if partitions.is_empty() {
           return Err(DataFusionError::Plan(
               "PARTITION clause requires at least one column = 
value".to_string(),
           ));
       }
       // ... existing branch
       return ok_result(...);
   }
   commit.truncate_table().await?;
   ```
   
   This way only `TRUNCATE TABLE t` (no PARTITION clause at all, i.e. 
`truncate.partitions` is `None`) reaches `truncate_table()`.



##########
crates/integrations/datafusion/src/sql_handler.rs:
##########
@@ -326,6 +330,16 @@ impl PaimonSqlHandler {
                         }
                     }
                 }
+                // DropPartitions is a data operation (not a schema change), 
so we handle it
+                // separately and return early — it cannot be combined with 
schema changes.

Review Comment:
   **MEDIUM: `if_exists` dropped at three levels — none honored.**
   
   1. **Inner `AlterTableOperation::DropPartitions { if_exists: _ }`** (this 
match arm): partition-level IF EXISTS is explicitly bound and discarded. The 
doc comment claims this "matches IF EXISTS semantics" because the underlying 
overwrite is a no-op on missing partitions (verified — `truncate_partitions` 
early-returns on empty resolved entries). But that makes plain `DROP PARTITION` 
behave identically to `DROP PARTITION IF EXISTS`. Spark's 
`AlterTableDropPartitionCommand` errors when IF EXISTS is omitted and the 
partition doesn't exist — this PR diverges silently.
   
   2. **Outer `Statement::AlterTable { if_exists, .. }`**: returning early from 
this branch into `handle_drop_partitions` skips whatever outer IF EXISTS 
handling the rest of `handle_alter_table` does. `ALTER TABLE IF EXISTS 
missing_table DROP PARTITION (...)` will fail with a hard error from 
`catalog.get_table(...)` instead of being a silent no-op.
   
   3. **`Statement::Truncate { if_exists, .. }`** (handler at line 570): 
sqlparser parses the flag (confirmed in `sqlparser::ast::Truncate`), but 
`handle_truncate_table` never reads it. `TRUNCATE TABLE IF EXISTS 
missing_table` errors instead of no-oping.
   
   Suggested fix: at each layer, read the flag, and on `get_table` returning 
NotFound (or by `list_tables` first), short-circuit to `ok_result` when 
`if_exists` is set, otherwise propagate the error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to