I published a json file to kafka and run the hoodie delta streamer as a
spark job with kafka as main data source. Since am using kafka 1.1 version
i had to make changes to the kafka offset generation class and also the
json kafka source class because Hudi is using  a depreciated class such as
kafkacluster.  Now i can connect to kafka and consume some values but am
facing one problem. that is :


Can not instantiate value of type [map type; class java.util.LinkedHashMap,
[simple type, class java.lang.Object] -> [simple type, class
java.lang.Object]] from String value ('{"volume": 483951, "symbol": "MSFT",
"ts": "2018-08-31 09:30:00", "month": "08", "high": 111.74, "low": 111.55,
"key": "MSFT_2018-08-31 09", "year": 2018, "date": "2018/08/31", "close":
111.72, "open": 111.55, "day": "31"}

'); no single-String constructor/factory method

 at [Source: "{\"volume\": 483951, \"symbol\": \"MSFT\", \"ts\":
\"2018-08-31 09:30:00\", \"month\": \"08\", \"high\": 111.74, \"low\":
111.55, \"key\": \"MSFT_2018-08-31 09\", \"year\": 2018, \"date\":
\"2018/08/31\", \"close\": 111.72, \"open\": 111.55, \"day\": \"31\"}\n";
line: 1, column: 1]

    at
com.uber.hoodie.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:51)

    at
com.uber.hoodie.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:86)


  at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)



Would you please give me some ideas in how to solve this problem.


The toRDD method in the JsonKafkaSource.java looks like below and am using
StringDeserializer.


private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {



   return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(),
offsetRanges, LocationStrategies.PreferConsistent()).map( s ->
s.value().toString());



  }


Thanks for your consideration and looking forward to hearing from you.

Reply via email to