nsivabalan commented on a change in pull request #3873:
URL: https://github.com/apache/hudi/pull/3873#discussion_r745808254
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -647,4 +624,96 @@ protected void doClean(AbstractHoodieWriteClient
writeClient, String instantTime
// metadata table.
writeClient.clean(instantTime + "002");
}
+
+ /**
+ * This is invoked to bootstrap metadata table for a dataset. Bootstrap
Commit has special handling mechanism due to its scale compared to
+ * other regular commits.
+ *
+ */
+ protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String
createInstantTime) {
Review comment:
Note to reviewer: I have added HoodieData abstractions to commit() in
metadata writer. And bootstrap is just one code across all engines. I see some
difference in actual commit() impl across spark and flink and so did not try to
generalize it in this patch.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -147,15 +154,39 @@ protected void commit(List<HoodieRecord> records, String
partitionName, String i
*
* The record is tagged with respective file slice's location based on its
record key.
*/
- private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String
partitionName, int numFileGroups) {
+ private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD,
String partitionName, int numFileGroups) {
List<FileSlice> fileSlices =
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups,
String.format("Invalid number of file groups: found=%d, required=%d",
fileSlices.size(), numFileGroups));
- JavaSparkContext jsc = ((HoodieSparkEngineContext)
engineContext).getJavaSparkContext();
- return jsc.parallelize(records, 1).map(r -> {
+ return recordsRDD.map(r -> {
FileSlice slice =
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
numFileGroups));
r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
return r;
});
}
+
+ @Override
+ protected void commit(List<DirectoryInfo> partitionInfoList, String
createInstantTime, boolean canTriggerTableService) {
+ ValidationUtils.checkState(!partitionInfoList.isEmpty());
+
+ JavaSparkContext jsc = ((HoodieSparkEngineContext)
engineContext).getJavaSparkContext();
+ List<String> partitions = partitionInfoList.stream().map(p ->
p.getRelativePath()).collect(Collectors.toList());
+ final int totalFiles = partitionInfoList.stream().mapToInt(p ->
p.getTotalFiles()).sum();
+
+ // Record which saves the list of all partitions
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionListRecord(partitions);
+ JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record),
1);
+ if (!partitionInfoList.isEmpty()) {
+ JavaRDD<HoodieRecord> fileListRecords =
jsc.parallelize(partitionInfoList, partitionInfoList.size()).map(pinfo -> {
+ // Record which saves files within a partition
+ return HoodieMetadataPayload.createPartitionFilesRecord(
+ pinfo.getRelativePath(), Option.of(pinfo.getFileMap()),
Option.empty());
+ });
+ recordRDD = recordRDD.union(fileListRecords);
+ }
+
+ LOG.info("Committing " + partitions.size() + " partitions and " +
totalFiles + " files to metadata");
+ ValidationUtils.checkState(recordRDD.count() == (partitions.size() + 1));
Review comment:
won't we be triggering action on the recordRDD once here (count) and
then once again when we are doing some follow up actions? wondering if we
really need the size check validation here? this is not that costly, since
generating partitionInfoList is the costly one. but just wanted to check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]