andrei-ionescu opened a new issue #3564: URL: https://github.com/apache/iceberg/issues/3564
Timezone implementation in Spark 3 leads to `Already closed files for partition` error. To reproduce follow the next steps... First, download the source file: [data-dimension-vehicle-20210609T222533Z-4cols-100K.csv](https://github.com/apache/iceberg/files/7546106/data-dimension-vehicle-20210609T222533Z-4cols-100K.csv) and CD where the file was downloaded. Start Spark 3.1.2 shell setting the timezone to `Europe/Bucharest` with: ``` spark-shell --master "local[8]" --driver-memory 4g \ --packages "org.apache.iceberg:iceberg-spark3-runtime:0.10.0,org.apache.iceberg:iceberg-core:0.10.0,org.apache.iceberg:iceberg-common:0.10.0,org.apache.iceberg:iceberg-api:0.10.0,org.apache.iceberg:iceberg-parquet:0.10.0,org.apache.iceberg:iceberg-spark3:0.10.0,org.apache.iceberg:iceberg-bundled-guava:0.10.0" \ --conf "spark.driver.extraJavaOptions=-Dpackaging.type=jar -Duser.timezone=Europe/Bucharest" \ --conf "spark.executor.extraJavaOptions=-Dpackaging.type=jar -Duser.timezone=Europe/Bucharest" ``` Copy paste the following lines of code: ``` import org.apache.iceberg.{PartitionSpec => IcebergPartitionSpec} import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import scala.collection.JavaConverters._ import spark.implicits._ val dst = "/tmp/tmp_iceberg_issue" val src = "data-dimension-vehicle-20210609T222533Z-4cols-100K.csv" val schema = StructType(Seq( StructField("licence_code", StringType, true), StructField("vehicle_make", StringType, true), StructField("fuel_type", StringType, true), StructField("dimension_load_date", TimestampType, true))) val ds = spark .read .format("csv") .schema(schema) .load(src) val iceSchema = SparkSchemaUtil.convert(ds.schema) val iceSpec = IcebergPartitionSpec.builderFor(iceSchema) .day("dimension_load_date", "load_date") .identity("fuel_type") .build() val newTbls = new HadoopTables(spark.sessionState.newHadoopConf).create(iceSchema, iceSpec, Map.empty[String, String].asJava, dst) ds .sort(to_date(col("dimension_load_date")), col("fuel_type")) .write .format("iceberg") .mode("overwrite") .save(dst) ``` The result is failure due to the `Already closed files for partition` exception: ``` Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357) ... 97 more Caused by: java.lang.IllegalStateException: Already closed files for partition: load_date=2021-06-09/fuel_type=Diesel at org.apache.iceberg.io.PartitionedWriter.write(PartitionedWriter.java:69) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) ``` The root cause is the difference in how Iceberg's date/time partitioning functions are implemented versus the date/time functions in Spark. The sorting step needed as a workaround for the `Already close files for partition` (as seen in Iceberg's documentation [here](https://iceberg.apache.org/#spark-writes/#writing-to-partitioned-tables)) when used on top of date/time functions, takes into account the timezone of the nodes it runs onto, while Iceberg date/time partitioning functions uses UTC. There is a workaround for this issue. That is to use the `UTC` timezone at JVM level so that Spark's timezone used in the date/time related functions will match the ones in Iceberg. So, starting Spark with the following will make it work: ``` spark-shell --master "local[8]" --driver-memory 4g \ --packages "org.apache.iceberg:iceberg-spark3-runtime:0.10.0,org.apache.iceberg:iceberg-core:0.10.0,org.apache.iceberg:iceberg-common:0.10.0,org.apache.iceberg:iceberg-api:0.10.0,org.apache.iceberg:iceberg-parquet:0.10.0,org.apache.iceberg:iceberg-spark3:0.10.0,org.apache.iceberg:iceberg-bundled-guava:0.10.0" \ --conf "spark.driver.extraJavaOptions=-Dpackaging.type=jar -Duser.timezone=UTC" \ --conf "spark.executor.extraJavaOptions=-Dpackaging.type=jar -Duser.timezone=UTC" ``` Using the `spark.conf.set("spark.sql.session.timeZone", true)` is not always reliable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
