CTTY commented on code in PR #1769: URL: https://github.com/apache/iceberg-rust/pull/1769#discussion_r2493162714
########## crates/integrations/datafusion/src/task_writer.rs: ########## @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! TaskWriter for DataFusion integration. +//! +//! This module provides a high-level writer that handles partitioning and routing +//! of RecordBatch data to Iceberg tables. + +use datafusion::arrow::array::RecordBatch; +use iceberg::Result; +use iceberg::arrow::RecordBatchPartitionSplitter; +use iceberg::spec::{DataFile, PartitionSpecRef, SchemaRef}; +use iceberg::writer::IcebergWriterBuilder; +use iceberg::writer::partitioning::PartitioningWriter; +use iceberg::writer::partitioning::clustered_writer::ClusteredWriter; +use iceberg::writer::partitioning::fanout_writer::FanoutWriter; +use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; + +/// High-level writer for DataFusion that handles partitioning and routing of RecordBatch data. +/// +/// TaskWriter coordinates writing data to Iceberg tables by: +/// - Selecting the appropriate partitioning strategy (unpartitioned, fanout, or clustered) +/// - Lazily initializing the partition splitter on first write +/// - Routing data to the underlying writer +/// - Collecting all written data files +/// +/// # Type Parameters +/// +/// * `B` - The IcebergWriterBuilder type used to create underlying writers +/// +/// # Example +/// +/// ```rust,ignore +/// use iceberg::spec::{PartitionSpec, Schema}; +/// use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +/// use iceberg_datafusion::writer::task_writer::TaskWriter; +/// +/// // Create a TaskWriter for an unpartitioned table +/// let task_writer = TaskWriter::new( +/// data_file_writer_builder, +/// false, // fanout_enabled +/// schema, +/// partition_spec, +/// ); +/// +/// // Write data +/// task_writer.write(record_batch).await?; +/// +/// // Close and get data files +/// let data_files = task_writer.close().await?; +/// ``` +pub struct TaskWriter<B: IcebergWriterBuilder> { + /// The underlying writer (UnpartitionedWriter, FanoutWriter, or ClusteredWriter) + writer: SupportedWriter<B>, + /// Lazily initialized partition splitter for partitioned tables + partition_splitter: Option<RecordBatchPartitionSplitter>, + /// Iceberg schema reference + schema: SchemaRef, + /// Partition specification reference + partition_spec: PartitionSpecRef, +} + +/// Internal enum to hold the different writer types. +/// +/// This enum allows TaskWriter to work with different partitioning strategies +/// while maintaining a unified interface. +enum SupportedWriter<B: IcebergWriterBuilder> { + /// Writer for unpartitioned tables + Unpartitioned(UnpartitionedWriter<B>), + /// Writer for partitioned tables with unsorted data (maintains multiple active writers) + Fanout(FanoutWriter<B>), + /// Writer for partitioned tables with sorted data (maintains single active writer) + Clustered(ClusteredWriter<B>), +} + +impl<B: IcebergWriterBuilder> TaskWriter<B> { + /// Create a new TaskWriter. + /// + /// # Parameters + /// + /// * `writer_builder` - The IcebergWriterBuilder to use for creating underlying writers + /// * `fanout_enabled` - If true, use FanoutWriter for partitioned tables; otherwise use ClusteredWriter + /// * `schema` - The Iceberg schema reference + /// * `partition_spec` - The partition specification reference + /// + /// # Returns + /// + /// Returns a new TaskWriter instance. + /// + /// # Writer Selection Logic + /// + /// - If partition_spec is unpartitioned: creates UnpartitionedWriter + /// - If partition_spec is partitioned AND fanout_enabled is true: creates FanoutWriter + /// - If partition_spec is partitioned AND fanout_enabled is false: creates ClusteredWriter + /// + /// # Example + /// + /// ```rust,ignore + /// use iceberg::spec::{PartitionSpec, Schema}; + /// use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; + /// use iceberg_datafusion::writer::task_writer::TaskWriter; + /// + /// // Create a TaskWriter for an unpartitioned table + /// let task_writer = TaskWriter::new( + /// data_file_writer_builder, + /// false, // fanout_enabled + /// schema, + /// partition_spec, + /// ); + /// ``` + pub fn new( + writer_builder: B, + fanout_enabled: bool, + schema: SchemaRef, + partition_spec: PartitionSpecRef, + ) -> Self { + let writer = if partition_spec.is_unpartitioned() { + SupportedWriter::Unpartitioned(UnpartitionedWriter::new(writer_builder)) + } else if fanout_enabled { + SupportedWriter::Fanout(FanoutWriter::new(writer_builder)) + } else { + SupportedWriter::Clustered(ClusteredWriter::new(writer_builder)) + }; + + Self { + writer, + partition_splitter: None, Review Comment: This is because partition_splitter requires the schema of record batches, which may contain projected column and differ from the Iceberg schema. it would be safer to just use the schema of record batches directly to initialize partition_splitter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
