YuweiXiao commented on code in PR #7834:
URL: https://github.com/apache/hudi/pull/7834#discussion_r1096475783
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java:
##########
@@ -44,7 +44,7 @@ public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
super(config);
}
- private Map<Integer, HoodieRecordLocation>
loadPartitionBucketIdFileIdMapping(
+ public Map<Integer, HoodieRecordLocation> loadPartitionBucketIdFileIdMapping(
Review Comment:
Maybe keep it private.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java:
##########
@@ -18,82 +18,45 @@
package org.apache.hudi.execution.bulkinsert;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.SerializableSchema;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.FlatLists;
-import org.apache.hudi.common.util.collection.FlatLists.ComparableList;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
-import org.apache.hudi.io.AppendHandleFactory;
-import org.apache.hudi.io.SingleFileHandleCreateFactory;
-import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
-
-import org.apache.avro.Schema;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import scala.Tuple2;
-
import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
/**
* A partitioner for (consistent hashing) bucket index used in bulk_insert
*/
public class RDDConsistentBucketPartitioner<T> extends
RDDBucketIndexPartitioner<T> {
- private static final Logger LOG =
LogManager.getLogger(RDDConsistentBucketPartitioner.class);
-
- private final HoodieTable table;
- private final List<String> indexKeyFields;
private final Map<String, List<ConsistentHashingNode>> hashingChildrenNodes;
- private final String[] sortColumnNames;
- private final boolean preserveHoodieMetadata;
- private final boolean consistentLogicalTimestampEnabled;
- private List<Boolean> doAppend;
- private List<String> fileIdPfxList;
-
- public RDDConsistentBucketPartitioner(HoodieTable table, Map<String, String>
strategyParams, boolean preserveHoodieMetadata) {
- this.table = table;
- this.indexKeyFields =
Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
+ public RDDConsistentBucketPartitioner(HoodieTable table,
+ Map<String, String> strategyParams,
+ boolean preserveHoodieMetadata) {
+ super(table,
+ strategyParams.getOrDefault(PLAN_STRATEGY_SORT_COLUMNS.key(), null),
+ preserveHoodieMetadata);
this.hashingChildrenNodes = new HashMap<>();
- this.consistentLogicalTimestampEnabled =
table.getConfig().isConsistentLogicalTimestampEnabled();
- this.preserveHoodieMetadata = preserveHoodieMetadata;
-
- if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
- sortColumnNames =
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(",");
- } else {
- sortColumnNames = null;
- }
}
public RDDConsistentBucketPartitioner(HoodieTable table) {
this(table, Collections.emptyMap(), false);
ValidationUtils.checkArgument(table.getIndex() instanceof
HoodieSparkConsistentBucketIndex,
"RDDConsistentBucketPartitioner can only be used together with
consistent hashing bucket index");
-
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
Review Comment:
We could keep this validation, as Consistent Hashing only supports MOR table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]