This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2024d17704e [HUDI-6453] Cascade Glue schema changes to partitions 
(#11670)
2024d17704e is described below

commit 2024d17704e6856174fc6b56ba51cd2d1b93b49d
Author: Nicolas Paris <[email protected]>
AuthorDate: Tue Jul 30 03:52:13 2024 +0200

    [HUDI-6453] Cascade Glue schema changes to partitions (#11670)
    
    Co-authored-by: Shawn Chang <[email protected]>
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    | 32 ++++++++++++++++++----
 .../hudi/sync/datahub/DataHubSyncClient.java       |  3 +-
 .../apache/hudi/sync/datahub/DataHubSyncTool.java  |  2 +-
 .../hudi/sync/datahub/TestDataHubSyncClient.java   |  2 +-
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |  2 +-
 .../org/apache/hudi/hive/HoodieHiveSyncClient.java |  2 +-
 .../org/apache/hudi/hive/SchemaDifference.java     |  0
 .../hudi/sync/common/HoodieMetaSyncOperations.java |  3 +-
 8 files changed, 35 insertions(+), 11 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 733ed3c7d40..65bdbb1c04e 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.GlueCatalogSyncClientConfig;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.SchemaDifference;
 import org.apache.hudi.sync.common.HoodieSyncClient;
 import org.apache.hudi.sync.common.model.FieldSchema;
 import org.apache.hudi.sync.common.model.Partition;
@@ -186,6 +187,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
         GetPartitionsResponse result = 
awsGlue.getPartitions(GetPartitionsRequest.builder()
             .databaseName(databaseName)
             .tableName(tableName)
+            .excludeColumnSchema(true)
             .segment(segment)
             .nextToken(nextToken)
             .build()).get();
@@ -424,6 +426,9 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     }
   }
 
+  /**
+   * Update the table properties to the table.
+   */
   @Override
   public boolean updateTableProperties(String tableName, Map<String, String> 
tableProperties) {
     try {
@@ -509,9 +514,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
   }
 
   @Override
-  public void updateTableSchema(String tableName, MessageType newSchema) {
-    // ToDo Cascade is set in Hive meta sync, but need to investigate how to 
configure it for Glue meta
-    boolean cascade = 
config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
+  public void updateTableSchema(String tableName, MessageType newSchema, 
SchemaDifference schemaDiff) {
     try {
       Table table = getTable(awsGlue, databaseName, tableName);
       Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, 
config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
@@ -536,11 +539,30 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
           .build();
 
       awsGlue.updateTable(request).get();
+      // glue needs partition schema cascading only when columns get updated
+      // TODO: skip cascading when new fields in structs are added to the 
schema in last position
+      boolean cascade = 
config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0 && 
!schemaDiff.getUpdateColumnTypes().isEmpty();
+      if (cascade) {
+        LOG.info("Cascading column changes to partitions");
+        List<String> allPartitions = getAllPartitions(tableName).stream()
+            .map(partition -> getStringFromPartition(table.partitionKeys(), 
partition.getValues()))
+            .collect(Collectors.toList());
+        updatePartitionsToTable(tableName, allPartitions);
+      }
+      awsGlue.updateTable(request).get();
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Fail to update definition for table " 
+ tableId(databaseName, tableName), e);
     }
   }
 
+  private String getStringFromPartition(List<Column> partitionKeys, 
List<String> values) {
+    ArrayList<String> partitionValues = new ArrayList<>();
+    for (int i = 0; i < partitionKeys.size(); i++) {
+      partitionValues.add(String.format("%s=%s", partitionKeys.get(i).name(), 
values.get(i)));
+    }
+    return partitionValues.stream().collect(Collectors.joining("/"));
+  }
+
   @Override
   public void createOrReplaceTable(String tableName,
                                    MessageType storageSchema,
@@ -793,7 +815,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
       return Objects.nonNull(awsGlue.getTable(request).get().table());
     } catch (ExecutionException e) {
       if (e.getCause() instanceof EntityNotFoundException) {
-        LOG.info("Table not found: " + tableId(databaseName, tableName), e);
+        LOG.warn("Table not found: " + tableId(databaseName, tableName), e);
         return false;
       } else {
         throw new HoodieGlueSyncException("Fail to get table: " + 
tableId(databaseName, tableName), e);
@@ -810,7 +832,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
       return Objects.nonNull(awsGlue.getDatabase(request).get().database());
     } catch (ExecutionException e) {
       if (e.getCause() instanceof EntityNotFoundException) {
-        LOG.info("Database not found: " + databaseName, e);
+        LOG.warn("Database not found: " + databaseName, e);
         return false;
       } else {
         throw new HoodieGlueSyncException("Fail to check if database exists " 
+ databaseName, e);
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index cf99bfdcd95..fc8737ad2db 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -22,6 +22,7 @@ package org.apache.hudi.sync.datahub;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.SchemaDifference;
 import org.apache.hudi.sync.common.HoodieSyncClient;
 import org.apache.hudi.sync.common.HoodieSyncException;
 import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
@@ -101,7 +102,7 @@ public class DataHubSyncClient extends HoodieSyncClient {
   }
 
   @Override
-  public void updateTableSchema(String tableName, MessageType schema) {
+  public void updateTableSchema(String tableName, MessageType schema, 
SchemaDifference schemaDifference) {
     try (RestEmitter emitter = config.getRestEmitter()) {
       DatahubResponseLogger responseLogger = new DatahubResponseLogger();
       MetadataChangeProposalWrapper schemaChange = 
createSchemaMetadataUpdate(tableName);
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
index 567f547a817..ea61267676b 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
@@ -52,7 +52,7 @@ public class DataHubSyncTool extends HoodieSyncTool {
   @Override
   public void syncHoodieTable() {
     try (DataHubSyncClient syncClient = new DataHubSyncClient(config)) {
-      syncClient.updateTableSchema(config.getString(META_SYNC_TABLE_NAME), 
null);
+      syncClient.updateTableSchema(config.getString(META_SYNC_TABLE_NAME), 
null, null);
       
syncClient.updateLastCommitTimeSynced(config.getString(META_SYNC_TABLE_NAME));
     }
   }
diff --git 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
index 7029f38a963..ae23e1c78d7 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
@@ -98,7 +98,7 @@ public class TestDataHubSyncClient {
     DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, 
restEmitterMock);
     DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
 
-    dhClient.updateTableSchema("some_table", null);
+    dhClient.updateTableSchema("some_table", null, null);
     verify(restEmitterMock, 
times(2)).emit(any(MetadataChangeProposalWrapper.class),
             Mockito.any());
   }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 0dae64bdc29..6a7b94e43aa 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -401,7 +401,7 @@ public class HiveSyncTool extends HoodieSyncTool implements 
AutoCloseable {
       LOG.info("No Schema difference for {}.", tableName);
     } else {
       LOG.info("Schema difference found for {}. Updated schema: {}", 
tableName, schema);
-      syncClient.updateTableSchema(tableName, schema);
+      syncClient.updateTableSchema(tableName, schema, schemaDiff);
       schemaChanged = true;
     }
 
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index ad96e511af6..fe93fe6fdf1 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -201,7 +201,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
   }
 
   @Override
-  public void updateTableSchema(String tableName, MessageType newSchema) {
+  public void updateTableSchema(String tableName, MessageType newSchema, 
SchemaDifference schemaDiff) {
     ddlExecutor.updateTableDefinition(tableName, newSchema);
   }
 
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/hive/SchemaDifference.java
similarity index 100%
rename from 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java
rename to 
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/hive/SchemaDifference.java
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
index f0772f2b548..76b719c8dd0 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.sync.common;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.SchemaDifference;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.sync.common.model.FieldSchema;
 import org.apache.hudi.sync.common.model.Partition;
@@ -167,7 +168,7 @@ public interface HoodieMetaSyncOperations {
   /**
    * Update schema for the table in the metastore.
    */
-  default void updateTableSchema(String tableName, MessageType newSchema) {
+  default void updateTableSchema(String tableName, MessageType newSchema, 
SchemaDifference schemaDiff) {
 
   }
 

Reply via email to