This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 76dbdaa65e3 [HUDI-8622] Fix performance regression of tag when written
into consistent bucket index table (#12389)
76dbdaa65e3 is described below
commit 76dbdaa65e3e2865d250f862ff23ab0039679e87
Author: TheR1sing3un <[email protected]>
AuthorDate: Mon Dec 16 15:22:00 2024 +0800
[HUDI-8622] Fix performance regression of tag when written into consistent
bucket index table (#12389)
* fix: fix performance regression of tag when written into consistent
bucket index table
1. fix performance regression of tag when written into consistent bucket
index table
2. unified the tag logic of the bucket index and lazily loaded the required
mapper information
---------
Signed-off-by: TheR1sing3un <[email protected]>
Co-authored-by: danny0405 <[email protected]>
---
.../index/bucket/ConsistentBucketIndexUtils.java | 23 ++++++--
.../hudi/index/bucket/HoodieBucketIndex.java | 61 ++++++++++++++++++++++
.../index/bucket/HoodieConsistentBucketIndex.java | 57 ++++++--------------
.../hudi/index/bucket/HoodieSimpleBucketIndex.java | 41 ++++++---------
.../org/apache/hudi/storage/HoodieStorage.java | 21 +++++++-
5 files changed, 131 insertions(+), 72 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index e5014bb8f13..2f294a58c68 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -108,11 +108,11 @@ public class ConsistentBucketIndexUtils {
try {
Predicate<StoragePathInfo> hashingMetaCommitFilePredicate = pathInfo -> {
String filename = pathInfo.getPath().getName();
- return
filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX);
+ return
filename.endsWith(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX);
};
Predicate<StoragePathInfo> hashingMetadataFilePredicate = pathInfo -> {
String filename = pathInfo.getPath().getName();
- return filename.contains(HASHING_METADATA_FILE_SUFFIX);
+ return filename.endsWith(HASHING_METADATA_FILE_SUFFIX);
};
final List<StoragePathInfo> metaFiles =
metaClient.getStorage().listDirectEntries(metadataPath);
final TreeSet<String> commitMetaTss =
metaFiles.stream().filter(hashingMetaCommitFilePredicate)
@@ -185,10 +185,23 @@ public class ConsistentBucketIndexUtils {
table.getMetaClient().getHashingMetadataPath(),
metadata.getPartitionPath());
StoragePath fullPath = new StoragePath(dir, metadata.getFilename());
try {
- storage.createImmutableFileInPath(fullPath,
Option.of(metadata.toBytes()));
+ if (storage.exists(fullPath)) {
+ // the file has been created by other tasks
+ return true;
+ }
+ storage.createImmutableFileInPath(fullPath,
Option.of(metadata.toBytes()), true);
return true;
- } catch (IOException e) {
- LOG.warn("Failed to update bucket metadata: " + metadata, e);
+ } catch (IOException e1) {
+ // ignore the exception and check the file existence
+ try {
+ if (storage.exists(fullPath)) {
+ return true;
+ }
+ } catch (IOException e2) {
+ // ignore the exception and return false
+ LOG.warn("Failed to check the existence of bucket metadata file: " +
fullPath, e2);
+ }
+ LOG.warn("Failed to update bucket metadata: " + metadata, e1);
return false;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
index 3ca75d3e264..b00770cfe4f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
@@ -19,9 +19,13 @@
package org.apache.hudi.index.bucket;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
@@ -30,8 +34,14 @@ import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
/**
* Hash indexing mechanism.
@@ -59,6 +69,57 @@ public abstract class HoodieBucketIndex extends
HoodieIndex<Object, Object> {
return writeStatuses;
}
+ @Override
+ public <R> HoodieData<HoodieRecord<R>> tagLocation(
+ HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
+ HoodieTable hoodieTable)
+ throws HoodieIndexException {
+
+ // Get bucket location mapper
+ GlobalIndexLocationFunction locFunc = new
GlobalIndexLocationFunction(hoodieTable);
+
+ return records.mapPartitions(iterator ->
+ new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
+ @Override
+ protected HoodieRecord<R> computeNext() {
+ // TODO maybe batch the operation to improve performance
+ HoodieRecord record = inputItr.next();
+ Option<HoodieRecordLocation> loc = locFunc.apply(record);
+ return tagAsNewRecordIfNeeded(record, loc);
+ }
+ }, false
+ );
+ }
+
+ /**
+ * Global lazy-loading index location function. The index location function
can be applied to a hoodie record to get its location under the partition.
+ * The per-partition index location functions are cached for better
performance.
+ */
+ class GlobalIndexLocationFunction implements Function<HoodieRecord,
Option<HoodieRecordLocation>>, Serializable {
+
+ private final HoodieTable table;
+ private final Map<String/*partition path*/, Function<HoodieRecord,
Option<HoodieRecordLocation>>/*location func per partition*/>
partitionToIndexFunctionMap;
+
+ public GlobalIndexLocationFunction(HoodieTable table) {
+ this.table = table;
+ this.partitionToIndexFunctionMap = new HashMap<>();
+ }
+
+ @Override
+ public Option<HoodieRecordLocation> apply(HoodieRecord record) {
+ String partitionPath = record.getPartitionPath();
+ if (!partitionToIndexFunctionMap.containsKey(partitionPath)) {
+ partitionToIndexFunctionMap.put(partitionPath,
getIndexLocationFunctionForPartition(table, partitionPath));
+ }
+ return partitionToIndexFunctionMap.get(partitionPath).apply(record);
+ }
+ }
+
+ /**
+ * Returns the index location function for the give partition path {@code
partitionPath}.
+ */
+ protected abstract Function<HoodieRecord, Option<HoodieRecordLocation>>
getIndexLocationFunctionForPartition(HoodieTable table, String partitionPath);
+
@Override
public boolean requiresTagging(WriteOperationType operationType) {
switch (operationType) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
index 125bc970d65..f43f2ebb275 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
@@ -19,7 +19,6 @@
package org.apache.hudi.index.bucket;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -37,12 +36,7 @@ import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
+import java.util.function.Function;
/**
* Consistent hashing bucket index implementation, with auto-adjust bucket
number.
@@ -76,45 +70,24 @@ public class HoodieConsistentBucketIndex extends
HoodieBucketIndex {
}
@Override
- public <R> HoodieData<HoodieRecord<R>> tagLocation(
- HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
- HoodieTable hoodieTable)
- throws HoodieIndexException {
- // Get bucket location mapper for the given partitions
- List<String> partitions =
records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
- LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions);
- ConsistentBucketIndexLocationMapper mapper = new
ConsistentBucketIndexLocationMapper(hoodieTable, partitions);
-
- return records.mapPartitions(iterator ->
- new LazyIterableIterator<HoodieRecord<R>,
HoodieRecord<R>>(iterator) {
- @Override
- protected HoodieRecord<R> computeNext() {
- // TODO maybe batch the operation to improve performance
- HoodieRecord record = inputItr.next();
- Option<HoodieRecordLocation> loc =
mapper.getRecordLocation(record.getKey());
- return tagAsNewRecordIfNeeded(record, loc);
- }
- }, false);
+ protected Function<HoodieRecord, Option<HoodieRecordLocation>>
getIndexLocationFunctionForPartition(HoodieTable table, String partitionPath) {
+ return new ConsistentBucketIndexLocationFunction(table, partitionPath);
}
- public class ConsistentBucketIndexLocationMapper implements Serializable {
-
- /**
- * Mapping from partitionPath -> bucket identifier
- */
- private final Map<String, ConsistentBucketIdentifier>
partitionToIdentifier;
+ private class ConsistentBucketIndexLocationFunction implements
Function<HoodieRecord, Option<HoodieRecordLocation>> {
+ private final String partitionPath;
+ private final ConsistentBucketIdentifier identifier;
- public ConsistentBucketIndexLocationMapper(HoodieTable table, List<String>
partitions) {
- // TODO maybe parallel
- partitionToIdentifier = partitions.stream().collect(Collectors.toMap(p
-> p, p -> {
- HoodieConsistentHashingMetadata metadata =
ConsistentBucketIndexUtils.loadOrCreateMetadata(table, p, getNumBuckets());
- return new ConsistentBucketIdentifier(metadata);
- }));
+ public ConsistentBucketIndexLocationFunction(HoodieTable table, String
partition) {
+ this.partitionPath = partition;
+ HoodieConsistentHashingMetadata metadata =
ConsistentBucketIndexUtils.loadOrCreateMetadata(table, partition,
getNumBuckets());
+ this.identifier = new ConsistentBucketIdentifier(metadata);
}
- public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
- String partitionPath = key.getPartitionPath();
- ConsistentHashingNode node =
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+ @Override
+ public Option<HoodieRecordLocation> apply(HoodieRecord record) {
+ HoodieKey recordKey = record.getKey();
+ ConsistentHashingNode node = identifier.getBucket(recordKey,
indexKeyFields);
if (!StringUtils.isNullOrEmpty(node.getFileIdPrefix())) {
// Dynamic Bucket Index doesn't need the instant time of the latest
file group.
// We add suffix 0 here to the file uuid, following the naming
convention, i.e., fileId = [uuid]_[numWrites]
@@ -122,7 +95,7 @@ public class HoodieConsistentBucketIndex extends
HoodieBucketIndex {
}
LOG.error("Consistent hashing node has no file group, partition: {},
meta: {}, record_key: {}",
- partitionPath,
partitionToIdentifier.get(partitionPath).getMetadata().getFilename(),
key.toString());
+ partitionPath, identifier.getMetadata().getFilename(), recordKey);
throw new HoodieIndexException("Failed to getBucket as hashing node has
no file group");
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
index d72eca3c409..0f184b4a593 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -18,9 +18,6 @@
package org.apache.hudi.index.bucket;
-import org.apache.hudi.client.utils.LazyIterableIterator;
-import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -31,7 +28,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
@@ -47,11 +43,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
-
/**
* Simple bucket index implementation, with fixed bucket number.
*/
@@ -151,23 +146,21 @@ public class HoodieSimpleBucketIndex extends
HoodieBucketIndex {
}
@Override
- public <R> HoodieData<HoodieRecord<R>> tagLocation(
- HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
- HoodieTable hoodieTable)
- throws HoodieIndexException {
- Map<String, Map<Integer, HoodieRecordLocation>> partitionPathFileIDList =
new HashMap<>();
- return records.mapPartitions(iterator -> new
LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
- @Override
- protected HoodieRecord<R> computeNext() {
- HoodieRecord record = inputItr.next();
- int bucketId = getBucketID(record.getKey());
- String partitionPath = record.getPartitionPath();
- if (!partitionPathFileIDList.containsKey(partitionPath)) {
- partitionPathFileIDList.put(partitionPath,
loadBucketIdToFileIdMappingForPartition(hoodieTable, partitionPath));
- }
- HoodieRecordLocation loc =
partitionPathFileIDList.get(partitionPath).getOrDefault(bucketId, null);
- return tagAsNewRecordIfNeeded(record, Option.ofNullable(loc));
- }
- }, false);
+ protected Function<HoodieRecord, Option<HoodieRecordLocation>>
getIndexLocationFunctionForPartition(HoodieTable table, String partitionPath) {
+ return new SimpleBucketIndexLocationFunction(table, partitionPath);
+ }
+
+ private class SimpleBucketIndexLocationFunction implements
Function<HoodieRecord, Option<HoodieRecordLocation>> {
+ private final Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping;
+
+ public SimpleBucketIndexLocationFunction(HoodieTable table, String
partitionPath) {
+ this.bucketIdToFileIdMapping =
loadBucketIdToFileIdMappingForPartition(table, partitionPath);
+ }
+
+ @Override
+ public Option<HoodieRecordLocation> apply(HoodieRecord record) {
+ int bucketId = getBucketID(record.getKey());
+ return Option.ofNullable(bucketIdToFileIdMapping.get(bucketId));
+ }
}
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index 8d5cbb38025..12187941c58 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -309,10 +309,29 @@ public abstract class HoodieStorage implements Closeable {
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public final void createImmutableFileInPath(StoragePath path,
Option<byte[]> content) throws
HoodieIOException {
+ createImmutableFileInPath(path, content, needCreateTempFile());
+ }
+
+ /**
+ * Creates a new file with overwrite set to false. This ensures files are
created
+ * only once and never rewritten, also, here we take care if the content is
not
+ * empty, will first write the content to a temp file if
{needCreateTempFile} is
+ * true, and then rename it back after the content is written.
+ *
+ * <p>CAUTION: if this method is invoked in multi-threads for concurrent
write of the same file,
+ * an existence check of the file is recommended.
+ *
+ * @param path File path.
+ * @param content Content to be stored.
+ * @param needTempFile Whether to create auxiliary temp file.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public final void createImmutableFileInPath(StoragePath path,
+ Option<byte[]> content,
+ boolean needTempFile) throws
HoodieIOException {
OutputStream fsout = null;
StoragePath tmpPath = null;
- boolean needTempFile = needCreateTempFile();
try {
if (!content.isPresent()) {
fsout = create(path, false);