codope commented on code in PR #4958:
URL: https://github.com/apache/hudi/pull/4958#discussion_r917486731


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.SerializableSchema;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
+import org.apache.hudi.io.AppendHandleFactory;
+import org.apache.hudi.io.SingleFileHandleCreateFactory;
+import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * A partitioner for (consistent hashing) bucket index used in bulk_insert
+ */
+public class RDDConsistentBucketPartitioner<T extends HoodieRecordPayload> 
extends RDDBucketIndexPartitioner<T> {
+
+  private static final Logger LOG = 
LogManager.getLogger(RDDConsistentBucketPartitioner.class);
+
+  private final HoodieTable table;
+  private final HoodieWriteConfig config;
+  private final List<String> indexKeyFields;
+  private final Map<String, List<ConsistentHashingNode>> hashingChildrenNodes;
+  private final String[] sortColumnNames;
+  private final boolean preserveHoodieMetadata;
+  private final boolean consistentLogicalTimestampEnabled;
+
+  private List<Boolean> doAppend;
+  private List<String> fileIdPfxList;
+
+  public RDDConsistentBucketPartitioner(HoodieTable table, HoodieWriteConfig 
config, Map<String, String> strategyParams, boolean preserveHoodieMetadata) {
+    this.table = table;
+    this.config = config;
+    this.indexKeyFields = 
Arrays.asList(config.getBucketIndexHashField().split(","));
+    this.hashingChildrenNodes = new HashMap<>();
+    this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
+    this.preserveHoodieMetadata = preserveHoodieMetadata;
+
+    if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
+      sortColumnNames = 
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(",");
+    } else {
+      sortColumnNames = null;
+    }
+  }
+
+  public RDDConsistentBucketPartitioner(HoodieTable table, HoodieWriteConfig 
config) {
+    this(table, config, Collections.emptyMap(), false);
+    ValidationUtils.checkArgument(table.getIndex() instanceof 
HoodieSparkConsistentBucketIndex,
+        "RDDConsistentBucketPartitioner can only be used together with 
consistent hashing bucket index");
+    
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
+        "CoW table with bucket index doesn't support bulk_insert");
+  }
+
+  /**
+   * Repartition the records to conform the bucket index storage layout 
constraints.
+   * Specifically, partition the records based on consistent bucket index, 
which is computed
+   * using hashing metadata and records' key.
+   *
+   * @param records               Input Hoodie records
+   * @param outputSparkPartitions Not used
+   * @return partitioned records, each partition of data corresponds to a 
bucket (i.e., file group)
+   */
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records, int outputSparkPartitions) {
+    Map<String, ConsistentBucketIdentifier> partitionToIdentifier = 
initializeBucketIdentifier(records);
+    Map<String, Map<String, Integer>> partitionToFileIdPfxIdxMap = 
generateFileIdPfx(partitionToIdentifier, outputSparkPartitions);
+    return doPartition(records, new Partitioner() {
+      @Override
+      public int numPartitions() {
+        return fileIdPfxList.size();
+      }
+
+      @Override
+      public int getPartition(Object key) {
+        HoodieKey hoodieKey = (HoodieKey) key;
+        String partition = hoodieKey.getPartitionPath();
+        ConsistentHashingNode node = 
partitionToIdentifier.get(partition).getBucket(hoodieKey, indexKeyFields);
+        return 
partitionToFileIdPfxIdxMap.get(partition).get(node.getFileIdPrefix());
+      }
+    });
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+    return (sortColumnNames != null && sortColumnNames.length > 0)
+        || table.requireSortedRecords() || config.getBulkInsertSortMode() != 
BulkInsertSortMode.NONE;
+  }
+
+  @Override
+  public Option<WriteHandleFactory> getWriteHandleFactory(int idx) {
+    return doAppend.get(idx) ? Option.of(new AppendHandleFactory()) :
+        Option.of(new 
SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0), 
this.preserveHoodieMetadata));
+  }
+
+  @Override
+  public String getFileIdPfx(int partitionId) {
+    return fileIdPfxList.get(partitionId);
+  }
+
+  public void addHashingChildrenNodes(String partition, 
List<ConsistentHashingNode> nodes) {
+    ValidationUtils.checkState(nodes.stream().noneMatch(n -> n.getTag() == 
ConsistentHashingNode.NodeTag.NORMAL), "children nodes should not be tagged as 
NORMAL");
+    hashingChildrenNodes.put(partition, nodes);
+  }
+
+  /**
+   * Get (construct) the bucket identifier of the given partition
+   */
+  private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
+    HoodieSparkConsistentBucketIndex index = 
(HoodieSparkConsistentBucketIndex) table.getIndex();
+    HoodieConsistentHashingMetadata metadata = 
index.loadOrCreateMetadata(this.table, partition);
+    if (hashingChildrenNodes.containsKey(partition)) {
+      metadata.setChildrenNodes(hashingChildrenNodes.get(partition));
+    }
+    return new ConsistentBucketIdentifier(metadata);
+  }
+
+  /**
+   * Initialize hashing metadata of input records. The metadata of all related 
partitions will be loaded, and
+   * the mapping from partition to its bucket identifier is constructed.
+   */
+  private Map<String, ConsistentBucketIdentifier> 
initializeBucketIdentifier(JavaRDD<HoodieRecord<T>> records) {
+    return 
records.map(HoodieRecord::getPartitionPath).distinct().collect().stream()
+        .collect(Collectors.toMap(p -> p, p -> getBucketIdentifier(p)));
+  }
+
+  /**
+   * Initialize fileIdPfx for each data partition. Specifically, the following 
fields is constructed:
+   * - fileIdPfx: the Nth element corresponds to the Nth data partition, 
indicating its fileIdPfx
+   * - doAppend: represents if the Nth data partition should use AppendHandler
+   * - partitionToFileIdPfxIdxMap (return value): (table partition) -> 
(fileIdPfx -> idx) mapping
+   *
+   * @param parallelism Not used, the actual parallelism is determined by the 
bucket number
+   */
+  private Map<String, Map<String, Integer>> generateFileIdPfx(Map<String, 
ConsistentBucketIdentifier> partitionToIdentifier, int parallelism) {
+    Map<String, Map<String, Integer>> partitionToFileIdPfxIdxMap = new 
HashMap(partitionToIdentifier.size() * 2);
+    doAppend = new ArrayList<>();
+    fileIdPfxList = new ArrayList<>();
+    int count = 0;
+    for (ConsistentBucketIdentifier identifier : 
partitionToIdentifier.values()) {
+      Map<String, Integer> fileIdPfxToIdx = new HashMap();
+      for (ConsistentHashingNode node : identifier.getNodes()) {
+        fileIdPfxToIdx.put(node.getFileIdPrefix(), count++);
+      }
+      
fileIdPfxList.addAll(identifier.getNodes().stream().map(ConsistentHashingNode::getFileIdPrefix).collect(Collectors.toList()));
+      if (identifier.getMetadata().isFirstCreated()) {
+        // Create new file group when the hashing metadata is new (i.e., first 
write to the partition)
+        doAppend.addAll(Collections.nCopies(identifier.getNodes().size(), 
false));
+      } else {
+        // Child node requires generating a fresh new base file, rather than 
log file
+        doAppend.addAll(identifier.getNodes().stream().map(n -> n.getTag() == 
ConsistentHashingNode.NodeTag.NORMAL).collect(Collectors.toList()));
+      }
+      
partitionToFileIdPfxIdxMap.put(identifier.getMetadata().getPartitionPath(), 
fileIdPfxToIdx);
+    }
+
+    ValidationUtils.checkState(fileIdPfxList.size() == 
partitionToIdentifier.values().stream().mapToInt(ConsistentBucketIdentifier::getNumBuckets).sum(),
+        "Error state after constructing fileId & idx mapping");
+    return partitionToFileIdPfxIdxMap;
+  }
+
+  /**
+   * Execute partition using the given partitioner.
+   * If sorting is required, will do it within each data partition:
+   * - if sortColumnNames is specified, apply sort to the column (the 
behaviour is the same as `RDDCustomColumnsSortPartitioner`
+   * - if table requires sort or BulkInsertSortMode is not None, then sort by 
record key within partition.
+   * By default, do partition only.
+   *
+   * @param records
+   * @param partitioner a default partition that accepts `HoodieKey` as the 
partition key
+   * @return
+   */
+  private JavaRDD<HoodieRecord<T>> doPartition(JavaRDD<HoodieRecord<T>> 
records, Partitioner partitioner) {
+    if (sortColumnNames != null && sortColumnNames.length > 0) {
+      return doPartitionAndCustomColumnSort(records, partitioner);
+    } else if (table.requireSortedRecords() || config.getBulkInsertSortMode() 
!= BulkInsertSortMode.NONE) {
+      return doPartitionAndSortByRecordKey(records, partitioner);
+    } else {
+      // By default, do partition only
+      return records.mapToPair(record -> new Tuple2<>(record.getKey(), record))
+          .partitionBy(partitioner).map(Tuple2::_2);
+    }
+  }
+
+  /**
+   * Sort by specified column value. The behaviour is the same as 
`RDDCustomColumnsSortPartitioner`
+   *
+   * @param records
+   * @param partitioner
+   * @return
+   */
+  private JavaRDD<HoodieRecord<T>> 
doPartitionAndCustomColumnSort(JavaRDD<HoodieRecord<T>> records, Partitioner 
partitioner) {
+    final String[] sortColumns = sortColumnNames;
+    final SerializableSchema schema = new 
SerializableSchema(HoodieAvroUtils.addMetadataFields((new 
Schema.Parser().parse(config.getSchema()))));
+    Comparator<HoodieRecord<T>> comparator = (Comparator<HoodieRecord<T>> & 
Serializable) (t1, t2) -> {
+      Object obj1 = HoodieAvroUtils.getRecordColumnValues(t1, sortColumns, 
schema, consistentLogicalTimestampEnabled);
+      Object obj2 = HoodieAvroUtils.getRecordColumnValues(t2, sortColumns, 
schema, consistentLogicalTimestampEnabled);
+      return ((Comparable) obj1).compareTo(obj2);
+    };
+
+    return records.mapToPair(record -> new Tuple2<>(record, record))
+        .repartitionAndSortWithinPartitions(new Partitioner() {
+          @Override
+          public int numPartitions() {
+            return partitioner.numPartitions();
+          }
+
+          @Override
+          public int getPartition(Object key) {
+            return partitioner.getPartition(((HoodieRecord) key).getKey());
+          }
+        }, comparator).map(Tuple2::_2);
+  }
+
+  /**
+   * Sort by record key within each partition. The behaviour is the same as 
BulkInsertSortMode.PARTITION_SORT.
+   *
+   * @param records
+   * @param partitioner
+   * @return
+   */
+  private JavaRDD<HoodieRecord<T>> 
doPartitionAndSortByRecordKey(JavaRDD<HoodieRecord<T>> records, Partitioner 
partitioner) {
+    if (config.getBulkInsertSortMode() == BulkInsertSortMode.GLOBAL_SORT) {
+      LOG.warn("Consistent bucket does not support global sort mode, the sort 
will only be done within each data partition");
+    }
+
+    Comparator<HoodieKey> comparator = (Comparator<HoodieKey> & Serializable) 
(t1, t2) -> {
+      return t1.getRecordKey().compareTo(t2.getRecordKey());
+    };

Review Comment:
   ok let's keep the original version then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to