dongkelun commented on a change in pull request #3502:
URL: https://github.com/apache/hudi/pull/3502#discussion_r697981489
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -91,12 +96,21 @@ public AvroKafkaSource(TypedProperties props,
JavaSparkContext sparkContext, Spa
return new InputBatch<>(Option.empty(),
CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
- return new InputBatch<>(Option.of(newDataRDD),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+ return new InputBatch<>(Option.of(newDataRDD),
CheckpointUtils.offsetsToStr(offsetRanges));
}
private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
- return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(),
offsetRanges,
- LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord)
obj.value());
+ if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
+ if (schemaProvider == null) {
+ throw new HoodieException("Please provide a valid schema provider
class!");
Review comment:
OK, good idea,'when use ByteArrayDeserializer ' would be better?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]