andygrove opened a new issue, #3104:
URL: https://github.com/apache/datafusion-comet/issues/3104
## What is the problem the feature request solves?
> **Note:** This issue was generated with AI assistance. The specification
details have been extracted from Spark documentation and may need verification.
Comet does not currently support the Spark `current_batch_timestamp`
function, causing queries using this function to fall back to Spark's JVM
execution instead of running natively on DataFusion.
The `CurrentBatchTimestamp` expression represents a timestamp value that
remains constant for the duration of a streaming batch. It is designed to
prevent optimizer from pushing it below stateful operators and allows
IncrementalExecution to substitute it with a literal value during streaming
query execution.
Supporting this expression would allow more Spark workloads to benefit from
Comet's native acceleration.
## Describe the potential solution
### Spark Specification
**Syntax:**
```sql
current_batch_timestamp()
```
**Arguments:**
| Argument | Type | Description |
|----------|------|-------------|
| timestampMs | Long | The timestamp value in milliseconds |
| dataType | DataType | The target data type (TimestampType,
TimestampNTZType, or DateType) |
| timeZoneId | Option[String] | Optional timezone identifier for
timezone-aware conversions |
**Return Type:** Returns one of the following data types based on the
configured `dataType` parameter:
- `TimestampType` - Returns timestamp in microseconds
- `TimestampNTZType` - Returns timezone-naive timestamp in microseconds
- `DateType` - Returns date as days since epoch
**Supported Data Types:**
This expression supports conversion to the following output data types:
- TimestampType (with timezone)
- TimestampNTZType (timezone-naive)
- DateType
**Edge Cases:**
- Null handling: Expression is marked as non-nullable (`nullable = false`)
- Empty input: Returns the configured timestamp value regardless of input
row content
- Timezone conversion: When no timeZoneId is provided, defaults to system
timezone for conversions
- Batch consistency: Same timestamp value is returned for all rows within a
single batch
**Examples:**
```sql
-- Returns current batch timestamp as timestamp type
SELECT current_batch_timestamp()
-- Can be used in streaming queries to get batch processing time
SELECT id, value, current_batch_timestamp() as batch_time FROM
streaming_table
```
```scala
// DataFrame API usage in streaming context
import org.apache.spark.sql.functions._
val streamingDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.load()
val withBatchTime = streamingDF.select(
col("value"),
expr("current_batch_timestamp()").as("batch_time")
)
```
### Implementation Approach
See the [Comet guide on adding new
expressions](https://datafusion.apache.org/comet/contributor-guide/adding_a_new_expression.html)
for detailed instructions.
1. **Scala Serde**: Add expression handler in
`spark/src/main/scala/org/apache/comet/serde/`
2. **Register**: Add to appropriate map in `QueryPlanSerde.scala`
3. **Protobuf**: Add message type in `native/proto/src/proto/expr.proto` if
needed
4. **Rust**: Implement in `native/spark-expr/src/` (check if DataFusion has
built-in support first)
## Additional context
**Difficulty:** Medium
**Spark Expression Class:**
`org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp`
**Related:**
- `CurrentTimestamp` - Returns current system timestamp
- `Literal` - Static literal values
- `TimeZoneAwareExpression` - Base trait for timezone-aware expressions
- `Nondeterministic` - Trait for expressions that return different values
across evaluations
---
*This issue was auto-generated from Spark reference documentation.*
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]