This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 36242ff516d [HUDI-8015] Fix Glue Meta Sync Failure with recreating the 
table (#11677)
36242ff516d is described below

commit 36242ff516dbd92fa6ef16bbcc150dfc6488d815
Author: vamsikarnika <[email protected]>
AuthorDate: Thu Aug 8 21:13:20 2024 +0530

    [HUDI-8015] Fix Glue Meta Sync Failure with recreating the table (#11677)
    
    Updated AwsGlueCatalaogSyncClient createOrReplace method to drop and
    create the table, instead of updating the table. Updated the check to 
compare
    the base of the hudi table and metastore table location to ignore file 
schemas.
    
    ---------
    
    Co-authored-by: Vamsi <[email protected]>
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    | 48 ++++++++++---------
 .../hudi/aws/sync/TestAWSGlueSyncClient.java       | 56 +++++++++++++++++-----
 .../java/org/apache/hudi/common/fs/FSUtils.java    | 12 +++++
 .../org/apache/hudi/common/fs/TestFSUtils.java     | 51 ++++++++++++++++++++
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |  8 +++-
 5 files changed, 138 insertions(+), 37 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 65bdbb1c04e..1e8c63d5667 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -579,34 +579,36 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     }
 
     try {
-      // Create a temp table will validate the schema before dropping and 
recreating the table
-      String tempTableName = generateTempTableName(tableName);
-      createTable(tempTableName, storageSchema, inputFormatClass, 
outputFormatClass, serdeClass, serdeProperties, tableProperties);
-
-      Table tempTable = getTable(awsGlue, databaseName, tempTableName);
-      final Instant now = Instant.now();
-      TableInput updatedTableInput = TableInput.builder()
-          .name(tableName)
-          .tableType(tempTable.tableType())
-          .parameters(tempTable.parameters())
-          .partitionKeys(tempTable.partitionKeys())
-          .storageDescriptor(tempTable.storageDescriptor())
-          .lastAccessTime(now)
-          .lastAnalyzedTime(now)
-          .build();
-
-      UpdateTableRequest request = UpdateTableRequest.builder()
-          .databaseName(databaseName)
-          .skipArchive(skipTableArchive)
-          .tableInput(updatedTableInput)
-          .build();
-      awsGlue.updateTable(request).get();
-      dropTable(tempTableName);
+      // validate before dropping the table
+      validateSchemaAndProperties(tableName, storageSchema, inputFormatClass, 
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+      // drop and recreate the actual table
+      dropTable(tableName);
+      createTable(tableName, storageSchema, inputFormatClass, 
outputFormatClass, serdeClass, serdeProperties, tableProperties);
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Fail to recreate the table" + 
tableId(databaseName, tableName), e);
     }
   }
 
+  /**
+   * creates a temp table with the given schema and properties to ensure
+   * table creation succeeds before dropping the table and recreating it.
+   * This ensures that actual table is not dropped in case there are any
+   * issues with table creation because of provided schema or properties
+   */
+  private void validateSchemaAndProperties(String tableName,
+                                           MessageType storageSchema,
+                                           String inputFormatClass,
+                                           String outputFormatClass,
+                                           String serdeClass,
+                                           Map<String, String> serdeProperties,
+                                           Map<String, String> 
tableProperties) {
+    // Create a temp table to validate the schema and properties
+    String tempTableName = generateTempTableName(tableName);
+    createTable(tempTableName, storageSchema, inputFormatClass, 
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+    // drop the temp table
+    dropTable(tempTableName);
+  }
+
   @Override
   public void createTable(String tableName,
                           MessageType storageSchema,
diff --git 
a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java 
b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java
index 8723db418c0..035f31c4351 100644
--- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java
+++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/TestAWSGlueSyncClient.java
@@ -42,6 +42,7 @@ import 
software.amazon.awssdk.services.glue.model.GetTableResponse;
 import software.amazon.awssdk.services.glue.model.SerDeInfo;
 import software.amazon.awssdk.services.glue.model.Table;
 import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -49,11 +50,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.hudi.aws.testutils.GlueTestUtil.glueSyncProps;
 import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -83,7 +86,7 @@ class TestAWSGlueSyncClient {
   }
 
   @Test
-  void testCreateOrReplaceTable_TableExists() {
+  void testCreateOrReplaceTable_TableExists() throws ExecutionException, 
InterruptedException {
     String tableName = "testTable";
     String databaseName = "testdb";
     String inputFormatClass = "inputFormat";
@@ -97,31 +100,37 @@ class TestAWSGlueSyncClient {
         .inputFormat(inputFormatClass)
         .outputFormat(outputFormatClass)
         .build();
-    Table tempTable = Table.builder()
-        .name("tempTable")
+    Table table = Table.builder()
+        .name(tableName)
         .tableType("COPY_ON_WRITE")
         .parameters(new HashMap<>())
         .storageDescriptor(storageDescriptor)
         .databaseName(databaseName)
         .build();
-    GetTableResponse response = GetTableResponse.builder()
-        .table(tempTable)
+
+    GetTableResponse tableResponse = GetTableResponse.builder()
+        .table(table)
         .build();
 
+    GetTableRequest getTableRequestForTable = 
GetTableRequest.builder().databaseName(databaseName).name(tableName).build();
     // Mock methods
-    CompletableFuture<GetTableResponse> tableResponse = 
CompletableFuture.completedFuture(response);
-    
Mockito.when(mockAwsGlue.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+    CompletableFuture<GetTableResponse> tableResponseFuture = 
CompletableFuture.completedFuture(tableResponse);
+    CompletableFuture<GetTableResponse> mockTableNotFoundResponse = 
Mockito.mock(CompletableFuture.class);
+    ExecutionException executionException = new ExecutionException("failed to 
get table", EntityNotFoundException.builder().build());
+    
Mockito.when(mockTableNotFoundResponse.get()).thenThrow(executionException);
+
+    
Mockito.when(mockAwsGlue.getTable(any(GetTableRequest.class))).thenReturn(mockTableNotFoundResponse);
+    
Mockito.when(mockAwsGlue.getTable(getTableRequestForTable)).thenReturn(tableResponseFuture).thenReturn(mockTableNotFoundResponse);
+    
Mockito.when(mockAwsGlue.createTable(any(CreateTableRequest.class))).thenReturn(CompletableFuture.completedFuture(CreateTableResponse.builder().build()));
 
     CompletableFuture<DeleteTableResponse> deleteTableResponse = 
CompletableFuture.completedFuture(DeleteTableResponse.builder().build());
     
Mockito.when(mockAwsGlue.deleteTable(any(DeleteTableRequest.class))).thenReturn(deleteTableResponse);
 
-    
Mockito.when(mockAwsGlue.updateTable(any(UpdateTableRequest.class))).thenReturn(CompletableFuture.completedFuture(null));
     awsGlueSyncClient.createOrReplaceTable(tableName, storageSchema, 
inputFormatClass, outputFormatClass, serdeClass, serdeProperties, 
tableProperties);
 
-    // Verify that awsGlue.updateTable() is called exactly once
-    verify(mockAwsGlue, times(1)).updateTable(any(UpdateTableRequest.class));
-    verify(mockAwsGlue, times(0)).createTable(any(CreateTableRequest.class));
-    verify(mockAwsGlue, times(1)).deleteTable(any(DeleteTableRequest.class));
+    verify(mockAwsGlue, times(2)).deleteTable(any(DeleteTableRequest.class));
+    verify(mockAwsGlue, times(3)).getTable(any(GetTableRequest.class));
+    verify(mockAwsGlue, times(2)).createTable(any(CreateTableRequest.class));
   }
 
   @Test
@@ -227,6 +236,29 @@ class TestAWSGlueSyncClient {
     assertThrows(HoodieGlueSyncException.class, () -> 
awsGlueSyncClient.getTableLocation(tableName));
   }
 
+  @Test
+  void testUpdateTableProperties() throws ExecutionException, 
InterruptedException {
+    String tableName = "test";
+    GlueTestUtil.getColumn("name", "string", "person's name");
+    List<Column> columns = Arrays.asList(GlueTestUtil.getColumn("name", 
"string", "person's name"),
+        GlueTestUtil.getColumn("age", "int", "person's age"));
+    List<Column> partitionKeys = 
Collections.singletonList(GlueTestUtil.getColumn("city", "string", "person's 
city"));
+    CompletableFuture<GetTableResponse> tableResponseFuture = 
getTableWithDefaultProps(tableName, columns, partitionKeys);
+    HashMap<String, String> newTableProperties = new HashMap<>();
+    newTableProperties.put("last_commit_time_sync", "100");
+
+    CompletableFuture<UpdateTableResponse> mockUpdateTableResponse = 
Mockito.mock(CompletableFuture.class);
+    
Mockito.when(mockUpdateTableResponse.get()).thenReturn(UpdateTableResponse.builder().build());
+    
Mockito.when(mockAwsGlue.getTable(any(GetTableRequest.class))).thenReturn(tableResponseFuture);
+    
Mockito.when(mockAwsGlue.updateTable(any(UpdateTableRequest.class))).thenReturn(mockUpdateTableResponse);
+    boolean updated = awsGlueSyncClient.updateTableProperties(tableName, 
newTableProperties);
+    assertTrue(updated, "should return true when new parameters is not empty");
+    verify(mockAwsGlue, times(1)).updateTable(any(UpdateTableRequest.class));
+
+    Mockito.when(mockUpdateTableResponse.get()).thenThrow(new 
InterruptedException());
+    assertThrows(HoodieGlueSyncException.class, () -> 
awsGlueSyncClient.updateTableProperties(tableName, newTableProperties));
+  }
+
   private CompletableFuture<GetTableResponse> getTableWithDefaultProps(String 
tableName, List<Column> columns, List<Column> partitionColumns) {
     String databaseName = "testdb";
     String inputFormatClass = "inputFormat";
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 4fb8149ed56..f4d29a138fe 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -44,6 +44,7 @@ import org.apache.hudi.storage.StoragePathFilter;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.inline.InLineFSUtils;
 
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -690,6 +691,17 @@ public class FSUtils {
     return pathInfoList;
   }
 
+  public static boolean comparePathsWithoutScheme(String pathStr1, String 
pathStr2) {
+    Path pathWithoutScheme1 = getPathWithoutScheme(new Path(pathStr1));
+    Path pathWithoutScheme2 = getPathWithoutScheme(new Path(pathStr2));
+    return pathWithoutScheme1.equals(pathWithoutScheme2);
+  }
+
+  public static Path getPathWithoutScheme(Path path) {
+    return path.isUriPathAbsolute()
+        ? new Path(null, path.toUri().getAuthority(), path.toUri().getPath()) 
: path;
+  }
+
   /**
    * Serializable function interface.
    *
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index df7e41688a3..7621aca7c7f 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -590,6 +590,57 @@ public class TestFSUtils extends HoodieCommonTestHarness {
         FSUtils.makeQualified(wrapperStorage, new StoragePath("s3://x/y")));
   }
 
+  @Test
+  void testComparePathsWithoutScheme() {
+    String path1 = "s3://test_bucket_one/table/base/path";
+    String path2 = "s3a://test_bucket_two/table/base/path";
+    assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should 
return false since bucket names dont match");
+
+    path1 = "s3a://test_bucket_one/table/new_base/path";
+    path2 = "s3a://test_bucket_one/table/old_base/path";
+    assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should 
return false since paths don't match");
+
+    path1 = "s3://test_bucket_one/table/base/path";
+    path2 = "s3a://test_bucket_one/table/base/path";
+    assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return 
false since bucket names match without file shema");
+
+    path1 = "s3a://test_bucket_one/table/base/path";
+    path2 = "s3a://test_bucket_one/table/base/path";
+    assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return 
true since bucket names and path matches");
+
+    path1 = "gs://test_bucket_one/table/base/path";
+    path2 = "gs://test_bucket_two/table/base/path";
+    assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should 
return true since bucket names and path matches");
+
+    path1 = "gs://test_bucket_one/table/base/path";
+    path2 = "gs://test_bucket_one/table/base/path";
+    assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return 
true since bucket names and path matches");
+
+    path1 = "file:/var/table/base/path";
+    path2 = "/var/table/base/path";
+    assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return 
true since path matches");
+
+    path1 = "file:/var/table/base/path";
+    path2 = "file:/var/table/old_base/path";
+    assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should 
return false since path doesn't matches");
+
+    path1 = "table/base/path";
+    path2 = "table/base/path";
+    assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return 
true since relative path doesn't matches");
+  }
+
+  @Test
+  void testGetPathWithoutScheme() {
+    String path1 = "s3://test_bucket_one/table/base/path";
+    assertEquals(FSUtils.getPathWithoutScheme(new 
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should 
return false since bucket names dont match");
+
+    path1 = "s3a://test_bucket_one/table/base/path";
+    assertEquals(FSUtils.getPathWithoutScheme(new 
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should 
return false since bucket names dont match");
+
+    path1 = "gs://test_bucket_one/table/base/path";
+    assertEquals(FSUtils.getPathWithoutScheme(new 
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should 
return false since bucket names dont match");
+  }
+
   private StoragePath getHoodieTempDir() {
     return new StoragePath(baseUri.toString(), ".hoodie/.temp");
   }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 6a7b94e43aa..30d55ddcec8 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hive;
 
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieSyncTableStrategy;
 import org.apache.hudi.common.util.ConfigUtils;
@@ -233,7 +234,8 @@ public class HiveSyncTool extends HoodieSyncTool implements 
AutoCloseable {
     // Get the parquet schema for this table looking at the latest commit
     MessageType schema = 
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
     // if table exists and location of the metastore table doesn't match the 
hoodie base path, recreate the table
-    if (tableExists && 
!syncClient.getTableLocation(tableName).equals(syncClient.getBasePath())) {
+    if (tableExists && 
!FSUtils.comparePathsWithoutScheme(syncClient.getBasePath(), 
syncClient.getTableLocation(tableName))) {
+      LOG.info("basepath is updated for the table {}", tableName);
       recreateAndSyncHiveTable(tableName, useRealtimeInputFormat, 
readAsOptimized);
       return;
     }
@@ -257,9 +259,11 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
       }
       LOG.info("Sync complete for {}", tableName);
     } catch (HoodieHiveSyncException ex) {
-      LOG.error("failed to sync the table {}", tableName, ex);
       if (shouldRecreateAndSyncTable()) {
+        LOG.warn("failed to sync the table {}, trying to recreate", tableName, 
ex);
         recreateAndSyncHiveTable(tableName, useRealtimeInputFormat, 
readAsOptimized);
+      } else {
+        throw new HoodieHiveSyncException("failed to sync the table " + 
tableName, ex);
       }
     }
   }

Reply via email to