yihua commented on code in PR #11634:
URL: https://github.com/apache/hudi/pull/11634#discussion_r1737510637


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -448,9 +448,9 @@ private boolean initializeFromFilesystem(String 
initializationTime, List<Metadat
 
       // Perform the commit using bulkCommit
       HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
-      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
-      metadataMetaClient.reloadActiveTimeline();
       String partitionPath = (partitionType == FUNCTIONAL_INDEX || 
partitionType == SECONDARY_INDEX) ? 
dataWriteConfig.getIndexingConfig().getIndexName() : 
partitionType.getPartitionPath();
+      bulkCommit(commitTimeForPartition, partitionPath, records, 
fileGroupCount);

Review Comment:
   IMO, we should remove the following and use 
`MetadataPartitionType#getPartitionPath` as suggested above to avoid leaking 
index specific logic in this metadata writer.  And we can keep passing the 
`MetadataPartitionType` instance as before, and use `#getPartitionPath` when 
needed.
   ```
   String partitionPath = (partitionType == FUNCTIONAL_INDEX || partitionType 
== SECONDARY_INDEX) ? dataWriteConfig.getIndexingConfig().getIndexName() : 
partitionType.getPartitionPath();
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -395,7 +395,7 @@ public static <R> HoodieRecord<R> 
createNewTaggedHoodieRecord(HoodieRecord<R> ol
    * The actual partition name is contained in the index definition.
    */
   public static String getPartitionNameFromPartitionType(MetadataPartitionType 
partitionType, HoodieTableMetaClient metaClient, String indexName) {
-    if (MetadataPartitionType.FUNCTIONAL_INDEX.equals(partitionType)) {
+    if (MetadataPartitionType.FUNCTIONAL_INDEX.equals(partitionType) || 
MetadataPartitionType.SECONDARY_INDEX.equals(partitionType)) {

Review Comment:
   Can this conversion logic be put into `MetadataPartitionType` for each type, 
by providing a new API
   ```
   public enum MetadataPartitionType {
     ...
     
FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX,
 "func-index-", -1) {
       ...
       @Override
       public String getPartitionPath(...) {
         return 
metaClient.getIndexMetadata().get().getIndexDefinitions().get(indexName).getIndexName();
       }
       ...
     },
     ...
     public String getPartitionPath(...) {
       ...
     }
   }
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -395,7 +395,7 @@ public static <R> HoodieRecord<R> 
createNewTaggedHoodieRecord(HoodieRecord<R> ol
    * The actual partition name is contained in the index definition.
    */
   public static String getPartitionNameFromPartitionType(MetadataPartitionType 
partitionType, HoodieTableMetaClient metaClient, String indexName) {
-    if (MetadataPartitionType.FUNCTIONAL_INDEX.equals(partitionType)) {
+    if (MetadataPartitionType.FUNCTIONAL_INDEX.equals(partitionType) || 
MetadataPartitionType.SECONDARY_INDEX.equals(partitionType)) {

Review Comment:
   In this way, we can avoid the confusion of what partition name or path to 
use, by consolidating the logic into this new API.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1396,27 +1395,25 @@ protected void preWrite(String instantTime) {
    * records and hence is more suited to bulkInsert for write performance.
    *
    * @param instantTime    - Action instant time for this commit
-   * @param partitionType  - The MDT partition to which records are to be 
committed
+   * @param partitionName  - The MDT partition to which records are to be 
committed
    * @param records        - records to be bulk inserted
    * @param fileGroupCount - The maximum number of file groups to which the 
records will be written.
    */
   protected abstract void bulkCommit(
-      String instantTime, MetadataPartitionType partitionType, 
HoodieData<HoodieRecord> records,
+      String instantTime, String partitionName, HoodieData<HoodieRecord> 
records,

Review Comment:
   Could you add more details to the docs of `partitionName`?  Is 
`partitionName` going to be different for secondary/functional index, e.g., 
each secondary/functional index definition has its own partition path?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1672,13 +1669,14 @@ private HoodieData<HoodieRecord> 
getRecordIndexUpserts(HoodieData<WriteStatus> w
   }
 
   private HoodieData<HoodieRecord> 
getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata 
replaceCommitMetadata) {
-    final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
-        dataMetaClient.getActiveTimeline(), metadata);
-    List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = 
replaceCommitMetadata
-        .getPartitionToReplaceFileIds()
-        .keySet().stream().flatMap(partition
-            -> fsView.getLatestBaseFiles(partition).map(f -> 
Pair.of(partition, f)))
-        .collect(Collectors.toList());
+    List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs;
+    try (HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient, 
dataMetaClient.getActiveTimeline(), metadata)) {
+      partitionBaseFilePairs = replaceCommitMetadata
+          .getPartitionToReplaceFileIds()
+          .keySet().stream()
+          .flatMap(partition -> fsView.getLatestBaseFiles(partition).map(f -> 
Pair.of(partition, f)))
+          .collect(Collectors.toList());

Review Comment:
   Move `return` statement inside to avoid separate `partitionBaseFilePairs` 
declaration.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -448,9 +448,9 @@ private boolean initializeFromFilesystem(String 
initializationTime, List<Metadat
 
       // Perform the commit using bulkCommit
       HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
-      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
-      metadataMetaClient.reloadActiveTimeline();
       String partitionPath = (partitionType == FUNCTIONAL_INDEX || 
partitionType == SECONDARY_INDEX) ? 
dataWriteConfig.getIndexingConfig().getIndexName() : 
partitionType.getPartitionPath();
+      bulkCommit(commitTimeForPartition, partitionPath, records, 
fileGroupCount);

Review Comment:
   We can avoid most type changes in that way in the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to