andygrove opened a new issue, #3193:
URL: https://github.com/apache/datafusion-comet/issues/3193
### What is the problem the feature request solves?
## Summary
Add support for partitioned writes in native Parquet writes.
## Problem Description
Currently, native Parquet write support is blocked for any write operation
involving partitioned tables. This is enforced in
`CometDataWritingCommand.scala` at lines 66-68:
```scala
if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) {
return Unsupported(Some("Partitioned writes are not supported"))
}
Additionally, dynamic partition overwrite is hardcoded to false at line 196:
java.lang.Boolean.FALSE // dynamicPartitionOverwrite = false for now
This limitation prevents native Parquet writes from being used with
partitioned tables, which are a fundamental pattern in data lakes and modern
data warehousing.
Current Behavior
When attempting to write partitioned data using native Parquet writes:
- Any operation with partitionBy() falls back to Spark's default writer
- Static partition writes (e.g., INSERT INTO table PARTITION(year=2024))
fall back
- Dynamic partition writes fall back
- Native write acceleration cannot be used for partitioned tables
Expected Behavior
Native Parquet writes should support:
- Dynamic partitioning - Automatically partition data based on column values
- Static partitioning - Write to specific partition(s) specified by user
- Dynamic partition overwrite - Overwrite only the partitions present in the
data
- Static partition overwrite - Overwrite specific partitions
- Multi-level partitioning - Support multiple partition columns (e.g.,
year/month/day)
- Partition pruning - Efficient handling of partition directories
Impact
This is a critical blocker preventing native Parquet writes from being
enabled by default in production. Without partitioned write support:
- Cannot use native writes with Hive tables, Delta Lake, Apache Iceberg
- Incompatible with standard data lake partitioning patterns (e.g.,
date-based partitioning)
- Cannot leverage partition pruning benefits for query optimization
- Severely limits applicability to enterprise use cases
Partitioned tables are the norm in production Spark deployments, making this
feature essential for production readiness.
Technical Context
Affected Files:
-
spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala:66-68
- Where the check blocks partitioned writes
-
spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala:196
- Dynamic partition overwrite hardcoded to false
- native/core/src/execution/operators/parquet_writer.rs - Native writer
implementation that needs partitioning logic
Implementation Requirements:
- Partition column extraction from data rows
- Dynamic directory structure creation (e.g., year=2024/month=01/)
- Partition value encoding in directory names
- Support for both Hive-style (col=value) and non-Hive-style partitioning
- Integration with FileCommitProtocol for atomic partition writes
- Handling of partition schema vs data schema separation
Considerations:
- Partition discovery on read must remain compatible
- Small file problem - many partitions can create many small files
- Partition column ordering matters for query performance
- Special characters in partition values need proper encoding
Related Work
This is part of making native Parquet writes production-ready. Other
blockers include:
- Complex types support (#TBD)
- Cloud storage support (S3/GCS/Azure) (#TBD)
- Complete FileCommitProtocol implementation (#TBD)
Acceptance Criteria
- Support dynamic partitioning with single partition column
- Support dynamic partitioning with multiple partition columns
- Support static partition writes
- Support dynamic partition overwrite mode
- Support static partition overwrite mode
- Proper Hive-style partition directory naming (col=value)
- Handle special characters in partition values correctly
- Test coverage for all partitioning modes
- Integration tests with common partition patterns (date-based, etc.)
- Verify compatibility with Hive metastore partition metadata
- Performance benchmarks show native partitioned writes outperform Spark
default
Example Use Cases
```scala
// Dynamic partitioning
df.write
.partitionBy("year", "month", "day")
.parquet("/path/to/table")
// Static partition write
df.write
.mode("overwrite")
.insertInto("table PARTITION(year=2024, month=01)")
// Dynamic partition overwrite
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df.write
.mode("overwrite")
.partitionBy("date")
.parquet("/path/to/table")
```
### Describe the potential solution
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]