LucasRoesler opened a new issue, #7657:
URL: https://github.com/apache/iceberg/issues/7657

   ### Apache Iceberg version
   
   1.2.1 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   ## Short version
   I am unable to append data into a table after trying to optimize it. 
   
   Spark will attempt to load and sort the entire table before inserting the 
new batch even though we are partitioning by `loaded_at` which should never 
need more than one partition. 
   
   I am not sure how to correctly investigate the table and possibly fix it. 
Are there any written recommendations for how to verify the table metadata or 
to figure out why Spark is trying to sort the entire table?
   
   
   ## Long Version 
   We have a Spark job that uses Spark Structured Streaming to load data from 
Kafka into a an Iceberg table called `loading_zone`. This Kafka topic receives 
data on a nearly continuous basis. The spark job does some light parsing and 
anonymization of the data and then writes the data to the `loading_zone` table 
which is partitioned by a `loaded_at` timestamp, using an hourly transform. As 
of this writing, the table was approximately 2.8 billion records.
   
   We are using AWS Glue for the catalog and S3 for storage, so that we can 
also query the data from Athena.
   
   Last week I tried to follow some of the recommendations about table 
maintenance here 
   
   * https://iceberg.apache.org/docs/latest/maintenance/
   * https://iceberg.apache.org/docs/latest/spark-procedures/
     
   For various reasons one of the procedures was cancelled and we thought that 
the table was still in a good state. Unfortunately, we didn't immediately 
detect the issue and I don't know exactly which procedure was cancelled, but I 
believe it was the `rewrite_data_files` procedure.
   
   On Monday, I checked the status of the table and noticed that it had not 
loaded any data over the weekend.
   
   I then started checking the pod logs for the Spark job and the Spark UI to 
determine what was happening. Two things immediately stood out:
   
   * The Spark UI reported that it was processing 0 rows per second. 
   * The Pod logs were very weird, it was continuously updating the offsets for 
the kafka consumer, every partition of the kafka topic was being updated 
several times per second. All of the logs came from 
`org.apache.kafka.clients.consumer.internals.SubscriptionState`  For example: 
    
       > "Seeking to EARLIEST offset of partition mpathic-event-18"
       > "Seeking to EARLIEST offset of partition mpathic-event-23"
       > "Resetting offset for partition mpathic-event-16 to position 
FetchPosition{offset=183777608, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[contextstore-kafka-1.contextstore-kafka-headless.customer-insights.svc.cluster.local:9092
 (id: 1 rack: null)], epoch=125}}."
       > "Seeking to LATEST offset of partition mpathic-event-16"
   
   Seeking is not unusual, but at that speed makes no sense when we have a 1 
minute trigger for the spark stream. 
   
   At first we thought we had a issue in the Kafka cluster, but we were able to 
rule that out because there are several other consumers that did not have any 
issues with the same data. We could also rule out any additional deployments or 
configuration changes in Kafka and we could also rule out unusual data flow, in 
fact the total data and the rate of data in the topic during the weekend was 
lower than earlier in the week when the job was working fine.  We decided Kafka 
was not the source of the problem.
   
   Eventually we started looking into the Spark UI to get more information 
about the job. We noticed that 
   
   1. the job was using a lot of disk space and essentially no memory
   2. the batch duration was extremely long, the entire lifetime of the job 
   3. kubernetes would eventually kill it because of the high disk usage after 
approximately 90 mins (sometimes longer, sometimes shorter)
   
   We tried reducing the size of the batches, but nothing helped. 
   
   
   Finally we looked at the DataFrame metrics and planning and noticed that it 
was scanning _a lot_ of data. 69 million rows and growing, which made no sense 
because we had made the batch size only a couple thousand kafka records.
   
   We looked at the query plan and it said it was doing a Sort
   
   ```
   == Physical Plan ==
   AppendData (5)
   +- Sort (4)
      +- Exchange (3)
         +- Project (2)
            +- * Scan ExistingRDD (1)
   ```
   
   This is also not unusual because data needs to be sorted within the 
partition. Finally we decided/realized that it must be trying to sort the 
entire table, not just the partition.
   
   In the end we changed the Spark job to write to a new table so that we 
didn't lose anything from Kafka. Once we changed to a new table, the job 
started working again. 
   
   
   I would like to recover and merge these two tables together but I am not 
sure how to really debug the table or it's metadata. I don't understand why the 
spark job changed it's behavior and needed to scan/sort the entire table. 
   
   
   I also have questions about the best (most efficient) way to merge the 
tables together. Should i just use something like 
   
   ```sql
   REPLACE TABLE loading_zone
   USING iceberg
   PARTITIONED BY (hours(loaded_at))
   TBLPROPERTIES (
       "format-version" = "2"
   )
   AS SELECT * from loading_zone UNION ALL SELECT * from loading_zone_b
   ```
   
   or would something like 
[`add_files`](https://iceberg.apache.org/docs/latest/spark-procedures/#add_files)
 be better?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to