This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f5b0c7  [HUDI-724] Parallelize getSmallFiles for partitions (#1421)
1f5b0c7 is described below

commit 1f5b0c77d6c87a936f2d34287ec6a1df1cb18b33
Author: ffcchi <[email protected]>
AuthorDate: Mon Mar 30 01:14:38 2020 -0600

    [HUDI-724] Parallelize getSmallFiles for partitions (#1421)
    
    Co-authored-by: Feichi Feng <[email protected]>
---
 .../org/apache/hudi/client/HoodieWriteClient.java  |  4 +--
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  | 37 ++++++++++++++++------
 .../apache/hudi/table/HoodieMergeOnReadTable.java  | 12 +++----
 .../java/org/apache/hudi/table/HoodieTable.java    |  4 +--
 .../apache/hudi/table/TestCopyOnWriteTable.java    |  2 +-
 .../apache/hudi/table/TestMergeOnReadTable.java    |  2 +-
 6 files changed, 37 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 5d2ec76..a0d1867 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -489,9 +489,9 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
 
   private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, 
WorkloadProfile profile) {
     if (isUpsert) {
-      return table.getUpsertPartitioner(profile);
+      return table.getUpsertPartitioner(profile, jsc);
     } else {
-      return table.getInsertPartitioner(profile);
+      return table.getInsertPartitioner(profile, jsc);
     }
   }
 
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 874c3e8..de43900 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -81,6 +81,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
 
 /**
@@ -142,16 +143,16 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
   }
 
   @Override
-  public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+  public Partitioner getUpsertPartitioner(WorkloadProfile profile, 
JavaSparkContext jsc) {
     if (profile == null) {
       throw new HoodieUpsertException("Need workload profile to construct the 
upsert partitioner.");
     }
-    return new UpsertPartitioner(profile);
+    return new UpsertPartitioner(profile, jsc);
   }
 
   @Override
-  public Partitioner getInsertPartitioner(WorkloadProfile profile) {
-    return getUpsertPartitioner(profile);
+  public Partitioner getInsertPartitioner(WorkloadProfile profile, 
JavaSparkContext jsc) {
+    return getUpsertPartitioner(profile, jsc);
   }
 
   @Override
@@ -573,14 +574,14 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
      */
     protected HoodieRollingStatMetadata rollingStatMetadata;
 
-    UpsertPartitioner(WorkloadProfile profile) {
+    UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
       updateLocationToBucket = new HashMap<>();
       partitionPathToInsertBuckets = new HashMap<>();
       bucketInfoMap = new HashMap<>();
       globalStat = profile.getGlobalStat();
       rollingStatMetadata = getRollingStats();
       assignUpdates(profile);
-      assignInserts(profile);
+      assignInserts(profile, jsc);
 
       LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + 
bucketInfoMap + ", \n"
           + "Partition to insert buckets => " + partitionPathToInsertBuckets + 
", \n"
@@ -610,18 +611,24 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
       return bucket;
     }
 
-    private void assignInserts(WorkloadProfile profile) {
+    private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
       // for new inserts, compute buckets depending on how many records we 
have for each partition
       Set<String> partitionPaths = profile.getPartitionPaths();
       long averageRecordSize =
           
averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
               config.getCopyOnWriteRecordSizeEstimate());
       LOG.info("AvgRecordSize => " + averageRecordSize);
+
+      Map<String, List<SmallFile>> partitionSmallFilesMap =
+              getSmallFilesForPartitions(new 
ArrayList<String>(partitionPaths), jsc);
+
       for (String partitionPath : partitionPaths) {
         WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
         if (pStat.getNumInserts() > 0) {
 
-          List<SmallFile> smallFiles = getSmallFiles(partitionPath);
+          List<SmallFile> smallFiles = 
partitionSmallFilesMap.get(partitionPath);
+          this.smallFiles.addAll(smallFiles);
+
           LOG.info("For partitionPath : " + partitionPath + " Small Files => " 
+ smallFiles);
 
           long totalUnassignedInserts = pStat.getNumInserts();
@@ -684,6 +691,18 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
       }
     }
 
+    private Map<String, List<SmallFile>> 
getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) {
+
+      Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
+      if (partitionPaths != null && partitionPaths.size() > 0) {
+        JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, 
partitionPaths.size());
+        partitionSmallFilesMap = 
partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
+            partitionPath -> new Tuple2<>(partitionPath, 
getSmallFiles(partitionPath))).collectAsMap();
+      }
+
+      return partitionSmallFilesMap;
+    }
+
     /**
      * Returns a list of small files in the given partition path.
      */
@@ -706,8 +725,6 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
             sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
             sf.sizeBytes = file.getFileSize();
             smallFileLocations.add(sf);
-            // Update the global small files list
-            smallFiles.add(sf);
           }
         }
       }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index 6cb604c..7e96eb6 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -89,11 +89,11 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends Hoodi
   }
 
   @Override
-  public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+  public Partitioner getUpsertPartitioner(WorkloadProfile profile, 
JavaSparkContext jsc) {
     if (profile == null) {
       throw new HoodieUpsertException("Need workload profile to construct the 
upsert partitioner.");
     }
-    mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile);
+    mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile, 
jsc);
     return mergeOnReadUpsertPartitioner;
   }
 
@@ -325,8 +325,8 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends Hoodi
    */
   class MergeOnReadUpsertPartitioner extends 
HoodieCopyOnWriteTable.UpsertPartitioner {
 
-    MergeOnReadUpsertPartitioner(WorkloadProfile profile) {
-      super(profile);
+    MergeOnReadUpsertPartitioner(WorkloadProfile profile, JavaSparkContext 
jsc) {
+      super(profile, jsc);
     }
 
     @Override
@@ -376,16 +376,12 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends Hoodi
             sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
             sf.sizeBytes = getTotalFileSize(smallFileSlice);
             smallFileLocations.add(sf);
-            // Update the global small files list
-            smallFiles.add(sf);
           } else {
             HoodieLogFile logFile = 
smallFileSlice.getLogFiles().findFirst().get();
             sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
                 FSUtils.getFileIdFromLogPath(logFile.getPath()));
             sf.sizeBytes = getTotalFileSize(smallFileSlice);
             smallFileLocations.add(sf);
-            // Update the global small files list
-            smallFiles.add(sf);
           }
         }
       }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 143086f..e3c134c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -118,12 +118,12 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
   /**
    * Provides a partitioner to perform the upsert operation, based on the 
workload profile.
    */
-  public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile);
+  public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile, 
JavaSparkContext jsc);
 
   /**
    * Provides a partitioner to perform the insert operation, based on the 
workload profile.
    */
-  public abstract Partitioner getInsertPartitioner(WorkloadProfile profile);
+  public abstract Partitioner getInsertPartitioner(WorkloadProfile profile, 
JavaSparkContext jsc);
 
   /**
    * Return whether this HoodieTable implementation can benefit from workload 
profiling.
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index f02afdd..f670b86 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -415,7 +415,7 @@ public class TestCopyOnWriteTable extends 
HoodieClientTestHarness {
     records.addAll(updateRecords);
     WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records));
     HoodieCopyOnWriteTable.UpsertPartitioner partitioner =
-        (HoodieCopyOnWriteTable.UpsertPartitioner) 
table.getUpsertPartitioner(profile);
+        (HoodieCopyOnWriteTable.UpsertPartitioner) 
table.getUpsertPartitioner(profile, jsc);
     assertEquals("Update record should have gone to the 1 update partition", 
0, partitioner.getPartition(
         new Tuple2<>(updateRecords.get(0).getKey(), 
Option.ofNullable(updateRecords.get(0).getCurrentLocation()))));
     return partitioner;
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 966cc52..b48ad3f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -1272,7 +1272,7 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
       JavaRDD<HoodieRecord> deleteRDD = jsc.parallelize(fewRecordsForDelete, 
1);
 
       // initialize partitioner
-      hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD));
+      hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD), jsc);
       final List<List<WriteStatus>> deleteStatus = 
jsc.parallelize(Arrays.asList(1)).map(x -> {
         return hoodieTable.handleUpdate(newDeleteTime, partitionPath, fileId, 
fewRecordsForDelete.iterator());
       }).map(x -> (List<WriteStatus>) 
HoodieClientTestUtils.collectStatuses(x)).collect();

Reply via email to