ad1happy2go commented on code in PR #9059:
URL: https://github.com/apache/hudi/pull/9059#discussion_r1245397685


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -639,7 +639,13 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
             BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
             List<HoodieRecord> avroRecords = new ArrayList<>();
             while (genericRecordIterator.hasNext()) {
-              GenericRecord genRec = genericRecordIterator.next();
+              GenericRecord genRec = null;
+              try {
+                genRec = genericRecordIterator.next();
+              } catch (IllegalArgumentException e) {
+                LOG.warn("Handling exception for transaction topic  -  " + 
e.getMessage());
+                break;

Review Comment:
   @danielfordfc Thanks for testing. I did test multiple rounds of producing 
records transactionally, but I kept the delta streamer running in continuous 
mode without stopping and rerunning the job. I will try to reproduce that 
scenario as well. If that turns out to be the case, fixing the transactional 
topic may not be so straightforward. We may need to explore alternative 
approaches and potentially rewrite the code to handle it similar to how Spark 
Structured Streaming handles it.
   
   @WarFox, I also couldn't find a reliable way to detect this on the consumer 
side. So, if the fix works, I believe adding the configuration would be a good 
approach to handle this special case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to