danielfordfc opened a new issue, #8258: URL: https://github.com/apache/hudi/issues/8258
**Describe the problem you faced** Deltastreamer, when being run on a **transactional topic** (one being produced to by a [transactional producer, like kafka-streams](https://medium.com/lydtech-consulting/kafka-streams-transactions-exactly-once-messaging-82194b50900a)) is unable to be read. ```bash Caused by: java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 after polling for 1000 OR when using AllowNonConsecutiveOffsets=false.. Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4) (192.168.1.240 executor driver): java.lang.IllegalArgumentException: requirement failed: Failed to get records for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 5 after polling for 1000 at scala.Predef$.require(Predef.scala:281) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:143) ``` Full stack traces will be linked below. Our configuration is as follows: > Note: this works fine on non-transactional topics, and compacted topics (cleanup.policy=compact) when setting `AllowNonConsecutiveOffsets=true` > Note: This isn't a networking issue or timeout issue which we'll discuss below. ``` bash hoodie.datasource.write.recordkey.field=viewtime hoodie.datasource.write.partitionpath.field=pageid hoodie.deltastreamer.source.kafka.topic=${topic} hoodie.deltastreamer.schemaprovider.registry.url=[http://localhost:8081/subjects/${topic}-value/versions/latest](http://localhost:8081/subjects/$%7Btopic%7D-value/versions/latest) schema.registry.url=http://localhost:8081/ # Kafka Consumer props bootstrap.servers=localhost:9092 auto.offset.reset=earliest # Consumer Group group.id=hudi-deltastreamer-${topic} #isolation.level=read_committed <-- tried adjusting this with no effect #enable.auto.commit=false <-- tried adjusting this with no effect # spark.properties spark.streaming.kafka.allowNonConsecutiveOffsets=true <-- so we use this by default as some of our topics are compacted spark.streaming.kafka.consumer.poll.ms=1000 <-- To make it fail faster, from default of 120,000 spark.executor.cores=1 spark-submit \ --master local[1] \ --num-executors=1 \ --executor-cores=1 \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ --conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \ --conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \ --properties-file ~/Sandbox/spark-sandbox/src/main/resources/spark.properties \ ~/Workspace/github.com/apache/hudi/target/hudi-utilities-bundle_2.12-0.12.1.jar \ --op INSERT \ --props /tmp/hoodie-conf-${topic}.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --source-ordering-field viewtime \ --table-type COPY_ON_WRITE \ --target-base-path file://${target_base_path} \ --target-table $target_table ``` We've identified that this is because the `get()` / `compactedNext()` method used by the `InternalKafkaDataConsumer` is failing to poll records/batches for these transactional topics.. If we create a simple non-compacted topic that we'll be writing to non-transactionally, and one transactionally: ``` shell kafka-topics --bootstrap-server localhost:9092 --create --topic my-transactional-topic --partitions 5 kafka-topics --bootstrap-server localhost:9092 --create --pageviews --partitions 5 ``` ``` shell # Following messages can be consumed when produced **without** transactional producer, but not with.. [{:viewtime 100 :userid "User_0" :pageid "Page_0"} {:viewtime 101 :userid "User_1" :pageid "Page_1"} {:viewtime 102 :userid "User_2" :pageid "Page_2"} ...etc... {:viewtime 115 :userid "User_15" :pageid "Page_15"}] ``` **Produced 16 message non-transactionally** -- as you can see the end/"next available" offset is the one after the last offset containing data in each partition  **Produced 16 message transactionally** -- as you can see the end/"next available" offset is 2 more than the last offset containing data in each partition, because the end of that batch of write placed a commit marker/offset message in each partition  And we see the stack traces mentioned at the bottom: [hoodie-allow-consecutive-off-false.log](https://github.com/apache/hudi/files/11028874/hoodie-allow-consecutive-off-false.log) [hoodie-allow-consecutive-off-true.log](https://github.com/apache/hudi/files/11028875/hoodie-allow-consecutive-off-true.log) Notably **Extra Information gathered from running this locally** Dive into our local example showing how we get the poll of [topic-partition] 5, followed by a poll of [] 0, followed by the crash when AllowNonConsecutiveOffsets=true Interestingly, in the below, when setting AllowNonConsecutiveOffsets=False, we see that the initial poll for the partition 0 (which from the above screenshot, showed offset 0->4 being valid messages, offset 5 being the commit marker, has it poll those first 5 messages, then fail on the next poll. ``` 23/03/21 12:48:57 INFO org.apache.spark.streaming.kafka010.KafkaRDD: Computing topic my-transactional-topic, partition 0 offsets 0 -> 6 23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.KafkaDataConsumer: Not used cached consumer found, re-using it InternalKafkaConsumer(hash=511066e5, groupId=spark-executor-hudi-deltastreamer-my-transactional-topic, topicPartition=my-transactional-topic-0) 23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 1 requested 0 23/03/21 12:48:57 INFO org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Initial fetch for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 0 23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Seeking to my-transactional-topic-0 0 23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled [my-transactional-topic-0] 5 23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 1 requested 1 23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 2 requested 2 23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 3 requested 3 23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 4 requested 4 23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 5 requested 5 23/03/21 12:48:58 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled [] 0 23/03/21 12:48:59 WARN org.apache.spark.storage.BlockManager: Putting block rdd_2_0 failed due to exception java.lang.IllegalArgumentException: requirement failed: Failed to get records for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 5 after polling for 1000. ``` If we create another topic with one partition and write a single batch of 16 records transactionally (so 0->15 is data, 16 is commit marker, end of topic is 17), we see similar behaviour. [my-transactional-topic-single-partition.log](https://github.com/apache/hudi/files/11028871/my-transactional-topic-single-partition.log) If we remove the possibility that it might be crashing because the endOffset is the "invisible" marker that it can't read, by adding another 16 records (putting 17->32 as data, 33 as the marker and 34 as endOffset), we see a similar issue with the following: ``` requirement failed: Got wrong record for spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 even after seeking to offset 16 got offset 17 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets ``` [my-transactional-topic-single-partition-32-msgs.log](https://github.com/apache/hudi/files/11028880/my-transactional-topic-single-partition-32-msgs.log) Changing to `AllowNonConsecutiveOffsets=true` on the above topic yields the following: ``` 23/03/21 13:24:10 INFO org.apache.spark.streaming.kafka010.KafkaRDD: Computing topic my-transactional-topic-single-partition, partition 0 offsets 0 -> 34 23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.KafkaDataConsumer: Not used cached consumer found, re-using it InternalKafkaConsumer(hash=9903e40, groupId=spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition, topicPartition=my-transactional-topic-single-partition-0) 23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: compacted start spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 starting 0 23/03/21 13:24:10 INFO org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Initial fetch for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 0 23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Seeking to my-transactional-topic-single-partition-0 0 23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled [my-transactional-topic-single-partition-0] 32 23/03/21 13:24:10 INFO org.apache.spark.storage.BlockManager: Removing RDD 6 23/03/21 13:24:11 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled [] 0 23/03/21 13:24:11 WARN org.apache.spark.storage.BlockManager: Putting block rdd_2_0 failed due to exception java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 after polling for 1000. 23/03/21 13:24:11 WARN org.apache.spark.storage.BlockManager: Block rdd_2_0 could not be removed as it was not found on disk or in memory 23/03/21 13:24:11 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 4.0 (TID 4) java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 after polling for 1000 ``` Stack trace for the above: [my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log](https://github.com/apache/hudi/files/11028882/my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log) **Answers Required** So we know what the problem is, we are just unsure on how to fix. We've taken this to the hudi office hours before and the host suggested to ask @yihua for advice. **Usual Environment in Production, but all of this has been reproduced locally** Hudi version : Deltastreamer on EMR 6.8.0 running Hudi 0.11.1-amzn-0 Spark version : 3.3.0 Hive version : 3.1.3 Hadoop version : Amazon 3.2.1 Storage (HDFS/S3/GCS..) : S3 Running on Docker? (yes/no) : No **Additional context** hudi 0.12.1 used for local testing Can add more details if required. **Stacktrace** Stacktraces have been littered throughout but pasted here again: [my-transactional-topic-single-partition-32-msgs.log](https://github.com/apache/hudi/files/11028884/my-transactional-topic-single-partition-32-msgs.log) [my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log](https://github.com/apache/hudi/files/11028885/my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log) [hoodie-allow-consecutive-off-false.log](https://github.com/apache/hudi/files/11028886/hoodie-allow-consecutive-off-false.log) [hoodie-allow-consecutive-off-true.log](https://github.com/apache/hudi/files/11028887/hoodie-allow-consecutive-off-true.log) [my-transactional-topic-single-partition.log](https://github.com/apache/hudi/files/11028888/my-transactional-topic-single-partition.log) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
