yihua commented on code in PR #18458:
URL: https://github.com/apache/hudi/pull/18458#discussion_r3035019439
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java:
##########
@@ -293,6 +333,24 @@ public InputBatch<Dataset<Row>>
fetchNewDataInRowFormat(Option<Checkpoint> lastC
return new InputBatch<>(
eventsDataset,
r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ } else if (skipInvalidJsonRecords) {
+ String corruptCol = "_corrupt_record";
Review Comment:
🤖 I wonder if `asNullable()` changing every field in the schema to nullable
could cause issues downstream. After filtering out the corrupt records and
dropping `_corrupt_record`, the returned DataFrame still has a fully-nullable
schema, whereas the caller's `schemaProvider` likely has non-nullable fields.
Have you checked whether downstream write-path schema validation or schema
evolution logic can handle all-nullable schemas here?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java:
##########
@@ -136,6 +146,21 @@ private JavaRDD<GenericRecord>
transformJsonToGenericRdd(InputBatch<JavaRDD<Stri
errorTableWriter.get().addErrorEvents(javaRDD.filter(x ->
x.isRight()).map(x ->
new ErrorEvent<>(x.right().get(),
ErrorEvent.ErrorReason.JSON_AVRO_DESERIALIZATION_FAILURE)));
return javaRDD.filter(x -> x.isLeft()).map(x -> x.left().get());
Review Comment:
🤖 Collecting the entire partition into an `ArrayList` before returning the
iterator forces all partition records into heap at once, which can OOM on large
partitions. A `flatMap` would handle this more gracefully — something like
`rdd.flatMap(json -> { Either<GenericRecord, String> e =
convertor.fromJsonWithError(json); if (e.isRight()) { LOG.warn(...,
e.right().get()); return Collections.emptyIterator(); } return
Collections.singletonList(e.left().get()).iterator(); })`.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java:
##########
@@ -293,6 +333,24 @@ public InputBatch<Dataset<Row>>
fetchNewDataInRowFormat(Option<Checkpoint> lastC
return new InputBatch<>(
eventsDataset,
r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ } else if (skipInvalidJsonRecords) {
+ String corruptCol = "_corrupt_record";
+ StructType dataType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema)
+ .add(new StructField(corruptCol, DataTypes.StringType, true,
Metadata.empty()));
+ StructType nullableStruct = dataType.asNullable();
+ Option<Dataset<Row>> dataset = r.getBatch().map(rdd -> {
+ Dataset<Row> df = source.getSparkSession().read()
+ .option("columnNameOfCorruptRecord", corruptCol)
+ .schema(nullableStruct)
Review Comment:
🤖 This `count()` triggers a full Spark action on `df`, but since `df` isn't
cached, when the caller eventually consumes the returned dataset, Spark will
recompute `df` from `rdd` — reading and parsing the Kafka RDD a second time.
The count is only used for a log message, so the data gets scanned twice for
pure observability. Could you either `df.cache()` before the count (and
`df.unpersist()` after the filter), or drop the count and instead log a generic
warning after the filter (e.g. by checking the filtered dataset's size lazily,
or just logging unconditionally)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]