nsivabalan commented on code in PR #9852:
URL: https://github.com/apache/hudi/pull/9852#discussion_r1366227106
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -584,16 +573,36 @@ private Pair<SchemaProvider, Pair<String,
JavaRDD<HoodieRecord>>> fetchFromSourc
(SchemaProvider) new DelegatingSchemaProvider(props,
hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(),
new SimpleSchemaProvider(hoodieSparkContext.jsc(),
targetSchema, props)))
.orElse(dataAndCheckpoint.getSchemaProvider());
+ }
+
+ // Short circuit for bulk insert to gain performance benefits of row
writing
+ if (recordType == HoodieRecordType.SPARK) {
+ rowDatasetOptional = transformed;
+ noNewData = (!rowDatasetOptional.isPresent()) ||
(rowDatasetOptional.get().isEmpty());
+ } else {
// Rewrite transformed records into the expected target schema
- avroRDDOptional = transformed.map(t -> getTransformedRDD(t,
reconcileSchema, schemaProvider.getTargetSchema()));
+ avroRDDOptional = transformed.map(
+ rowDataset -> getTransformedRDD(rowDataset, reconcileSchema,
schemaProvider.getTargetSchema()));
+ noNewData = (!avroRDDOptional.isPresent()) ||
(avroRDDOptional.get().isEmpty());
}
} else {
- // Pull the data from the source & prepare the write
- InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
- formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr,
cfg.sourceLimit);
- avroRDDOptional = dataAndCheckpoint.getBatch();
- checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
- schemaProvider = dataAndCheckpoint.getSchemaProvider();
+ if (recordType == HoodieRecordType.SPARK) {
+ // Pull the data from the source in row format for bulk inserts &
prepare the write
Review Comment:
can you file a jira to introduce proper abstractions on this. we should
unify/generify the two diff types ( JavaRdd<GenericRecord> and Dataset<Row>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -553,17 +546,13 @@ private Pair<SchemaProvider, Pair<String,
JavaRDD<HoodieRecord>>> fetchFromSourc
rowDataset -> {
Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs =
HoodieSparkUtils.safeCreateRDD(rowDataset,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
-
Option.of(this.userProvidedSchemaProvider.getTargetSchema()));
+ Option.of(schemaProvider.getTargetSchema()));
Review Comment:
we have two variables.
schemaProvider and
userProvidedSchemaProvider
Did you change it intentionally ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]