beyond1920 commented on code in PR #9087:
URL: https://github.com/apache/hudi/pull/9087#discussion_r1249088825
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java:
##########
@@ -190,7 +202,18 @@ public static boolean needsAsyncClustering(Configuration
conf) {
* @param conf The flink configuration.
*/
public static boolean needsScheduleClustering(Configuration conf) {
- return isInsertOperation(conf) &&
conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED);
+ if (!conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) {
+ return false;
+ }
+ WriteOperationType operationType =
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+ if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
+ // Write pipelines for table with consistent bucket index would detect
whether clustering service occurs,
+ // and automatically adjust the partitioner and write function if
clustering service happens.
+ // So it could handle UPSERT and INSERT case.
Review Comment:
Update the logical of `needsScheduleClustering` because the previous version
only support `insert` for schedule clustering. I think the reason to add this
limitation is the mapping from key to file group is changing after the
clustering . But for consistent bucket index, both partitioner and write
function in the write pipelines could handle the clustering happens.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]