vamsikarnika commented on code in PR #14260:
URL: https://github.com/apache/hudi/pull/14260#discussion_r2545895576


##########
hudi-utilities/pom.xml:
##########
@@ -84,6 +84,12 @@
   </build>
 
   <dependencies>
+    <dependency>

Review Comment:
   Yes, Have added it for development purpose



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -246,6 +246,20 @@ public static String getRelativePartitionPath(Path 
basePath, Path fullPartitionP
         : fullPartitionPathStr.substring(partitionStartIndex + 
basePath.getName().length() + 1);
   }
 
+  public static String getRelativePartitionPath(Path fullPartitionPath, int 
numPartitionPathLevels) {

Review Comment:
   We still need this method for UV?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/MetadataSyncUtils.java:
##########
@@ -0,0 +1,77 @@
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MetadataSyncUtils {
+
+  public static List<String> getPendingInstants(
+      HoodieActiveTimeline activeTimeline,
+      HoodieInstant latestCommit) {
+    List<HoodieInstant> pendingHoodieInstants =
+        activeTimeline
+            .filterInflightsAndRequested()
+            .findInstantsBefore(latestCommit.getTimestamp())
+            .getInstants();
+    return pendingHoodieInstants.stream()
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toList());
+  }
+
+  public static SyncMetadata getTableSyncExtraMetadata(Option<HoodieInstant> 
targetTableLastInstant, HoodieTableMetaClient targetTableMetaClient, String 
sourceIdentifier,
+                                                       String 
sourceInstantSynced, List<String> pendingInstantsToSync) {
+    return targetTableLastInstant.map(instant -> {
+      SyncMetadata lastSyncMetadata = null;
+      try {
+        lastSyncMetadata = getTableSyncMetadataFromCommitMetadata(instant, 
targetTableMetaClient);
+      } catch (IOException e) {
+        throw new HoodieException("Failed to get sync metadata");
+      }
+
+      TableCheckpointInfo checkpointInfo = 
TableCheckpointInfo.of(sourceInstantSynced, pendingInstantsToSync, 
sourceIdentifier);
+      List<TableCheckpointInfo> updatedCheckpointInfos = 
lastSyncMetadata.getTableCheckpointInfos().stream()
+          .filter(metadata -> 
!metadata.getSourceIdentifier().equals(sourceIdentifier)).collect(Collectors.toList());
+      updatedCheckpointInfos.add(checkpointInfo);
+      return SyncMetadata.of(Instant.now(), updatedCheckpointInfos);
+    }).orElseGet(() -> {
+      List<TableCheckpointInfo> checkpointInfos = 
Collections.singletonList(TableCheckpointInfo.of(sourceInstantSynced, 
pendingInstantsToSync, sourceIdentifier));
+      return SyncMetadata.of(Instant.now(), checkpointInfos);
+    });
+  }
+
+  public static SyncMetadata 
getTableSyncMetadataFromCommitMetadata(HoodieInstant instant, 
HoodieTableMetaClient metaClient) throws IOException {
+    HoodieCommitMetadata commitMetadata = 
getHoodieCommitMetadata(instant.getTimestamp(), metaClient);
+    Option<String> tableSyncMetadataJson = 
Option.ofNullable(commitMetadata.getMetadata(SyncMetadata.TABLE_SYNC_METADATA));
+    if (!tableSyncMetadataJson.isPresent()) {
+      // if table sync metadata is not present, sync all commits from source 
table
+      throw new HoodieException("Table sync metadata is missing in the target 
table commit metadata");

Review Comment:
   We always expected write commit on the target table to contain sync metadata 
information. which is required for running next sync



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/TableCheckpointInfo.java:
##########
@@ -0,0 +1,39 @@
+package org.apache.hudi.utilities;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.util.List;
+
+@Getter
+@AllArgsConstructor
+public class TableCheckpointInfo {
+  String lastInstantSynced;
+  List<String> instantsToConsiderForNextSync;

Review Comment:
   There are few scenarios where we might need to include instants that are 
completed to sync in next run.
   
   
   For e.g
   
   At t0, you have
   
   (c0, C) -> (c1, P) -> (c2, P) -> (c3, C)
   
   
   For 1st sync, you have c1 and c2 in pending state. And you;ve synced to c3 
and it becomes the last sync checkpoint for next run
   
   
   At t1
   
   (c0, C) -> (c1, C) -> (c2, C) -> (c3, C)
   
   
   when c1 is getting synced, we still need to include c2(completed) in the 
instants for next sync with last sync checkpoint still pointing to 
c3(checkpoint from previous run).



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/MetadataSyncUtils.java:
##########
@@ -0,0 +1,77 @@
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MetadataSyncUtils {
+
+  public static List<String> getPendingInstants(
+      HoodieActiveTimeline activeTimeline,
+      HoodieInstant latestCommit) {
+    List<HoodieInstant> pendingHoodieInstants =
+        activeTimeline
+            .filterInflightsAndRequested()
+            .findInstantsBefore(latestCommit.getTimestamp())
+            .getInstants();
+    return pendingHoodieInstants.stream()
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toList());
+  }
+
+  public static SyncMetadata getTableSyncExtraMetadata(Option<HoodieInstant> 
targetTableLastInstant, HoodieTableMetaClient targetTableMetaClient, String 
sourceIdentifier,
+                                                       String 
sourceInstantSynced, List<String> pendingInstantsToSync) {
+    return targetTableLastInstant.map(instant -> {
+      SyncMetadata lastSyncMetadata = null;
+      try {
+        lastSyncMetadata = getTableSyncMetadataFromCommitMetadata(instant, 
targetTableMetaClient);
+      } catch (IOException e) {
+        throw new HoodieException("Failed to get sync metadata");
+      }
+
+      TableCheckpointInfo checkpointInfo = 
TableCheckpointInfo.of(sourceInstantSynced, pendingInstantsToSync, 
sourceIdentifier);
+      List<TableCheckpointInfo> updatedCheckpointInfos = 
lastSyncMetadata.getTableCheckpointInfos().stream()
+          .filter(metadata -> 
!metadata.getSourceIdentifier().equals(sourceIdentifier)).collect(Collectors.toList());
+      updatedCheckpointInfos.add(checkpointInfo);
+      return SyncMetadata.of(Instant.now(), updatedCheckpointInfos);
+    }).orElseGet(() -> {
+      List<TableCheckpointInfo> checkpointInfos = 
Collections.singletonList(TableCheckpointInfo.of(sourceInstantSynced, 
pendingInstantsToSync, sourceIdentifier));
+      return SyncMetadata.of(Instant.now(), checkpointInfos);
+    });
+  }
+
+  public static SyncMetadata 
getTableSyncMetadataFromCommitMetadata(HoodieInstant instant, 
HoodieTableMetaClient metaClient) throws IOException {
+    HoodieCommitMetadata commitMetadata = 
getHoodieCommitMetadata(instant.getTimestamp(), metaClient);
+    Option<String> tableSyncMetadataJson = 
Option.ofNullable(commitMetadata.getMetadata(SyncMetadata.TABLE_SYNC_METADATA));
+    if (!tableSyncMetadataJson.isPresent()) {
+      // if table sync metadata is not present, sync all commits from source 
table
+      throw new HoodieException("Table sync metadata is missing in the target 
table commit metadata");
+    }
+    Option<SyncMetadata> tableSyncMetadataListOpt = 
SyncMetadata.fromJson(tableSyncMetadataJson.get());
+    if (!tableSyncMetadataListOpt.isPresent()) {
+      throw new HoodieException("Table Sync metadata is empty in the target 
table commit metadata");
+    }
+
+    return tableSyncMetadataListOpt.get();
+  }
+
+  public static HoodieCommitMetadata getHoodieCommitMetadata(String 
instantTime, HoodieTableMetaClient metaClient) throws IOException {
+    Option<HoodieInstant> instantOpt = 
metaClient.getActiveTimeline().filterCompletedInstants().filter(instant -> 
instant.getTimestamp().equalsIgnoreCase(instantTime)).firstInstant();
+    if (instantOpt.isPresent()) {
+      HoodieInstant instant = instantOpt.get();
+      return HoodieCommitMetadata.fromBytes(
+          metaClient.getCommitsTimeline().getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);

Review Comment:
   Actually ReplaceCommitMetadata is a subclass of HoodieCommitMetadata, so it 
can deserialize replace commit metadata as well. Currently these 2 are the only 
commits we deserialize.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/MetadataSyncUtils.java:
##########
@@ -0,0 +1,77 @@
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MetadataSyncUtils {
+
+  public static List<String> getPendingInstants(
+      HoodieActiveTimeline activeTimeline,
+      HoodieInstant latestCommit) {
+    List<HoodieInstant> pendingHoodieInstants =
+        activeTimeline
+            .filterInflightsAndRequested()
+            .findInstantsBefore(latestCommit.getTimestamp())
+            .getInstants();
+    return pendingHoodieInstants.stream()
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toList());
+  }
+
+  public static SyncMetadata getTableSyncExtraMetadata(Option<HoodieInstant> 
targetTableLastInstant, HoodieTableMetaClient targetTableMetaClient, String 
sourceIdentifier,
+                                                       String 
sourceInstantSynced, List<String> pendingInstantsToSync) {
+    return targetTableLastInstant.map(instant -> {
+      SyncMetadata lastSyncMetadata = null;
+      try {
+        lastSyncMetadata = getTableSyncMetadataFromCommitMetadata(instant, 
targetTableMetaClient);
+      } catch (IOException e) {
+        throw new HoodieException("Failed to get sync metadata");
+      }
+
+      TableCheckpointInfo checkpointInfo = 
TableCheckpointInfo.of(sourceInstantSynced, pendingInstantsToSync, 
sourceIdentifier);
+      List<TableCheckpointInfo> updatedCheckpointInfos = 
lastSyncMetadata.getTableCheckpointInfos().stream()
+          .filter(metadata -> 
!metadata.getSourceIdentifier().equals(sourceIdentifier)).collect(Collectors.toList());
+      updatedCheckpointInfos.add(checkpointInfo);
+      return SyncMetadata.of(Instant.now(), updatedCheckpointInfos);
+    }).orElseGet(() -> {
+      List<TableCheckpointInfo> checkpointInfos = 
Collections.singletonList(TableCheckpointInfo.of(sourceInstantSynced, 
pendingInstantsToSync, sourceIdentifier));
+      return SyncMetadata.of(Instant.now(), checkpointInfos);

Review Comment:
   It's just useful for debugging purpose. Not actually used in application 
logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to