luoyuxia commented on code in PR #280:
URL: https://github.com/apache/paimon-rust/pull/280#discussion_r3137621974
##########
crates/integrations/datafusion/src/sql_handler.rs:
##########
@@ -399,6 +421,88 @@ impl PaimonSqlHandler {
crate::delete::execute_delete(&self.ctx, delete, table,
&table_ref).await
}
+ async fn handle_insert_overwrite_partition(&self, insert: &Insert) ->
DFResult<DataFrame> {
+ let table_name = match &insert.table {
+ TableObject::TableName(name) => name.clone(),
+ other => {
+ return Err(DataFusionError::Plan(format!(
+ "Unsupported target table in INSERT OVERWRITE: {other}"
+ )))
+ }
+ };
+ let identifier = self.resolve_table_name(&table_name)?;
+ let table = self
+ .catalog
+ .get_table(&identifier)
+ .await
+ .map_err(to_datafusion_error)?;
+
+ let partition_exprs = insert.partitioned.as_ref().ok_or_else(|| {
+ DataFusionError::Plan("INSERT OVERWRITE PARTITION requires a
PARTITION clause".into())
+ })?;
+ let partition_fields = table.schema().partition_fields();
+ let static_partitions =
+ parse_static_partitions(partition_exprs, &partition_fields,
table.schema().fields())?;
+
+ let source = insert.source.as_ref().ok_or_else(|| {
+ DataFusionError::Plan("INSERT OVERWRITE requires a source
query".into())
+ })?;
+ // Re-parse via to_string(); may lose dialect-specific syntax for
complex queries.
+ let df = self.ctx.sql(&source.to_string()).await?;
+ let mut stream = df.execute_stream().await?;
+
+ let all_fields = table.schema().fields();
+ let expected_source_cols = all_fields
+ .iter()
+ .filter(|f| !static_partitions.contains_key(f.name()))
+ .count();
+
+ let wb = table.new_write_builder();
+ let mut tw = wb
+ .new_write()
+ .map_err(to_datafusion_error)?
+ .with_overwrite();
+ let mut row_count = 0u64;
+ let mut col_checked = false;
+
+ while let Some(batch_result) = stream.next().await {
+ let batch = batch_result?;
+ if batch.num_rows() == 0 {
+ continue;
+ }
+ if !col_checked {
+ if batch.num_columns() != expected_source_cols {
+ return Err(DataFusionError::Plan(format!(
+ "Source query has {} columns, but expected {}
non-partition columns",
+ batch.num_columns(),
+ expected_source_cols
+ )));
+ }
+ col_checked = true;
+ }
+ let augmented = append_partition_columns(
Review Comment:
This path is currently treating the source columns as positional "all
non-static table fields in schema order". That breaks `INSERT OVERWRITE ...
PARTITION (...)` when the statement uses an explicit target column list,
because this handler never applies `insert.columns` / `after_columns` before
calling `append_partition_columns`.
For example, if the table schema is `(dt, id, name)` and the statement is:
`INSERT OVERWRITE t (name, id) PARTITION (dt = '2024-01-01') VALUES
('alice', 1)`
then the source columns here are `(name, id)`, but
`append_partition_columns` will consume them as `(id, name)` in table-schema
order. That means we either get an unexpected cast failure (`'alice' -> id`) or
silently write the wrong values when the types are compatible.
We should preserve target-column mapping here and reorder/project the source
batch by column name before appending static partition columns.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]