danielfordfc opened a new issue, #8258:
URL: https://github.com/apache/hudi/issues/8258

   **Describe the problem you faced**
   
   Deltastreamer, when being run on a **transactional topic** (one being 
produced to by a [transactional producer, like 
kafka-streams](https://medium.com/lydtech-consulting/kafka-streams-transactions-exactly-once-messaging-82194b50900a))
 is unable to be read.
   
   ```bash
   Caused by: java.lang.IllegalArgumentException: requirement failed:
    Failed to get records for compacted 
spark-executor-hudi-deltastreamer-my-transactional-topic 
my-transactional-topic-0 after polling for 1000
    
    OR when using AllowNonConsecutiveOffsets=false..
    
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 
in stage 4.0 (TID 4) (192.168.1.240 executor driver): 
java.lang.IllegalArgumentException: requirement failed: Failed to get records 
for spark-executor-hudi-deltastreamer-my-transactional-topic 
my-transactional-topic-0 5 after polling for 1000
           at scala.Predef$.require(Predef.scala:281)
           at 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:143)
   ```
   
   Full stack traces will be linked below.
   
   Our configuration is as follows:
   > Note: this works fine on non-transactional topics, and compacted topics 
(cleanup.policy=compact) when setting `AllowNonConsecutiveOffsets=true`
   > Note: This isn't a networking issue or timeout issue which we'll discuss 
below.
   
   ``` bash 
   hoodie.datasource.write.recordkey.field=viewtime
   hoodie.datasource.write.partitionpath.field=pageid
   hoodie.deltastreamer.source.kafka.topic=${topic}
   
hoodie.deltastreamer.schemaprovider.registry.url=[http://localhost:8081/subjects/${topic}-value/versions/latest](http://localhost:8081/subjects/$%7Btopic%7D-value/versions/latest)
   schema.registry.url=http://localhost:8081/
   # Kafka Consumer props
   bootstrap.servers=localhost:9092
   auto.offset.reset=earliest
   # Consumer Group
   group.id=hudi-deltastreamer-${topic}
   #isolation.level=read_committed <-- tried adjusting this with no effect
   #enable.auto.commit=false <-- tried adjusting this with no effect
   
   # spark.properties
   spark.streaming.kafka.allowNonConsecutiveOffsets=true <--  so we use this by 
default as some of our topics are compacted
   spark.streaming.kafka.consumer.poll.ms=1000 <-- To make it fail faster, from 
default of 120,000
   spark.executor.cores=1
   
   spark-submit \
     --master local[1] \
     --num-executors=1 \
     --executor-cores=1 \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
     --conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \
     --conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \
     --properties-file 
~/Sandbox/spark-sandbox/src/main/resources/spark.properties \
    
~/Workspace/github.com/apache/hudi/target/hudi-utilities-bundle_2.12-0.12.1.jar 
\
    --op INSERT \
    --props /tmp/hoodie-conf-${topic}.properties \
    --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
    --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
    --source-ordering-field viewtime  \
    --table-type COPY_ON_WRITE \
    --target-base-path file://${target_base_path} \
    --target-table $target_table
   ```
   
   We've identified that this is because the `get()` / `compactedNext()` method 
used by the `InternalKafkaDataConsumer` is failing to poll records/batches for 
these transactional topics..
   
   If we create a simple non-compacted topic that we'll be writing to 
non-transactionally, and one transactionally:
   
   ``` shell 
   kafka-topics --bootstrap-server localhost:9092 --create --topic 
my-transactional-topic --partitions 5 
   kafka-topics --bootstrap-server localhost:9092 --create --pageviews 
--partitions 5 
   ```
   
   ``` shell 
   # Following messages can be consumed when produced **without** transactional 
producer, but not with..
   [{:viewtime 100 :userid "User_0" :pageid "Page_0"}
                  {:viewtime 101 :userid "User_1" :pageid "Page_1"}
                  {:viewtime 102 :userid "User_2" :pageid "Page_2"}
                  ...etc...
                  {:viewtime 115 :userid "User_15" :pageid "Page_15"}]
   ```
   
   **Produced 16 message non-transactionally** -- as you can see the end/"next 
available" offset is the one after the last offset containing data in each 
partition
   ![Screenshot 2023-03-21 at 12 36 
09](https://user-images.githubusercontent.com/75728527/226607911-b8a2d0bf-9bf8-470f-b910-626dc2c2a51e.png)
   
   **Produced 16 message transactionally** -- as you can see the end/"next 
available" offset is 2 more than the last offset containing data in each 
partition, because the end of that batch of write placed a commit marker/offset 
message in each partition
   ![Screenshot 2023-03-21 at 12 36 
37](https://user-images.githubusercontent.com/75728527/226607926-3e3c7e93-6f70-414f-96e6-b03a47b3128d.png)
   
   And we see the stack traces mentioned at the bottom:  
   
[hoodie-allow-consecutive-off-false.log](https://github.com/apache/hudi/files/11028874/hoodie-allow-consecutive-off-false.log)
   
[hoodie-allow-consecutive-off-true.log](https://github.com/apache/hudi/files/11028875/hoodie-allow-consecutive-off-true.log)
   
   
   Notably
   
   **Extra Information gathered from running this locally**
   
   Dive into our local example showing how we get the poll of [topic-partition] 
5, followed by a poll of [] 0, followed by the crash when 
AllowNonConsecutiveOffsets=true
   
   
   Interestingly, in the below, when setting AllowNonConsecutiveOffsets=False, 
we see that the initial poll for the partition 0 (which from the above 
screenshot, showed offset 0->4 being valid messages, offset 5 being the commit 
marker, has it poll those first 5 messages, then fail on the next poll.
   ```
   23/03/21 12:48:57 INFO org.apache.spark.streaming.kafka010.KafkaRDD: 
Computing topic my-transactional-topic, partition 0 offsets 0 -> 6
   23/03/21 12:48:57 DEBUG 
org.apache.spark.streaming.kafka010.KafkaDataConsumer: Not used cached consumer 
found, re-using it InternalKafkaConsumer(hash=511066e5, 
groupId=spark-executor-hudi-deltastreamer-my-transactional-topic, 
topicPartition=my-transactional-topic-0)
   23/03/21 12:48:57 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get 
spark-executor-hudi-deltastreamer-my-transactional-topic 
my-transactional-topic-0 nextOffset 1 requested 0
   23/03/21 12:48:57 INFO 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Initial fetch for 
spark-executor-hudi-deltastreamer-my-transactional-topic 
my-transactional-topic-0 0
   23/03/21 12:48:57 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Seeking to 
my-transactional-topic-0 0
   23/03/21 12:48:57 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled 
[my-transactional-topic-0]  5
   23/03/21 12:48:57 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get 
spark-executor-hudi-deltastreamer-my-transactional-topic 
my-transactional-topic-0 nextOffset 1 requested 1
   23/03/21 12:48:57 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get 
spark-executor-hudi-deltastreamer-my-transactional-topic 
my-transactional-topic-0 nextOffset 2 requested 2
   23/03/21 12:48:57 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get 
spark-executor-hudi-deltastreamer-my-transactional-topic 
my-transactional-topic-0 nextOffset 3 requested 3
   23/03/21 12:48:57 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get 
spark-executor-hudi-deltastreamer-my-transactional-topic 
my-transactional-topic-0 nextOffset 4 requested 4
   23/03/21 12:48:57 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get 
spark-executor-hudi-deltastreamer-my-transactional-topic 
my-transactional-topic-0 nextOffset 5 requested 5
   23/03/21 12:48:58 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled []  0
   23/03/21 12:48:59 WARN org.apache.spark.storage.BlockManager: Putting block 
rdd_2_0 failed due to exception java.lang.IllegalArgumentException: requirement 
failed: Failed to get records for 
spark-executor-hudi-deltastreamer-my-transactional-topic 
my-transactional-topic-0 5 after polling for 1000.
   ```
   
   If we create another topic with one partition and write a single batch of 16 
records transactionally (so 0->15 is data, 16 is commit marker, end of topic is 
17), we see similar behaviour.
   
   
[my-transactional-topic-single-partition.log](https://github.com/apache/hudi/files/11028871/my-transactional-topic-single-partition.log)
   
   
   If we remove the possibility that it might be crashing because the endOffset 
is the "invisible" marker that it can't read, by adding another 16 records 
(putting 17->32 as data, 33 as the marker and 34 as endOffset), we see a 
similar issue with the following:
   
   ```
   requirement failed: Got wrong record for 
spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition 
my-transactional-topic-single-partition-0 even after seeking to offset 16 got 
offset 17 instead. If this is a compacted topic, consider enabling 
spark.streaming.kafka.allowNonConsecutiveOffsets
   ```
   
[my-transactional-topic-single-partition-32-msgs.log](https://github.com/apache/hudi/files/11028880/my-transactional-topic-single-partition-32-msgs.log)
   
   
   Changing to `AllowNonConsecutiveOffsets=true` on the above topic yields the 
following:
   ```
   23/03/21 13:24:10 INFO org.apache.spark.streaming.kafka010.KafkaRDD: 
Computing topic my-transactional-topic-single-partition, partition 0 offsets 0 
-> 34
   23/03/21 13:24:10 DEBUG 
org.apache.spark.streaming.kafka010.KafkaDataConsumer: Not used cached consumer 
found, re-using it InternalKafkaConsumer(hash=9903e40, 
groupId=spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition,
 topicPartition=my-transactional-topic-single-partition-0)
   23/03/21 13:24:10 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: compacted start 
spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition 
my-transactional-topic-single-partition-0 starting 0
   23/03/21 13:24:10 INFO 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Initial fetch for 
compacted 
spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition 
my-transactional-topic-single-partition-0 0
   23/03/21 13:24:10 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Seeking to 
my-transactional-topic-single-partition-0 0
   23/03/21 13:24:10 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled 
[my-transactional-topic-single-partition-0]  32
   23/03/21 13:24:10 INFO org.apache.spark.storage.BlockManager: Removing RDD 6
   23/03/21 13:24:11 DEBUG 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled []  0
   23/03/21 13:24:11 WARN org.apache.spark.storage.BlockManager: Putting block 
rdd_2_0 failed due to exception java.lang.IllegalArgumentException: requirement 
failed: Failed to get records for compacted 
spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition 
my-transactional-topic-single-partition-0 after polling for 1000.
   23/03/21 13:24:11 WARN org.apache.spark.storage.BlockManager: Block rdd_2_0 
could not be removed as it was not found on disk or in memory
   23/03/21 13:24:11 ERROR org.apache.spark.executor.Executor: Exception in 
task 0.0 in stage 4.0 (TID 4)
   java.lang.IllegalArgumentException: requirement failed: Failed to get 
records for compacted 
spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition 
my-transactional-topic-single-partition-0 after polling for 1000
   ```
   Stack trace for the above:
   
[my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log](https://github.com/apache/hudi/files/11028882/my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log)
   
   **Answers Required**
   
   So we know what the problem is, we are just unsure on how to fix.
   We've taken this to the hudi office hours before and the host suggested to 
ask @yihua for advice.
   
   
   **Usual Environment in Production, but all of this has been reproduced 
locally**
   
   Hudi version : Deltastreamer on EMR 6.8.0 running Hudi 0.11.1-amzn-0
   Spark version : 3.3.0
   Hive version : 3.1.3
   Hadoop version : Amazon 3.2.1
   Storage (HDFS/S3/GCS..) : S3
   Running on Docker? (yes/no) : No
   
   **Additional context**
   
   hudi 0.12.1 used for local testing
   Can add more details if required.
   
   **Stacktrace**
   Stacktraces have been littered throughout but pasted here again:
   
   
[my-transactional-topic-single-partition-32-msgs.log](https://github.com/apache/hudi/files/11028884/my-transactional-topic-single-partition-32-msgs.log)
   
[my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log](https://github.com/apache/hudi/files/11028885/my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log)
   
[hoodie-allow-consecutive-off-false.log](https://github.com/apache/hudi/files/11028886/hoodie-allow-consecutive-off-false.log)
   
[hoodie-allow-consecutive-off-true.log](https://github.com/apache/hudi/files/11028887/hoodie-allow-consecutive-off-true.log)
   
[my-transactional-topic-single-partition.log](https://github.com/apache/hudi/files/11028888/my-transactional-topic-single-partition.log)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to