codope commented on code in PR #8388:
URL: https://github.com/apache/hudi/pull/8388#discussion_r1186990025


##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java:
##########
@@ -112,44 +113,101 @@ public MessageType getStorageSchema(boolean 
includeMetadataField) {
     }
   }
 
+  /**
+   * Gets all relative partitions paths in the Hudi table on storage.
+   *
+   * @return All relative partitions paths.
+   */
+  public List<String> getAllPartitionPathsOnStorage() {
+    HoodieLocalEngineContext engineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
+    return FSUtils.getAllPartitionPaths(engineContext,
+        config.getString(META_SYNC_BASE_PATH),
+        config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
+        config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
+  }
+
   public List<String> getWrittenPartitionsSince(Option<String> 
lastCommitTimeSynced) {
     if (!lastCommitTimeSynced.isPresent()) {
       LOG.info("Last commit time synced is not known, listing all partitions 
in "
           + config.getString(META_SYNC_BASE_PATH)
           + ",FS :" + config.getHadoopFileSystem());
-      HoodieLocalEngineContext engineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
-      return FSUtils.getAllPartitionPaths(engineContext,
-          config.getString(META_SYNC_BASE_PATH),
-          config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
-          config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
+      return getAllPartitionPathsOnStorage();
     } else {
       LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", 
Getting commits since then");
       return TimelineUtils.getWrittenPartitions(
           TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get()));
     }
   }
 
+  /**
+   * Gets the partition events for changed partitions.
+   * <p>
+   * This compares the list of all partitions of a table stored in the 
metastore and
+   * on the storage:
+   * (1) Partitions exist in the metastore, but NOT the storage: drops them in 
the metastore;
+   * (2) Partitions exist on the storage, but NOT the metastore: adds them to 
the metastore;
+   * (3) Partitions exist in both, but the partition path is different: update 
them in the metastore.
+   *
+   * @param allPartitionsInMetastore All partitions of a table stored in the 
metastore.
+   * @param allPartitionsOnStorage   All partitions of a table stored on the 
storage.
+   * @return partition events for changed partitions.
+   */
+  public List<PartitionEvent> getPartitionEvents(List<Partition> 
allPartitionsInMetastore,
+                                                 List<String> 
allPartitionsOnStorage) {
+    Map<String, String> paths = 
getPartitionValuesToPathMapping(allPartitionsInMetastore);
+    Set<String> partitionsToDrop = new HashSet<>(paths.keySet());
+
+    List<PartitionEvent> events = new ArrayList<>();
+    for (String storagePartition : allPartitionsOnStorage) {
+      Path storagePartitionPath = 
FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), 
storagePartition);
+      String fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+      // Check if the partition values or if hdfs path is the same
+      List<String> storagePartitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
+
+      if (!storagePartitionValues.isEmpty()) {
+        String storageValue = String.join(", ", storagePartitionValues);
+        // Remove partitions that exist on storage from the `partitionsToDrop` 
set,
+        // so the remaining partitions that exist in the metastore should be 
dropped
+        partitionsToDrop.remove(storageValue);
+        if (!paths.containsKey(storageValue)) {
+          events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+        } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+          events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+        }
+      }
+    }
+
+    partitionsToDrop.forEach(storageValue -> {
+      String storagePath = paths.get(storageValue);
+      try {
+        String relativePath = FSUtils.getRelativePartitionPath(
+            metaClient.getBasePathV2(), new CachingPath(storagePath));
+        events.add(PartitionEvent.newPartitionDropEvent(relativePath));
+      } catch (IllegalArgumentException e) {
+        LOG.error("Cannot parse the path stored in the metastore, ignoring it 
for "
+            + "generating DROP partition event: \"" + storagePath + "\".", e);

Review Comment:
   Makes sense to not drop the partition in this case. But, just curious, what 
can cause this scenario?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to