nsivabalan commented on code in PR #10578:
URL: https://github.com/apache/hudi/pull/10578#discussion_r1590676144


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java:
##########
@@ -91,12 +89,12 @@ private HoodiePairData<String, HoodieRecordGlobalLocation> 
fetchRecordGlobalLoca
   /**
    * Load all files for all partitions as <Partition, filename> pair data.
    */
-  private List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(
-      final HoodieEngineContext context, final HoodieTable hoodieTable) {
+  private HoodieData<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(
+      final HoodieEngineContext context, final HoodieTable hoodieTable, int 
parallelism) {
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, 
config.getMetadataConfig(), metaClient.getBasePath());
     // Obtain the latest data files from all the partitions.
-    return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, 
hoodieTable);
+    return 
getLatestBaseFilesForAllPartitions(context.parallelize(allPartitionPaths, 
parallelism), context, hoodieTable);

Review Comment:
   should we use allPartitionPaths.size as the parallelism instead of the 
configuration parallelism ? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -197,14 +199,15 @@ private Map<HoodieFileGroupId, Long> 
computeComparisonsPerFileGroup(
       // we will just try exploding the input and then count to determine 
comparisons
       // FIX(vc): Only do sampling here and extrapolate?
       context.setJobStatus(this.getClass().getSimpleName(), "Compute all 
comparisons needed between records and files: " + config.getTableName());
-      fileToComparisons = fileComparisonsRDD.countByKey();
+      fileToComparisons = 
fileComparisonsRDD.countApproxDistinctByKey(0.05).collectAsMap();

Review Comment:
   I am afraid this might change the dag on spark task retries.
   can we move this fix as a new patch. try to do some benchmarking and then we 
can open it up for review
   



##########
hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java:
##########
@@ -143,7 +147,7 @@ public void testLoadInvolvedFiles(boolean rangePruning, 
boolean treeFiltering, b
         .withInserts("2015/03/12", "4", record2, record3, record4);
     metaClient.reloadActiveTimeline();
 
-    filesList = index.loadColumnRangesFromFiles(partitions, context, 
hoodieTable);
+    filesList = 
index.loadColumnRangesFromFiles(context.parallelize(partitions, 1), context, 
hoodieTable);

Review Comment:
   same here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java:
##########
@@ -91,12 +89,12 @@ private HoodiePairData<String, HoodieRecordGlobalLocation> 
fetchRecordGlobalLoca
   /**
    * Load all files for all partitions as <Partition, filename> pair data.
    */
-  private List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(
-      final HoodieEngineContext context, final HoodieTable hoodieTable) {
+  private HoodieData<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(
+      final HoodieEngineContext context, final HoodieTable hoodieTable, int 
parallelism) {
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, 
config.getMetadataConfig(), metaClient.getBasePath());
     // Obtain the latest data files from all the partitions.
-    return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, 
hoodieTable);
+    return 
getLatestBaseFilesForAllPartitions(context.parallelize(allPartitionPaths, 
parallelism), context, hoodieTable);

Review Comment:
   ```
   Math.max(allPartitionPaths.size(), 1));
   ```
   
   for eg, checkout HoodieIndexUtils L137. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java:
##########
@@ -143,19 +141,17 @@ protected <R> HoodieData<HoodieRecord<R>> 
tagLocationInternal(
   protected HoodiePairData<HoodieKey, HoodieRecordLocation> 
fetchRecordLocationsForAffectedPartitions(
       HoodieData<HoodieKey> hoodieKeys, HoodieEngineContext context, 
HoodieTable hoodieTable,
       int parallelism) {
-    List<String> affectedPartitionPathList =
-        hoodieKeys.map(HoodieKey::getPartitionPath).distinct().collectAsList();
-    List<Pair<String, HoodieBaseFile>> latestBaseFiles =
+    HoodieData<String> affectedPartitionPathList =
+        hoodieKeys.map(HoodieKey::getPartitionPath).distinct();
+    HoodieData<Pair<String, HoodieBaseFile>> latestBaseFiles =
         getLatestBaseFilesForAllPartitions(affectedPartitionPathList, context, 
hoodieTable);
-    return fetchRecordLocations(context, hoodieTable, parallelism, 
latestBaseFiles);
+    return fetchRecordLocations(hoodieTable, parallelism, latestBaseFiles);
   }
 
   protected HoodiePairData<HoodieKey, HoodieRecordLocation> 
fetchRecordLocations(
-      HoodieEngineContext context, HoodieTable hoodieTable, int parallelism,
-      List<Pair<String, HoodieBaseFile>> baseFiles) {
-    int fetchParallelism = Math.max(1, Math.min(baseFiles.size(), 
parallelism));
-
-    return context.parallelize(baseFiles, fetchParallelism)
+      HoodieTable hoodieTable, int parallelism,
+      HoodieData<Pair<String, HoodieBaseFile>> baseFiles) {
+    return baseFiles.repartition(Math.max(1, 
Math.min(baseFiles.getNumPartitions(), parallelism)))

Review Comment:
   should we consider doing coalesce or repartition based on total partitions. 



##########
hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java:
##########
@@ -133,7 +137,7 @@ public void testLoadInvolvedFiles(boolean rangePruning, 
boolean treeFiltering, b
         new HoodieAvroRecord(new HoodieKey(rowChange4.getRowKey(), 
rowChange4.getPartitionPath()), rowChange4);
 
     List<String> partitions = asList("2016/01/21", "2016/04/01", "2015/03/12");
-    List<Pair<String, BloomIndexFileInfo>> filesList = 
index.loadColumnRangesFromFiles(partitions, context, hoodieTable);
+    List<Pair<String, BloomIndexFileInfo>> filesList = 
index.loadColumnRangesFromFiles(context.parallelize(partitions, 1), context, 
hoodieTable);

Review Comment:
   why parallelize w/ just 1?
   what incase we have 10k physical partitions? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to