Erik LaBianca created HUDI-6485:
-----------------------------------

             Summary: Invalid json input record crashes HoodieDeltaStreamer
                 Key: HUDI-6485
                 URL: https://issues.apache.org/jira/browse/HUDI-6485
             Project: Apache Hudi
          Issue Type: Bug
          Components: deltastreamer
    Affects Versions: 0.12.3, 0.13.1
            Reporter: Erik LaBianca


With a HoodieDeltaStreamer configured to decode json records, a non-json record 
causes a failure with no way to skip it or move past. ClickHouse KafkaEngine 
solves this with an optional "skip invalid records" mode, which seems like it 
would be helpful here as well.

It's not clear to me if `--commit-on-errors` is supposed to solve for this, but 
it appears not to in practice.

On failure, the following stack trace is thrown and the process exits:

{{23/07/03 14:49:23 INFO DAGScheduler: ShuffleMapStage 2 (mapToPair at 
HoodieJavaRDD.java:135) failed in 2144.592 s due to Job aborted due to stage 
failure: Task 14 in stage 2.0 failed 1 times, most recent f}}
{{ailure: Lost task 14.0 in stage 2.0 (TID 16) (executor driver): 
org.apache.hudi.exception.HoodieIOException: Unrecognized token 'testvalue': 
was expecting (JSON String, Number, Array, Obje}}
{{ct or token 'null', 'true' or 'false')                                        
                                                                                
                                               }}
{{ at [Source: (String)"testvalue"; line: 1, column: 10]                        
                                                                                
                                               }}
{{        at 
org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:96)
                                                                                
                                 }}
{{        at 
org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:87)
                                                                                
                           }}
{{        at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
                                                                                
                         }}
{{        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)        
                                                                                
                                               }}
{{        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)        
                                                                                
                                               }}
{{        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)        
                                                                                
                                               }}
{{        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
                                                                                
                               }}
{{        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
                                                                                
                                 }}
{{        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)}}
{{        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)}}
{{        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)}}
{{        at org.apache.spark.scheduler.Task.run(Task.scala:136)}}
{{        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)}}
{{        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)}}
{{        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)}}
{{        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)}}
{{        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)}}
{{        at java.base/java.lang.Thread.run(Thread.java:829)}}
{{Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 
'testvalue': was expecting (JSON String, Number, Array, Object or token 'null', 
'true' or 'false')}}
{{ at [Source: (String)"testvalue"; line: 1, column: 10]}}
{{        at 
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)}}
{{        at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)}}
{{        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2961)}}
{{        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2939)}}
{{        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._matchToken(ReaderBasedJsonParser.java:2713)}}
{{        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._matchTrue(ReaderBasedJsonParser.java:2667)}}
{{        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:767)}}
{{        at 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4761)}}
{{        at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4667)}}
{{        at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)}}
{{        at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)}}
{{        at 
org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:93)}}
{{        ... 17 more}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to