This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ef5b9ca3777 Revert "[HUDI-8621] Revert single file slice optimisation
for getRecordsByKeys in MDT table (#12643)" (#12700)
ef5b9ca3777 is described below
commit ef5b9ca3777775fae29c04c06cbb743183e9eef3
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jan 24 23:10:17 2025 +0530
Revert "[HUDI-8621] Revert single file slice optimisation for
getRecordsByKeys in MDT table (#12643)" (#12700)
This reverts commit 79e1730aea01dab80181fb10bd22c2d3f00204e7.
---
.../hudi/metadata/HoodieBackedTableMetadata.java | 58 +++++++++++++---------
.../hudi/functional/TestBootstrapReadBase.java | 2 +-
2 files changed, 35 insertions(+), 25 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index ca76e0d28c2..83c2fd20560 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -263,18 +263,23 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
checkState(numFileSlices > 0, "Number of file slices for partition " +
partitionName + " should be > 0");
// Lookup keys from each file slice
- // Parallel lookup for large sized partitions with many file slices
- // Partition the keys by the file slice which contains it
- ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keys, numFileSlices);
- result = new HashMap<>(keys.size());
- getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Reading
keys from metadata table partition " + partitionName);
- getEngineContext().map(partitionedKeys, keysList -> {
- if (keysList.isEmpty()) {
- return Collections.<String,
HoodieRecord<HoodieMetadataPayload>>emptyMap();
- }
- int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0),
numFileSlices);
- return lookupKeysFromFileSlice(partitionName, keysList,
partitionFileSlices.get(shardIndex));
- }, partitionedKeys.size()).forEach(result::putAll);
+ if (numFileSlices == 1) {
+ // Optimization for a single slice for smaller metadata table partitions
+ result = lookupKeysFromFileSlice(partitionName, keys,
partitionFileSlices.get(0));
+ } else {
+ // Parallel lookup for large sized partitions with many file slices
+ // Partition the keys by the file slice which contains it
+ ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keys, numFileSlices);
+ result = new HashMap<>(keys.size());
+ getEngineContext().setJobStatus(this.getClass().getSimpleName(),
"Reading keys from metadata table partition " + partitionName);
+ getEngineContext().map(partitionedKeys, keysList -> {
+ if (keysList.isEmpty()) {
+ return Collections.<String,
HoodieRecord<HoodieMetadataPayload>>emptyMap();
+ }
+ int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0),
numFileSlices);
+ return lookupKeysFromFileSlice(partitionName, keysList,
partitionFileSlices.get(shardIndex));
+ }, partitionedKeys.size()).forEach(result::putAll);
+ }
return result;
}
@@ -305,18 +310,23 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
checkState(numFileSlices > 0, "Number of file slices for partition " +
partitionName + " should be > 0");
// Lookup keys from each file slice
- // Parallel lookup for large sized partitions with many file slices
- // Partition the keys by the file slice which contains it
- ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keys, numFileSlices);
- result = new HashMap<>(keys.size());
- getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Reading
keys from metadata table partition " + partitionName);
- getEngineContext().map(partitionedKeys, keysList -> {
- if (keysList.isEmpty()) {
- return Collections.<String,
HoodieRecord<HoodieMetadataPayload>>emptyMap();
- }
- int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0),
numFileSlices);
- return lookupAllKeysFromFileSlice(partitionName, keysList,
partitionFileSlices.get(shardIndex));
- }, partitionedKeys.size()).forEach(map -> result.putAll((Map<String,
List<HoodieRecord<HoodieMetadataPayload>>>) map));
+ if (numFileSlices == 1) {
+ // Optimization for a single slice for smaller metadata table partitions
+ result = lookupAllKeysFromFileSlice(partitionName, keys,
partitionFileSlices.get(0));
+ } else {
+ // Parallel lookup for large sized partitions with many file slices
+ // Partition the keys by the file slice which contains it
+ ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keys, numFileSlices);
+ result = new HashMap<>(keys.size());
+ getEngineContext().setJobStatus(this.getClass().getSimpleName(),
"Reading keys from metadata table partition " + partitionName);
+ getEngineContext().map(partitionedKeys, keysList -> {
+ if (keysList.isEmpty()) {
+ return Collections.<String,
HoodieRecord<HoodieMetadataPayload>>emptyMap();
+ }
+ int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0),
numFileSlices);
+ return lookupAllKeysFromFileSlice(partitionName, keysList,
partitionFileSlices.get(shardIndex));
+ }, partitionedKeys.size()).forEach(map -> result.putAll((Map<String,
List<HoodieRecord<HoodieMetadataPayload>>>) map));
+ }
return result;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
index 3250521d839..6103e0fd076 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
@@ -286,6 +286,6 @@ public abstract class TestBootstrapReadBase extends
HoodieSparkClientTestBase {
return df.withColumn("partpath" + n,
functions.md5(functions.concat_ws("," + n + ",",
df.col("partition_path"),
- functions.hash(df.col("partition_path")).mod(n))));
+ functions.hash(df.col("_row_key")).mod(n))));
}
}