adigerber opened a new issue, #7226:
URL: https://github.com/apache/iceberg/issues/7226

   ### Apache Iceberg version
   
   1.2.0 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   According to the documentation, when using Iceberg, one should set 
`spark.sql.extensions` to 
`org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions`, but 
setting this property seems to cause an exception to be thrown when trying to 
write to an Iceberg table using Spark structured streaming.
   
   The exception that is thrown is:
   
   ```
   Exception in thread "main" 
org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not 
currently supported
   === Streaming Query ===
   Identifier: [id = cfb83943-cd87-4c84-bf25-a290e8891e19, runId = 
ddf71690-7e5d-41f6-8a8e-84c425683a26]
   Current Committed Offsets: {}
   Current Available Offsets: {MemoryStream[ts#3,a#4,b#5]: 0}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   WriteToMicroBatchDataSource spark_catalog.default.test_iceberg_table, 
cfb83943-cd87-4c84-bf25-a290e8891e19, [path=test_iceberg_table, 
fanout-enabled=true, 
checkpointLocation=/tmp/spark-checkpoint-16659193840247202419], Append
   +- StreamingDataSourceV2Relation [ts#3, a#4, b#5], 
org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@4533d1cf, 
MemoryStream[ts#3,a#4,b#5]
   ```
   
   Code to reproduce:
   
   ```scala
   package com.example
   
   import org.apache.spark.sql.execution.streaming.MemoryStream
   import org.apache.spark.sql.streaming.Trigger
   import org.apache.spark.sql.{Encoders, SparkSession}
   
   import java.nio.file.Files
   import java.sql.Timestamp
   
   case class Bla(ts: Timestamp, a: String, b: Double)
   
   object MinEx {
     def main(args: Array[String]): Unit = {
       val warehouseDir = 
Files.createTempDirectory("spark-warehouse-iceberg-").toString
       val checkpointDir = 
Files.createTempDirectory("spark-checkpoint-").toString
       val spark = SparkSession.builder()
         .config("spark.sql.catalog.spark_catalog", 
"org.apache.iceberg.spark.SparkSessionCatalog")
         .config("spark.sql.catalog.spark_catalog.type", "hadoop")
         .config("spark.sql.catalog.spark_catalog.warehouse", warehouseDir)
         .config("spark.sql.warehouse.dir", warehouseDir)
         .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
         .appName("BugRepro")
         .master("local[*]")
         .enableHiveSupport()
         .getOrCreate()
   
       spark.sql("create table test_iceberg_table(ts timestamp, a string, b 
double) using iceberg partitioned by (days(ts))")
   
       implicit val sqlContext = spark.sqlContext
       implicit val encoder = Encoders.product[Bla]
       val memStream = MemoryStream[Bla]
       val now = System.currentTimeMillis()
       val day = 86400000
       memStream.addData(List(
         Bla(new Timestamp(now), "test", 12.34),
         Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
         Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
         Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
       ))
   
       memStream.toDF()
         .writeStream
         .format("iceberg")
         .outputMode("append")
         .option("path", "test_iceberg_table")
         .option("fanout-enabled", true)
         .option("checkpointLocation", checkpointDir)
         .trigger(Trigger.Once())
         .start()
         .awaitTermination()
     }
   }
   ```
   
   The code works as expected when the statement that configures 
`spark.sql.extensions` is commented out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to