This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 1fcad936 feat(datafusion): Support `INSERT INTO` partitioned tables 
(#1827)
1fcad936 is described below

commit 1fcad93641603917829cdd6d60f4570062394855
Author: Shawn Chang <[email protected]>
AuthorDate: Wed Nov 12 02:49:54 2025 -0800

    feat(datafusion): Support `INSERT INTO` partitioned tables (#1827)
    
    ## Which issue does this PR close?
    
    - Closes #1828
    - Related to #1540
    
    ## What changes are included in this PR?
    - Use project to calculate partition values for record batches
    - Repartition inputs for table_provider::insert_into
    - Initialize partition_splitter in TaskWriter's constructor
    - Use TaskWriter in `IcebergWriteExec` to support partitioned data
    
    
    ## Are these changes tested?
    Added an ut
---
 .../src/arrow/record_batch_partition_splitter.rs   |  20 +--
 .../datafusion/src/physical_plan/repartition.rs    |   1 -
 .../datafusion/src/physical_plan/write.rs          |  48 ++++---
 crates/integrations/datafusion/src/table/mod.rs    |  43 ++++--
 crates/integrations/datafusion/src/task_writer.rs  |  71 ++++------
 .../tests/integration_datafusion_test.rs           | 152 ++++++++++++++++++++-
 6 files changed, 239 insertions(+), 96 deletions(-)

diff --git a/crates/iceberg/src/arrow/record_batch_partition_splitter.rs 
b/crates/iceberg/src/arrow/record_batch_partition_splitter.rs
index dcdd9c68..7b83621f 100644
--- a/crates/iceberg/src/arrow/record_batch_partition_splitter.rs
+++ b/crates/iceberg/src/arrow/record_batch_partition_splitter.rs
@@ -59,7 +59,7 @@ impl RecordBatchPartitionSplitter {
     /// # Returns
     ///
     /// Returns a new `RecordBatchPartitionSplitter` instance or an error if 
initialization fails.
-    pub fn new(
+    pub fn try_new(
         iceberg_schema: SchemaRef,
         partition_spec: PartitionSpecRef,
         calculator: Option<PartitionValueCalculator>,
@@ -87,12 +87,12 @@ impl RecordBatchPartitionSplitter {
     /// # Returns
     ///
     /// Returns a new `RecordBatchPartitionSplitter` instance or an error if 
initialization fails.
-    pub fn new_with_computed_values(
+    pub fn try_new_with_computed_values(
         iceberg_schema: SchemaRef,
         partition_spec: PartitionSpecRef,
     ) -> Result<Self> {
         let calculator = PartitionValueCalculator::try_new(&partition_spec, 
&iceberg_schema)?;
-        Self::new(iceberg_schema, partition_spec, Some(calculator))
+        Self::try_new(iceberg_schema, partition_spec, Some(calculator))
     }
 
     /// Create a new RecordBatchPartitionSplitter expecting pre-computed 
partition values.
@@ -108,11 +108,11 @@ impl RecordBatchPartitionSplitter {
     /// # Returns
     ///
     /// Returns a new `RecordBatchPartitionSplitter` instance or an error if 
initialization fails.
-    pub fn new_with_precomputed_values(
+    pub fn try_new_with_precomputed_values(
         iceberg_schema: SchemaRef,
         partition_spec: PartitionSpecRef,
     ) -> Result<Self> {
-        Self::new(iceberg_schema, partition_spec, None)
+        Self::try_new(iceberg_schema, partition_spec, None)
     }
 
     /// Split the record batch into multiple record batches based on the 
partition spec.
@@ -261,9 +261,11 @@ mod tests {
                 .build()
                 .unwrap(),
         );
-        let partition_splitter =
-            
RecordBatchPartitionSplitter::new_with_computed_values(schema.clone(), 
partition_spec)
-                .expect("Failed to create splitter");
+        let partition_splitter = 
RecordBatchPartitionSplitter::try_new_with_computed_values(
+            schema.clone(),
+            partition_spec,
+        )
+        .expect("Failed to create splitter");
 
         let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
         let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]);
@@ -392,7 +394,7 @@ mod tests {
         ]));
 
         // Create splitter expecting pre-computed partition column
-        let partition_splitter = 
RecordBatchPartitionSplitter::new_with_precomputed_values(
+        let partition_splitter = 
RecordBatchPartitionSplitter::try_new_with_precomputed_values(
             schema.clone(),
             partition_spec,
         )
diff --git a/crates/integrations/datafusion/src/physical_plan/repartition.rs 
b/crates/integrations/datafusion/src/physical_plan/repartition.rs
index 95cdc847..8ad87fd1 100644
--- a/crates/integrations/datafusion/src/physical_plan/repartition.rs
+++ b/crates/integrations/datafusion/src/physical_plan/repartition.rs
@@ -86,7 +86,6 @@ use iceberg::spec::{TableMetadata, TableMetadataRef, 
Transform};
 ///     NonZeroUsize::new(4).unwrap(),
 /// )?;
 /// ```
-#[allow(dead_code)]
 pub(crate) fn repartition(
     input: Arc<dyn ExecutionPlan>,
     table_metadata: TableMetadataRef,
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs 
b/crates/integrations/datafusion/src/physical_plan/write.rs
index b9d1f02d..9eb53c23 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -35,7 +35,7 @@ use datafusion::physical_plan::{
     execute_input_stream,
 };
 use futures::StreamExt;
-use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema};
+use iceberg::arrow::FieldMatchMode;
 use iceberg::spec::{DataFileFormat, TableProperties, 
serialize_data_file_to_json};
 use iceberg::table::Table;
 use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
@@ -44,12 +44,12 @@ use iceberg::writer::file_writer::location_generator::{
     DefaultFileNameGenerator, DefaultLocationGenerator,
 };
 use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
-use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
 use iceberg::{Error, ErrorKind};
 use parquet::file::properties::WriterProperties;
 use uuid::Uuid;
 
 use crate::physical_plan::DATA_FILES_COL_NAME;
+use crate::task_writer::TaskWriter;
 use crate::to_datafusion_error;
 
 /// An execution plan node that writes data to an Iceberg table.
@@ -205,18 +205,6 @@ impl ExecutionPlan for IcebergWriteExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> DFResult<SendableRecordBatchStream> {
-        if !self
-            .table
-            .metadata()
-            .default_partition_spec()
-            .is_unpartitioned()
-        {
-            // TODO add support for partitioned tables
-            return Err(DataFusionError::NotImplemented(
-                "IcebergWriteExec does not support partitioned tables 
yet".to_string(),
-            ));
-        }
-
         let partition_type = 
self.table.metadata().default_partition_type().clone();
         let format_version = self.table.metadata().format_version();
 
@@ -277,31 +265,41 @@ impl ExecutionPlan for IcebergWriteExec {
         );
         let data_file_writer_builder = 
DataFileWriterBuilder::new(rolling_writer_builder);
 
+        // Create TaskWriter
+        // TODO: Make fanout_enabled configurable via table properties
+        let fanout_enabled = true;
+        let schema = self.table.metadata().current_schema().clone();
+        let partition_spec = 
self.table.metadata().default_partition_spec().clone();
+        let task_writer = TaskWriter::try_new(
+            data_file_writer_builder,
+            fanout_enabled,
+            schema.clone(),
+            partition_spec,
+        )
+        .map_err(to_datafusion_error)?;
+
         // Get input data
         let data = execute_input_stream(
             Arc::clone(&self.input),
-            Arc::new(
-                schema_to_arrow_schema(self.table.metadata().current_schema())
-                    .map_err(to_datafusion_error)?,
-            ),
+            self.input.schema(), // input schema may have projected column 
`_partition`
             partition,
             Arc::clone(&context),
         )?;
 
         // Create write stream
         let stream = futures::stream::once(async move {
-            let mut writer = data_file_writer_builder
-                // todo specify partition key when partitioning writer is 
supported
-                .build(None)
-                .await
-                .map_err(to_datafusion_error)?;
+            let mut task_writer = task_writer;
             let mut input_stream = data;
 
             while let Some(batch) = input_stream.next().await {
-                writer.write(batch?).await.map_err(to_datafusion_error)?;
+                let batch = batch?;
+                task_writer
+                    .write(batch)
+                    .await
+                    .map_err(to_datafusion_error)?;
             }
 
-            let data_files = 
writer.close().await.map_err(to_datafusion_error)?;
+            let data_files = 
task_writer.close().await.map_err(to_datafusion_error)?;
 
             // Convert builders to data files and then to JSON strings
             let data_files_strs: Vec<String> = data_files
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index a8c49837..42a3baad 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -19,6 +19,7 @@ pub mod metadata_table;
 pub mod table_provider_factory;
 
 use std::any::Any;
+use std::num::NonZeroUsize;
 use std::sync::Arc;
 
 use async_trait::async_trait;
@@ -38,6 +39,8 @@ use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, 
Result, TableIdent};
 use metadata_table::IcebergMetadataTableProvider;
 
 use crate::physical_plan::commit::IcebergCommitExec;
+use crate::physical_plan::project::project_with_partition;
+use crate::physical_plan::repartition::repartition;
 use crate::physical_plan::scan::IcebergTableScan;
 use crate::physical_plan::write::IcebergWriteExec;
 
@@ -170,32 +173,42 @@ impl TableProvider for IcebergTableProvider {
 
     async fn insert_into(
         &self,
-        _state: &dyn Session,
+        state: &dyn Session,
         input: Arc<dyn ExecutionPlan>,
         _insert_op: InsertOp,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        if !self
-            .table
-            .metadata()
-            .default_partition_spec()
-            .is_unpartitioned()
-        {
-            // TODO add insert into support for partitioned tables
-            return Err(DataFusionError::NotImplemented(
-                "IcebergTableProvider::insert_into does not support 
partitioned tables yet"
-                    .to_string(),
-            ));
-        }
-
         let Some(catalog) = self.catalog.clone() else {
             return Err(DataFusionError::Execution(
                 "Catalog cannot be none for insert_into".to_string(),
             ));
         };
 
+        let partition_spec = self.table.metadata().default_partition_spec();
+
+        // Step 1: Project partition values for partitioned tables
+        let plan_with_partition = if !partition_spec.is_unpartitioned() {
+            project_with_partition(input, &self.table)?
+        } else {
+            input
+        };
+
+        // Step 2: Repartition for parallel processing
+        let target_partitions =
+            
NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
+                DataFusionError::Configuration(
+                    "target_partitions must be greater than 0".to_string(),
+                )
+            })?;
+
+        let repartitioned_plan = repartition(
+            plan_with_partition,
+            self.table.metadata_ref(),
+            target_partitions,
+        )?;
+
         let write_plan = Arc::new(IcebergWriteExec::new(
             self.table.clone(),
-            input,
+            repartitioned_plan,
             self.schema.clone(),
         ));
 
diff --git a/crates/integrations/datafusion/src/task_writer.rs 
b/crates/integrations/datafusion/src/task_writer.rs
index d27b2e6f..5329f264 100644
--- a/crates/integrations/datafusion/src/task_writer.rs
+++ b/crates/integrations/datafusion/src/task_writer.rs
@@ -34,7 +34,7 @@ use 
iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter;
 ///
 /// TaskWriter coordinates writing data to Iceberg tables by:
 /// - Selecting the appropriate partitioning strategy (unpartitioned, fanout, 
or clustered)
-/// - Lazily initializing the partition splitter on first write
+/// - Initializing the partition splitter in the constructor for partitioned 
tables
 /// - Routing data to the underlying writer
 /// - Collecting all written data files
 ///
@@ -63,23 +63,17 @@ use 
iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter;
 /// // Close and get data files
 /// let data_files = task_writer.close().await?;
 /// ```
-#[allow(dead_code)]
 pub(crate) struct TaskWriter<B: IcebergWriterBuilder> {
     /// The underlying writer (UnpartitionedWriter, FanoutWriter, or 
ClusteredWriter)
     writer: SupportedWriter<B>,
-    /// Lazily initialized partition splitter for partitioned tables
+    /// Partition splitter for partitioned tables (initialized in constructor)
     partition_splitter: Option<RecordBatchPartitionSplitter>,
-    /// Iceberg schema reference
-    schema: SchemaRef,
-    /// Partition specification reference
-    partition_spec: PartitionSpecRef,
 }
 
 /// Internal enum to hold the different writer types.
 ///
 /// This enum allows TaskWriter to work with different partitioning strategies
 /// while maintaining a unified interface.
-#[allow(dead_code)]
 enum SupportedWriter<B: IcebergWriterBuilder> {
     /// Writer for unpartitioned tables
     Unpartitioned(UnpartitionedWriter<B>),
@@ -89,7 +83,6 @@ enum SupportedWriter<B: IcebergWriterBuilder> {
     Clustered(ClusteredWriter<B>),
 }
 
-#[allow(dead_code)]
 impl<B: IcebergWriterBuilder> TaskWriter<B> {
     /// Create a new TaskWriter.
     ///
@@ -125,12 +118,12 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
     ///     partition_spec,
     /// );
     /// ```
-    pub fn new(
+    pub fn try_new(
         writer_builder: B,
         fanout_enabled: bool,
         schema: SchemaRef,
         partition_spec: PartitionSpecRef,
-    ) -> Self {
+    ) -> Result<Self> {
         let writer = if partition_spec.is_unpartitioned() {
             
SupportedWriter::Unpartitioned(UnpartitionedWriter::new(writer_builder))
         } else if fanout_enabled {
@@ -139,17 +132,28 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
             SupportedWriter::Clustered(ClusteredWriter::new(writer_builder))
         };
 
-        Self {
+        // Initialize partition splitter in constructor for partitioned tables
+        let partition_splitter = if !partition_spec.is_unpartitioned() {
+            Some(
+                RecordBatchPartitionSplitter::try_new_with_precomputed_values(
+                    schema.clone(),
+                    partition_spec.clone(),
+                )?,
+            )
+        } else {
+            None
+        };
+
+        Ok(Self {
             writer,
-            partition_splitter: None,
-            schema,
-            partition_spec,
-        }
+            partition_splitter,
+        })
     }
 
     /// Write a RecordBatch to the TaskWriter.
     ///
-    /// For the first write to a partitioned table, this method initializes 
the partition splitter.
+    /// For partitioned tables, uses the partition splitter to split
+    /// the batch by partition key and route each partition to the underlying 
writer.
     /// For unpartitioned tables, data is written directly without splitting.
     ///
     /// # Parameters
@@ -163,7 +167,6 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
     /// # Errors
     ///
     /// This method will return an error if:
-    /// - Partition splitter initialization fails
     /// - Splitting the batch by partition fails
     /// - Writing to the underlying writer fails
     ///
@@ -183,29 +186,9 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
                 writer.write(batch).await
             }
             SupportedWriter::Fanout(writer) => {
-                // Initialize splitter on first write if needed
-                if self.partition_splitter.is_none() {
-                    self.partition_splitter =
-                        
Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
-                            self.schema.clone(),
-                            self.partition_spec.clone(),
-                        )?);
-                }
-
-                // Split and write partitioned data
                 Self::write_partitioned_batches(writer, 
&self.partition_splitter, &batch).await
             }
             SupportedWriter::Clustered(writer) => {
-                // Initialize splitter on first write if needed
-                if self.partition_splitter.is_none() {
-                    self.partition_splitter =
-                        
Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
-                            self.schema.clone(),
-                            self.partition_spec.clone(),
-                        )?);
-                }
-
-                // Split and write partitioned data
                 Self::write_partitioned_batches(writer, 
&self.partition_splitter, &batch).await
             }
         }
@@ -214,13 +197,13 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
     /// Helper method to split and write partitioned data.
     ///
     /// This method handles the common logic for both FanoutWriter and 
ClusteredWriter:
-    /// - Splits the batch by partition key using the provided splitter
-    /// - Writes each partition to the underlying writer
+    /// - Splits the batch by partition key using the partition splitter
+    /// - Writes each partition to the underlying writer with its 
corresponding partition key
     ///
     /// # Parameters
     ///
     /// * `writer` - The underlying PartitioningWriter (FanoutWriter or 
ClusteredWriter)
-    /// * `partition_splitter` - The partition splitter (must be initialized)
+    /// * `partition_splitter` - The partition splitter
     /// * `batch` - The RecordBatch to write
     ///
     /// # Returns
@@ -393,7 +376,7 @@ mod tests {
         let partition_spec = 
Arc::new(PartitionSpec::builder(schema.clone()).build()?);
 
         let writer_builder = create_writer_builder(&temp_dir, schema.clone())?;
-        let mut task_writer = TaskWriter::new(writer_builder, false, schema, 
partition_spec);
+        let mut task_writer = TaskWriter::try_new(writer_builder, false, 
schema, partition_spec)?;
 
         // Write data
         let batch = RecordBatch::try_new(arrow_schema, vec![
@@ -459,7 +442,7 @@ mod tests {
         );
 
         let writer_builder = create_writer_builder(&temp_dir, schema.clone())?;
-        let mut task_writer = TaskWriter::new(writer_builder, true, schema, 
partition_spec);
+        let mut task_writer = TaskWriter::try_new(writer_builder, true, 
schema, partition_spec)?;
 
         // Create partition column
         let partition_field = Field::new("region", DataType::Utf8, 
false).with_metadata(
@@ -502,7 +485,7 @@ mod tests {
         );
 
         let writer_builder = create_writer_builder(&temp_dir, schema.clone())?;
-        let mut task_writer = TaskWriter::new(writer_builder, false, schema, 
partition_spec);
+        let mut task_writer = TaskWriter::try_new(writer_builder, false, 
schema, partition_spec)?;
 
         // Create partition column
         let partition_field = Field::new("region", DataType::Utf8, 
false).with_metadata(
diff --git 
a/crates/integrations/datafusion/tests/integration_datafusion_test.rs 
b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
index cb4987a9..fdf5b17d 100644
--- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
+++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
@@ -27,9 +27,13 @@ use datafusion::execution::context::SessionContext;
 use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
 use expect_test::expect;
 use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
-use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
+use iceberg::spec::{
+    NestedField, PrimitiveType, Schema, StructType, Transform, Type, 
UnboundPartitionSpec,
+};
 use iceberg::test_utils::check_record_batches;
-use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, 
TableCreation};
+use iceberg::{
+    Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, 
TableCreation, TableIdent,
+};
 use iceberg_datafusion::IcebergCatalogProvider;
 use tempfile::TempDir;
 
@@ -810,3 +814,147 @@ async fn test_insert_into_nested() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn test_insert_into_partitioned() -> Result<()> {
+    let iceberg_catalog = get_iceberg_catalog().await;
+    let namespace = NamespaceIdent::new("test_partitioned_write".to_string());
+    set_test_namespace(&iceberg_catalog, &namespace).await?;
+
+    // Create a schema with a partition column
+    let schema = Schema::builder()
+        .with_schema_id(0)
+        .with_fields(vec![
+            NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+            NestedField::required(2, "category", 
Type::Primitive(PrimitiveType::String)).into(),
+            NestedField::required(3, "value", 
Type::Primitive(PrimitiveType::String)).into(),
+        ])
+        .build()?;
+
+    // Create partition spec with identity transform on category
+    let partition_spec = UnboundPartitionSpec::builder()
+        .with_spec_id(0)
+        .add_partition_field(2, "category", Transform::Identity)?
+        .build();
+
+    // Create the partitioned table
+    let creation = TableCreation::builder()
+        .name("partitioned_table".to_string())
+        .location(temp_path())
+        .schema(schema)
+        .partition_spec(partition_spec)
+        .properties(HashMap::new())
+        .build();
+
+    iceberg_catalog.create_table(&namespace, creation).await?;
+
+    let client = Arc::new(iceberg_catalog);
+    let catalog = 
Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?);
+
+    let ctx = SessionContext::new();
+    ctx.register_catalog("catalog", catalog);
+
+    // Insert data with multiple partition values in a single batch
+    let df = ctx
+        .sql(
+            r#"
+            INSERT INTO catalog.test_partitioned_write.partitioned_table 
+            VALUES 
+                (1, 'electronics', 'laptop'),
+                (2, 'electronics', 'phone'),
+                (3, 'books', 'novel'),
+                (4, 'books', 'textbook'),
+                (5, 'clothing', 'shirt')
+            "#,
+        )
+        .await
+        .unwrap();
+
+    let batches = df.collect().await.unwrap();
+    assert_eq!(batches.len(), 1);
+    let batch = &batches[0];
+    let rows_inserted = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<UInt64Array>()
+        .unwrap();
+    assert_eq!(rows_inserted.value(0), 5);
+
+    // Refresh catalog to get updated table
+    let catalog = 
Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?);
+    ctx.register_catalog("catalog", catalog);
+
+    // Query the table to verify data
+    let df = ctx
+        .sql("SELECT * FROM catalog.test_partitioned_write.partitioned_table 
ORDER BY id")
+        .await
+        .unwrap();
+
+    let batches = df.collect().await.unwrap();
+
+    // Verify the data - note that _partition column should NOT be present
+    check_record_batches(
+        batches,
+        expect![[r#"
+            Field { name: "id", data_type: Int32, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
+            Field { name: "category", data_type: Utf8, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
+            Field { name: "value", data_type: Utf8, nullable: false, dict_id: 
0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }"#]],
+        expect![[r#"
+            id: PrimitiveArray<Int32>
+            [
+              1,
+              2,
+              3,
+              4,
+              5,
+            ],
+            category: StringArray
+            [
+              "electronics",
+              "electronics",
+              "books",
+              "books",
+              "clothing",
+            ],
+            value: StringArray
+            [
+              "laptop",
+              "phone",
+              "novel",
+              "textbook",
+              "shirt",
+            ]"#]],
+        &[],
+        Some("id"),
+    );
+
+    // Verify that data files exist under correct partition paths
+    let table_ident = TableIdent::new(namespace.clone(), 
"partitioned_table".to_string());
+    let table = client.load_table(&table_ident).await?;
+    let table_location = table.metadata().location();
+    let file_io = table.file_io();
+
+    // List files under each expected partition path
+    let electronics_path = format!("{}/data/category=electronics", 
table_location);
+    let books_path = format!("{}/data/category=books", table_location);
+    let clothing_path = format!("{}/data/category=clothing", table_location);
+
+    // Verify partition directories exist and contain data files
+    assert!(
+        file_io.exists(&electronics_path).await?,
+        "Expected partition directory: {}",
+        electronics_path
+    );
+    assert!(
+        file_io.exists(&books_path).await?,
+        "Expected partition directory: {}",
+        books_path
+    );
+    assert!(
+        file_io.exists(&clothing_path).await?,
+        "Expected partition directory: {}",
+        clothing_path
+    );
+
+    Ok(())
+}

Reply via email to