danny0405 commented on code in PR #5997:
URL: https://github.com/apache/hudi/pull/5997#discussion_r914365747
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java:
##########
@@ -330,17 +330,23 @@ public static DataStream<Object>
hoodieStreamWrite(Configuration conf, int defau
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} else {
WriteOperatorFactory<HoodieRecord> operatorFactory =
StreamWriteOperator.getFactory(conf);
- return dataStream
- // Key-by record key, to avoid multiple subtasks write to a bucket
at the same time
- .keyBy(HoodieRecord::getRecordKey)
- .transform(
- "bucket_assigner",
- TypeInformation.of(HoodieRecord.class),
- new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
- .uid("uid_bucket_assigner_" +
conf.getString(FlinkOptions.TABLE_NAME))
-
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
- // shuffle by fileId(bucket id)
- .keyBy(record -> record.getCurrentLocation().getFileId())
+
+ DataStream<HoodieRecord> bucketDataStream = dataStream
+ // Key-by record key, to avoid multiple subtasks write to a
bucket at the same time
+ .keyBy(HoodieRecord::getRecordKey)
+ .transform(
+ "bucket_assigner",
+ TypeInformation.of(HoodieRecord.class),
+ new KeyedProcessOperator<>(new
BucketAssignFunction<>(conf)))
+ .uid("uid_bucket_assigner_" +
conf.getString(FlinkOptions.TABLE_NAME))
+
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism));
+
+ bucketDataStream =
conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism) ==
+ conf.getInteger(FlinkOptions.WRITE_TASKS) ? bucketDataStream :
bucketDataStream
+ // shuffle by fileId(bucket id)
+ .keyBy(record -> record.getCurrentLocation().getFileId());
Review Comment:
But we should figure out more random algorithm here instead of just fix the
case when bucket assign and write task have the same parallelism, how about the
bucket assign has parallelism 4 and write task has parallelism 6 here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]