ad1happy2go commented on code in PR #9059:
URL: https://github.com/apache/hudi/pull/9059#discussion_r1245397685
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -639,7 +639,13 @@ private Pair<SchemaProvider, Pair<String,
JavaRDD<HoodieRecord>>> fetchFromSourc
BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
List<HoodieRecord> avroRecords = new ArrayList<>();
while (genericRecordIterator.hasNext()) {
- GenericRecord genRec = genericRecordIterator.next();
+ GenericRecord genRec = null;
+ try {
+ genRec = genericRecordIterator.next();
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Handling exception for transaction topic - " +
e.getMessage());
+ break;
Review Comment:
@danielfordfc Thanks for testing. I did test multiple rounds of producing
records transactionally, but I kept the delta streamer running in continuous
mode without stopping and rerunning the job. I will try to reproduce that
scenario as well. If that turns out to be the case, fixing the transactional
topic may not be so straightforward. We may need to explore alternative
approaches and potentially rewrite the code to handle it similar to how Spark
Structured Streaming handles it.
@WarFox, I also couldn't find a reliable way to detect this on the consumer
side. So, if the fix works, I believe adding the configuration would be a good
approach to handle this special case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]