This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 990c0c565b7 [HUDI-7933] Sync table in Glue/HMS if table base path is 
updated (#11529)
990c0c565b7 is described below

commit 990c0c565b7ccee2c2cda690267cb7567d47f5e1
Author: vamsikarnika <[email protected]>
AuthorDate: Tue Jul 2 06:54:02 2024 +0530

    [HUDI-7933] Sync table in Glue/HMS if table base path is updated (#11529)
    
    Co-authored-by: Vamsi <[email protected]>
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    |  10 +++
 .../hudi/aws/sync/TestAWSGlueSyncClient.java       |  24 +++++
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |   6 ++
 .../org/apache/hudi/hive/HoodieHiveSyncClient.java |  10 +++
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 100 +++++++++++++++++++++
 .../hudi/sync/common/HoodieMetaSyncOperations.java |   8 ++
 6 files changed, 158 insertions(+)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 2a293007062..60d6a1e708c 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -994,6 +994,16 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     }
   }
 
+  @Override
+  public String getTableLocation(String tableName) {
+    try {
+      Table table = getTable(awsGlue, databaseName, tableName);
+      return table.storageDescriptor().location();
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Fail to get base path for the table " 
+ tableId(databaseName, tableName), e);
+    }
+  }
+
   private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
     List<Column> cols = new ArrayList<>();
     for (String key : mapSchema.keySet()) {
diff --git 
a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java 
b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java
index 335c2fa720a..540b1d75141 100644
--- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java
+++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java
@@ -50,6 +50,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.hudi.aws.testutils.GlueTestUtil.glueSyncProps;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.any;
@@ -203,6 +205,27 @@ class TestAWSGlueSyncClient {
     assertThrows(HoodieGlueSyncException.class, () -> 
awsGlueSyncClient.getMetastoreFieldSchemas(tableName));
   }
 
+  @Test
+  void testGetTableLocation() {
+    String tableName = "testTable";
+    List<Column> columns = 
Arrays.asList(Column.builder().name("name").type("string").comment("person's 
name").build(),
+        Column.builder().name("age").type("int").comment("person's 
age").build());
+    CompletableFuture<GetTableResponse> tableResponse = 
getTableWithDefaultProps(tableName, columns, Collections.emptyList());
+    // mock aws glue get table call
+    
Mockito.when(mockAwsGlue.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+    String basePath = awsGlueSyncClient.getTableLocation(tableName);
+    // verify if table base path is correct
+    assertEquals(glueSyncProps.get(META_SYNC_BASE_PATH.key()), basePath, 
"table base path should match");
+  }
+
+  @Test
+  void testGetTableLocation_ThrowsException() {
+    String tableName = "testTable";
+    // mock aws glue get table call to throw an exception
+    
Mockito.when(mockAwsGlue.getTable(any(GetTableRequest.class))).thenThrow(EntityNotFoundException.class);
+    assertThrows(HoodieGlueSyncException.class, () -> 
awsGlueSyncClient.getTableLocation(tableName));
+  }
+
   private CompletableFuture<GetTableResponse> getTableWithDefaultProps(String 
tableName, List<Column> columns, List<Column> partitionColumns) {
     String databaseName = "testdb";
     String inputFormatClass = "inputFormat";
@@ -213,6 +236,7 @@ class TestAWSGlueSyncClient {
     software.amazon.awssdk.services.glue.model.StorageDescriptor 
storageDescriptor = 
software.amazon.awssdk.services.glue.model.StorageDescriptor.builder()
         
.serdeInfo(SerDeInfo.builder().serializationLibrary(serdeClass).parameters(serdeProperties).build())
         .inputFormat(inputFormatClass)
+        .location(glueSyncProps.getString(META_SYNC_BASE_PATH.key()))
         .columns(columns)
         .outputFormat(outputFormatClass)
         .build();
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 1c2056785b7..0dae64bdc29 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -232,6 +232,12 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
     final boolean tableExists = syncClient.tableExists(tableName);
     // Get the parquet schema for this table looking at the latest commit
     MessageType schema = 
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
+    // if table exists and location of the metastore table doesn't match the 
hoodie base path, recreate the table
+    if (tableExists && 
!syncClient.getTableLocation(tableName).equals(syncClient.getBasePath())) {
+      recreateAndSyncHiveTable(tableName, useRealtimeInputFormat, 
readAsOptimized);
+      return;
+    }
+
     boolean schemaChanged;
     boolean propertiesChanged;
     try {
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 0ad52e7a189..ebf5dc7368e 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -487,4 +487,14 @@ public class HoodieHiveSyncClient extends HoodieSyncClient 
{
       throw new HoodieHiveSyncException("Failed to delete the table " + 
tableId(databaseName, tableName), e);
     }
   }
+
+  @Override
+  public String getTableLocation(String tableName) {
+    try {
+      Table table = client.getTable(databaseName, tableName);
+      return table.getSd().getLocation();
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to get the basepath of the 
table " + tableId(databaseName, tableName), e);
+    }
+  }
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 34dea62db1f..9c85fab18cc 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -826,6 +826,47 @@ public class TestHiveSyncTool {
         "The last commit that was synced should be 101");
   }
 
+  @ParameterizedTest
+  @MethodSource("syncModeAndEnablePushDown")
+  public void testRecreateCOWTableOnBasePathChange(String syncMode, String 
enablePushDown) throws Exception {
+    hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+    hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(), 
enablePushDown);
+
+    String commitTime1 = "100";
+    HiveTestUtil.createCOWTable(commitTime1, 5, true);
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+
+    String commitTime2 = "105";
+    // let's update the basepath
+    basePath = Files.createTempDirectory("hivesynctest_new" + 
Instant.now().toEpochMilli()).toUri().toString();
+    hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
+
+    // let's create new table in new basepath
+    HiveTestUtil.createCOWTable(commitTime2, 2, true);
+    // Now lets create more partitions and these are the only ones which needs 
to be synced
+    ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
+    String commitTime3 = "110";
+    // let's add 2 more partitions to the new basepath
+    HiveTestUtil.addCOWPartitions(2, false, true, dateTime, commitTime3);
+
+    // reinitialize hive client
+    reInitHiveSyncClient();
+    // after reinitializing hive client, table location shouldn't match hoodie 
base path
+    assertNotEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(HiveTestUtil.TABLE_NAME), "new table location 
should match hoodie basepath");
+
+    // Lets do the sync
+    reSyncHiveTable();
+    // verify partition count should be 4 from new basepath, not 5 from old
+    assertEquals(4, 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
+        "the 4 partitions from new base path should be present for hive");
+    // verify last commit time synced
+    assertEquals(commitTime3, 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+        "The last commit that was synced should be 110");
+    // table location now should be updated to latest hoodie basepath
+    assertEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(HiveTestUtil.TABLE_NAME), "new table location 
should match hoodie basepath");
+  }
+
   @ParameterizedTest
   @MethodSource("syncModeAndEnablePushDown")
   void testRecreateCOWTableWithSchemaEvolution(String syncMode, String 
enablePushDown) throws Exception {
@@ -1028,6 +1069,65 @@ public class TestHiveSyncTool {
         "The last commit that was synced should be 103");
   }
 
+  @ParameterizedTest
+  @MethodSource("syncModeAndSchemaFromCommitMetadata")
+  public void testSyncMergeOnReadWithBasePathChange(boolean 
useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws 
Exception {
+    hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+    hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(), 
enablePushDown);
+
+    String instantTime = "100";
+    String deltaCommitTime = "101";
+    HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
+        useSchemaFromCommitMetadata);
+
+    String roTableName = HiveTestUtil.TABLE_NAME + 
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
+    String rtTableName = HiveTestUtil.TABLE_NAME + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+    reInitHiveSyncClient();
+    assertFalse(hiveClient.tableExists(roTableName), "Table " + 
HiveTestUtil.TABLE_NAME + " should not exist initially");
+    assertFalse(hiveClient.tableExists(rtTableName), "Table " + 
HiveTestUtil.TABLE_NAME + " should not exist initially");
+    // Lets do the sync
+    reSyncHiveTable();
+
+    // change the hoodie base path
+    basePath = Files.createTempDirectory("hivesynctest_new" + 
Instant.now().toEpochMilli()).toUri().toString();
+    hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
+
+    String instantTime2 = "102";
+    String deltaCommitTime2 = "103";
+    // let's create MOR table in the new basepath
+    HiveTestUtil.createMORTable(instantTime2, deltaCommitTime2, 2, true,
+        useSchemaFromCommitMetadata);
+
+    // let's add more partitions in the new basepath
+    ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
+    String commitTime3 = "104";
+    String deltaCommitTime3 = "105";
+    HiveTestUtil.addMORPartitions(2, true, false,
+        useSchemaFromCommitMetadata, dateTime, commitTime3, deltaCommitTime3);
+
+    // reinitialize hive client
+    reInitHiveSyncClient();
+    // verify table location is different from hoodie basepath
+    assertNotEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(roTableName), "ro table location should not match 
hoodie base path before sync");
+    assertNotEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(rtTableName), "rt table location should not match 
hoodie base path before sync");
+    // Lets do the sync
+    reSyncHiveTable();
+
+    // verify partition count should be 4, not 5 from old basepath
+    assertEquals(4, hiveClient.getAllPartitions(roTableName).size(),
+        "the 4 partitions from new base path should be present for ro table");
+    assertEquals(4, hiveClient.getAllPartitions(rtTableName).size(),
+        "the 4 partitions from new base path should be present for rt table");
+    // verify last synced commit time
+    assertEquals(deltaCommitTime3, 
hiveClient.getLastCommitTimeSynced(roTableName).get(),
+        "The last commit that was synced should be 103");
+    assertEquals(deltaCommitTime3, 
hiveClient.getLastCommitTimeSynced(rtTableName).get(),
+        "The last commit that was synced should be 103");
+    // verify table location is updated to the new hoodie basepath
+    assertEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(roTableName), "ro table location should match 
hoodie base path after sync");
+    assertEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(rtTableName), "rt table location should match 
hoodie base path after sync");
+  }
+
   @ParameterizedTest
   @MethodSource("syncModeAndSchemaFromCommitMetadata")
   public void testSyncMergeOnReadRT(boolean useSchemaFromCommitMetadata, 
String syncMode, String enablePushDown) throws Exception {
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
index d377d321e67..f0772f2b548 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.sync.common;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.sync.common.model.FieldSchema;
 import org.apache.hudi.sync.common.model.Partition;
 
@@ -184,6 +185,13 @@ public interface HoodieMetaSyncOperations {
     return Collections.emptyList();
   }
 
+  /**
+   * Get the base path of the table from metastore
+   */
+  default String getTableLocation(String tableName) {
+    return StringUtils.EMPTY_STRING;
+  }
+
   /**
    * Update the field comments for table in metastore, by using the ones from 
storage.
    *

Reply via email to