This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1f5b0c7 [HUDI-724] Parallelize getSmallFiles for partitions (#1421)
1f5b0c7 is described below
commit 1f5b0c77d6c87a936f2d34287ec6a1df1cb18b33
Author: ffcchi <[email protected]>
AuthorDate: Mon Mar 30 01:14:38 2020 -0600
[HUDI-724] Parallelize getSmallFiles for partitions (#1421)
Co-authored-by: Feichi Feng <[email protected]>
---
.../org/apache/hudi/client/HoodieWriteClient.java | 4 +--
.../apache/hudi/table/HoodieCopyOnWriteTable.java | 37 ++++++++++++++++------
.../apache/hudi/table/HoodieMergeOnReadTable.java | 12 +++----
.../java/org/apache/hudi/table/HoodieTable.java | 4 +--
.../apache/hudi/table/TestCopyOnWriteTable.java | 2 +-
.../apache/hudi/table/TestMergeOnReadTable.java | 2 +-
6 files changed, 37 insertions(+), 24 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 5d2ec76..a0d1867 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -489,9 +489,9 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
private Partitioner getPartitioner(HoodieTable table, boolean isUpsert,
WorkloadProfile profile) {
if (isUpsert) {
- return table.getUpsertPartitioner(profile);
+ return table.getUpsertPartitioner(profile, jsc);
} else {
- return table.getInsertPartitioner(profile);
+ return table.getInsertPartitioner(profile, jsc);
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 874c3e8..de43900 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -81,6 +81,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
@@ -142,16 +143,16 @@ public class HoodieCopyOnWriteTable<T extends
HoodieRecordPayload> extends Hoodi
}
@Override
- public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+ public Partitioner getUpsertPartitioner(WorkloadProfile profile,
JavaSparkContext jsc) {
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the
upsert partitioner.");
}
- return new UpsertPartitioner(profile);
+ return new UpsertPartitioner(profile, jsc);
}
@Override
- public Partitioner getInsertPartitioner(WorkloadProfile profile) {
- return getUpsertPartitioner(profile);
+ public Partitioner getInsertPartitioner(WorkloadProfile profile,
JavaSparkContext jsc) {
+ return getUpsertPartitioner(profile, jsc);
}
@Override
@@ -573,14 +574,14 @@ public class HoodieCopyOnWriteTable<T extends
HoodieRecordPayload> extends Hoodi
*/
protected HoodieRollingStatMetadata rollingStatMetadata;
- UpsertPartitioner(WorkloadProfile profile) {
+ UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBuckets = new HashMap<>();
bucketInfoMap = new HashMap<>();
globalStat = profile.getGlobalStat();
rollingStatMetadata = getRollingStats();
assignUpdates(profile);
- assignInserts(profile);
+ assignInserts(profile, jsc);
LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " +
bucketInfoMap + ", \n"
+ "Partition to insert buckets => " + partitionPathToInsertBuckets +
", \n"
@@ -610,18 +611,24 @@ public class HoodieCopyOnWriteTable<T extends
HoodieRecordPayload> extends Hoodi
return bucket;
}
- private void assignInserts(WorkloadProfile profile) {
+ private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
// for new inserts, compute buckets depending on how many records we
have for each partition
Set<String> partitionPaths = profile.getPartitionPaths();
long averageRecordSize =
averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
config.getCopyOnWriteRecordSizeEstimate());
LOG.info("AvgRecordSize => " + averageRecordSize);
+
+ Map<String, List<SmallFile>> partitionSmallFilesMap =
+ getSmallFilesForPartitions(new
ArrayList<String>(partitionPaths), jsc);
+
for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
if (pStat.getNumInserts() > 0) {
- List<SmallFile> smallFiles = getSmallFiles(partitionPath);
+ List<SmallFile> smallFiles =
partitionSmallFilesMap.get(partitionPath);
+ this.smallFiles.addAll(smallFiles);
+
LOG.info("For partitionPath : " + partitionPath + " Small Files => "
+ smallFiles);
long totalUnassignedInserts = pStat.getNumInserts();
@@ -684,6 +691,18 @@ public class HoodieCopyOnWriteTable<T extends
HoodieRecordPayload> extends Hoodi
}
}
+ private Map<String, List<SmallFile>>
getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) {
+
+ Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
+ if (partitionPaths != null && partitionPaths.size() > 0) {
+ JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths,
partitionPaths.size());
+ partitionSmallFilesMap =
partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
+ partitionPath -> new Tuple2<>(partitionPath,
getSmallFiles(partitionPath))).collectAsMap();
+ }
+
+ return partitionSmallFilesMap;
+ }
+
/**
* Returns a list of small files in the given partition path.
*/
@@ -706,8 +725,6 @@ public class HoodieCopyOnWriteTable<T extends
HoodieRecordPayload> extends Hoodi
sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
- // Update the global small files list
- smallFiles.add(sf);
}
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index 6cb604c..7e96eb6 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -89,11 +89,11 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends Hoodi
}
@Override
- public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+ public Partitioner getUpsertPartitioner(WorkloadProfile profile,
JavaSparkContext jsc) {
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the
upsert partitioner.");
}
- mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile);
+ mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile,
jsc);
return mergeOnReadUpsertPartitioner;
}
@@ -325,8 +325,8 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends Hoodi
*/
class MergeOnReadUpsertPartitioner extends
HoodieCopyOnWriteTable.UpsertPartitioner {
- MergeOnReadUpsertPartitioner(WorkloadProfile profile) {
- super(profile);
+ MergeOnReadUpsertPartitioner(WorkloadProfile profile, JavaSparkContext
jsc) {
+ super(profile, jsc);
}
@Override
@@ -376,16 +376,12 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends Hoodi
sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
- // Update the global small files list
- smallFiles.add(sf);
} else {
HoodieLogFile logFile =
smallFileSlice.getLogFiles().findFirst().get();
sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
- // Update the global small files list
- smallFiles.add(sf);
}
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 143086f..e3c134c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -118,12 +118,12 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload> implements Seri
/**
* Provides a partitioner to perform the upsert operation, based on the
workload profile.
*/
- public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile);
+ public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile,
JavaSparkContext jsc);
/**
* Provides a partitioner to perform the insert operation, based on the
workload profile.
*/
- public abstract Partitioner getInsertPartitioner(WorkloadProfile profile);
+ public abstract Partitioner getInsertPartitioner(WorkloadProfile profile,
JavaSparkContext jsc);
/**
* Return whether this HoodieTable implementation can benefit from workload
profiling.
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index f02afdd..f670b86 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -415,7 +415,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
records.addAll(updateRecords);
WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records));
HoodieCopyOnWriteTable.UpsertPartitioner partitioner =
- (HoodieCopyOnWriteTable.UpsertPartitioner)
table.getUpsertPartitioner(profile);
+ (HoodieCopyOnWriteTable.UpsertPartitioner)
table.getUpsertPartitioner(profile, jsc);
assertEquals("Update record should have gone to the 1 update partition",
0, partitioner.getPartition(
new Tuple2<>(updateRecords.get(0).getKey(),
Option.ofNullable(updateRecords.get(0).getCurrentLocation()))));
return partitioner;
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 966cc52..b48ad3f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -1272,7 +1272,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
JavaRDD<HoodieRecord> deleteRDD = jsc.parallelize(fewRecordsForDelete,
1);
// initialize partitioner
- hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD));
+ hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD), jsc);
final List<List<WriteStatus>> deleteStatus =
jsc.parallelize(Arrays.asList(1)).map(x -> {
return hoodieTable.handleUpdate(newDeleteTime, partitionPath, fileId,
fewRecordsForDelete.iterator());
}).map(x -> (List<WriteStatus>)
HoodieClientTestUtils.collectStatuses(x)).collect();