This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 79e1730aea0 [HUDI-8621] Revert single file slice optimisation for
getRecordsByKeys in MDT table (#12643)
79e1730aea0 is described below
commit 79e1730aea01dab80181fb10bd22c2d3f00204e7
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Jan 16 12:07:38 2025 +0530
[HUDI-8621] Revert single file slice optimisation for getRecordsByKeys in
MDT table (#12643)
* revert single file slice optimization
* Fix test failure
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/metadata/HoodieBackedTableMetadata.java | 58 +++++++++-------------
.../hudi/functional/TestBootstrapReadBase.java | 2 +-
2 files changed, 25 insertions(+), 35 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 83c2fd20560..ca76e0d28c2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -263,23 +263,18 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
checkState(numFileSlices > 0, "Number of file slices for partition " +
partitionName + " should be > 0");
// Lookup keys from each file slice
- if (numFileSlices == 1) {
- // Optimization for a single slice for smaller metadata table partitions
- result = lookupKeysFromFileSlice(partitionName, keys,
partitionFileSlices.get(0));
- } else {
- // Parallel lookup for large sized partitions with many file slices
- // Partition the keys by the file slice which contains it
- ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keys, numFileSlices);
- result = new HashMap<>(keys.size());
- getEngineContext().setJobStatus(this.getClass().getSimpleName(),
"Reading keys from metadata table partition " + partitionName);
- getEngineContext().map(partitionedKeys, keysList -> {
- if (keysList.isEmpty()) {
- return Collections.<String,
HoodieRecord<HoodieMetadataPayload>>emptyMap();
- }
- int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0),
numFileSlices);
- return lookupKeysFromFileSlice(partitionName, keysList,
partitionFileSlices.get(shardIndex));
- }, partitionedKeys.size()).forEach(result::putAll);
- }
+ // Parallel lookup for large sized partitions with many file slices
+ // Partition the keys by the file slice which contains it
+ ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keys, numFileSlices);
+ result = new HashMap<>(keys.size());
+ getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Reading
keys from metadata table partition " + partitionName);
+ getEngineContext().map(partitionedKeys, keysList -> {
+ if (keysList.isEmpty()) {
+ return Collections.<String,
HoodieRecord<HoodieMetadataPayload>>emptyMap();
+ }
+ int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0),
numFileSlices);
+ return lookupKeysFromFileSlice(partitionName, keysList,
partitionFileSlices.get(shardIndex));
+ }, partitionedKeys.size()).forEach(result::putAll);
return result;
}
@@ -310,23 +305,18 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
checkState(numFileSlices > 0, "Number of file slices for partition " +
partitionName + " should be > 0");
// Lookup keys from each file slice
- if (numFileSlices == 1) {
- // Optimization for a single slice for smaller metadata table partitions
- result = lookupAllKeysFromFileSlice(partitionName, keys,
partitionFileSlices.get(0));
- } else {
- // Parallel lookup for large sized partitions with many file slices
- // Partition the keys by the file slice which contains it
- ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keys, numFileSlices);
- result = new HashMap<>(keys.size());
- getEngineContext().setJobStatus(this.getClass().getSimpleName(),
"Reading keys from metadata table partition " + partitionName);
- getEngineContext().map(partitionedKeys, keysList -> {
- if (keysList.isEmpty()) {
- return Collections.<String,
HoodieRecord<HoodieMetadataPayload>>emptyMap();
- }
- int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0),
numFileSlices);
- return lookupAllKeysFromFileSlice(partitionName, keysList,
partitionFileSlices.get(shardIndex));
- }, partitionedKeys.size()).forEach(map -> result.putAll((Map<String,
List<HoodieRecord<HoodieMetadataPayload>>>) map));
- }
+ // Parallel lookup for large sized partitions with many file slices
+ // Partition the keys by the file slice which contains it
+ ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keys, numFileSlices);
+ result = new HashMap<>(keys.size());
+ getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Reading
keys from metadata table partition " + partitionName);
+ getEngineContext().map(partitionedKeys, keysList -> {
+ if (keysList.isEmpty()) {
+ return Collections.<String,
HoodieRecord<HoodieMetadataPayload>>emptyMap();
+ }
+ int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0),
numFileSlices);
+ return lookupAllKeysFromFileSlice(partitionName, keysList,
partitionFileSlices.get(shardIndex));
+ }, partitionedKeys.size()).forEach(map -> result.putAll((Map<String,
List<HoodieRecord<HoodieMetadataPayload>>>) map));
return result;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
index 6103e0fd076..3250521d839 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
@@ -286,6 +286,6 @@ public abstract class TestBootstrapReadBase extends
HoodieSparkClientTestBase {
return df.withColumn("partpath" + n,
functions.md5(functions.concat_ws("," + n + ",",
df.col("partition_path"),
- functions.hash(df.col("_row_key")).mod(n))));
+ functions.hash(df.col("partition_path")).mod(n))));
}
}