This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new aff07d088 feat: Comet Writer should respect object store settings 
(#3042)
aff07d088 is described below

commit aff07d088de1babbdb8647b1c94fb40b4a7fb763
Author: Oleks V <[email protected]>
AuthorDate: Wed Jan 14 16:14:39 2026 -0800

    feat: Comet Writer should respect object store settings (#3042)
---
 .../core/src/execution/operators/parquet_writer.rs | 120 ++++++++++++---------
 native/core/src/execution/planner.rs               |   7 ++
 native/core/src/parquet/parquet_support.rs         |  22 ++--
 native/proto/src/proto/operator.proto              |   7 ++
 .../serde/operator/CometDataWritingCommand.scala   |  20 +++-
 5 files changed, 111 insertions(+), 65 deletions(-)

diff --git a/native/core/src/execution/operators/parquet_writer.rs 
b/native/core/src/execution/operators/parquet_writer.rs
index 2ca1e9cfd..6de1da5b4 100644
--- a/native/core/src/execution/operators/parquet_writer.rs
+++ b/native/core/src/execution/operators/parquet_writer.rs
@@ -19,6 +19,7 @@
 
 use std::{
     any::Any,
+    collections::HashMap,
     fmt,
     fmt::{Debug, Formatter},
     fs::File,
@@ -26,9 +27,12 @@ use std::{
     sync::Arc,
 };
 
-use opendal::{services::Hdfs, Operator};
-use url::Url;
+use opendal::Operator;
 
+use crate::execution::shuffle::CompressionCodec;
+use crate::parquet::parquet_support::{
+    create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs,
+};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
@@ -50,8 +54,7 @@ use parquet::{
     basic::{Compression, ZstdLevel},
     file::properties::WriterProperties,
 };
-
-use crate::execution::shuffle::CompressionCodec;
+use url::Url;
 
 /// Enum representing different types of Arrow writers based on storage backend
 enum ParquetWriter {
@@ -200,6 +203,8 @@ pub struct ParquetWriterExec {
     partition_id: i32,
     /// Column names to use in the output Parquet file
     column_names: Vec<String>,
+    /// Object store configuration options
+    object_store_options: HashMap<String, String>,
     /// Metrics
     metrics: ExecutionPlanMetricsSet,
     /// Cache for plan properties
@@ -218,6 +223,7 @@ impl ParquetWriterExec {
         compression: CompressionCodec,
         partition_id: i32,
         column_names: Vec<String>,
+        object_store_options: HashMap<String, String>,
     ) -> Result<Self> {
         // Preserve the input's partitioning so each partition writes its own 
file
         let input_partitioning = input.output_partitioning().clone();
@@ -238,6 +244,7 @@ impl ParquetWriterExec {
             compression,
             partition_id,
             column_names,
+            object_store_options,
             metrics: ExecutionPlanMetricsSet::new(),
             cache,
         })
@@ -255,10 +262,11 @@ impl ParquetWriterExec {
     /// Create an Arrow writer based on the storage scheme
     ///
     /// # Arguments
-    /// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local")
     /// * `output_file_path` - The full path to the output file
     /// * `schema` - The Arrow schema for the Parquet file
     /// * `props` - Writer properties including compression
+    /// * `runtime_env` - Runtime environment for object store registration
+    /// * `object_store_options` - Configuration options for object store
     ///
     /// # Returns
     /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme
@@ -267,71 +275,61 @@ impl ParquetWriterExec {
         output_file_path: &str,
         schema: SchemaRef,
         props: WriterProperties,
+        runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
+        object_store_options: &HashMap<String, String>,
     ) -> Result<ParquetWriter> {
-        // Determine storage scheme from output_file_path
-        let storage_scheme = if output_file_path.starts_with("hdfs://") {
-            "hdfs"
-        } else if output_file_path.starts_with("s3://") || 
output_file_path.starts_with("s3a://") {
-            "s3"
-        } else {
-            "local"
-        };
+        // Parse URL and match on storage scheme directly
+        let url = Url::parse(output_file_path).map_err(|e| {
+            DataFusionError::Execution(format!("Failed to parse URL '{}': {}", 
output_file_path, e))
+        })?;
 
-        match storage_scheme {
-            "hdfs" => {
-                // Parse the output_file_path to extract namenode and path
-                // Expected format: hdfs://namenode:port/path/to/file
-                let url = Url::parse(output_file_path).map_err(|e| {
+        if is_hdfs_scheme(&url, object_store_options) {
+            // HDFS storage
+            {
+                // Use prepare_object_store_with_configs to create and 
register the object store
+                let (_object_store_url, object_store_path) = 
prepare_object_store_with_configs(
+                    runtime_env,
+                    output_file_path.to_string(),
+                    object_store_options,
+                )
+                .map_err(|e| {
                     DataFusionError::Execution(format!(
-                        "Failed to parse HDFS URL '{}': {}",
+                        "Failed to prepare object store for '{}': {}",
                         output_file_path, e
                     ))
                 })?;
 
-                // Extract namenode (scheme + host + port)
-                let namenode = format!(
-                    "{}://{}{}",
-                    url.scheme(),
-                    url.host_str().unwrap_or("localhost"),
-                    url.port()
-                        .map(|p| format!(":{}", p))
-                        .unwrap_or_else(|| ":9000".to_string())
-                );
-
-                // Extract the path (without the scheme and host)
-                let hdfs_path = url.path().to_string();
-
                 // For remote storage (HDFS, S3), write to an in-memory buffer
                 let buffer = Vec::new();
                 let cursor = Cursor::new(buffer);
                 let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, 
schema, Some(props))
                     .map_err(|e| {
-                        DataFusionError::Execution(format!(
-                            "Failed to create {} writer: {}",
-                            storage_scheme, e
-                        ))
+                        DataFusionError::Execution(format!("Failed to create 
HDFS writer: {}", e))
                     })?;
 
-                let builder = Hdfs::default().name_node(&namenode);
-                let op = Operator::new(builder)
-                    .map_err(|e| {
-                        DataFusionError::Execution(format!(
-                            "Failed to create HDFS operator for '{}' 
(namenode: {}): {}",
-                            output_file_path, namenode, e
-                        ))
-                    })?
-                    .finish();
+                // Create HDFS operator with configuration options using the 
helper function
+                let op = create_hdfs_operator(&url).map_err(|e| {
+                    DataFusionError::Execution(format!(
+                        "Failed to create HDFS operator for '{}': {}",
+                        output_file_path, e
+                    ))
+                })?;
 
                 // HDFS writer will be created lazily on first write
-                // Use only the path part for the HDFS writer
+                // Use the path from prepare_object_store_with_configs
                 Ok(ParquetWriter::Remote(
                     arrow_parquet_buffer_writer,
                     None,
                     op,
-                    hdfs_path,
+                    object_store_path.to_string(),
                 ))
             }
-            "local" => {
+        } else if output_file_path.starts_with("file://")
+            || output_file_path.starts_with("file:")
+            || !output_file_path.contains("://")
+        {
+            // Local file system
+            {
                 // For a local file system, write directly to file
                 // Strip file:// or file: prefix if present
                 let local_path = output_file_path
@@ -368,10 +366,12 @@ impl ParquetWriterExec {
                 })?;
                 Ok(ParquetWriter::LocalFile(writer))
             }
-            _ => Err(DataFusionError::Execution(format!(
-                "Unsupported storage scheme: {}",
-                storage_scheme
-            ))),
+        } else {
+            // Unsupported storage scheme
+            Err(DataFusionError::Execution(format!(
+                "Unsupported storage scheme in path: {}",
+                output_file_path
+            )))
         }
     }
 }
@@ -435,6 +435,7 @@ impl ExecutionPlan for ParquetWriterExec {
                 self.compression.clone(),
                 self.partition_id,
                 self.column_names.clone(),
+                self.object_store_options.clone(),
             )?)),
             _ => Err(DataFusionError::Internal(
                 "ParquetWriterExec requires exactly one child".to_string(),
@@ -454,6 +455,7 @@ impl ExecutionPlan for ParquetWriterExec {
         let bytes_written = 
MetricBuilder::new(&self.metrics).counter("bytes_written", partition);
         let rows_written = 
MetricBuilder::new(&self.metrics).counter("rows_written", partition);
 
+        let runtime_env = context.runtime_env();
         let input = self.input.execute(partition, context)?;
         let input_schema = self.input.schema();
         let work_dir = self.work_dir.clone();
@@ -488,7 +490,14 @@ impl ExecutionPlan for ParquetWriterExec {
             .set_compression(compression)
             .build();
 
-        let mut writer = Self::create_arrow_writer(&part_file, 
Arc::clone(&output_schema), props)?;
+        let object_store_options = self.object_store_options.clone();
+        let mut writer = Self::create_arrow_writer(
+            &part_file,
+            Arc::clone(&output_schema),
+            props,
+            runtime_env,
+            &object_store_options,
+        )?;
 
         // Clone schema for use in async closure
         let schema_for_write = Arc::clone(&output_schema);
@@ -732,10 +741,14 @@ mod tests {
         // Create ParquetWriter using the create_arrow_writer method
         // Use full HDFS URL format
         let full_output_path = format!("hdfs://namenode:9000{}", output_path);
+        let session_ctx = datafusion::prelude::SessionContext::new();
+        let runtime_env = session_ctx.runtime_env();
         let mut writer = ParquetWriterExec::create_arrow_writer(
             &full_output_path,
             create_test_record_batch(1)?.schema(),
             props,
+            runtime_env,
+            &HashMap::new(),
         )?;
 
         // Write 5 batches in a loop
@@ -802,6 +815,7 @@ mod tests {
             CompressionCodec::None,
             0, // partition_id
             column_names,
+            HashMap::new(), // object_store_options
         )?;
 
         // Create a session context and execute the plan
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 93fbb59c1..7d806213d 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -1248,6 +1248,12 @@ impl PhysicalPlanner {
                     ))),
                 }?;
 
+                let object_store_options: HashMap<String, String> = writer
+                    .object_store_options
+                    .iter()
+                    .map(|(k, v)| (k.clone(), v.clone()))
+                    .collect();
+
                 let parquet_writer = Arc::new(ParquetWriterExec::try_new(
                     Arc::clone(&child.native_plan),
                     writer.output_path.clone(),
@@ -1261,6 +1267,7 @@ impl PhysicalPlanner {
                     codec,
                     self.partition,
                     writer.column_names.clone(),
+                    object_store_options,
                 )?);
 
                 Ok((
diff --git a/native/core/src/parquet/parquet_support.rs 
b/native/core/src/parquet/parquet_support.rs
index c9a27d7dc..e7ff5630f 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -358,7 +358,7 @@ fn value_field(entries_field: &FieldRef) -> 
Option<FieldRef> {
     }
 }
 
-fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) 
-> bool {
+pub fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, 
String>) -> bool {
     const COMET_LIBHDFS_SCHEMES_KEY: &str = "fs.comet.libhdfs.schemes";
     let scheme = url.scheme();
     if let Some(libhdfs_schemes) = 
object_store_configs.get(COMET_LIBHDFS_SCHEMES_KEY) {
@@ -387,20 +387,26 @@ fn create_hdfs_object_store(
     }
 }
 
-// Creates an HDFS object store from a URL using OpenDAL
+// Creates an OpenDAL HDFS Operator from a URL with optional configuration
 #[cfg(feature = "hdfs-opendal")]
-fn create_hdfs_object_store(
-    url: &Url,
-) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
+pub(crate) fn create_hdfs_operator(url: &Url) -> Result<opendal::Operator, 
object_store::Error> {
     let name_node = get_name_node_uri(url)?;
     let builder = opendal::services::Hdfs::default().name_node(&name_node);
 
-    let op = opendal::Operator::new(builder)
+    opendal::Operator::new(builder)
         .map_err(|error| object_store::Error::Generic {
             store: "hdfs-opendal",
             source: error.into(),
-        })?
-        .finish();
+        })
+        .map(|op| op.finish())
+}
+
+// Creates an HDFS object store from a URL using OpenDAL
+#[cfg(feature = "hdfs-opendal")]
+pub(crate) fn create_hdfs_object_store(
+    url: &Url,
+) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
+    let op = create_hdfs_operator(url)?;
     let store = object_store_opendal::OpendalStore::new(op);
     let path = Path::parse(url.path())?;
     Ok((Box::new(store), path))
diff --git a/native/proto/src/proto/operator.proto 
b/native/proto/src/proto/operator.proto
index 015b5d96b..a1a3c4bed 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -245,6 +245,13 @@ message ParquetWriter {
   optional string job_id = 6;
   // Task attempt ID for this specific task
   optional int32 task_attempt_id = 7;
+  // Options for configuring object stores such as AWS S3, GCS, etc. The 
key-value pairs are taken
+  // from Hadoop configuration for compatibility with Hadoop FileSystem 
implementations of object
+  // stores.
+  // The configuration values have hadoop. or spark.hadoop. prefix trimmed. 
For instance, the
+  // configuration value "spark.hadoop.fs.s3a.access.key" will be stored as 
"fs.s3a.access.key" in
+  // the map.
+  map<string, string> object_store_options = 8;
 }
 
 enum AggregateMode {
diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
index 834932984..1f3c3f40c 100644
--- 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
+++ 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
@@ -19,6 +19,7 @@
 
 package org.apache.comet.serde.operator
 
+import java.net.URI
 import java.util.Locale
 
 import scala.jdk.CollectionConverters._
@@ -32,6 +33,7 @@ import org.apache.spark.sql.internal.SQLConf
 
 import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
 import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.objectstore.NativeConfig
 import org.apache.comet.serde.{CometOperatorSerde, Incompatible, 
OperatorOuterClass, SupportLevel, Unsupported}
 import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.QueryPlanSerde.serializeDataType
@@ -126,14 +128,24 @@ object CometDataWritingCommand extends 
CometOperatorSerde[DataWritingCommandExec
           return None
       }
 
-      val writerOp = OperatorOuterClass.ParquetWriter
+      val writerOpBuilder = OperatorOuterClass.ParquetWriter
         .newBuilder()
         .setOutputPath(outputPath)
         .setCompression(codec)
         .addAllColumnNames(cmd.query.output.map(_.name).asJava)
-        // Note: work_dir, job_id, and task_attempt_id will be set at 
execution time
-        // in CometNativeWriteExec, as they depend on the Spark task context
-        .build()
+      // Note: work_dir, job_id, and task_attempt_id will be set at execution 
time
+      // in CometNativeWriteExec, as they depend on the Spark task context
+
+      // Collect S3/cloud storage configurations
+      val session = op.session
+      val hadoopConf = 
session.sessionState.newHadoopConfWithOptions(cmd.options)
+      val objectStoreOptions =
+        NativeConfig.extractObjectStoreOptions(hadoopConf, 
URI.create(outputPath))
+      objectStoreOptions.foreach { case (key, value) =>
+        writerOpBuilder.putObjectStoreOptions(key, value)
+      }
+
+      val writerOp = writerOpBuilder.build()
 
       val writerOperator = Operator
         .newBuilder()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to