This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new aff07d088 feat: Comet Writer should respect object store settings
(#3042)
aff07d088 is described below
commit aff07d088de1babbdb8647b1c94fb40b4a7fb763
Author: Oleks V <[email protected]>
AuthorDate: Wed Jan 14 16:14:39 2026 -0800
feat: Comet Writer should respect object store settings (#3042)
---
.../core/src/execution/operators/parquet_writer.rs | 120 ++++++++++++---------
native/core/src/execution/planner.rs | 7 ++
native/core/src/parquet/parquet_support.rs | 22 ++--
native/proto/src/proto/operator.proto | 7 ++
.../serde/operator/CometDataWritingCommand.scala | 20 +++-
5 files changed, 111 insertions(+), 65 deletions(-)
diff --git a/native/core/src/execution/operators/parquet_writer.rs
b/native/core/src/execution/operators/parquet_writer.rs
index 2ca1e9cfd..6de1da5b4 100644
--- a/native/core/src/execution/operators/parquet_writer.rs
+++ b/native/core/src/execution/operators/parquet_writer.rs
@@ -19,6 +19,7 @@
use std::{
any::Any,
+ collections::HashMap,
fmt,
fmt::{Debug, Formatter},
fs::File,
@@ -26,9 +27,12 @@ use std::{
sync::Arc,
};
-use opendal::{services::Hdfs, Operator};
-use url::Url;
+use opendal::Operator;
+use crate::execution::shuffle::CompressionCodec;
+use crate::parquet::parquet_support::{
+ create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs,
+};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
@@ -50,8 +54,7 @@ use parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};
-
-use crate::execution::shuffle::CompressionCodec;
+use url::Url;
/// Enum representing different types of Arrow writers based on storage backend
enum ParquetWriter {
@@ -200,6 +203,8 @@ pub struct ParquetWriterExec {
partition_id: i32,
/// Column names to use in the output Parquet file
column_names: Vec<String>,
+ /// Object store configuration options
+ object_store_options: HashMap<String, String>,
/// Metrics
metrics: ExecutionPlanMetricsSet,
/// Cache for plan properties
@@ -218,6 +223,7 @@ impl ParquetWriterExec {
compression: CompressionCodec,
partition_id: i32,
column_names: Vec<String>,
+ object_store_options: HashMap<String, String>,
) -> Result<Self> {
// Preserve the input's partitioning so each partition writes its own
file
let input_partitioning = input.output_partitioning().clone();
@@ -238,6 +244,7 @@ impl ParquetWriterExec {
compression,
partition_id,
column_names,
+ object_store_options,
metrics: ExecutionPlanMetricsSet::new(),
cache,
})
@@ -255,10 +262,11 @@ impl ParquetWriterExec {
/// Create an Arrow writer based on the storage scheme
///
/// # Arguments
- /// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local")
/// * `output_file_path` - The full path to the output file
/// * `schema` - The Arrow schema for the Parquet file
/// * `props` - Writer properties including compression
+ /// * `runtime_env` - Runtime environment for object store registration
+ /// * `object_store_options` - Configuration options for object store
///
/// # Returns
/// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme
@@ -267,71 +275,61 @@ impl ParquetWriterExec {
output_file_path: &str,
schema: SchemaRef,
props: WriterProperties,
+ runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
+ object_store_options: &HashMap<String, String>,
) -> Result<ParquetWriter> {
- // Determine storage scheme from output_file_path
- let storage_scheme = if output_file_path.starts_with("hdfs://") {
- "hdfs"
- } else if output_file_path.starts_with("s3://") ||
output_file_path.starts_with("s3a://") {
- "s3"
- } else {
- "local"
- };
+ // Parse URL and match on storage scheme directly
+ let url = Url::parse(output_file_path).map_err(|e| {
+ DataFusionError::Execution(format!("Failed to parse URL '{}': {}",
output_file_path, e))
+ })?;
- match storage_scheme {
- "hdfs" => {
- // Parse the output_file_path to extract namenode and path
- // Expected format: hdfs://namenode:port/path/to/file
- let url = Url::parse(output_file_path).map_err(|e| {
+ if is_hdfs_scheme(&url, object_store_options) {
+ // HDFS storage
+ {
+ // Use prepare_object_store_with_configs to create and
register the object store
+ let (_object_store_url, object_store_path) =
prepare_object_store_with_configs(
+ runtime_env,
+ output_file_path.to_string(),
+ object_store_options,
+ )
+ .map_err(|e| {
DataFusionError::Execution(format!(
- "Failed to parse HDFS URL '{}': {}",
+ "Failed to prepare object store for '{}': {}",
output_file_path, e
))
})?;
- // Extract namenode (scheme + host + port)
- let namenode = format!(
- "{}://{}{}",
- url.scheme(),
- url.host_str().unwrap_or("localhost"),
- url.port()
- .map(|p| format!(":{}", p))
- .unwrap_or_else(|| ":9000".to_string())
- );
-
- // Extract the path (without the scheme and host)
- let hdfs_path = url.path().to_string();
-
// For remote storage (HDFS, S3), write to an in-memory buffer
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor,
schema, Some(props))
.map_err(|e| {
- DataFusionError::Execution(format!(
- "Failed to create {} writer: {}",
- storage_scheme, e
- ))
+ DataFusionError::Execution(format!("Failed to create
HDFS writer: {}", e))
})?;
- let builder = Hdfs::default().name_node(&namenode);
- let op = Operator::new(builder)
- .map_err(|e| {
- DataFusionError::Execution(format!(
- "Failed to create HDFS operator for '{}'
(namenode: {}): {}",
- output_file_path, namenode, e
- ))
- })?
- .finish();
+ // Create HDFS operator with configuration options using the
helper function
+ let op = create_hdfs_operator(&url).map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to create HDFS operator for '{}': {}",
+ output_file_path, e
+ ))
+ })?;
// HDFS writer will be created lazily on first write
- // Use only the path part for the HDFS writer
+ // Use the path from prepare_object_store_with_configs
Ok(ParquetWriter::Remote(
arrow_parquet_buffer_writer,
None,
op,
- hdfs_path,
+ object_store_path.to_string(),
))
}
- "local" => {
+ } else if output_file_path.starts_with("file://")
+ || output_file_path.starts_with("file:")
+ || !output_file_path.contains("://")
+ {
+ // Local file system
+ {
// For a local file system, write directly to file
// Strip file:// or file: prefix if present
let local_path = output_file_path
@@ -368,10 +366,12 @@ impl ParquetWriterExec {
})?;
Ok(ParquetWriter::LocalFile(writer))
}
- _ => Err(DataFusionError::Execution(format!(
- "Unsupported storage scheme: {}",
- storage_scheme
- ))),
+ } else {
+ // Unsupported storage scheme
+ Err(DataFusionError::Execution(format!(
+ "Unsupported storage scheme in path: {}",
+ output_file_path
+ )))
}
}
}
@@ -435,6 +435,7 @@ impl ExecutionPlan for ParquetWriterExec {
self.compression.clone(),
self.partition_id,
self.column_names.clone(),
+ self.object_store_options.clone(),
)?)),
_ => Err(DataFusionError::Internal(
"ParquetWriterExec requires exactly one child".to_string(),
@@ -454,6 +455,7 @@ impl ExecutionPlan for ParquetWriterExec {
let bytes_written =
MetricBuilder::new(&self.metrics).counter("bytes_written", partition);
let rows_written =
MetricBuilder::new(&self.metrics).counter("rows_written", partition);
+ let runtime_env = context.runtime_env();
let input = self.input.execute(partition, context)?;
let input_schema = self.input.schema();
let work_dir = self.work_dir.clone();
@@ -488,7 +490,14 @@ impl ExecutionPlan for ParquetWriterExec {
.set_compression(compression)
.build();
- let mut writer = Self::create_arrow_writer(&part_file,
Arc::clone(&output_schema), props)?;
+ let object_store_options = self.object_store_options.clone();
+ let mut writer = Self::create_arrow_writer(
+ &part_file,
+ Arc::clone(&output_schema),
+ props,
+ runtime_env,
+ &object_store_options,
+ )?;
// Clone schema for use in async closure
let schema_for_write = Arc::clone(&output_schema);
@@ -732,10 +741,14 @@ mod tests {
// Create ParquetWriter using the create_arrow_writer method
// Use full HDFS URL format
let full_output_path = format!("hdfs://namenode:9000{}", output_path);
+ let session_ctx = datafusion::prelude::SessionContext::new();
+ let runtime_env = session_ctx.runtime_env();
let mut writer = ParquetWriterExec::create_arrow_writer(
&full_output_path,
create_test_record_batch(1)?.schema(),
props,
+ runtime_env,
+ &HashMap::new(),
)?;
// Write 5 batches in a loop
@@ -802,6 +815,7 @@ mod tests {
CompressionCodec::None,
0, // partition_id
column_names,
+ HashMap::new(), // object_store_options
)?;
// Create a session context and execute the plan
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 93fbb59c1..7d806213d 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -1248,6 +1248,12 @@ impl PhysicalPlanner {
))),
}?;
+ let object_store_options: HashMap<String, String> = writer
+ .object_store_options
+ .iter()
+ .map(|(k, v)| (k.clone(), v.clone()))
+ .collect();
+
let parquet_writer = Arc::new(ParquetWriterExec::try_new(
Arc::clone(&child.native_plan),
writer.output_path.clone(),
@@ -1261,6 +1267,7 @@ impl PhysicalPlanner {
codec,
self.partition,
writer.column_names.clone(),
+ object_store_options,
)?);
Ok((
diff --git a/native/core/src/parquet/parquet_support.rs
b/native/core/src/parquet/parquet_support.rs
index c9a27d7dc..e7ff5630f 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -358,7 +358,7 @@ fn value_field(entries_field: &FieldRef) ->
Option<FieldRef> {
}
}
-fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>)
-> bool {
+pub fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String,
String>) -> bool {
const COMET_LIBHDFS_SCHEMES_KEY: &str = "fs.comet.libhdfs.schemes";
let scheme = url.scheme();
if let Some(libhdfs_schemes) =
object_store_configs.get(COMET_LIBHDFS_SCHEMES_KEY) {
@@ -387,20 +387,26 @@ fn create_hdfs_object_store(
}
}
-// Creates an HDFS object store from a URL using OpenDAL
+// Creates an OpenDAL HDFS Operator from a URL with optional configuration
#[cfg(feature = "hdfs-opendal")]
-fn create_hdfs_object_store(
- url: &Url,
-) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
+pub(crate) fn create_hdfs_operator(url: &Url) -> Result<opendal::Operator,
object_store::Error> {
let name_node = get_name_node_uri(url)?;
let builder = opendal::services::Hdfs::default().name_node(&name_node);
- let op = opendal::Operator::new(builder)
+ opendal::Operator::new(builder)
.map_err(|error| object_store::Error::Generic {
store: "hdfs-opendal",
source: error.into(),
- })?
- .finish();
+ })
+ .map(|op| op.finish())
+}
+
+// Creates an HDFS object store from a URL using OpenDAL
+#[cfg(feature = "hdfs-opendal")]
+pub(crate) fn create_hdfs_object_store(
+ url: &Url,
+) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
+ let op = create_hdfs_operator(url)?;
let store = object_store_opendal::OpendalStore::new(op);
let path = Path::parse(url.path())?;
Ok((Box::new(store), path))
diff --git a/native/proto/src/proto/operator.proto
b/native/proto/src/proto/operator.proto
index 015b5d96b..a1a3c4bed 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -245,6 +245,13 @@ message ParquetWriter {
optional string job_id = 6;
// Task attempt ID for this specific task
optional int32 task_attempt_id = 7;
+ // Options for configuring object stores such as AWS S3, GCS, etc. The
key-value pairs are taken
+ // from Hadoop configuration for compatibility with Hadoop FileSystem
implementations of object
+ // stores.
+ // The configuration values have hadoop. or spark.hadoop. prefix trimmed.
For instance, the
+ // configuration value "spark.hadoop.fs.s3a.access.key" will be stored as
"fs.s3a.access.key" in
+ // the map.
+ map<string, string> object_store_options = 8;
}
enum AggregateMode {
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
index 834932984..1f3c3f40c 100644
---
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
+++
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
@@ -19,6 +19,7 @@
package org.apache.comet.serde.operator
+import java.net.URI
import java.util.Locale
import scala.jdk.CollectionConverters._
@@ -32,6 +33,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.serde.{CometOperatorSerde, Incompatible,
OperatorOuterClass, SupportLevel, Unsupported}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.serializeDataType
@@ -126,14 +128,24 @@ object CometDataWritingCommand extends
CometOperatorSerde[DataWritingCommandExec
return None
}
- val writerOp = OperatorOuterClass.ParquetWriter
+ val writerOpBuilder = OperatorOuterClass.ParquetWriter
.newBuilder()
.setOutputPath(outputPath)
.setCompression(codec)
.addAllColumnNames(cmd.query.output.map(_.name).asJava)
- // Note: work_dir, job_id, and task_attempt_id will be set at
execution time
- // in CometNativeWriteExec, as they depend on the Spark task context
- .build()
+ // Note: work_dir, job_id, and task_attempt_id will be set at execution
time
+ // in CometNativeWriteExec, as they depend on the Spark task context
+
+ // Collect S3/cloud storage configurations
+ val session = op.session
+ val hadoopConf =
session.sessionState.newHadoopConfWithOptions(cmd.options)
+ val objectStoreOptions =
+ NativeConfig.extractObjectStoreOptions(hadoopConf,
URI.create(outputPath))
+ objectStoreOptions.foreach { case (key, value) =>
+ writerOpBuilder.putObjectStoreOptions(key, value)
+ }
+
+ val writerOp = writerOpBuilder.build()
val writerOperator = Operator
.newBuilder()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]