This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new c507cba8bd3d [HUDI-9770] During hive/glue sync, ensure drop partition
events are generated when partition is present in the metastore (#13794)
(#17857)
c507cba8bd3d is described below
commit c507cba8bd3db3fca7866cb8c769b2a59e1bbad8
Author: Lin Liu <[email protected]>
AuthorDate: Thu Feb 5 01:50:19 2026 -0800
[HUDI-9770] During hive/glue sync, ensure drop partition events are
generated when partition is present in the metastore (#13794) (#17857)
Co-authored-by: Roushan Kumar <[email protected]>
---
.../java/org/apache/hudi/hive/HiveSyncTool.java | 1 +
.../org/apache/hudi/hive/TestHiveSyncTool.java | 123 +++++++++++++++++++++
.../apache/hudi/sync/common/HoodieSyncClient.java | 13 ++-
3 files changed, 132 insertions(+), 5 deletions(-)
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index ddc6da22d91b..1fd4e16baa43 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -287,6 +287,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
// Sync the partitions if needed
// find dropped partitions, if any, in the latest commit
Set<String> droppedPartitions =
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced,
lastCommitCompletionTimeSynced);
+ LOG.info("Partitions dropped since last sync: {}",
droppedPartitions.size());
partitionsChanged = syncPartitions(tableName, writtenPartitionsSince,
droppedPartitions);
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index af4baaae4a3b..e696bdd9e93a 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -1281,6 +1281,129 @@ public class TestHiveSyncTool {
"The last commit that was synced should be updated in the
TBLPROPERTIES");
}
+ @ParameterizedTest
+ @MethodSource("syncModeAndEnablePushDown")
+ void testGetPartitionEvents_droppedStoragePartitionNotPresentInMetastore(
+ String syncMode, String enablePushDown) throws Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+
+ // Create a table with 1 partition
+ String instantTime1 = "100";
+ HiveTestUtil.createCOWTable(instantTime1, 1, true);
+
+ reInitHiveSyncClient();
+ // Sync the table to metastore
+ reSyncHiveTable();
+
+ List<Partition> partitionsInMetastore =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(1, partitionsInMetastore.size(), "Should have 1 partition in
metastore");
+
+ // Add a partition to storage but don't sync it to metastore
+ String instantTime2 = "101";
+ String newPartition = "2010/02/01";
+ HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2);
+
+ // Verify the partition is not in metastore yet
+ partitionsInMetastore =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(1, partitionsInMetastore.size(), "Should have 1 partition in
metastore");
+
+ // Delete the partition that was never synced to metastore
+ String instantTime3 = "102";
+ HiveTestUtil.createReplaceCommit(instantTime3, newPartition,
WriteOperationType.DELETE_PARTITION, true, true);
+
+ // Add another partition to storage but don't sync to metastore
+ String instantTime4 = "103";
+ String addPartition = "2010/04/01";
+ HiveTestUtil.addCOWPartition(addPartition, true, true, instantTime4);
+
+ reInitHiveSyncClient();
+
+ Set<String> droppedPartitionsOnStorage =
hiveClient.getDroppedPartitionsSince(Option.of(instantTime1),
Option.of(instantTime1));
+ List<String> writtenPartitionsOnStorage =
hiveClient.getWrittenPartitionsSince(Option.of(instantTime1),
Option.of(instantTime1));
+ List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(
+ partitionsInMetastore, writtenPartitionsOnStorage,
droppedPartitionsOnStorage);
+
+ // Verify no DROP event is generated for partition that was never in
metastore
+ long dropEvents = partitionEvents.stream()
+ .filter(e -> e.eventType == PartitionEventType.DROP)
+ .count();
+ assertEquals(0, dropEvents,
+ "No DROP partition event should be generated for partition that was
never in metastore");
+
+ // Verify ADD event is generated for the new partition that was added to
storage
+ List<PartitionEvent> addEvents = partitionEvents.stream()
+ .filter(e -> e.eventType == PartitionEventType.ADD)
+ .collect(Collectors.toList());
+ assertEquals(1, addEvents.size(),
+ "ADD partition event should be generated for new partition added to
storage");
+ assertEquals(addPartition, addEvents.get(0).storagePartition);
+ }
+
+ @ParameterizedTest
+ @MethodSource("syncModeAndEnablePushDown")
+ void testGetPartitionEvents_droppedStoragePartitionPresentInMetastore(
+ String syncMode, String enablePushDown) throws Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+
+ // Create a table with 1 partition
+ String instantTime1 = "100";
+ HiveTestUtil.createCOWTable(instantTime1, 1, true);
+
+ reInitHiveSyncClient();
+ // Sync the table to metastore
+ reSyncHiveTable();
+
+ List<Partition> partitionsInMetastore =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(1, partitionsInMetastore.size(), "Should have 1 partition in
metastore");
+
+ // Add a partition and sync it to metastore
+ String instantTime2 = "101";
+ String newPartition = "2010/02/01";
+ HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2);
+
+ reInitHiveSyncClient();
+ // Sync the table to metastore
+ reSyncHiveTable();
+
+ partitionsInMetastore =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+ assertEquals(2, partitionsInMetastore.size(), "Should have 2 partitions in
metastore");
+
+ // Now delete the partition that exists in metastore
+ String instantTime3 = "102";
+ HiveTestUtil.createReplaceCommit(instantTime3, newPartition,
WriteOperationType.DELETE_PARTITION, true, true);
+
+ // Add another partition to storage but don't sync to metastore
+ String instantTime4 = "103";
+ String addPartition = "2010/04/01";
+ HiveTestUtil.addCOWPartition(addPartition, true, true, instantTime4);
+
+ reInitHiveSyncClient();
+
+ // Get partition events
+ Set<String> droppedPartitionsOnStorage =
hiveClient.getDroppedPartitionsSince(Option.of(instantTime2),
Option.of(instantTime2));
+ List<String> writtenPartitionsOnStorage =
hiveClient.getWrittenPartitionsSince(Option.of(instantTime2),
Option.of(instantTime2));
+ List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(
+ partitionsInMetastore, writtenPartitionsOnStorage,
droppedPartitionsOnStorage);
+
+ // Verify DROP event is generated for partition that exists in metastore
+ List<PartitionEvent> dropEvents = partitionEvents.stream()
+ .filter(e -> e.eventType == PartitionEventType.DROP)
+ .collect(Collectors.toList());
+ assertEquals(1, dropEvents.size(),
+ "DROP partition event should be generated for partition that exists in
metastore");
+ assertEquals(newPartition, dropEvents.get(0).storagePartition);
+
+ // Verify ADD event is generated for the new partition that was added to
storage
+ List<PartitionEvent> addEvents = partitionEvents.stream()
+ .filter(e -> e.eventType == PartitionEventType.ADD)
+ .collect(Collectors.toList());
+ assertEquals(1, addEvents.size(),
+ "ADD partition event should be generated for new partition added to
storage");
+ assertEquals(addPartition, addEvents.get(0).storagePartition);
+ }
+
@ParameterizedTest
@MethodSource("syncModeAndEnablePushDown")
public void testNonPartitionedSync(String syncMode, String enablePushDown)
throws Exception {
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index d7239e0a4b68..b96f2ad76121 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -212,11 +212,14 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues =
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
- if (droppedPartitionsOnStorage.contains(storagePartition)) {
- events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
- } else {
- if (!storagePartitionValues.isEmpty()) {
- String storageValue = String.join(", ", storagePartitionValues);
+ if (!storagePartitionValues.isEmpty()) {
+ String storageValue = String.join(", ", storagePartitionValues);
+ if (droppedPartitionsOnStorage.contains(storagePartition)) {
+ if (paths.containsKey(storageValue)) {
+ // Add partition drop event only if it exists in the metastore
+ events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
+ }
+ } else {
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if
(!paths.get(storageValue).equals(fullStoragePartitionPath)) {