alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r971286665


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -148,29 +184,34 @@ protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> 
getPartitioner(Map<Str
       switch (layoutOptStrategy) {
         case ZORDER:
         case HILBERT:
-          return new RDDSpatialCurveSortPartitioner(
+          return isRowPartitioner
+              ? new RowSpatialCurveSortPartitioner(getWriteConfig())
+              : new RDDSpatialCurveSortPartitioner(
               (HoodieSparkEngineContext) getEngineContext(),
               orderByColumns,
               layoutOptStrategy,
               getWriteConfig().getLayoutOptimizationCurveBuildMethod(),
               HoodieAvroUtils.addMetadataFields(schema));
         case LINEAR:
-          return new RDDCustomColumnsSortPartitioner(orderByColumns, 
HoodieAvroUtils.addMetadataFields(schema),
+          return isRowPartitioner
+              ? new RowCustomColumnsSortPartitioner(orderByColumns)
+              : new RDDCustomColumnsSortPartitioner(orderByColumns, 
HoodieAvroUtils.addMetadataFields(schema),
               getWriteConfig().isConsistentLogicalTimestampEnabled());
         default:
           throw new UnsupportedOperationException(String.format("Layout 
optimization strategy '%s' is not supported", layoutOptStrategy));
       }
-    
}).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
+    }).orElse(isRowPartitioner ? 
BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig().getBulkInsertSortMode())
 :
+        
BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
   }
 
   /**
-   * Submit job to execute clustering for the group.
+   * Submit job to execute clustering for the group with RDD APIs.
    */
-  private CompletableFuture<HoodieData<WriteStatus>> 
runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, 
String> strategyParams,
-                                                                             
boolean preserveHoodieMetadata, String instantTime) {
+  private CompletableFuture<HoodieData<WriteStatus>> 
runClusteringForGroupAsyncWithRDD(HoodieClusteringGroup clusteringGroup, 
Map<String, String> strategyParams,

Review Comment:
   RDD suffix is misleading though (Row based ones are also relying on RDD 
internally)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to