yihua commented on code in PR #18458:
URL: https://github.com/apache/hudi/pull/18458#discussion_r3035019439


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java:
##########
@@ -293,6 +333,24 @@ public InputBatch<Dataset<Row>> 
fetchNewDataInRowFormat(Option<Checkpoint> lastC
           return new InputBatch<>(
               eventsDataset,
               r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        } else if (skipInvalidJsonRecords) {
+          String corruptCol = "_corrupt_record";

Review Comment:
   🤖 I wonder if `asNullable()` changing every field in the schema to nullable 
could cause issues downstream. After filtering out the corrupt records and 
dropping `_corrupt_record`, the returned DataFrame still has a fully-nullable 
schema, whereas the caller's `schemaProvider` likely has non-nullable fields. 
Have you checked whether downstream write-path schema validation or schema 
evolution logic can handle all-nullable schemas here?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java:
##########
@@ -136,6 +146,21 @@ private JavaRDD<GenericRecord> 
transformJsonToGenericRdd(InputBatch<JavaRDD<Stri
         errorTableWriter.get().addErrorEvents(javaRDD.filter(x -> 
x.isRight()).map(x ->
             new ErrorEvent<>(x.right().get(), 
ErrorEvent.ErrorReason.JSON_AVRO_DESERIALIZATION_FAILURE)));
         return javaRDD.filter(x -> x.isLeft()).map(x -> x.left().get());

Review Comment:
   🤖 Collecting the entire partition into an `ArrayList` before returning the 
iterator forces all partition records into heap at once, which can OOM on large 
partitions. A `flatMap` would handle this more gracefully — something like 
`rdd.flatMap(json -> { Either<GenericRecord, String> e = 
convertor.fromJsonWithError(json); if (e.isRight()) { LOG.warn(..., 
e.right().get()); return Collections.emptyIterator(); } return 
Collections.singletonList(e.left().get()).iterator(); })`.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java:
##########
@@ -293,6 +333,24 @@ public InputBatch<Dataset<Row>> 
fetchNewDataInRowFormat(Option<Checkpoint> lastC
           return new InputBatch<>(
               eventsDataset,
               r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        } else if (skipInvalidJsonRecords) {
+          String corruptCol = "_corrupt_record";
+          StructType dataType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema)
+              .add(new StructField(corruptCol, DataTypes.StringType, true, 
Metadata.empty()));
+          StructType nullableStruct = dataType.asNullable();
+          Option<Dataset<Row>> dataset = r.getBatch().map(rdd -> {
+            Dataset<Row> df = source.getSparkSession().read()
+                .option("columnNameOfCorruptRecord", corruptCol)
+                .schema(nullableStruct)

Review Comment:
   🤖 This `count()` triggers a full Spark action on `df`, but since `df` isn't 
cached, when the caller eventually consumes the returned dataset, Spark will 
recompute `df` from `rdd` — reading and parsing the Kafka RDD a second time. 
The count is only used for a log message, so the data gets scanned twice for 
pure observability. Could you either `df.cache()` before the count (and 
`df.unpersist()` after the filter), or drop the count and instead log a generic 
warning after the filter (e.g. by checking the filtered dataset's size lazily, 
or just logging unconditionally)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to