yihua commented on code in PR #11634:
URL: https://github.com/apache/hudi/pull/11634#discussion_r1737510637
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -448,9 +448,9 @@ private boolean initializeFromFilesystem(String
initializationTime, List<Metadat
// Perform the commit using bulkCommit
HoodieData<HoodieRecord> records =
fileGroupCountAndRecordsPair.getValue();
- bulkCommit(commitTimeForPartition, partitionType, records,
fileGroupCount);
- metadataMetaClient.reloadActiveTimeline();
String partitionPath = (partitionType == FUNCTIONAL_INDEX ||
partitionType == SECONDARY_INDEX) ?
dataWriteConfig.getIndexingConfig().getIndexName() :
partitionType.getPartitionPath();
+ bulkCommit(commitTimeForPartition, partitionPath, records,
fileGroupCount);
Review Comment:
IMO, we should remove the following and use
`MetadataPartitionType#getPartitionPath` as suggested above to avoid leaking
index specific logic in this metadata writer. And we can keep passing the
`MetadataPartitionType` instance as before, and use `#getPartitionPath` when
needed.
```
String partitionPath = (partitionType == FUNCTIONAL_INDEX || partitionType
== SECONDARY_INDEX) ? dataWriteConfig.getIndexingConfig().getIndexName() :
partitionType.getPartitionPath();
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -395,7 +395,7 @@ public static <R> HoodieRecord<R>
createNewTaggedHoodieRecord(HoodieRecord<R> ol
* The actual partition name is contained in the index definition.
*/
public static String getPartitionNameFromPartitionType(MetadataPartitionType
partitionType, HoodieTableMetaClient metaClient, String indexName) {
- if (MetadataPartitionType.FUNCTIONAL_INDEX.equals(partitionType)) {
+ if (MetadataPartitionType.FUNCTIONAL_INDEX.equals(partitionType) ||
MetadataPartitionType.SECONDARY_INDEX.equals(partitionType)) {
Review Comment:
Can this conversion logic be put into `MetadataPartitionType` for each type,
by providing a new API
```
public enum MetadataPartitionType {
...
FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX,
"func-index-", -1) {
...
@Override
public String getPartitionPath(...) {
return
metaClient.getIndexMetadata().get().getIndexDefinitions().get(indexName).getIndexName();
}
...
},
...
public String getPartitionPath(...) {
...
}
}
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -395,7 +395,7 @@ public static <R> HoodieRecord<R>
createNewTaggedHoodieRecord(HoodieRecord<R> ol
* The actual partition name is contained in the index definition.
*/
public static String getPartitionNameFromPartitionType(MetadataPartitionType
partitionType, HoodieTableMetaClient metaClient, String indexName) {
- if (MetadataPartitionType.FUNCTIONAL_INDEX.equals(partitionType)) {
+ if (MetadataPartitionType.FUNCTIONAL_INDEX.equals(partitionType) ||
MetadataPartitionType.SECONDARY_INDEX.equals(partitionType)) {
Review Comment:
In this way, we can avoid the confusion of what partition name or path to
use, by consolidating the logic into this new API.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1396,27 +1395,25 @@ protected void preWrite(String instantTime) {
* records and hence is more suited to bulkInsert for write performance.
*
* @param instantTime - Action instant time for this commit
- * @param partitionType - The MDT partition to which records are to be
committed
+ * @param partitionName - The MDT partition to which records are to be
committed
* @param records - records to be bulk inserted
* @param fileGroupCount - The maximum number of file groups to which the
records will be written.
*/
protected abstract void bulkCommit(
- String instantTime, MetadataPartitionType partitionType,
HoodieData<HoodieRecord> records,
+ String instantTime, String partitionName, HoodieData<HoodieRecord>
records,
Review Comment:
Could you add more details to the docs of `partitionName`? Is
`partitionName` going to be different for secondary/functional index, e.g.,
each secondary/functional index definition has its own partition path?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1672,13 +1669,14 @@ private HoodieData<HoodieRecord>
getRecordIndexUpserts(HoodieData<WriteStatus> w
}
private HoodieData<HoodieRecord>
getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata
replaceCommitMetadata) {
- final HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
- dataMetaClient.getActiveTimeline(), metadata);
- List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs =
replaceCommitMetadata
- .getPartitionToReplaceFileIds()
- .keySet().stream().flatMap(partition
- -> fsView.getLatestBaseFiles(partition).map(f ->
Pair.of(partition, f)))
- .collect(Collectors.toList());
+ List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs;
+ try (HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata)) {
+ partitionBaseFilePairs = replaceCommitMetadata
+ .getPartitionToReplaceFileIds()
+ .keySet().stream()
+ .flatMap(partition -> fsView.getLatestBaseFiles(partition).map(f ->
Pair.of(partition, f)))
+ .collect(Collectors.toList());
Review Comment:
Move `return` statement inside to avoid separate `partitionBaseFilePairs`
declaration.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -448,9 +448,9 @@ private boolean initializeFromFilesystem(String
initializationTime, List<Metadat
// Perform the commit using bulkCommit
HoodieData<HoodieRecord> records =
fileGroupCountAndRecordsPair.getValue();
- bulkCommit(commitTimeForPartition, partitionType, records,
fileGroupCount);
- metadataMetaClient.reloadActiveTimeline();
String partitionPath = (partitionType == FUNCTIONAL_INDEX ||
partitionType == SECONDARY_INDEX) ?
dataWriteConfig.getIndexingConfig().getIndexName() :
partitionType.getPartitionPath();
+ bulkCommit(commitTimeForPartition, partitionPath, records,
fileGroupCount);
Review Comment:
We can avoid most type changes in that way in the PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]