YuweiXiao commented on code in PR #7834:
URL: https://github.com/apache/hudi/pull/7834#discussion_r1096918955


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java:
##########
@@ -18,82 +18,45 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.SerializableSchema;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.ConsistentHashingNode;
 import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.FlatLists;
-import org.apache.hudi.common.util.collection.FlatLists.ComparableList;
 import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
 import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
-import org.apache.hudi.io.AppendHandleFactory;
-import org.apache.hudi.io.SingleFileHandleCreateFactory;
-import org.apache.hudi.io.WriteHandleFactory;
 import org.apache.hudi.table.HoodieTable;
-
-import org.apache.avro.Schema;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaRDD;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import scala.Tuple2;
-
 import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
 
 /**
  * A partitioner for (consistent hashing) bucket index used in bulk_insert
  */
 public class RDDConsistentBucketPartitioner<T> extends 
RDDBucketIndexPartitioner<T> {
 
-  private static final Logger LOG = 
LogManager.getLogger(RDDConsistentBucketPartitioner.class);
-
-  private final HoodieTable table;
-  private final List<String> indexKeyFields;
   private final Map<String, List<ConsistentHashingNode>> hashingChildrenNodes;
-  private final String[] sortColumnNames;
-  private final boolean preserveHoodieMetadata;
-  private final boolean consistentLogicalTimestampEnabled;
 
-  private List<Boolean> doAppend;
-  private List<String> fileIdPfxList;
-
-  public RDDConsistentBucketPartitioner(HoodieTable table, Map<String, String> 
strategyParams, boolean preserveHoodieMetadata) {
-    this.table = table;
-    this.indexKeyFields = 
Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
+  public RDDConsistentBucketPartitioner(HoodieTable table,
+                                        Map<String, String> strategyParams,
+                                        boolean preserveHoodieMetadata) {
+    super(table,
+        strategyParams.getOrDefault(PLAN_STRATEGY_SORT_COLUMNS.key(), null),
+        preserveHoodieMetadata);
     this.hashingChildrenNodes = new HashMap<>();
-    this.consistentLogicalTimestampEnabled = 
table.getConfig().isConsistentLogicalTimestampEnabled();
-    this.preserveHoodieMetadata = preserveHoodieMetadata;
-
-    if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
-      sortColumnNames = 
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(",");
-    } else {
-      sortColumnNames = null;
-    }
   }
 
   public RDDConsistentBucketPartitioner(HoodieTable table) {
     this(table, Collections.emptyMap(), false);
     ValidationUtils.checkArgument(table.getIndex() instanceof 
HoodieSparkConsistentBucketIndex,
         "RDDConsistentBucketPartitioner can only be used together with 
consistent hashing bucket index");
-    
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),

Review Comment:
   Not sure if SIMPLE bucket also has this constraint. 
   
   The reason why we constraint CONSISTENT HASHING to MOR table only, is that 
some mechanism (i.e., concurrent control) rely on the log feature of MOR table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to