This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 990c0c565b7 [HUDI-7933] Sync table in Glue/HMS if table base path is
updated (#11529)
990c0c565b7 is described below
commit 990c0c565b7ccee2c2cda690267cb7567d47f5e1
Author: vamsikarnika <[email protected]>
AuthorDate: Tue Jul 2 06:54:02 2024 +0530
[HUDI-7933] Sync table in Glue/HMS if table base path is updated (#11529)
Co-authored-by: Vamsi <[email protected]>
---
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 10 +++
.../hudi/aws/sync/TestAWSGlueSyncClient.java | 24 +++++
.../java/org/apache/hudi/hive/HiveSyncTool.java | 6 ++
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 10 +++
.../org/apache/hudi/hive/TestHiveSyncTool.java | 100 +++++++++++++++++++++
.../hudi/sync/common/HoodieMetaSyncOperations.java | 8 ++
6 files changed, 158 insertions(+)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 2a293007062..60d6a1e708c 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -994,6 +994,16 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
}
}
+ @Override
+ public String getTableLocation(String tableName) {
+ try {
+ Table table = getTable(awsGlue, databaseName, tableName);
+ return table.storageDescriptor().location();
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Fail to get base path for the table "
+ tableId(databaseName, tableName), e);
+ }
+ }
+
private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
List<Column> cols = new ArrayList<>();
for (String key : mapSchema.keySet()) {
diff --git
a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java
b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java
index 335c2fa720a..540b1d75141 100644
--- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java
+++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java
@@ -50,6 +50,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import static org.apache.hudi.aws.testutils.GlueTestUtil.glueSyncProps;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.any;
@@ -203,6 +205,27 @@ class TestAWSGlueSyncClient {
assertThrows(HoodieGlueSyncException.class, () ->
awsGlueSyncClient.getMetastoreFieldSchemas(tableName));
}
+ @Test
+ void testGetTableLocation() {
+ String tableName = "testTable";
+ List<Column> columns =
Arrays.asList(Column.builder().name("name").type("string").comment("person's
name").build(),
+ Column.builder().name("age").type("int").comment("person's
age").build());
+ CompletableFuture<GetTableResponse> tableResponse =
getTableWithDefaultProps(tableName, columns, Collections.emptyList());
+ // mock aws glue get table call
+
Mockito.when(mockAwsGlue.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+ String basePath = awsGlueSyncClient.getTableLocation(tableName);
+ // verify if table base path is correct
+ assertEquals(glueSyncProps.get(META_SYNC_BASE_PATH.key()), basePath,
"table base path should match");
+ }
+
+ @Test
+ void testGetTableLocation_ThrowsException() {
+ String tableName = "testTable";
+ // mock aws glue get table call to throw an exception
+
Mockito.when(mockAwsGlue.getTable(any(GetTableRequest.class))).thenThrow(EntityNotFoundException.class);
+ assertThrows(HoodieGlueSyncException.class, () ->
awsGlueSyncClient.getTableLocation(tableName));
+ }
+
private CompletableFuture<GetTableResponse> getTableWithDefaultProps(String
tableName, List<Column> columns, List<Column> partitionColumns) {
String databaseName = "testdb";
String inputFormatClass = "inputFormat";
@@ -213,6 +236,7 @@ class TestAWSGlueSyncClient {
software.amazon.awssdk.services.glue.model.StorageDescriptor
storageDescriptor =
software.amazon.awssdk.services.glue.model.StorageDescriptor.builder()
.serdeInfo(SerDeInfo.builder().serializationLibrary(serdeClass).parameters(serdeProperties).build())
.inputFormat(inputFormatClass)
+ .location(glueSyncProps.getString(META_SYNC_BASE_PATH.key()))
.columns(columns)
.outputFormat(outputFormatClass)
.build();
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 1c2056785b7..0dae64bdc29 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -232,6 +232,12 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
final boolean tableExists = syncClient.tableExists(tableName);
// Get the parquet schema for this table looking at the latest commit
MessageType schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
+ // if table exists and location of the metastore table doesn't match the
hoodie base path, recreate the table
+ if (tableExists &&
!syncClient.getTableLocation(tableName).equals(syncClient.getBasePath())) {
+ recreateAndSyncHiveTable(tableName, useRealtimeInputFormat,
readAsOptimized);
+ return;
+ }
+
boolean schemaChanged;
boolean propertiesChanged;
try {
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 0ad52e7a189..ebf5dc7368e 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -487,4 +487,14 @@ public class HoodieHiveSyncClient extends HoodieSyncClient
{
throw new HoodieHiveSyncException("Failed to delete the table " +
tableId(databaseName, tableName), e);
}
}
+
+ @Override
+ public String getTableLocation(String tableName) {
+ try {
+ Table table = client.getTable(databaseName, tableName);
+ return table.getSd().getLocation();
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to get the basepath of the
table " + tableId(databaseName, tableName), e);
+ }
+ }
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 34dea62db1f..9c85fab18cc 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -826,6 +826,47 @@ public class TestHiveSyncTool {
"The last commit that was synced should be 101");
}
+ @ParameterizedTest
+ @MethodSource("syncModeAndEnablePushDown")
+ public void testRecreateCOWTableOnBasePathChange(String syncMode, String
enablePushDown) throws Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+
+ String commitTime1 = "100";
+ HiveTestUtil.createCOWTable(commitTime1, 5, true);
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+
+ String commitTime2 = "105";
+ // let's update the basepath
+ basePath = Files.createTempDirectory("hivesynctest_new" +
Instant.now().toEpochMilli()).toUri().toString();
+ hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
+
+ // let's create new table in new basepath
+ HiveTestUtil.createCOWTable(commitTime2, 2, true);
+ // Now lets create more partitions and these are the only ones which needs
to be synced
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
+ String commitTime3 = "110";
+ // let's add 2 more partitions to the new basepath
+ HiveTestUtil.addCOWPartitions(2, false, true, dateTime, commitTime3);
+
+ // reinitialize hive client
+ reInitHiveSyncClient();
+ // after reinitializing hive client, table location shouldn't match hoodie
base path
+ assertNotEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(HiveTestUtil.TABLE_NAME), "new table location
should match hoodie basepath");
+
+ // Lets do the sync
+ reSyncHiveTable();
+ // verify partition count should be 4 from new basepath, not 5 from old
+ assertEquals(4,
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
+ "the 4 partitions from new base path should be present for hive");
+ // verify last commit time synced
+ assertEquals(commitTime3,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+ "The last commit that was synced should be 110");
+ // table location now should be updated to latest hoodie basepath
+ assertEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(HiveTestUtil.TABLE_NAME), "new table location
should match hoodie basepath");
+ }
+
@ParameterizedTest
@MethodSource("syncModeAndEnablePushDown")
void testRecreateCOWTableWithSchemaEvolution(String syncMode, String
enablePushDown) throws Exception {
@@ -1028,6 +1069,65 @@ public class TestHiveSyncTool {
"The last commit that was synced should be 103");
}
+ @ParameterizedTest
+ @MethodSource("syncModeAndSchemaFromCommitMetadata")
+ public void testSyncMergeOnReadWithBasePathChange(boolean
useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws
Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+
+ String instantTime = "100";
+ String deltaCommitTime = "101";
+ HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
+ useSchemaFromCommitMetadata);
+
+ String roTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
+ String rtTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+ reInitHiveSyncClient();
+ assertFalse(hiveClient.tableExists(roTableName), "Table " +
HiveTestUtil.TABLE_NAME + " should not exist initially");
+ assertFalse(hiveClient.tableExists(rtTableName), "Table " +
HiveTestUtil.TABLE_NAME + " should not exist initially");
+ // Lets do the sync
+ reSyncHiveTable();
+
+ // change the hoodie base path
+ basePath = Files.createTempDirectory("hivesynctest_new" +
Instant.now().toEpochMilli()).toUri().toString();
+ hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
+
+ String instantTime2 = "102";
+ String deltaCommitTime2 = "103";
+ // let's create MOR table in the new basepath
+ HiveTestUtil.createMORTable(instantTime2, deltaCommitTime2, 2, true,
+ useSchemaFromCommitMetadata);
+
+ // let's add more partitions in the new basepath
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
+ String commitTime3 = "104";
+ String deltaCommitTime3 = "105";
+ HiveTestUtil.addMORPartitions(2, true, false,
+ useSchemaFromCommitMetadata, dateTime, commitTime3, deltaCommitTime3);
+
+ // reinitialize hive client
+ reInitHiveSyncClient();
+ // verify table location is different from hoodie basepath
+ assertNotEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(roTableName), "ro table location should not match
hoodie base path before sync");
+ assertNotEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(rtTableName), "rt table location should not match
hoodie base path before sync");
+ // Lets do the sync
+ reSyncHiveTable();
+
+ // verify partition count should be 4, not 5 from old basepath
+ assertEquals(4, hiveClient.getAllPartitions(roTableName).size(),
+ "the 4 partitions from new base path should be present for ro table");
+ assertEquals(4, hiveClient.getAllPartitions(rtTableName).size(),
+ "the 4 partitions from new base path should be present for rt table");
+ // verify last synced commit time
+ assertEquals(deltaCommitTime3,
hiveClient.getLastCommitTimeSynced(roTableName).get(),
+ "The last commit that was synced should be 103");
+ assertEquals(deltaCommitTime3,
hiveClient.getLastCommitTimeSynced(rtTableName).get(),
+ "The last commit that was synced should be 103");
+ // verify table location is updated to the new hoodie basepath
+ assertEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(roTableName), "ro table location should match
hoodie base path after sync");
+ assertEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(rtTableName), "rt table location should match
hoodie base path after sync");
+ }
+
@ParameterizedTest
@MethodSource("syncModeAndSchemaFromCommitMetadata")
public void testSyncMergeOnReadRT(boolean useSchemaFromCommitMetadata,
String syncMode, String enablePushDown) throws Exception {
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
index d377d321e67..f0772f2b548 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
@@ -20,6 +20,7 @@
package org.apache.hudi.sync.common;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;
@@ -184,6 +185,13 @@ public interface HoodieMetaSyncOperations {
return Collections.emptyList();
}
+ /**
+ * Get the base path of the table from metastore
+ */
+ default String getTableLocation(String tableName) {
+ return StringUtils.EMPTY_STRING;
+ }
+
/**
* Update the field comments for table in metastore, by using the ones from
storage.
*