This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d24220a4804 [HUDI-7111] Fix performance regression of tag when written
into simple bucket index table (#10130)
d24220a4804 is described below
commit d24220a4804ee6e04346a03a4ddbf2d2711ae301
Author: Jing Zhang <[email protected]>
AuthorDate: Tue Nov 21 09:56:07 2023 +0800
[HUDI-7111] Fix performance regression of tag when written into simple
bucket index table (#10130)
---
.../index/bucket/BucketIndexLocationMapper.java | 35 --------------
.../hudi/index/bucket/HoodieBucketIndex.java | 35 --------------
.../index/bucket/HoodieConsistentBucketIndex.java | 29 ++++++++++--
.../hudi/index/bucket/HoodieSimpleBucketIndex.java | 54 ++++++++++------------
4 files changed, 50 insertions(+), 103 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java
deleted file mode 100644
index 1ce68ef97bf..00000000000
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.index.bucket;
-
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.util.Option;
-
-import java.io.Serializable;
-
-public interface BucketIndexLocationMapper extends Serializable {
-
- /**
- * Get record location given hoodie key
- */
- Option<HoodieRecordLocation> getRecordLocation(HoodieKey key);
-
-}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
index a41aa82a3e8..3ca75d3e264 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
@@ -19,13 +19,9 @@
package org.apache.hudi.index.bucket;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
@@ -37,8 +33,6 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
-import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
-
/**
* Hash indexing mechanism.
*/
@@ -65,30 +59,6 @@ public abstract class HoodieBucketIndex extends
HoodieIndex<Object, Object> {
return writeStatuses;
}
- @Override
- public <R> HoodieData<HoodieRecord<R>> tagLocation(
- HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
- HoodieTable hoodieTable)
- throws HoodieIndexException {
- // Get bucket location mapper for the given partitions
- List<String> partitions =
records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
- LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions);
- BucketIndexLocationMapper mapper = getLocationMapper(hoodieTable,
partitions);
-
- return records.mapPartitions(iterator ->
- new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
- @Override
- protected HoodieRecord<R> computeNext() {
- // TODO maybe batch the operation to improve performance
- HoodieRecord record = inputItr.next();
- Option<HoodieRecordLocation> loc =
mapper.getRecordLocation(record.getKey());
- return tagAsNewRecordIfNeeded(record, loc);
- }
- },
- false
- );
- }
-
@Override
public boolean requiresTagging(WriteOperationType operationType) {
switch (operationType) {
@@ -127,9 +97,4 @@ public abstract class HoodieBucketIndex extends
HoodieIndex<Object, Object> {
public int getNumBuckets() {
return numBuckets;
}
-
- /**
- * Get a location mapper for the given table & partitionPath
- */
- protected abstract BucketIndexLocationMapper getLocationMapper(HoodieTable
table, List<String> partitionPath);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
index 156d14b7cf5..125bc970d65 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
@@ -19,12 +19,14 @@
package org.apache.hudi.index.bucket;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -35,10 +37,13 @@ import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
+
/**
* Consistent hashing bucket index implementation, with auto-adjust bucket
number.
* NOTE: bucket resizing is triggered by clustering.
@@ -71,11 +76,28 @@ public class HoodieConsistentBucketIndex extends
HoodieBucketIndex {
}
@Override
- protected BucketIndexLocationMapper getLocationMapper(HoodieTable table,
List<String> partitionPath) {
- return new ConsistentBucketIndexLocationMapper(table, partitionPath);
+ public <R> HoodieData<HoodieRecord<R>> tagLocation(
+ HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
+ HoodieTable hoodieTable)
+ throws HoodieIndexException {
+ // Get bucket location mapper for the given partitions
+ List<String> partitions =
records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
+ LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions);
+ ConsistentBucketIndexLocationMapper mapper = new
ConsistentBucketIndexLocationMapper(hoodieTable, partitions);
+
+ return records.mapPartitions(iterator ->
+ new LazyIterableIterator<HoodieRecord<R>,
HoodieRecord<R>>(iterator) {
+ @Override
+ protected HoodieRecord<R> computeNext() {
+ // TODO maybe batch the operation to improve performance
+ HoodieRecord record = inputItr.next();
+ Option<HoodieRecordLocation> loc =
mapper.getRecordLocation(record.getKey());
+ return tagAsNewRecordIfNeeded(record, loc);
+ }
+ }, false);
}
- public class ConsistentBucketIndexLocationMapper implements
BucketIndexLocationMapper {
+ public class ConsistentBucketIndexLocationMapper implements Serializable {
/**
* Mapping from partitionPath -> bucket identifier
@@ -90,7 +112,6 @@ public class HoodieConsistentBucketIndex extends
HoodieBucketIndex {
}));
}
- @Override
public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
String partitionPath = key.getPartitionPath();
ConsistentHashingNode node =
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
index fa2289ed87e..a38fa489a2a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -18,29 +18,29 @@
package org.apache.hudi.index.bucket;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
/**
* Simple bucket index implementation, with fixed bucket number.
*/
public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieSimpleBucketIndex.class);
-
public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
super(config);
}
@@ -79,27 +79,23 @@ public class HoodieSimpleBucketIndex extends
HoodieBucketIndex {
}
@Override
- protected BucketIndexLocationMapper getLocationMapper(HoodieTable table,
List<String> partitionPath) {
- return new SimpleBucketIndexLocationMapper(table, partitionPath);
- }
-
- public class SimpleBucketIndexLocationMapper implements
BucketIndexLocationMapper {
-
- /**
- * Mapping from partitionPath -> bucketId -> fileInfo
- */
- private final Map<String, Map<Integer, HoodieRecordLocation>>
partitionPathFileIDList;
-
- public SimpleBucketIndexLocationMapper(HoodieTable table, List<String>
partitions) {
- partitionPathFileIDList = partitions.stream()
- .collect(Collectors.toMap(p -> p, p ->
loadBucketIdToFileIdMappingForPartition(table, p)));
- }
-
- @Override
- public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
- int bucketId = getBucketID(key);
- Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping =
partitionPathFileIDList.get(key.getPartitionPath());
- return Option.ofNullable(bucketIdToFileIdMapping.getOrDefault(bucketId,
null));
- }
+ public <R> HoodieData<HoodieRecord<R>> tagLocation(
+ HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
+ HoodieTable hoodieTable)
+ throws HoodieIndexException {
+ Map<String, Map<Integer, HoodieRecordLocation>> partitionPathFileIDList =
new HashMap<>();
+ return records.mapPartitions(iterator -> new
LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
+ @Override
+ protected HoodieRecord<R> computeNext() {
+ HoodieRecord record = inputItr.next();
+ int bucketId = getBucketID(record.getKey());
+ String partitionPath = record.getPartitionPath();
+ if (!partitionPathFileIDList.containsKey(partitionPath)) {
+ partitionPathFileIDList.put(partitionPath,
loadBucketIdToFileIdMappingForPartition(hoodieTable, partitionPath));
+ }
+ HoodieRecordLocation loc =
partitionPathFileIDList.get(partitionPath).getOrDefault(bucketId, null);
+ return tagAsNewRecordIfNeeded(record, Option.ofNullable(loc));
+ }
+ }, false);
}
}