This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a56065c854 [HUDI-7853] Fix missing serDe properties post migration 
from hiveSync to glueSync (#11404)
2a56065c854 is described below

commit 2a56065c854cedaf506521c6a39ae6e927c7e9c9
Author: Prathit malik <[email protected]>
AuthorDate: Tue Jun 11 06:24:27 2024 +0530

    [HUDI-7853] Fix missing serDe properties post migration from hiveSync to 
glueSync (#11404)
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    | 64 ++++++++++++++++++++++
 1 file changed, 64 insertions(+)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index b39d47132c5..7187e44a2a9 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -21,11 +21,14 @@ package org.apache.hudi.aws.sync;
 import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
 import org.apache.hudi.aws.sync.util.GluePartitionFilterGenerator;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.MapUtils;
 import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.config.GlueCatalogSyncClientConfig;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.sync.common.HoodieSyncClient;
@@ -109,6 +112,7 @@ import static 
org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
 import static 
org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
 import static org.apache.hudi.sync.common.util.TableUtils.tableId;
 
 /**
@@ -844,6 +848,66 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     return new 
GluePartitionFilterGenerator().generatePushDownFilter(writtenPartitions, 
partitionFields, (HiveSyncConfig) config);
   }
 
+  @Override
+  public boolean updateSerdeProperties(String tableName, Map<String, String> 
serdeProperties, boolean useRealtimeFormat) {
+    if (MapUtils.isNullOrEmpty(serdeProperties)) {
+      return false;
+    }
+
+    try {
+      serdeProperties.putIfAbsent("serialization.format", "1");
+      Table table = getTable(awsGlue, databaseName, tableName);
+      StorageDescriptor existingTableStorageDescriptor = 
table.storageDescriptor();
+
+      if (existingTableStorageDescriptor != null && 
existingTableStorageDescriptor.serdeInfo() != null
+              && 
existingTableStorageDescriptor.serdeInfo().parameters().size() == 
serdeProperties.size()) {
+        Map<String, String> existingSerdeProperties = 
existingTableStorageDescriptor.serdeInfo().parameters();
+        boolean different = serdeProperties.entrySet().stream().anyMatch(e ->
+                !existingSerdeProperties.containsKey(e.getKey()) || 
!existingSerdeProperties.get(e.getKey()).equals(e.getValue()));
+        if (!different) {
+          LOG.debug("Table " + tableName + " serdeProperties already up to 
date, skip update serde properties.");
+          return false;
+        }
+      }
+
+      HoodieFileFormat baseFileFormat = 
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
+      String serDeClassName = 
HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
+
+      SerDeInfo newSerdeInfo = SerDeInfo
+          .builder()
+          .serializationLibrary(serDeClassName)
+          .parameters(serdeProperties)
+          .build();
+
+      StorageDescriptor storageDescriptor = table
+          .storageDescriptor()
+          .toBuilder()
+          .serdeInfo(newSerdeInfo)
+          .build();
+
+      TableInput updatedTableInput = TableInput.builder()
+          .name(tableName)
+          .tableType(table.tableType())
+          .parameters(table.parameters())
+          .partitionKeys(table.partitionKeys())
+          .storageDescriptor(storageDescriptor)
+          .lastAccessTime(table.lastAccessTime())
+          .lastAccessTime(table.lastAnalyzedTime())
+          .build();
+
+      UpdateTableRequest updateTableRequest = UpdateTableRequest.builder()
+          .databaseName(databaseName)
+          .tableInput(updatedTableInput)
+          .build();
+
+      awsGlue.updateTable(updateTableRequest);
+      return true;
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Failed to update table serde info for 
table: "
+              + tableName, e);
+    }
+  }
+
   private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
     List<Column> cols = new ArrayList<>();
     for (String key : mapSchema.keySet()) {

Reply via email to