Davis-Zhang-Onehouse commented on code in PR #13489:
URL: https://github.com/apache/hudi/pull/13489#discussion_r2178682534
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1692,9 +1710,14 @@ protected Pair<HoodieData<HoodieRecord>,
List<HoodieFileGroupId>> tagRecordsWith
hoodieFileGroupIdList.addAll(fileSlices.stream().map(fileSlice -> new
HoodieFileGroupId(partitionName,
fileSlice.getFileId())).collect(Collectors.toList()));
List<FileSlice> finalFileSlices = fileSlices;
+ HoodieIndexVersion indexVersion =
existingIndexVersionOrDefault(partitionName, metadataMetaClient);
+
+ // Determine key format once per partition to avoid repeated checks
+ boolean useSecondaryKeyForHashing =
MetadataPartitionType.SECONDARY_INDEX.matchesPartitionPath(partitionName) &&
indexVersion.greaterThanOrEquals(HoodieIndexVersion.V2);
+ SerializableFunction<String, SerializableFunction<Integer, Integer>>
mappingFunction =
Review Comment:
check commits
c45c2534d2d processValuesOfTheSameShards key should be generic type
806a940f0ee processValuesOfTheSameShards value should be generic type
c25eb092eeb index lookup APIs implmentation 3
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -592,9 +915,14 @@ record -> {
return null;
})
.filter(Objects::nonNull)
- .collectAsList()
- .stream()
- .collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toSet())));
+ .collectAsList();
+
+ Map<String, Set<String>> res = new HashMap<>();
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java:
##########
@@ -26,15 +26,13 @@ public class SecondaryIndexKeyUtils {
public static String getRecordKeyFromSecondaryIndexKey(String key) {
// the payload key is in the format of "secondaryKey$primaryKey"
// we need to extract the primary key from the payload key
- checkState(key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid
key format for secondary index payload: " + key);
int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
Review Comment:
that requires all caller to do if else to detect "this is SI and it is of
index v2".
I just remove this check as
- getSecondaryIndexKeySeparatorPosition will throw if there is no unescpaed $
- the check state just check any $, not unescaped $, which is misleading
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -466,14 +419,33 @@ private void
checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, Stri
}
}
+ /**
+ * Retrieves a single record from the metadata table by its key.
+ *
+ * @param key The escaped/encoded key to look up in the metadata table
+ * @param partitionName The partition name where the record is stored
+ * @return Option containing the record if found, empty Option if not found
+ */
protected abstract Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKey(String key, String partitionName);
- protected abstract Map<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeys(List<String> keys, String partitionName);
+ /**
+ * Retrieves a map of (key -> record) from the metadata table by its keys.
+ *
+ * @param keys The to look up in the metadata table
+ * @param partitionName The partition name where the records are stored
+ * @return A map of (key -> record)
+ */
+ public abstract HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeysWithMapping(
Review Comment:
not sure if you are aware, a new API with almost same signature is introduced
HoodieData<HoodieRecordGlobalLocation>
readRecordIndexResult(HoodieData<String> recordKeys)
it has the same input yet different output. We must have some differentiator
in the name otherwise code will not compile.
I keep the old name and the name new API with a best guess.
We have APIs that take key and return file location, take secondary key and
return record key, and variants of result only as output or a mapping from key
-> output.
Too generic naming without enough differentiator makes functionality
confusing to users.
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +162,25 @@ <L, W> HoodiePairData<L, W> mapToPair(
*/
List<Pair<K, V>> collectAsList();
+ /**
+ * WARNING: It is caller's responsibility to ensure that it is of <Integer,
String> type.
+ *
+ * Repartitions the RDD based on key ranges so that:
Review Comment:
check commits
c45c2534d2d processValuesOfTheSameShards key should be generic type
806a940f0ee processValuesOfTheSameShards value should be generic type
c25eb092eeb index lookup APIs implmentation 3
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java:
##########
@@ -187,4 +198,27 @@ public int deduceNumPartitions() {
return pairRDDData.getNumPartitions();
}
}
+
+ @Override
+ public HoodiePairData<Integer, String> rangeBasedRepartitionForEachKey(
Review Comment:
check commits
c45c2534d2d processValuesOfTheSameShards key should be generic type
806a940f0ee processValuesOfTheSameShards value should be generic type
c25eb092eeb index lookup APIs implmentation 3
and you will read the design doc, run the toy example and we decide the next
step.
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -455,6 +455,47 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
+ "The index name either starts with or matches exactly can be one
of the following: "
+
StringUtils.join(Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()),
", "));
+ // Range-based repartitioning configuration for metadata table lookups
Review Comment:
check commits
c45c2534d2d processValuesOfTheSameShards key should be generic type
806a940f0ee processValuesOfTheSameShards value should be generic type
c25eb092eeb index lookup APIs implmentation 3
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +162,25 @@ <L, W> HoodiePairData<L, W> mapToPair(
*/
List<Pair<K, V>> collectAsList();
+ /**
+ * WARNING: It is caller's responsibility to ensure that it is of <Integer,
String> type.
+ *
+ * Repartitions the RDD based on key ranges so that:
+ * 1. The keys are sorted within each partition.
+ * 2. There is at most only 1 key per partition.
+ * 3. For partitions containing entries of the same key, the value ranges
are not overlapping.
+ * 4. Number of keys per partition is probably at most maxKeyPerBucket.
+ *
+ * @param keyRange The range of keys to partition across (0 to keyRange
inclusive). It must cover all possible keys
+ * in the RDD. Keys covered in range but not in the RDD are
ignored.
+ * @param sampleFraction Fraction of data to sample for determining value
range per bucket points (between 0 and 1).
+ * @param maxKeyPerBucket Maximum number of keys allowed per partition bucket
+ * @param seed Random seed for sampling
+ * @return Repartitioned RDD
Review Comment:
check commits
c45c2534d2d processValuesOfTheSameShards key should be generic type
806a940f0ee processValuesOfTheSameShards value should be generic type
c25eb092eeb index lookup APIs implmentation 3
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +162,25 @@ <L, W> HoodiePairData<L, W> mapToPair(
*/
List<Pair<K, V>> collectAsList();
+ /**
+ * WARNING: It is caller's responsibility to ensure that it is of <Integer,
String> type.
+ *
+ * Repartitions the RDD based on key ranges so that:
+ * 1. The keys are sorted within each partition.
+ * 2. There is at most only 1 key per partition.
+ * 3. For partitions containing entries of the same key, the value ranges
are not overlapping.
+ * 4. Number of keys per partition is probably at most maxKeyPerBucket.
+ *
+ * @param keyRange The range of keys to partition across (0 to keyRange
inclusive). It must cover all possible keys
+ * in the RDD. Keys covered in range but not in the RDD are
ignored.
+ * @param sampleFraction Fraction of data to sample for determining value
range per bucket points (between 0 and 1).
+ * @param maxKeyPerBucket Maximum number of keys allowed per partition bucket
+ * @param seed Random seed for sampling
+ * @return Repartitioned RDD
+ */
+ HoodiePairData<Integer, String> rangeBasedRepartitionForEachKey(
Review Comment:
check commits
c45c2534d2d processValuesOfTheSameShards key should be generic type
806a940f0ee processValuesOfTheSameShards value should be generic type
c25eb092eeb index lookup APIs implmentation 3
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -592,9 +915,14 @@ record -> {
return null;
})
.filter(Objects::nonNull)
- .collectAsList()
- .stream()
- .collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toSet())));
+ .collectAsList();
+
+ Map<String, Set<String>> res = new HashMap<>();
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]