This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e82dc21d1b Feat : added truncate table support (#19633)
e82dc21d1b is described below
commit e82dc21d1b8817654a57f629b788fd0a3e493cfb
Author: Rosai <[email protected]>
AuthorDate: Wed Jan 21 09:18:05 2026 +0530
Feat : added truncate table support (#19633)
## Which issue does this PR close?
- Related to #19617
## Rationale for this change
DataFusion recently added TableProvider hooks for row-level DML
operations such as DELETE and UPDATE, but TRUNCATE TABLE was still
unsupported.
## What changes are included in this PR?
This PR adds planning and integration support for TRUNCATE TABLE in
DataFusion, completing another part of the DML surface alongside
existing DELETE and UPDATE support.
Specifically, it includes:
- SQL parsing support for TRUNCATE TABLE
- Logical plan support via a new WriteOp::Truncate DML operation
- Physical planner routing for TRUNCATE statements
- A new TableProvider::truncate() hook for storage-native
implementations
- Protobuf / DML node support for serializing and deserializing TRUNCATE
operations
- SQL logic tests validating logical and physical planning behavior
The implementation follows the same structure and conventions as the
existing DELETE and UPDATE DML support.
Execution semantics are delegated to individual TableProvider
implementations via the new hook.
## Are these changes tested?
Yes. The PR includes:
SQL logic tests that verify:
- Parsing of TRUNCATE TABLE
- Correct logical plan generation
- Correct physical planner routing
- Clear and consistent errors for providers that do not yet support
TRUNCATE
These tests mirror the existing testing strategy used for unsupported
DELETE and UPDATE operations.
## Are there any user-facing changes?
Yes. Users can now execute TRUNCATE TABLE statements in DataFusion for
tables whose TableProvider supports the new truncate() hook. Tables that
do not support TRUNCATE will return a clear NotImplemented error.
---
datafusion/catalog/src/table.rs | 8 ++
datafusion/core/src/physical_planner.rs | 24 +++++
.../tests/custom_sources_cases/dml_planning.rs | 102 ++++++++++++++++++++-
datafusion/expr/src/logical_plan/dml.rs | 3 +
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 3 +
datafusion/proto/src/generated/prost.rs | 3 +
datafusion/proto/src/logical_plan/from_proto.rs | 1 +
datafusion/proto/src/logical_plan/to_proto.rs | 1 +
.../proto/tests/cases/roundtrip_logical_plan.rs | 1 +
datafusion/sql/src/statement.rs | 50 ++++++++++
datafusion/sqllogictest/test_files/truncate.slt | 85 +++++++++++++++++
12 files changed, 280 insertions(+), 2 deletions(-)
diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs
index 1f223852c2..f31d4d52ce 100644
--- a/datafusion/catalog/src/table.rs
+++ b/datafusion/catalog/src/table.rs
@@ -353,6 +353,14 @@ pub trait TableProvider: Debug + Sync + Send {
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("UPDATE not supported for {} table", self.table_type())
}
+
+ /// Remove all rows from the table.
+ ///
+ /// Should return an [ExecutionPlan] producing a single row with count
(UInt64),
+ /// representing the number of rows removed.
+ async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn
ExecutionPlan>> {
+ not_impl_err!("TRUNCATE not supported for {} table", self.table_type())
+ }
}
/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 8434a7ae5e..94c8fd510a 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -655,6 +655,30 @@ impl DefaultPhysicalPlanner {
);
}
}
+ LogicalPlan::Dml(DmlStatement {
+ table_name,
+ target,
+ op: WriteOp::Truncate,
+ ..
+ }) => {
+ if let Some(provider) =
+ target.as_any().downcast_ref::<DefaultTableSource>()
+ {
+ provider
+ .table_provider
+ .truncate(session_state)
+ .await
+ .map_err(|e| {
+ e.context(format!(
+ "TRUNCATE operation on table '{table_name}'"
+ ))
+ })?
+ } else {
+ return exec_err!(
+ "Table source can't be downcasted to
DefaultTableSource"
+ );
+ }
+ }
LogicalPlan::Window(Window { window_expr, .. }) => {
assert_or_internal_err!(
!window_expr.is_empty(),
diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs
b/datafusion/core/tests/custom_sources_cases/dml_planning.rs
index 84cf97710a..a4033e445c 100644
--- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs
+++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Tests for DELETE and UPDATE planning to verify filter and assignment
extraction.
+//! Tests for DELETE, UPDATE, and TRUNCATE planning to verify filter and
assignment extraction.
use std::any::Any;
use std::sync::{Arc, Mutex};
@@ -24,9 +24,10 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
-use datafusion::execution::context::SessionContext;
+use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::logical_expr::Expr;
use datafusion_catalog::Session;
+use datafusion_common::ScalarValue;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::empty::EmptyExec;
@@ -165,6 +166,66 @@ impl TableProvider for CaptureUpdateProvider {
}
}
+/// A TableProvider that captures whether truncate() was called.
+struct CaptureTruncateProvider {
+ schema: SchemaRef,
+ truncate_called: Arc<Mutex<bool>>,
+}
+
+impl CaptureTruncateProvider {
+ fn new(schema: SchemaRef) -> Self {
+ Self {
+ schema,
+ truncate_called: Arc::new(Mutex::new(false)),
+ }
+ }
+
+ fn was_truncated(&self) -> bool {
+ *self.truncate_called.lock().unwrap()
+ }
+}
+
+impl std::fmt::Debug for CaptureTruncateProvider {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("CaptureTruncateProvider")
+ .field("schema", &self.schema)
+ .finish()
+ }
+}
+
+#[async_trait]
+impl TableProvider for CaptureTruncateProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ _projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(EmptyExec::new(Arc::clone(&self.schema))))
+ }
+
+ async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn
ExecutionPlan>> {
+ *self.truncate_called.lock().unwrap() = true;
+
+ Ok(Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![
+ Field::new("count", DataType::UInt64, false),
+ ])))))
+ }
+}
+
fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
@@ -269,6 +330,28 @@ async fn test_update_assignments() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn test_truncate_calls_provider() -> Result<()> {
+ let provider = Arc::new(CaptureTruncateProvider::new(test_schema()));
+ let config = SessionConfig::new().set(
+ "datafusion.optimizer.max_passes",
+ &ScalarValue::UInt64(Some(0)),
+ );
+
+ let ctx = SessionContext::new_with_config(config);
+
+ ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+ ctx.sql("TRUNCATE TABLE t").await?.collect().await?;
+
+ assert!(
+ provider.was_truncated(),
+ "truncate() should be called on the TableProvider"
+ );
+
+ Ok(())
+}
+
#[tokio::test]
async fn test_unsupported_table_delete() -> Result<()> {
let schema = test_schema();
@@ -295,3 +378,18 @@ async fn test_unsupported_table_update() -> Result<()> {
assert!(result.is_err() || result.unwrap().collect().await.is_err());
Ok(())
}
+
+#[tokio::test]
+async fn test_unsupported_table_truncate() -> Result<()> {
+ let schema = test_schema();
+ let ctx = SessionContext::new();
+
+ let empty_table = datafusion::datasource::empty::EmptyTable::new(schema);
+ ctx.register_table("empty_t", Arc::new(empty_table))?;
+
+ let result = ctx.sql("TRUNCATE TABLE empty_t").await;
+
+ assert!(result.is_err() || result.unwrap().collect().await.is_err());
+
+ Ok(())
+}
diff --git a/datafusion/expr/src/logical_plan/dml.rs
b/datafusion/expr/src/logical_plan/dml.rs
index 6ac3b309aa..b668cbfe2c 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -237,6 +237,8 @@ pub enum WriteOp {
Update,
/// `CREATE TABLE AS SELECT` operation
Ctas,
+ /// `TRUNCATE` operation
+ Truncate,
}
impl WriteOp {
@@ -247,6 +249,7 @@ impl WriteOp {
WriteOp::Delete => "Delete",
WriteOp::Update => "Update",
WriteOp::Ctas => "Ctas",
+ WriteOp::Truncate => "Truncate",
}
}
}
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index b035a4673b..2b5e2368c1 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -278,6 +278,7 @@ message DmlNode{
INSERT_APPEND = 3;
INSERT_OVERWRITE = 4;
INSERT_REPLACE = 5;
+ TRUNCATE = 6;
}
Type dml_type = 1;
LogicalPlanNode input = 2;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 5384cc0aab..842dc7f632 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -5253,6 +5253,7 @@ impl serde::Serialize for dml_node::Type {
Self::InsertAppend => "INSERT_APPEND",
Self::InsertOverwrite => "INSERT_OVERWRITE",
Self::InsertReplace => "INSERT_REPLACE",
+ Self::Truncate => "TRUNCATE",
};
serializer.serialize_str(variant)
}
@@ -5270,6 +5271,7 @@ impl<'de> serde::Deserialize<'de> for dml_node::Type {
"INSERT_APPEND",
"INSERT_OVERWRITE",
"INSERT_REPLACE",
+ "TRUNCATE",
];
struct GeneratedVisitor;
@@ -5316,6 +5318,7 @@ impl<'de> serde::Deserialize<'de> for dml_node::Type {
"INSERT_APPEND" => Ok(dml_node::Type::InsertAppend),
"INSERT_OVERWRITE" => Ok(dml_node::Type::InsertOverwrite),
"INSERT_REPLACE" => Ok(dml_node::Type::InsertReplace),
+ "TRUNCATE" => Ok(dml_node::Type::Truncate),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index c6110865e2..3a7b35509e 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -444,6 +444,7 @@ pub mod dml_node {
InsertAppend = 3,
InsertOverwrite = 4,
InsertReplace = 5,
+ Truncate = 6,
}
impl Type {
/// String value of the enum field names used in the ProtoBuf
definition.
@@ -458,6 +459,7 @@ pub mod dml_node {
Self::InsertAppend => "INSERT_APPEND",
Self::InsertOverwrite => "INSERT_OVERWRITE",
Self::InsertReplace => "INSERT_REPLACE",
+ Self::Truncate => "TRUNCATE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
@@ -469,6 +471,7 @@ pub mod dml_node {
"INSERT_APPEND" => Some(Self::InsertAppend),
"INSERT_OVERWRITE" => Some(Self::InsertOverwrite),
"INSERT_REPLACE" => Some(Self::InsertReplace),
+ "TRUNCATE" => Some(Self::Truncate),
_ => None,
}
}
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 179fe8bb7d..a653f517b7 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -239,6 +239,7 @@ impl From<protobuf::dml_node::Type> for WriteOp {
}
protobuf::dml_node::Type::InsertReplace =>
WriteOp::Insert(InsertOp::Replace),
protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
+ protobuf::dml_node::Type::Truncate => WriteOp::Truncate,
}
}
}
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 9c326a8f6e..fe63fce6ee 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -729,6 +729,7 @@ impl From<&WriteOp> for protobuf::dml_node::Type {
WriteOp::Delete => protobuf::dml_node::Type::Delete,
WriteOp::Update => protobuf::dml_node::Type::Update,
WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
+ WriteOp::Truncate => protobuf::dml_node::Type::Truncate,
}
}
}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index b9af9fc935..e5c218e5eb 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -413,6 +413,7 @@ async fn roundtrip_logical_plan_dml() -> Result<()> {
"DELETE FROM T1",
"UPDATE T1 SET a = 1",
"CREATE TABLE T2 AS SELECT * FROM T1",
+ "TRUNCATE TABLE T1",
];
for query in queries {
let plan = ctx.sql(query).await?.into_optimized_plan()?;
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index c962c25b51..4981db5537 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -1390,6 +1390,56 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
exec_err!("Function name not provided")
}
}
+ Statement::Truncate(ast::Truncate {
+ table_names,
+ partitions,
+ identity,
+ cascade,
+ on_cluster,
+ table,
+ }) => {
+ let _ = table; // Support TRUNCATE TABLE and TRUNCATE syntax
+ if table_names.len() != 1 {
+ return not_impl_err!(
+ "TRUNCATE with multiple tables is not supported"
+ );
+ }
+
+ let target = &table_names[0];
+ if target.only {
+ return not_impl_err!("TRUNCATE with ONLY is not
supported");
+ }
+ if partitions.is_some() {
+ return not_impl_err!("TRUNCATE with PARTITION is not
supported");
+ }
+ if identity.is_some() {
+ return not_impl_err!(
+ "TRUNCATE with RESTART/CONTINUE IDENTITY is not
supported"
+ );
+ }
+ if cascade.is_some() {
+ return not_impl_err!(
+ "TRUNCATE with CASCADE/RESTRICT is not supported"
+ );
+ }
+ if on_cluster.is_some() {
+ return not_impl_err!("TRUNCATE with ON CLUSTER is not
supported");
+ }
+ let table =
self.object_name_to_table_reference(target.name.clone())?;
+ let source =
self.context_provider.get_table_source(table.clone())?;
+
+ // TRUNCATE does not operate on input rows. The EmptyRelation
is a logical placeholder
+ // since the real operation is executed directly by the
TableProvider's truncate() hook.
+ Ok(LogicalPlan::Dml(DmlStatement::new(
+ table.clone(),
+ source,
+ WriteOp::Truncate,
+ Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema: DFSchemaRef::new(DFSchema::empty()),
+ })),
+ )))
+ }
Statement::CreateIndex(CreateIndex {
name,
table_name,
diff --git a/datafusion/sqllogictest/test_files/truncate.slt
b/datafusion/sqllogictest/test_files/truncate.slt
new file mode 100644
index 0000000000..5a5d47760d
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/truncate.slt
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+## Truncate Tests
+##########
+
+statement ok
+create table t1(a int, b varchar, c double, d int);
+
+statement ok
+insert into t1 values (1, 'abc', 3.14, 4), (2, 'def', 2.71, 5);
+
+# Truncate all rows from table
+query TT
+explain truncate table t1;
+----
+logical_plan
+01)Dml: op=[Truncate] table=[t1]
+02)--EmptyRelation: rows=0
+physical_plan_error
+01)TRUNCATE operation on table 't1'
+02)caused by
+03)This feature is not implemented: TRUNCATE not supported for Base table
+
+# Test TRUNCATE with fully qualified table name
+statement ok
+create schema test_schema;
+
+statement ok
+create table test_schema.t5(a int);
+
+query TT
+explain truncate table test_schema.t5;
+----
+logical_plan
+01)Dml: op=[Truncate] table=[test_schema.t5]
+02)--EmptyRelation: rows=0
+physical_plan_error
+01)TRUNCATE operation on table 'test_schema.t5'
+02)caused by
+03)This feature is not implemented: TRUNCATE not supported for Base table
+
+# Test TRUNCATE with CASCADE option
+statement error TRUNCATE with CASCADE/RESTRICT is not supported
+TRUNCATE TABLE t1 CASCADE;
+
+# Test TRUNCATE with multiple tables
+statement error TRUNCATE with multiple tables is not supported
+TRUNCATE TABLE t1, t2;
+
+statement error TRUNCATE with PARTITION is not supported
+TRUNCATE TABLE t1 PARTITION (p1);
+
+statement error TRUNCATE with ONLY is not supported
+TRUNCATE ONLY t1;
+
+statement error TRUNCATE with RESTART/CONTINUE IDENTITY is not supported
+TRUNCATE TABLE t1 RESTART IDENTITY;
+
+# Test TRUNCATE without TABLE keyword
+query TT
+explain truncate t1;
+----
+logical_plan
+01)Dml: op=[Truncate] table=[t1]
+02)--EmptyRelation: rows=0
+physical_plan_error
+01)TRUNCATE operation on table 't1'
+02)caused by
+03)This feature is not implemented: TRUNCATE not supported for Base table
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]