nsivabalan commented on code in PR #10578:
URL: https://github.com/apache/hudi/pull/10578#discussion_r1632183116
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java:
##########
@@ -143,19 +141,17 @@ protected <R> HoodieData<HoodieRecord<R>>
tagLocationInternal(
protected HoodiePairData<HoodieKey, HoodieRecordLocation>
fetchRecordLocationsForAffectedPartitions(
HoodieData<HoodieKey> hoodieKeys, HoodieEngineContext context,
HoodieTable hoodieTable,
int parallelism) {
- List<String> affectedPartitionPathList =
- hoodieKeys.map(HoodieKey::getPartitionPath).distinct().collectAsList();
- List<Pair<String, HoodieBaseFile>> latestBaseFiles =
+ HoodieData<String> affectedPartitionPathList =
+ hoodieKeys.map(HoodieKey::getPartitionPath).distinct();
+ HoodieData<Pair<String, HoodieBaseFile>> latestBaseFiles =
getLatestBaseFilesForAllPartitions(affectedPartitionPathList, context,
hoodieTable);
- return fetchRecordLocations(context, hoodieTable, parallelism,
latestBaseFiles);
+ return fetchRecordLocations(hoodieTable, parallelism, latestBaseFiles);
}
protected HoodiePairData<HoodieKey, HoodieRecordLocation>
fetchRecordLocations(
- HoodieEngineContext context, HoodieTable hoodieTable, int parallelism,
- List<Pair<String, HoodieBaseFile>> baseFiles) {
- int fetchParallelism = Math.max(1, Math.min(baseFiles.size(),
parallelism));
-
- return context.parallelize(baseFiles, fetchParallelism)
+ HoodieTable hoodieTable, int parallelism,
+ HoodieData<Pair<String, HoodieBaseFile>> baseFiles) {
+ return baseFiles.repartition(Math.max(1,
Math.min(baseFiles.getNumPartitions(), parallelism)))
Review Comment:
we can introduce on a need basis. may be, so far we did not have a need.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -122,18 +121,14 @@ public static List<FileSlice>
getLatestFileSlicesForPartition(
* @param hoodieTable instance of {@link HoodieTable} of interest
* @return the list of Pairs of partition path and fileId
*/
- public static List<Pair<String, HoodieBaseFile>>
getLatestBaseFilesForAllPartitions(final List<String> partitions,
-
final HoodieEngineContext context,
-
final HoodieTable hoodieTable) {
+ public static HoodieData<Pair<String, HoodieBaseFile>>
getLatestBaseFilesForAllPartitions(final HoodieData<String> partitions,
Review Comment:
If I am not wrong, the returned HoodieData here contains the same number of
spark partitions as `HoodieData<String> partitions` which is likely number of
hudi partitions. but we return base files from this method. and prior to this
patch, the caller or somewhere down the line, we might parallelize based on
number of base files (since we would be operating on list). But now, I guess we
are resorting to use numSparkPartitions from HoodieData<Pair<String,
HoodieBaseFile>>.
So, for partitions having large no of base files, that might have perf
impact. Can we ensure we use bloom index shuffle parallelism or
inputRecords.numPartitions (passed in as an arg) so that we adaptively choose a
value relative to incoming records
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java:
##########
@@ -68,21 +68,19 @@ public <R> HoodieData<HoodieRecord<R>> tagLocation(
protected <R> HoodieData<HoodieRecord<R>> tagLocationInternal(
HoodieData<HoodieRecord<R>> inputRecords, HoodieEngineContext context,
HoodieTable hoodieTable) {
- List<Pair<String, HoodieBaseFile>> latestBaseFiles =
getAllBaseFilesInTable(context, hoodieTable);
+ HoodieData<Pair<String, HoodieBaseFile>> latestBaseFiles =
getAllBaseFilesInTable(context, hoodieTable);
HoodiePairData<String, HoodieRecordGlobalLocation> allKeysAndLocations =
- fetchRecordGlobalLocations(context, hoodieTable,
config.getGlobalSimpleIndexParallelism(), latestBaseFiles);
+ fetchRecordGlobalLocations(hoodieTable,
config.getGlobalSimpleIndexParallelism(), latestBaseFiles);
boolean mayContainDuplicateLookup =
hoodieTable.getMetaClient().getTableType() == MERGE_ON_READ;
boolean shouldUpdatePartitionPath =
config.getGlobalSimpleIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
return tagGlobalLocationBackToRecords(inputRecords, allKeysAndLocations,
mayContainDuplicateLookup, shouldUpdatePartitionPath, config,
hoodieTable);
}
private HoodiePairData<String, HoodieRecordGlobalLocation>
fetchRecordGlobalLocations(
- HoodieEngineContext context, HoodieTable hoodieTable, int parallelism,
- List<Pair<String, HoodieBaseFile>> baseFiles) {
- int fetchParallelism = Math.max(1, Math.min(baseFiles.size(),
parallelism));
-
- return context.parallelize(baseFiles, fetchParallelism)
+ HoodieTable hoodieTable, int parallelism,
+ HoodieData<Pair<String, HoodieBaseFile>> baseFiles) {
+ return baseFiles.repartition(Math.max(1,
Math.min(baseFiles.getNumPartitions(), parallelism)))
Review Comment:
baseFiles.getNumPartitions() likely is equivalent to number of physical hudi
partitions.
with partitions having large no of file groups, this might have perf impact.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]