pratyakshsharma commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r422366002
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
##########
@@ -64,19 +64,22 @@ public SourceFormatAdapter(Source source) {
}
case ROW: {
InputBatch<Dataset<Row>> r = ((RowSource)
source).fetchNext(lastCkptStr, sourceLimit);
- return new InputBatch<>(Option.ofNullable(r.getBatch().map(
- rdd -> (
- (r.getSchemaProvider() instanceof FilebasedSchemaProvider)
- // If the source schema is specified through Avro schema,
- // pass in the schema for the Row-to-Avro conversion
- // to avoid nullability mismatch between Avro schema and
Row schema
- ? AvroConversionUtils.createRdd(
- rdd, r.getSchemaProvider().getSourceSchema(),
- HOODIE_RECORD_STRUCT_NAME,
HOODIE_RECORD_NAMESPACE).toJavaRDD()
- : AvroConversionUtils.createRdd(
- rdd, HOODIE_RECORD_STRUCT_NAME,
HOODIE_RECORD_NAMESPACE).toJavaRDD()
- ))
- .orElse(null)), r.getCheckpointForNextBatch(),
r.getSchemaProvider());
+ if (r.getBatch().isPresent()) {
Review comment:
> I'm referring tofetchNewDataInAvroFormat() being called when
transformer is not set in case of ROW type source and no new data
Yeah I got that. I was actually thinking on the following lines - Avro
relies on schemas. So the original thinking behind writing the function
`fetchNewDataInAvroFormat()` might have been that schema provider should be pre
specified.
> it conflicts with ROW source getting implicit schema. It seems like a
usability issue.
But the above point is valid. I guess you are correct :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]