adigerber opened a new issue, #7226:
URL: https://github.com/apache/iceberg/issues/7226
### Apache Iceberg version
1.2.0 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
According to the documentation, when using Iceberg, one should set
`spark.sql.extensions` to
`org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions`, but
setting this property seems to cause an exception to be thrown when trying to
write to an Iceberg table using Spark structured streaming.
The exception that is thrown is:
```
Exception in thread "main"
org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not
currently supported
=== Streaming Query ===
Identifier: [id = cfb83943-cd87-4c84-bf25-a290e8891e19, runId =
ddf71690-7e5d-41f6-8a8e-84c425683a26]
Current Committed Offsets: {}
Current Available Offsets: {MemoryStream[ts#3,a#4,b#5]: 0}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource spark_catalog.default.test_iceberg_table,
cfb83943-cd87-4c84-bf25-a290e8891e19, [path=test_iceberg_table,
fanout-enabled=true,
checkpointLocation=/tmp/spark-checkpoint-16659193840247202419], Append
+- StreamingDataSourceV2Relation [ts#3, a#4, b#5],
org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@4533d1cf,
MemoryStream[ts#3,a#4,b#5]
```
Code to reproduce:
```scala
package com.example
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}
import java.nio.file.Files
import java.sql.Timestamp
case class Bla(ts: Timestamp, a: String, b: Double)
object MinEx {
def main(args: Array[String]): Unit = {
val warehouseDir =
Files.createTempDirectory("spark-warehouse-iceberg-").toString
val checkpointDir =
Files.createTempDirectory("spark-checkpoint-").toString
val spark = SparkSession.builder()
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", warehouseDir)
.config("spark.sql.warehouse.dir", warehouseDir)
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.appName("BugRepro")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
spark.sql("create table test_iceberg_table(ts timestamp, a string, b
double) using iceberg partitioned by (days(ts))")
implicit val sqlContext = spark.sqlContext
implicit val encoder = Encoders.product[Bla]
val memStream = MemoryStream[Bla]
val now = System.currentTimeMillis()
val day = 86400000
memStream.addData(List(
Bla(new Timestamp(now), "test", 12.34),
Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
))
memStream.toDF()
.writeStream
.format("iceberg")
.outputMode("append")
.option("path", "test_iceberg_table")
.option("fanout-enabled", true)
.option("checkpointLocation", checkpointDir)
.trigger(Trigger.Once())
.start()
.awaitTermination()
}
}
```
The code works as expected when the statement that configures
`spark.sql.extensions` is commented out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]