kbendick edited a comment on issue #2962: URL: https://github.com/apache/iceberg/issues/2962#issuecomment-925357611
Hi @prodeezy @hankfanchiu , sorry for the delay in getting back to you on this. I was looking into a different parquet issue,[ PARQUET-2078](https://issues.apache.org/jira/browse/PARQUET-2078) so my apologies for overlooking this one. I'm also able to reproduce this using Spark 3.1.2 with Iceberg 0.9.0 (compile only [email protected]) to Iceberg 0.11.0 (compile only [email protected]). I get the same error ``` java.lang.IllegalArgumentException: [mapCol, key_value, key] required binary key (STRING) = 2 is not in the store: [] 12 at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ColumnChunkPageReadStore.getPageReader(ColumnChunkPageReadStore.java:231) at org.apache.iceberg.parquet.ParquetValueReaders$PrimitiveReader.setPageSource(ParquetValueReaders.java:154) at org.apache.iceberg.parquet.ParquetValueReaders$RepeatedKeyValueReader.setPageSource(ParquetValueReaders.java:487) at org.apache.iceberg.parquet.ParquetValueReaders$StructReader.setPageSource(ParquetValueReaders.java:643) at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:139) at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:110) at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:69) at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79) at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` Here's my minimal reproduction: I downloaded the Spark tarball for 3.1.2 from https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz, untarred it, and created a test table similar to yours. **Notice that by default, OSS Spark comes with parquet-1.10.1, so I removed the parquet jars from /opt/spark/jars, however we use a shaded parquet version so it shouldn't (and doesn't) matter**. I tested it with and without the OSS parquet jars in /opt/spark/jars. ```bash root@spark:/opt/spark# ls -la jars | grep parquet parquet-column-1.10.1.jar parquet-common-1.10.1.jar parquet-encoding-1.10.1.jar parquet-format-2.4.0.jar parquet-hadoop-1.10.1.jar parquet-jackson-1.10.1.jar root@spark:/opt/spark# rm -f jars/parquet-* root@spark-box:/opt/spark# ./bin/spark-shell \ --packages org.apache.iceberg:iceberg-spark3-runtime:0.9.0 \ --driver-memory 2g \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hive \ --conf spark.hadoop.hive.metastore.uris=thrift://hive:9083 ``` ```scala scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql._ import org.apache.spark.sql._ scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> import org.apache.spark.sql.expressions._ import org.apache.spark.sql.expressions._ scala> spark.sql("CREATE TABLE IF NOT EXISTS test_parquet_map_regression_iceberg_090(mapCol MAP<STRING, STRUCT<payload: STRUCT<bool: BOOLEAN, dbl: DOUBLE, str: STRING>, str: STRING>>) USING ICEBERG TBLPROPERTIES('type'='hive')") scala> :paste // Entering paste mode (ctrl-D to finish) var df = spark.range(NUM_ROWS) .withColumnRenamed("id", "longCol") .withColumn("intCol", expr("CAST(longCol AS INT)")) .withColumn("dbl", expr("CAST(longCol AS DOUBLE)")) .withColumn("str", expr("CAST(longCol AS STRING)")) .withColumn("bool", expr("IF(intCol % 2 = 0, true, false)")) .withColumn("payload", struct($"bool", $"dbl", $"str")) .withColumn("value", struct($"payload", $"str")) .withColumn("mapCol", map($"str", $"value")) .select("mapCol") // Exiting paste mode, now interpreting. df: org.apache.spark.sql.DataFrame = [mapCol: map<string,struct<payload:struct<bool:boolean,dbl:double,str:string>,str:string>>] scala> df.writeTo("default.test_parquet_map_regression_iceberg_090").append scala> spark.table("default.test_parquet_map_regression_iceberg_090").show(false) +-------------------------------+ |mapCol | +-------------------------------+ |{0 -> {{true, 0.0, 0}, 0}} | |{1 -> {{false, 1.0, 1}, 1}} | |{2 -> {{true, 2.0, 2}, 2}} | |{3 -> {{false, 3.0, 3}, 3}} | |{4 -> {{true, 4.0, 4}, 4}} | |{5 -> {{false, 5.0, 5}, 5}} | |{6 -> {{true, 6.0, 6}, 6}} | |{7 -> {{false, 7.0, 7}, 7}} | |{8 -> {{true, 8.0, 8}, 8}} | |{9 -> {{false, 9.0, 9}, 9}} | |{10 -> {{true, 10.0, 10}, 10}} | |{11 -> {{false, 11.0, 11}, 11}}| |{12 -> {{true, 12.0, 12}, 12}} | |{13 -> {{false, 13.0, 13}, 13}}| |{14 -> {{true, 14.0, 14}, 14}} | |{15 -> {{false, 15.0, 15}, 15}}| |{16 -> {{true, 16.0, 16}, 16}} | |{17 -> {{false, 17.0, 17}, 17}}| |{18 -> {{true, 18.0, 18}, 18}} | |{19 -> {{false, 19.0, 19}, 19}}| +-------------------------------+ only showing top 20 rows scala> :quit ``` I then upgraded from Iceberg 0.9.0 (compiled against Parquet 0.11.0) to Iceberg 0.10.0 (compiled against Parquet 0.11.1) First, I tried writing a new table (to ensure that data is working when using the new format). ```bash root@spark-box:/opt/spark# ./bin/spark-shell --packages org.apache.iceberg:iceberg-spark3-runtime:0.10.0 --driver-memory 2g --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.hadoop.hive.metastore.uris=thrift://hive-box:9083 ``` First, I tried writing a new table (to ensure that data is working when using the new format). ```scala scala> spark.sql("CREATE TABLE IF NOT EXISTS test_parquet_map_regression_iceberg_010(mapCol MAP<STRING, STRUCT<payload: STRUCT<bool: BOOLEAN, dbl: DOUBLE, str: STRING>, str: STRING>>) USING ICEBERG TBLPROPERTIES('type'='hive')") ... create df as above .... scala> spark.table("default.test_parquet_map_regression_iceberg_010").show(false) +-------------------------------+ |mapCol | +-------------------------------+ |{0 -> {{true, 0.0, 0}, 0}} | |{1 -> {{false, 1.0, 1}, 1}} | |{2 -> {{true, 2.0, 2}, 2}} | |{3 -> {{false, 3.0, 3}, 3}} | |{4 -> {{true, 4.0, 4}, 4}} | |{5 -> {{false, 5.0, 5}, 5}} | |{6 -> {{true, 6.0, 6}, 6}} | |{7 -> {{false, 7.0, 7}, 7}} | |{8 -> {{true, 8.0, 8}, 8}} | |{9 -> {{false, 9.0, 9}, 9}} | |{10 -> {{true, 10.0, 10}, 10}} | |{11 -> {{false, 11.0, 11}, 11}}| |{12 -> {{true, 12.0, 12}, 12}} | |{13 -> {{false, 13.0, 13}, 13}}| |{14 -> {{true, 14.0, 14}, 14}} | |{15 -> {{false, 15.0, 15}, 15}}| |{16 -> {{true, 16.0, 16}, 16}} | |{17 -> {{false, 17.0, 17}, 17}}| |{18 -> {{true, 18.0, 18}, 18}} | |{19 -> {{false, 19.0, 19}, 19}}| +-------------------------------+ only showing top 20 rows ``` So Parquet 1.11.1 data can be written and then read. However, the older data written with Parquet 1.11.0 cannot be read on the newer Iceberg version. ```scala scala> spark.table("default.test_parquet_map_regression_iceberg_090").show(false) 21/09/22 21:49:24 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.IllegalArgumentException: [mapCol, map, key] required binary key (STRING) = 2 is not in the store: [] 12 at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ColumnChunkPageReadStore.getPageReader(ColumnChunkPageReadStore.java:231) at org.apache.iceberg.parquet.ParquetValueReaders$PrimitiveReader.setPageSource(ParquetValueReaders.java:185) at org.apache.iceberg.parquet.ParquetValueReaders$RepeatedKeyValueReader.setPageSource(ParquetValueReaders.java:529) at org.apache.iceberg.parquet.ParquetValueReaders$StructReader.setPageSource(ParquetValueReaders.java:685) at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:142) at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:112) at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:81) at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79) at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` So this issue has actually been present for this particular map data since Iceberg 0.9.1 -> Iceberg 0.10.0 Upgrade (which is when we went from Parquet 1.11.0 -> Parquet 1.11.1). We should investigate with Iceberg 0.12.0 now that that is out. I will do that now. I'm also going to check to see if this issue affects OSS Spark and not just Iceberg (I suspect that it does not as Spark 3.1.2 is still on Parquet 1.10.2 but will double check). cc @rdblue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
