codope commented on code in PR #8388:
URL: https://github.com/apache/hudi/pull/8388#discussion_r1186990025
##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java:
##########
@@ -112,44 +113,101 @@ public MessageType getStorageSchema(boolean
includeMetadataField) {
}
}
+ /**
+ * Gets all relative partitions paths in the Hudi table on storage.
+ *
+ * @return All relative partitions paths.
+ */
+ public List<String> getAllPartitionPathsOnStorage() {
+ HoodieLocalEngineContext engineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
+ return FSUtils.getAllPartitionPaths(engineContext,
+ config.getString(META_SYNC_BASE_PATH),
+ config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
+ config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
+ }
+
public List<String> getWrittenPartitionsSince(Option<String>
lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions
in "
+ config.getString(META_SYNC_BASE_PATH)
+ ",FS :" + config.getHadoopFileSystem());
- HoodieLocalEngineContext engineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
- return FSUtils.getAllPartitionPaths(engineContext,
- config.getString(META_SYNC_BASE_PATH),
- config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
- config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
+ return getAllPartitionPathsOnStorage();
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ",
Getting commits since then");
return TimelineUtils.getWrittenPartitions(
TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get()));
}
}
+ /**
+ * Gets the partition events for changed partitions.
+ * <p>
+ * This compares the list of all partitions of a table stored in the
metastore and
+ * on the storage:
+ * (1) Partitions exist in the metastore, but NOT the storage: drops them in
the metastore;
+ * (2) Partitions exist on the storage, but NOT the metastore: adds them to
the metastore;
+ * (3) Partitions exist in both, but the partition path is different: update
them in the metastore.
+ *
+ * @param allPartitionsInMetastore All partitions of a table stored in the
metastore.
+ * @param allPartitionsOnStorage All partitions of a table stored on the
storage.
+ * @return partition events for changed partitions.
+ */
+ public List<PartitionEvent> getPartitionEvents(List<Partition>
allPartitionsInMetastore,
+ List<String>
allPartitionsOnStorage) {
+ Map<String, String> paths =
getPartitionValuesToPathMapping(allPartitionsInMetastore);
+ Set<String> partitionsToDrop = new HashSet<>(paths.keySet());
+
+ List<PartitionEvent> events = new ArrayList<>();
+ for (String storagePartition : allPartitionsOnStorage) {
+ Path storagePartitionPath =
FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH),
storagePartition);
+ String fullStoragePartitionPath =
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+ // Check if the partition values or if hdfs path is the same
+ List<String> storagePartitionValues =
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
+
+ if (!storagePartitionValues.isEmpty()) {
+ String storageValue = String.join(", ", storagePartitionValues);
+ // Remove partitions that exist on storage from the `partitionsToDrop`
set,
+ // so the remaining partitions that exist in the metastore should be
dropped
+ partitionsToDrop.remove(storageValue);
+ if (!paths.containsKey(storageValue)) {
+ events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+ } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+ events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+ }
+ }
+ }
+
+ partitionsToDrop.forEach(storageValue -> {
+ String storagePath = paths.get(storageValue);
+ try {
+ String relativePath = FSUtils.getRelativePartitionPath(
+ metaClient.getBasePathV2(), new CachingPath(storagePath));
+ events.add(PartitionEvent.newPartitionDropEvent(relativePath));
+ } catch (IllegalArgumentException e) {
+ LOG.error("Cannot parse the path stored in the metastore, ignoring it
for "
+ + "generating DROP partition event: \"" + storagePath + "\".", e);
Review Comment:
Makes sense to not drop the partition in this case. But, just curious, what
can cause this scenario?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]