LucasRoesler opened a new issue, #7657: URL: https://github.com/apache/iceberg/issues/7657
### Apache Iceberg version 1.2.1 (latest release) ### Query engine Spark ### Please describe the bug 🐞 ## Short version I am unable to append data into a table after trying to optimize it. Spark will attempt to load and sort the entire table before inserting the new batch even though we are partitioning by `loaded_at` which should never need more than one partition. I am not sure how to correctly investigate the table and possibly fix it. Are there any written recommendations for how to verify the table metadata or to figure out why Spark is trying to sort the entire table? ## Long Version We have a Spark job that uses Spark Structured Streaming to load data from Kafka into a an Iceberg table called `loading_zone`. This Kafka topic receives data on a nearly continuous basis. The spark job does some light parsing and anonymization of the data and then writes the data to the `loading_zone` table which is partitioned by a `loaded_at` timestamp, using an hourly transform. As of this writing, the table was approximately 2.8 billion records. We are using AWS Glue for the catalog and S3 for storage, so that we can also query the data from Athena. Last week I tried to follow some of the recommendations about table maintenance here * https://iceberg.apache.org/docs/latest/maintenance/ * https://iceberg.apache.org/docs/latest/spark-procedures/ For various reasons one of the procedures was cancelled and we thought that the table was still in a good state. Unfortunately, we didn't immediately detect the issue and I don't know exactly which procedure was cancelled, but I believe it was the `rewrite_data_files` procedure. On Monday, I checked the status of the table and noticed that it had not loaded any data over the weekend. I then started checking the pod logs for the Spark job and the Spark UI to determine what was happening. Two things immediately stood out: * The Spark UI reported that it was processing 0 rows per second. * The Pod logs were very weird, it was continuously updating the offsets for the kafka consumer, every partition of the kafka topic was being updated several times per second. All of the logs came from `org.apache.kafka.clients.consumer.internals.SubscriptionState` For example: > "Seeking to EARLIEST offset of partition mpathic-event-18" > "Seeking to EARLIEST offset of partition mpathic-event-23" > "Resetting offset for partition mpathic-event-16 to position FetchPosition{offset=183777608, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[contextstore-kafka-1.contextstore-kafka-headless.customer-insights.svc.cluster.local:9092 (id: 1 rack: null)], epoch=125}}." > "Seeking to LATEST offset of partition mpathic-event-16" Seeking is not unusual, but at that speed makes no sense when we have a 1 minute trigger for the spark stream. At first we thought we had a issue in the Kafka cluster, but we were able to rule that out because there are several other consumers that did not have any issues with the same data. We could also rule out any additional deployments or configuration changes in Kafka and we could also rule out unusual data flow, in fact the total data and the rate of data in the topic during the weekend was lower than earlier in the week when the job was working fine. We decided Kafka was not the source of the problem. Eventually we started looking into the Spark UI to get more information about the job. We noticed that 1. the job was using a lot of disk space and essentially no memory 2. the batch duration was extremely long, the entire lifetime of the job 3. kubernetes would eventually kill it because of the high disk usage after approximately 90 mins (sometimes longer, sometimes shorter) We tried reducing the size of the batches, but nothing helped. Finally we looked at the DataFrame metrics and planning and noticed that it was scanning _a lot_ of data. 69 million rows and growing, which made no sense because we had made the batch size only a couple thousand kafka records. We looked at the query plan and it said it was doing a Sort ``` == Physical Plan == AppendData (5) +- Sort (4) +- Exchange (3) +- Project (2) +- * Scan ExistingRDD (1) ``` This is also not unusual because data needs to be sorted within the partition. Finally we decided/realized that it must be trying to sort the entire table, not just the partition. In the end we changed the Spark job to write to a new table so that we didn't lose anything from Kafka. Once we changed to a new table, the job started working again. I would like to recover and merge these two tables together but I am not sure how to really debug the table or it's metadata. I don't understand why the spark job changed it's behavior and needed to scan/sort the entire table. I also have questions about the best (most efficient) way to merge the tables together. Should i just use something like ```sql REPLACE TABLE loading_zone USING iceberg PARTITIONED BY (hours(loaded_at)) TBLPROPERTIES ( "format-version" = "2" ) AS SELECT * from loading_zone UNION ALL SELECT * from loading_zone_b ``` or would something like [`add_files`](https://iceberg.apache.org/docs/latest/spark-procedures/#add_files) be better? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
