This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2a56065c854 [HUDI-7853] Fix missing serDe properties post migration
from hiveSync to glueSync (#11404)
2a56065c854 is described below
commit 2a56065c854cedaf506521c6a39ae6e927c7e9c9
Author: Prathit malik <[email protected]>
AuthorDate: Tue Jun 11 06:24:27 2024 +0530
[HUDI-7853] Fix missing serDe properties post migration from hiveSync to
glueSync (#11404)
---
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 64 ++++++++++++++++++++++
1 file changed, 64 insertions(+)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index b39d47132c5..7187e44a2a9 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -21,11 +21,14 @@ package org.apache.hudi.aws.sync;
import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
import org.apache.hudi.aws.sync.util.GluePartitionFilterGenerator;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.MapUtils;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.config.GlueCatalogSyncClientConfig;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncClient;
@@ -109,6 +112,7 @@ import static
org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
import static
org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
+import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
/**
@@ -844,6 +848,66 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
return new
GluePartitionFilterGenerator().generatePushDownFilter(writtenPartitions,
partitionFields, (HiveSyncConfig) config);
}
+ @Override
+ public boolean updateSerdeProperties(String tableName, Map<String, String>
serdeProperties, boolean useRealtimeFormat) {
+ if (MapUtils.isNullOrEmpty(serdeProperties)) {
+ return false;
+ }
+
+ try {
+ serdeProperties.putIfAbsent("serialization.format", "1");
+ Table table = getTable(awsGlue, databaseName, tableName);
+ StorageDescriptor existingTableStorageDescriptor =
table.storageDescriptor();
+
+ if (existingTableStorageDescriptor != null &&
existingTableStorageDescriptor.serdeInfo() != null
+ &&
existingTableStorageDescriptor.serdeInfo().parameters().size() ==
serdeProperties.size()) {
+ Map<String, String> existingSerdeProperties =
existingTableStorageDescriptor.serdeInfo().parameters();
+ boolean different = serdeProperties.entrySet().stream().anyMatch(e ->
+ !existingSerdeProperties.containsKey(e.getKey()) ||
!existingSerdeProperties.get(e.getKey()).equals(e.getValue()));
+ if (!different) {
+ LOG.debug("Table " + tableName + " serdeProperties already up to
date, skip update serde properties.");
+ return false;
+ }
+ }
+
+ HoodieFileFormat baseFileFormat =
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
+ String serDeClassName =
HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
+
+ SerDeInfo newSerdeInfo = SerDeInfo
+ .builder()
+ .serializationLibrary(serDeClassName)
+ .parameters(serdeProperties)
+ .build();
+
+ StorageDescriptor storageDescriptor = table
+ .storageDescriptor()
+ .toBuilder()
+ .serdeInfo(newSerdeInfo)
+ .build();
+
+ TableInput updatedTableInput = TableInput.builder()
+ .name(tableName)
+ .tableType(table.tableType())
+ .parameters(table.parameters())
+ .partitionKeys(table.partitionKeys())
+ .storageDescriptor(storageDescriptor)
+ .lastAccessTime(table.lastAccessTime())
+ .lastAccessTime(table.lastAnalyzedTime())
+ .build();
+
+ UpdateTableRequest updateTableRequest = UpdateTableRequest.builder()
+ .databaseName(databaseName)
+ .tableInput(updatedTableInput)
+ .build();
+
+ awsGlue.updateTable(updateTableRequest);
+ return true;
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to update table serde info for
table: "
+ + tableName, e);
+ }
+ }
+
private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
List<Column> cols = new ArrayList<>();
for (String key : mapSchema.keySet()) {