This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d24220a4804 [HUDI-7111] Fix performance regression of tag when written 
into simple bucket index table (#10130)
d24220a4804 is described below

commit d24220a4804ee6e04346a03a4ddbf2d2711ae301
Author: Jing Zhang <[email protected]>
AuthorDate: Tue Nov 21 09:56:07 2023 +0800

    [HUDI-7111] Fix performance regression of tag when written into simple 
bucket index table (#10130)
---
 .../index/bucket/BucketIndexLocationMapper.java    | 35 --------------
 .../hudi/index/bucket/HoodieBucketIndex.java       | 35 --------------
 .../index/bucket/HoodieConsistentBucketIndex.java  | 29 ++++++++++--
 .../hudi/index/bucket/HoodieSimpleBucketIndex.java | 54 ++++++++++------------
 4 files changed, 50 insertions(+), 103 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java
deleted file mode 100644
index 1ce68ef97bf..00000000000
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.index.bucket;
-
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.util.Option;
-
-import java.io.Serializable;
-
-public interface BucketIndexLocationMapper extends Serializable {
-
-  /**
-   * Get record location given hoodie key
-   */
-  Option<HoodieRecordLocation> getRecordLocation(HoodieKey key);
-
-}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
index a41aa82a3e8..3ca75d3e264 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
@@ -19,13 +19,9 @@
 package org.apache.hudi.index.bucket;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.LazyIterableIterator;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndex;
@@ -37,8 +33,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
-
 /**
  * Hash indexing mechanism.
  */
@@ -65,30 +59,6 @@ public abstract class HoodieBucketIndex extends 
HoodieIndex<Object, Object> {
     return writeStatuses;
   }
 
-  @Override
-  public <R> HoodieData<HoodieRecord<R>> tagLocation(
-      HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
-      HoodieTable hoodieTable)
-      throws HoodieIndexException {
-    // Get bucket location mapper for the given partitions
-    List<String> partitions = 
records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
-    LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions);
-    BucketIndexLocationMapper mapper = getLocationMapper(hoodieTable, 
partitions);
-
-    return records.mapPartitions(iterator ->
-        new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
-          @Override
-          protected HoodieRecord<R> computeNext() {
-            // TODO maybe batch the operation to improve performance
-            HoodieRecord record = inputItr.next();
-            Option<HoodieRecordLocation> loc = 
mapper.getRecordLocation(record.getKey());
-            return tagAsNewRecordIfNeeded(record, loc);
-          }
-        },
-        false
-    );
-  }
-
   @Override
   public boolean requiresTagging(WriteOperationType operationType) {
     switch (operationType) {
@@ -127,9 +97,4 @@ public abstract class HoodieBucketIndex extends 
HoodieIndex<Object, Object> {
   public int getNumBuckets() {
     return numBuckets;
   }
-
-  /**
-   * Get a location mapper for the given table & partitionPath
-   */
-  protected abstract BucketIndexLocationMapper getLocationMapper(HoodieTable 
table, List<String> partitionPath);
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
index 156d14b7cf5..125bc970d65 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
@@ -19,12 +19,14 @@
 package org.apache.hudi.index.bucket;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.LazyIterableIterator;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.ConsistentHashingNode;
 import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -35,10 +37,13 @@ import org.apache.hudi.table.HoodieTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
+
 /**
  * Consistent hashing bucket index implementation, with auto-adjust bucket 
number.
  * NOTE: bucket resizing is triggered by clustering.
@@ -71,11 +76,28 @@ public class HoodieConsistentBucketIndex extends 
HoodieBucketIndex {
   }
 
   @Override
-  protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, 
List<String> partitionPath) {
-    return new ConsistentBucketIndexLocationMapper(table, partitionPath);
+  public <R> HoodieData<HoodieRecord<R>> tagLocation(
+      HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
+      HoodieTable hoodieTable)
+      throws HoodieIndexException {
+    // Get bucket location mapper for the given partitions
+    List<String> partitions = 
records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
+    LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions);
+    ConsistentBucketIndexLocationMapper mapper = new 
ConsistentBucketIndexLocationMapper(hoodieTable, partitions);
+
+    return records.mapPartitions(iterator ->
+            new LazyIterableIterator<HoodieRecord<R>, 
HoodieRecord<R>>(iterator) {
+      @Override
+      protected HoodieRecord<R> computeNext() {
+        // TODO maybe batch the operation to improve performance
+        HoodieRecord record = inputItr.next();
+        Option<HoodieRecordLocation> loc = 
mapper.getRecordLocation(record.getKey());
+        return tagAsNewRecordIfNeeded(record, loc);
+      }
+      }, false);
   }
 
-  public class ConsistentBucketIndexLocationMapper implements 
BucketIndexLocationMapper {
+  public class ConsistentBucketIndexLocationMapper implements Serializable {
 
     /**
      * Mapping from partitionPath -> bucket identifier
@@ -90,7 +112,6 @@ public class HoodieConsistentBucketIndex extends 
HoodieBucketIndex {
       }));
     }
 
-    @Override
     public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
       String partitionPath = key.getPartitionPath();
       ConsistentHashingNode node = 
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
index fa2289ed87e..a38fa489a2a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -18,29 +18,29 @@
 
 package org.apache.hudi.index.bucket;
 
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.table.HoodieTable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
 
 /**
  * Simple bucket index implementation, with fixed bucket number.
  */
 public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSimpleBucketIndex.class);
-
   public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
     super(config);
   }
@@ -79,27 +79,23 @@ public class HoodieSimpleBucketIndex extends 
HoodieBucketIndex {
   }
 
   @Override
-  protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, 
List<String> partitionPath) {
-    return new SimpleBucketIndexLocationMapper(table, partitionPath);
-  }
-
-  public class SimpleBucketIndexLocationMapper implements 
BucketIndexLocationMapper {
-
-    /**
-     * Mapping from partitionPath -> bucketId -> fileInfo
-     */
-    private final Map<String, Map<Integer, HoodieRecordLocation>> 
partitionPathFileIDList;
-
-    public SimpleBucketIndexLocationMapper(HoodieTable table, List<String> 
partitions) {
-      partitionPathFileIDList = partitions.stream()
-          .collect(Collectors.toMap(p -> p, p -> 
loadBucketIdToFileIdMappingForPartition(table, p)));
-    }
-
-    @Override
-    public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
-      int bucketId = getBucketID(key);
-      Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = 
partitionPathFileIDList.get(key.getPartitionPath());
-      return Option.ofNullable(bucketIdToFileIdMapping.getOrDefault(bucketId, 
null));
-    }
+  public <R> HoodieData<HoodieRecord<R>> tagLocation(
+      HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
+      HoodieTable hoodieTable)
+      throws HoodieIndexException {
+    Map<String, Map<Integer, HoodieRecordLocation>> partitionPathFileIDList = 
new HashMap<>();
+    return records.mapPartitions(iterator -> new 
LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
+      @Override
+      protected HoodieRecord<R> computeNext() {
+        HoodieRecord record = inputItr.next();
+        int bucketId = getBucketID(record.getKey());
+        String partitionPath = record.getPartitionPath();
+        if (!partitionPathFileIDList.containsKey(partitionPath)) {
+          partitionPathFileIDList.put(partitionPath, 
loadBucketIdToFileIdMappingForPartition(hoodieTable, partitionPath));
+        }
+        HoodieRecordLocation loc = 
partitionPathFileIDList.get(partitionPath).getOrDefault(bucketId, null);
+        return tagAsNewRecordIfNeeded(record, Option.ofNullable(loc));
+      }
+      }, false);
   }
 }

Reply via email to