This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ec35634c feat: Add experimental support for native Parquet writes 
(#2812)
1ec35634c is described below

commit 1ec35634cfb30936a5034f19c9aab6290abf9b9b
Author: Andy Grove <[email protected]>
AuthorDate: Wed Nov 26 06:47:11 2025 -0700

    feat: Add experimental support for native Parquet writes (#2812)
---
 .github/workflows/pr_build_linux.yml               |   1 +
 .github/workflows/pr_build_macos.yml               |   1 +
 .../main/scala/org/apache/comet/CometConf.scala    |  11 +
 docs/source/user-guide/latest/configs.md           |   1 +
 docs/source/user-guide/latest/operators.md         |  41 +--
 native/core/src/execution/operators/mod.rs         |   2 +
 .../core/src/execution/operators/parquet_writer.rs | 278 +++++++++++++++++++++
 native/core/src/execution/planner.rs               |  34 ++-
 native/proto/src/proto/operator.proto              |   7 +
 .../org/apache/comet/rules/CometExecRule.scala     |  27 +-
 .../serde/operator/CometDataWritingCommand.scala   | 179 +++++++++++++
 .../spark/sql/comet/CometNativeWriteExec.scala     | 112 +++++++++
 .../comet/parquet/CometParquetWriterSuite.scala    | 155 ++++++++++++
 13 files changed, 827 insertions(+), 22 deletions(-)

diff --git a/.github/workflows/pr_build_linux.yml 
b/.github/workflows/pr_build_linux.yml
index 28999c5b8..43e1f776b 100644
--- a/.github/workflows/pr_build_linux.yml
+++ b/.github/workflows/pr_build_linux.yml
@@ -118,6 +118,7 @@ jobs:
               org.apache.comet.exec.DisableAQECometAsyncShuffleSuite
           - name: "parquet"
             value: |
+              org.apache.comet.parquet.CometParquetWriterSuite
               org.apache.comet.parquet.ParquetReadV1Suite
               org.apache.comet.parquet.ParquetReadV2Suite
               org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite
diff --git a/.github/workflows/pr_build_macos.yml 
b/.github/workflows/pr_build_macos.yml
index 313727319..88dfd9f92 100644
--- a/.github/workflows/pr_build_macos.yml
+++ b/.github/workflows/pr_build_macos.yml
@@ -83,6 +83,7 @@ jobs:
               org.apache.comet.exec.DisableAQECometAsyncShuffleSuite
           - name: "parquet"
             value: |
+              org.apache.comet.parquet.CometParquetWriterSuite
               org.apache.comet.parquet.ParquetReadV1Suite
               org.apache.comet.parquet.ParquetReadV2Suite
               org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 0484b64f1..1e5d19ee2 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -100,6 +100,17 @@ object CometConf extends ShimCometConf {
     .booleanConf
     .createWithDefault(true)
 
+  val COMET_NATIVE_PARQUET_WRITE_ENABLED: ConfigEntry[Boolean] =
+    conf("spark.comet.parquet.write.enabled")
+      .category(CATEGORY_TESTING)
+      .doc(
+        "Whether to enable native Parquet write through Comet. When enabled, " 
+
+          "Comet will intercept Parquet write operations and execute them 
natively. This " +
+          "feature is highly experimental and only partially implemented. It 
should not " +
+          "be used in production.")
+      .booleanConf
+      .createWithDefault(false)
+
   val SCAN_NATIVE_COMET = "native_comet"
   val SCAN_NATIVE_DATAFUSION = "native_datafusion"
   val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
diff --git a/docs/source/user-guide/latest/configs.md 
b/docs/source/user-guide/latest/configs.md
index 1e77032f7..a1c3212c2 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -142,6 +142,7 @@ These settings can be used to determine which parts of the 
plan are accelerated
 | `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap 
mode. Required for running Spark SQL tests. It can be overridden by the 
environment variable `ENABLE_COMET_ONHEAP`. | false |
 | `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used 
for Comet native execution when running Spark in on-heap mode. Available pool 
types are `greedy`, `fair_spill`, `greedy_task_shared`, 
`fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and 
`unbounded`. | greedy_task_shared |
 | `spark.comet.memoryOverhead` | The amount of additional memory to be 
allocated per executor process for Comet, in MiB, when running Spark in on-heap 
mode. | 1024 MiB |
+| `spark.comet.parquet.write.enabled` | Whether to enable native Parquet write 
through Comet. When enabled, Comet will intercept Parquet write operations and 
execute them natively. This feature is highly experimental and only partially 
implemented. It should not be used in production. | false |
 | `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow 
columnar conversion. When this is turned on, Comet will convert operators in 
`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format 
before processing. This is an experimental feature and has known issues with 
non-UTC timezones. | false |
 | `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list 
of operators that will be converted to Arrow columnar format when 
`spark.comet.sparkToColumnar.enabled` is true. | 
Range,InMemoryTableScan,RDDScan |
 | `spark.comet.testing.strict` | Experimental option to enable strict testing, 
which will fail tests that could be more comprehensive, such as checking for a 
specific fallback reason. It can be overridden by the environment variable 
`ENABLE_COMET_STRICT_TESTING`. | false |
diff --git a/docs/source/user-guide/latest/operators.md 
b/docs/source/user-guide/latest/operators.md
index fdfbcef68..f5f2d9724 100644
--- a/docs/source/user-guide/latest/operators.md
+++ b/docs/source/user-guide/latest/operators.md
@@ -22,25 +22,26 @@
 The following Spark operators are currently replaced with native versions. 
Query stages that contain any operators
 not supported by Comet will fall back to regular Spark execution.
 
-| Operator                | Spark-Compatible? | Compatibility Notes            
                                                                                
    |
-| ----------------------- | ----------------- | 
------------------------------------------------------------------------------------------------------------------
 |
-| BatchScanExec           | Yes               | Supports Parquet files and 
Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for more 
information. |
-| BroadcastExchangeExec   | Yes               |                                
                                                                                
    |
-| BroadcastHashJoinExec   | Yes               |                                
                                                                                
    |
-| ExpandExec              | Yes               |                                
                                                                                
    |
-| FileSourceScanExec      | Yes               | Supports Parquet files. See 
the [Comet Compatibility Guide] for more information.                           
       |
-| FilterExec              | Yes               |                                
                                                                                
    |
-| GlobalLimitExec         | Yes               |                                
                                                                                
    |
-| HashAggregateExec       | Yes               |                                
                                                                                
    |
-| LocalLimitExec          | Yes               |                                
                                                                                
    |
-| LocalTableScanExec      | No                | Experimental and disabled by 
default.                                                                        
      |
-| ObjectHashAggregateExec | Yes               | Supports a limited number of 
aggregates, such as `bloom_filter_agg`.                                         
      |
-| ProjectExec             | Yes               |                                
                                                                                
    |
-| ShuffleExchangeExec     | Yes               |                                
                                                                                
    |
-| ShuffledHashJoinExec    | Yes               |                                
                                                                                
    |
-| SortExec                | Yes               |                                
                                                                                
    |
-| SortMergeJoinExec       | Yes               |                                
                                                                                
    |
-| UnionExec               | Yes               |                                
                                                                                
    |
-| WindowExec              | No                | Disabled by default due to 
known correctness issues.                                                       
        |
+| Operator                          | Spark-Compatible? | Compatibility Notes  
                                                                                
              |
+| --------------------------------- | ----------------- | 
------------------------------------------------------------------------------------------------------------------
 |
+| BatchScanExec                     | Yes               | Supports Parquet 
files and Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for 
more information. |
+| BroadcastExchangeExec             | Yes               |                      
                                                                                
              |
+| BroadcastHashJoinExec             | Yes               |                      
                                                                                
              |
+| ExpandExec                        | Yes               |                      
                                                                                
              |
+| FileSourceScanExec                | Yes               | Supports Parquet 
files. See the [Comet Compatibility Guide] for more information.                
                  |
+| FilterExec                        | Yes               |                      
                                                                                
              |
+| GlobalLimitExec                   | Yes               |                      
                                                                                
              |
+| HashAggregateExec                 | Yes               |                      
                                                                                
              |
+| InsertIntoHadoopFsRelationCommand | No                | Experimental support 
for native Parquet writes. Disabled by default.                                 
              |
+| LocalLimitExec                    | Yes               |                      
                                                                                
              |
+| LocalTableScanExec                | No                | Experimental and 
disabled by default.                                                            
                  |
+| ObjectHashAggregateExec           | Yes               | Supports a limited 
number of aggregates, such as `bloom_filter_agg`.                               
                |
+| ProjectExec                       | Yes               |                      
                                                                                
              |
+| ShuffleExchangeExec               | Yes               |                      
                                                                                
              |
+| ShuffledHashJoinExec              | Yes               |                      
                                                                                
              |
+| SortExec                          | Yes               |                      
                                                                                
              |
+| SortMergeJoinExec                 | Yes               |                      
                                                                                
              |
+| UnionExec                         | Yes               |                      
                                                                                
              |
+| WindowExec                        | No                | Disabled by default 
due to known correctness issues.                                                
               |
 
 [Comet Compatibility Guide]: compatibility.md
diff --git a/native/core/src/execution/operators/mod.rs 
b/native/core/src/execution/operators/mod.rs
index b3998e2f6..b01f7857b 100644
--- a/native/core/src/execution/operators/mod.rs
+++ b/native/core/src/execution/operators/mod.rs
@@ -29,6 +29,8 @@ mod copy;
 mod expand;
 pub use expand::ExpandExec;
 mod iceberg_scan;
+mod parquet_writer;
+pub use parquet_writer::ParquetWriterExec;
 mod scan;
 
 /// Error returned during executing operators.
diff --git a/native/core/src/execution/operators/parquet_writer.rs 
b/native/core/src/execution/operators/parquet_writer.rs
new file mode 100644
index 000000000..5536e30dc
--- /dev/null
+++ b/native/core/src/execution/operators/parquet_writer.rs
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Parquet writer operator for writing RecordBatches to Parquet files
+
+use std::{
+    any::Any,
+    fmt,
+    fmt::{Debug, Formatter},
+    fs::File,
+    sync::Arc,
+};
+
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+use datafusion::{
+    error::{DataFusionError, Result},
+    execution::context::TaskContext,
+    physical_expr::EquivalenceProperties,
+    physical_plan::{
+        execution_plan::{Boundedness, EmissionType},
+        metrics::{ExecutionPlanMetricsSet, MetricsSet},
+        DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, 
PlanProperties,
+        SendableRecordBatchStream, Statistics,
+    },
+};
+use futures::TryStreamExt;
+use parquet::{
+    arrow::ArrowWriter,
+    basic::{Compression, ZstdLevel},
+    file::properties::WriterProperties,
+};
+
+use crate::execution::shuffle::CompressionCodec;
+
+/// Parquet writer operator that writes input batches to a Parquet file
+#[derive(Debug)]
+pub struct ParquetWriterExec {
+    /// Input execution plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Output file path
+    output_path: String,
+    /// Compression codec
+    compression: CompressionCodec,
+    /// Partition ID (from Spark TaskContext)
+    partition_id: i32,
+    /// Column names to use in the output Parquet file
+    column_names: Vec<String>,
+    /// Metrics
+    metrics: ExecutionPlanMetricsSet,
+    /// Cache for plan properties
+    cache: PlanProperties,
+}
+
+impl ParquetWriterExec {
+    /// Create a new ParquetWriterExec
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        output_path: String,
+        compression: CompressionCodec,
+        partition_id: i32,
+        column_names: Vec<String>,
+    ) -> Result<Self> {
+        // Preserve the input's partitioning so each partition writes its own 
file
+        let input_partitioning = input.output_partitioning().clone();
+
+        let cache = PlanProperties::new(
+            EquivalenceProperties::new(Arc::clone(&input.schema())),
+            input_partitioning,
+            EmissionType::Final,
+            Boundedness::Bounded,
+        );
+
+        Ok(ParquetWriterExec {
+            input,
+            output_path,
+            compression,
+            partition_id,
+            column_names,
+            metrics: ExecutionPlanMetricsSet::new(),
+            cache,
+        })
+    }
+
+    fn compression_to_parquet(&self) -> Result<Compression> {
+        match self.compression {
+            CompressionCodec::None => Ok(Compression::UNCOMPRESSED),
+            CompressionCodec::Zstd(level) => 
Ok(Compression::ZSTD(ZstdLevel::try_new(level)?)),
+            CompressionCodec::Lz4Frame => Ok(Compression::LZ4),
+            CompressionCodec::Snappy => Ok(Compression::SNAPPY),
+        }
+    }
+}
+
+impl DisplayAs for ParquetWriterExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "ParquetWriterExec: path={}, compression={:?}",
+                    self.output_path, self.compression
+                )
+            }
+            DisplayFormatType::TreeRender => unimplemented!(),
+        }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for ParquetWriterExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "ParquetWriterExec"
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        self.input.partition_statistics(None)
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.cache
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(ParquetWriterExec::try_new(
+                Arc::clone(&children[0]),
+                self.output_path.clone(),
+                self.compression.clone(),
+                self.partition_id,
+                self.column_names.clone(),
+            )?)),
+            _ => Err(DataFusionError::Internal(
+                "ParquetWriterExec requires exactly one child".to_string(),
+            )),
+        }
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let input = self.input.execute(partition, context)?;
+        let input_schema = self.schema();
+        let output_path = self.output_path.clone();
+        let compression = self.compression_to_parquet()?;
+        let column_names = self.column_names.clone();
+
+        assert_eq!(input_schema.fields().len(), column_names.len());
+
+        // Create output schema with correct column names
+        let output_schema = if !column_names.is_empty() {
+            // Replace the generic column names (col_0, col_1, etc.) with the 
actual names
+            let fields: Vec<_> = input_schema
+                .fields()
+                .iter()
+                .enumerate()
+                .map(|(i, field)| 
Arc::new(field.as_ref().clone().with_name(&column_names[i])))
+                .collect();
+            Arc::new(arrow::datatypes::Schema::new(fields))
+        } else {
+            // No column names provided, use input schema as-is
+            Arc::clone(&input_schema)
+        };
+
+        // Strip file:// or file: prefix if present
+        let local_path = output_path
+            .strip_prefix("file://")
+            .or_else(|| output_path.strip_prefix("file:"))
+            .unwrap_or(&output_path)
+            .to_string();
+
+        // Create output directory
+        std::fs::create_dir_all(&local_path).map_err(|e| {
+            DataFusionError::Execution(format!(
+                "Failed to create output directory '{}': {}",
+                local_path, e
+            ))
+        })?;
+
+        // Generate part file name for this partition
+        let part_file = format!("{}/part-{:05}.parquet", local_path, 
self.partition_id);
+
+        // Create the Parquet file
+        let file = File::create(&part_file).map_err(|e| {
+            DataFusionError::Execution(format!(
+                "Failed to create output file '{}': {}",
+                part_file, e
+            ))
+        })?;
+
+        // Configure writer properties
+        let props = WriterProperties::builder()
+            .set_compression(compression)
+            .build();
+
+        let mut writer = ArrowWriter::try_new(file, 
Arc::clone(&output_schema), Some(props))
+            .map_err(|e| DataFusionError::Execution(format!("Failed to create 
writer: {}", e)))?;
+
+        // Clone schema for use in async closure
+        let schema_for_write = Arc::clone(&output_schema);
+
+        // Write batches
+        let write_task = async move {
+            let mut stream = input;
+
+            while let Some(batch_result) = stream.try_next().await.transpose() 
{
+                let batch = batch_result?;
+
+                // Rename columns in the batch to match output schema
+                let renamed_batch = if !column_names.is_empty() {
+                    use arrow::record_batch::RecordBatch;
+                    RecordBatch::try_new(Arc::clone(&schema_for_write), 
batch.columns().to_vec())
+                        .map_err(|e| {
+                            DataFusionError::Execution(format!(
+                                "Failed to rename batch columns: {}",
+                                e
+                            ))
+                        })?
+                } else {
+                    batch
+                };
+
+                writer.write(&renamed_batch).map_err(|e| {
+                    DataFusionError::Execution(format!("Failed to write batch: 
{}", e))
+                })?;
+            }
+
+            writer.close().map_err(|e| {
+                DataFusionError::Execution(format!("Failed to close writer: 
{}", e))
+            })?;
+
+            // Return empty stream to indicate completion
+            Ok::<_, DataFusionError>(futures::stream::empty())
+        };
+
+        // Execute the write task and convert to a stream
+        use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            output_schema,
+            futures::stream::once(write_task).try_flatten(),
+        )))
+    }
+}
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 0fe04a5a4..b0746a6f8 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -22,7 +22,7 @@ use crate::{
     errors::ExpressionError,
     execution::{
         expressions::subquery::Subquery,
-        operators::{ExecutionError, ExpandExec, ScanExec},
+        operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec},
         serde::to_arrow_datatype,
         shuffle::ShuffleWriterExec,
     },
@@ -1448,6 +1448,38 @@ impl PhysicalPlanner {
                     )),
                 ))
             }
+            OpStruct::ParquetWriter(writer) => {
+                assert_eq!(children.len(), 1);
+                let (scans, child) = self.create_plan(&children[0], inputs, 
partition_count)?;
+
+                let codec = match writer.compression.try_into() {
+                    Ok(SparkCompressionCodec::None) => 
Ok(CompressionCodec::None),
+                    Ok(SparkCompressionCodec::Snappy) => 
Ok(CompressionCodec::Snappy),
+                    Ok(SparkCompressionCodec::Zstd) => 
Ok(CompressionCodec::Zstd(3)),
+                    Ok(SparkCompressionCodec::Lz4) => 
Ok(CompressionCodec::Lz4Frame),
+                    _ => Err(GeneralError(format!(
+                        "Unsupported parquet compression codec: {:?}",
+                        writer.compression
+                    ))),
+                }?;
+
+                let parquet_writer = Arc::new(ParquetWriterExec::try_new(
+                    Arc::clone(&child.native_plan),
+                    writer.output_path.clone(),
+                    codec,
+                    self.partition,
+                    writer.column_names.clone(),
+                )?);
+
+                Ok((
+                    scans,
+                    Arc::new(SparkPlan::new(
+                        spark_plan.plan_id,
+                        parquet_writer,
+                        vec![Arc::clone(&child)],
+                    )),
+                ))
+            }
             OpStruct::Expand(expand) => {
                 assert_eq!(children.len(), 1);
                 let (scans, child) = self.create_plan(&children[0], inputs, 
partition_count)?;
diff --git a/native/proto/src/proto/operator.proto 
b/native/proto/src/proto/operator.proto
index 94661a20e..a95832709 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -49,6 +49,7 @@ message Operator {
     Window window = 110;
     NativeScan native_scan = 111;
     IcebergScan iceberg_scan = 112;
+    ParquetWriter parquet_writer = 113;
   }
 }
 
@@ -236,6 +237,12 @@ message ShuffleWriter {
   bool tracing_enabled = 7;
 }
 
+message ParquetWriter {
+  string output_path = 1;
+  CompressionCodec compression = 2;
+  repeated string column_names = 4;
+}
+
 enum AggregateMode {
   Partial = 0;
   Final = 1;
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index 4baedc919..124188b64 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -32,7 +32,7 @@ import 
org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, Comet
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec}
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.command.{DataWritingCommandExec, 
ExecutedCommandExec}
 import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
ShuffledHashJoinExec, SortMergeJoinExec}
@@ -48,6 +48,7 @@ import org.apache.comet.serde.{CometOperatorSerde, 
Compatible, Incompatible, Ope
 import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.QueryPlanSerde.{serializeDataType, 
supportedDataType}
 import org.apache.comet.serde.operator._
+import org.apache.comet.serde.operator.CometDataWritingCommand
 
 object CometExecRule {
 
@@ -70,6 +71,13 @@ object CometExecRule {
       classOf[LocalTableScanExec] -> CometLocalTableScanExec,
       classOf[WindowExec] -> CometWindowExec)
 
+  /**
+   * DataWritingCommandExec is handled separately in convertNode since it 
doesn't follow the
+   * standard pattern of having CometNativeExec children.
+   */
+  val writeExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
+    Map(classOf[DataWritingCommandExec] -> CometDataWritingCommand)
+
   /**
    * Sinks that have a native plan of ScanExec.
    */
@@ -218,6 +226,23 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
         val nativeOp = operator2Proto(cometOp)
         CometScanWrapper(nativeOp.get, cometOp)
 
+      // Handle DataWritingCommandExec specially since it doesn't follow the 
standard pattern
+      case exec: DataWritingCommandExec =>
+        CometExecRule.writeExecs.get(classOf[DataWritingCommandExec]) match {
+          case Some(handler) if isOperatorEnabled(handler, exec) =>
+            val builder = 
OperatorOuterClass.Operator.newBuilder().setPlanId(exec.id)
+            handler
+              .asInstanceOf[CometOperatorSerde[DataWritingCommandExec]]
+              .convert(exec, builder)
+              .map(nativeOp =>
+                handler
+                  .asInstanceOf[CometOperatorSerde[DataWritingCommandExec]]
+                  .createExec(nativeOp, exec))
+              .getOrElse(exec)
+          case _ =>
+            exec
+        }
+
       // For AQE broadcast stage on a Comet broadcast exchange
       case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) =>
         newPlanWithProto(s, CometSinkPlaceHolder(_, s, s))
diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
new file mode 100644
index 000000000..46d01c887
--- /dev/null
+++ 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.serde.operator
+
+import java.util.Locale
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.comet.{CometNativeExec, CometNativeWriteExec}
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import 
org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, 
WriteFilesExec}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
+import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.serde.{CometOperatorSerde, Incompatible, 
OperatorOuterClass, SupportLevel, Unsupported}
+import org.apache.comet.serde.OperatorOuterClass.Operator
+import org.apache.comet.serde.QueryPlanSerde.serializeDataType
+
+/**
+ * CometOperatorSerde implementation for DataWritingCommandExec that converts 
Parquet write
+ * operations to use Comet's native Parquet writer.
+ */
+object CometDataWritingCommand extends 
CometOperatorSerde[DataWritingCommandExec] {
+
+  private val supportedCompressionCodes = Set("none", "snappy", "lz4", "zstd")
+
+  override def enabledConfig: Option[ConfigEntry[Boolean]] =
+    Some(CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED)
+
+  override def getSupportLevel(op: DataWritingCommandExec): SupportLevel = {
+    op.cmd match {
+      case cmd: InsertIntoHadoopFsRelationCommand =>
+        cmd.fileFormat match {
+          case _: ParquetFileFormat =>
+            if (!cmd.outputPath.toString.startsWith("file:")) {
+              return Unsupported(Some("Only local filesystem output paths are 
supported"))
+            }
+
+            if (cmd.bucketSpec.isDefined) {
+              return Unsupported(Some("Bucketed writes are not supported"))
+            }
+
+            if (cmd.partitionColumns.nonEmpty || 
cmd.staticPartitions.nonEmpty) {
+              return Unsupported(Some("Partitioned writes are not supported"))
+            }
+
+            if (cmd.query.output.exists(attr => 
DataTypeSupport.isComplexType(attr.dataType))) {
+              return Unsupported(Some("Complex types are not supported"))
+            }
+
+            val codec = parseCompressionCodec(cmd)
+            if (!supportedCompressionCodes.contains(codec)) {
+              return Unsupported(Some(s"Unsupported compression codec: 
$codec"))
+            }
+
+            Incompatible(Some("Parquet write support is highly experimental"))
+          case _ =>
+            Unsupported(Some("Only Parquet writes are supported"))
+        }
+      case other =>
+        Unsupported(Some(s"Unsupported write command: ${other.getClass}"))
+    }
+  }
+
+  override def convert(
+      op: DataWritingCommandExec,
+      builder: Operator.Builder,
+      childOp: Operator*): Option[OperatorOuterClass.Operator] = {
+
+    try {
+      val cmd = op.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand]
+
+      val scanOp = OperatorOuterClass.Scan
+        .newBuilder()
+        .setSource(cmd.query.nodeName)
+        .setArrowFfiSafe(false)
+
+      // Add fields from the query output schema
+      val scanTypes = cmd.query.output.flatMap { attr =>
+        serializeDataType(attr.dataType)
+      }
+
+      if (scanTypes.length != cmd.query.output.length) {
+        withInfo(op, "Cannot serialize data types for native write")
+        return None
+      }
+
+      scanTypes.foreach(scanOp.addFields)
+
+      val scanOperator = Operator
+        .newBuilder()
+        .setPlanId(op.id)
+        .setScan(scanOp.build())
+        .build()
+
+      val outputPath = cmd.outputPath.toString
+
+      val codec = parseCompressionCodec(cmd) match {
+        case "snappy" => OperatorOuterClass.CompressionCodec.Snappy
+        case "lz4" => OperatorOuterClass.CompressionCodec.Lz4
+        case "zstd" => OperatorOuterClass.CompressionCodec.Zstd
+        case "none" => OperatorOuterClass.CompressionCodec.None
+        case other =>
+          withInfo(op, s"Unsupported compression codec: $other")
+          return None
+      }
+
+      val writerOp = OperatorOuterClass.ParquetWriter
+        .newBuilder()
+        .setOutputPath(outputPath)
+        .setCompression(codec)
+        .addAllColumnNames(cmd.query.output.map(_.name).asJava)
+        .build()
+
+      val writerOperator = Operator
+        .newBuilder()
+        .setPlanId(op.id)
+        .addChildren(scanOperator)
+        .setParquetWriter(writerOp)
+        .build()
+
+      Some(writerOperator)
+    } catch {
+      case e: Exception =>
+        withInfo(
+          op,
+          "Failed to convert DataWritingCommandExec to native execution: " +
+            s"${e.getMessage}")
+        None
+    }
+  }
+
+  override def createExec(nativeOp: Operator, op: DataWritingCommandExec): 
CometNativeExec = {
+    val cmd = op.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand]
+    val outputPath = cmd.outputPath.toString
+
+    // Get the child plan from the WriteFilesExec or use the child directly
+    val childPlan = op.child match {
+      case writeFiles: WriteFilesExec =>
+        // The WriteFilesExec child should already be a Comet operator
+        writeFiles.child
+      case other =>
+        // Fallback: use the child directly
+        other
+    }
+
+    CometNativeWriteExec(nativeOp, childPlan, outputPath)
+  }
+
+  private def parseCompressionCodec(cmd: InsertIntoHadoopFsRelationCommand) = {
+    cmd.options
+      .getOrElse(
+        "compression",
+        SQLConf.get.getConfString(
+          SQLConf.PARQUET_COMPRESSION.key,
+          SQLConf.PARQUET_COMPRESSION.defaultValueString))
+      .toLowerCase(Locale.ROOT)
+  }
+
+}
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala
new file mode 100644
index 000000000..2617e8c60
--- /dev/null
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.comet
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import org.apache.comet.CometExecIterator
+import org.apache.comet.serde.OperatorOuterClass.Operator
+
+/**
+ * Comet physical operator for native Parquet write operations.
+ *
+ * This operator writes data to Parquet files using the native Comet engine. 
It wraps the child
+ * operator and adds a ParquetWriter operator on top.
+ *
+ * @param nativeOp
+ *   The native operator representing the write operation
+ * @param child
+ *   The child operator providing the data to write
+ * @param outputPath
+ *   The path where the Parquet file will be written
+ */
+case class CometNativeWriteExec(nativeOp: Operator, child: SparkPlan, 
outputPath: String)
+    extends CometNativeExec
+    with UnaryExecNode {
+
+  override def originalPlan: SparkPlan = child
+
+  override def serializedPlanOpt: SerializedPlan = {
+    val outputStream = new java.io.ByteArrayOutputStream()
+    nativeOp.writeTo(outputStream)
+    outputStream.close()
+    SerializedPlan(Some(outputStream.toByteArray))
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
+
+  override def nodeName: String = "CometNativeWrite"
+
+  override def doExecute(): RDD[InternalRow] = {
+    // Execute the native write
+    val resultRDD = doExecuteColumnar()
+    // Convert to empty InternalRow RDD (write operations typically return 
empty results)
+    resultRDD.mapPartitions { iter =>
+      // Consume all batches (they should be empty)
+      iter.foreach(_.close())
+      Iterator.empty
+    }
+  }
+
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    // Get the input data from the child operator
+    val childRDD = if (child.supportsColumnar) {
+      child.executeColumnar()
+    } else {
+      // If child doesn't support columnar, convert to columnar
+      child.execute().mapPartitionsInternal { _ =>
+        // TODO this could delegate to CometRowToColumnar, but maybe Comet
+        // does not need to support this case?
+        throw new UnsupportedOperationException(
+          "Row-based child operators not yet supported for native write")
+      }
+    }
+
+    // Capture metadata before the transformation
+    val numPartitions = childRDD.getNumPartitions
+    val numOutputCols = child.output.length
+
+    // Execute native write operation
+    childRDD.mapPartitionsInternal { iter =>
+      val nativeMetrics = CometMetricNode.fromCometPlan(this)
+
+      val outputStream = new java.io.ByteArrayOutputStream()
+      nativeOp.writeTo(outputStream)
+      outputStream.close()
+      val planBytes = outputStream.toByteArray
+
+      new CometExecIterator(
+        CometExec.newIterId,
+        Seq(iter),
+        numOutputCols,
+        planBytes,
+        nativeMetrics,
+        numPartitions,
+        org.apache.spark.TaskContext.getPartitionId(),
+        None,
+        Seq.empty)
+
+    }
+  }
+}
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
new file mode 100644
index 000000000..e4b8b5385
--- /dev/null
+++ 
b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet
+
+import java.io.File
+
+import scala.util.Random
+
+import org.apache.spark.sql.{CometTestBase, DataFrame}
+import org.apache.spark.sql.comet.CometNativeWriteExec
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.comet.CometConf
+import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, 
SchemaGenOptions}
+
+class CometParquetWriterSuite extends CometTestBase {
+
+  test("basic parquet write") {
+    // no support for fully native scan as input yet
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
+
+    withTempPath { dir =>
+      val outputPath = new File(dir, "output.parquet").getAbsolutePath
+
+      // Create test data and write it to a temp parquet file first
+      withTempPath { inputDir =>
+        val inputPath = new File(inputDir, "input.parquet").getAbsolutePath
+        val schema = FuzzDataGenerator.generateSchema(
+          SchemaGenOptions(generateArray = false, generateStruct = false, 
generateMap = false))
+        val df = FuzzDataGenerator.generateDataFrame(
+          new Random(42),
+          spark,
+          schema,
+          1000,
+          DataGenOptions(generateNegativeZero = false))
+        withSQLConf(
+          CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "false",
+          SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Denver") {
+          df.write.parquet(inputPath)
+        }
+
+        withSQLConf(
+          CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+          SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax",
+          
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> 
"true",
+          CometConf.COMET_EXEC_ENABLED.key -> "true") {
+          val df = spark.read.parquet(inputPath)
+
+          // Use a listener to capture the execution plan during write
+          var capturedPlan: Option[QueryExecution] = None
+
+          val listener = new org.apache.spark.sql.util.QueryExecutionListener {
+            override def onSuccess(
+                funcName: String,
+                qe: QueryExecution,
+                durationNs: Long): Unit = {
+              // Capture plans from write operations
+              if (funcName == "save" || funcName.contains("command")) {
+                capturedPlan = Some(qe)
+              }
+            }
+
+            override def onFailure(
+                funcName: String,
+                qe: QueryExecution,
+                exception: Exception): Unit = {}
+          }
+
+          spark.listenerManager.register(listener)
+
+          try {
+            // Perform native write
+            df.write.parquet(outputPath)
+
+            // Wait for listener to be called with timeout
+            val maxWaitTimeMs = 15000
+            val checkIntervalMs = 100
+            val maxIterations = maxWaitTimeMs / checkIntervalMs
+            var iterations = 0
+
+            while (capturedPlan.isEmpty && iterations < maxIterations) {
+              Thread.sleep(checkIntervalMs)
+              iterations += 1
+            }
+
+            // Verify that CometNativeWriteExec was used
+            assert(
+              capturedPlan.isDefined,
+              s"Listener was not called within ${maxWaitTimeMs}ms - no 
execution plan captured")
+
+            capturedPlan.foreach { qe =>
+              val executedPlan = qe.executedPlan
+              val hasNativeWrite = executedPlan.exists {
+                case _: CometNativeWriteExec => true
+                case d: DataWritingCommandExec =>
+                  d.child.exists {
+                    case _: CometNativeWriteExec => true
+                    case _ => false
+                  }
+                case _ => false
+              }
+
+              assert(
+                hasNativeWrite,
+                s"Expected CometNativeWriteExec in the plan, but 
got:\n${executedPlan.treeString}")
+            }
+          } finally {
+            spark.listenerManager.unregister(listener)
+          }
+
+          // Verify the data was written correctly
+          val resultDf = spark.read.parquet(outputPath)
+          assert(resultDf.count() == 1000, "Expected 1000 rows to be written")
+
+          // Verify multiple part files were created
+          val outputDir = new File(outputPath)
+          val partFiles = 
outputDir.listFiles().filter(_.getName.startsWith("part-"))
+          // With 1000 rows and default parallelism, we should get multiple 
partitions
+          assert(partFiles.length > 1, "Expected multiple part files to be 
created")
+
+          // read with and without Comet and compare
+          var sparkDf: DataFrame = null
+          var cometDf: DataFrame = null
+          withSQLConf(CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false") {
+            sparkDf = spark.read.parquet(outputPath)
+          }
+          withSQLConf(CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true") {
+            cometDf = spark.read.parquet(outputPath)
+          }
+          checkAnswer(sparkDf, cometDf)
+        }
+      }
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to