This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2024d17704e [HUDI-6453] Cascade Glue schema changes to partitions
(#11670)
2024d17704e is described below
commit 2024d17704e6856174fc6b56ba51cd2d1b93b49d
Author: Nicolas Paris <[email protected]>
AuthorDate: Tue Jul 30 03:52:13 2024 +0200
[HUDI-6453] Cascade Glue schema changes to partitions (#11670)
Co-authored-by: Shawn Chang <[email protected]>
---
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 32 ++++++++++++++++++----
.../hudi/sync/datahub/DataHubSyncClient.java | 3 +-
.../apache/hudi/sync/datahub/DataHubSyncTool.java | 2 +-
.../hudi/sync/datahub/TestDataHubSyncClient.java | 2 +-
.../java/org/apache/hudi/hive/HiveSyncTool.java | 2 +-
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 2 +-
.../org/apache/hudi/hive/SchemaDifference.java | 0
.../hudi/sync/common/HoodieMetaSyncOperations.java | 3 +-
8 files changed, 35 insertions(+), 11 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 733ed3c7d40..65bdbb1c04e 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.GlueCatalogSyncClientConfig;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;
@@ -186,6 +187,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
GetPartitionsResponse result =
awsGlue.getPartitions(GetPartitionsRequest.builder()
.databaseName(databaseName)
.tableName(tableName)
+ .excludeColumnSchema(true)
.segment(segment)
.nextToken(nextToken)
.build()).get();
@@ -424,6 +426,9 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
}
}
+ /**
+ * Update the table properties to the table.
+ */
@Override
public boolean updateTableProperties(String tableName, Map<String, String>
tableProperties) {
try {
@@ -509,9 +514,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
}
@Override
- public void updateTableSchema(String tableName, MessageType newSchema) {
- // ToDo Cascade is set in Hive meta sync, but need to investigate how to
configure it for Glue meta
- boolean cascade =
config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
+ public void updateTableSchema(String tableName, MessageType newSchema,
SchemaDifference schemaDiff) {
try {
Table table = getTable(awsGlue, databaseName, tableName);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema,
config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
@@ -536,11 +539,30 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
.build();
awsGlue.updateTable(request).get();
+ // glue needs partition schema cascading only when columns get updated
+ // TODO: skip cascading when new fields in structs are added to the
schema in last position
+ boolean cascade =
config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0 &&
!schemaDiff.getUpdateColumnTypes().isEmpty();
+ if (cascade) {
+ LOG.info("Cascading column changes to partitions");
+ List<String> allPartitions = getAllPartitions(tableName).stream()
+ .map(partition -> getStringFromPartition(table.partitionKeys(),
partition.getValues()))
+ .collect(Collectors.toList());
+ updatePartitionsToTable(tableName, allPartitions);
+ }
+ awsGlue.updateTable(request).get();
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update definition for table "
+ tableId(databaseName, tableName), e);
}
}
+ private String getStringFromPartition(List<Column> partitionKeys,
List<String> values) {
+ ArrayList<String> partitionValues = new ArrayList<>();
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ partitionValues.add(String.format("%s=%s", partitionKeys.get(i).name(),
values.get(i)));
+ }
+ return partitionValues.stream().collect(Collectors.joining("/"));
+ }
+
@Override
public void createOrReplaceTable(String tableName,
MessageType storageSchema,
@@ -793,7 +815,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
return Objects.nonNull(awsGlue.getTable(request).get().table());
} catch (ExecutionException e) {
if (e.getCause() instanceof EntityNotFoundException) {
- LOG.info("Table not found: " + tableId(databaseName, tableName), e);
+ LOG.warn("Table not found: " + tableId(databaseName, tableName), e);
return false;
} else {
throw new HoodieGlueSyncException("Fail to get table: " +
tableId(databaseName, tableName), e);
@@ -810,7 +832,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
return Objects.nonNull(awsGlue.getDatabase(request).get().database());
} catch (ExecutionException e) {
if (e.getCause() instanceof EntityNotFoundException) {
- LOG.info("Database not found: " + databaseName, e);
+ LOG.warn("Database not found: " + databaseName, e);
return false;
} else {
throw new HoodieGlueSyncException("Fail to check if database exists "
+ databaseName, e);
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index cf99bfdcd95..fc8737ad2db 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -22,6 +22,7 @@ package org.apache.hudi.sync.datahub;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.HoodieSyncException;
import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
@@ -101,7 +102,7 @@ public class DataHubSyncClient extends HoodieSyncClient {
}
@Override
- public void updateTableSchema(String tableName, MessageType schema) {
+ public void updateTableSchema(String tableName, MessageType schema,
SchemaDifference schemaDifference) {
try (RestEmitter emitter = config.getRestEmitter()) {
DatahubResponseLogger responseLogger = new DatahubResponseLogger();
MetadataChangeProposalWrapper schemaChange =
createSchemaMetadataUpdate(tableName);
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
index 567f547a817..ea61267676b 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
@@ -52,7 +52,7 @@ public class DataHubSyncTool extends HoodieSyncTool {
@Override
public void syncHoodieTable() {
try (DataHubSyncClient syncClient = new DataHubSyncClient(config)) {
- syncClient.updateTableSchema(config.getString(META_SYNC_TABLE_NAME),
null);
+ syncClient.updateTableSchema(config.getString(META_SYNC_TABLE_NAME),
null, null);
syncClient.updateLastCommitTimeSynced(config.getString(META_SYNC_TABLE_NAME));
}
}
diff --git
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
index 7029f38a963..ae23e1c78d7 100644
---
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
+++
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
@@ -98,7 +98,7 @@ public class TestDataHubSyncClient {
DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props,
restEmitterMock);
DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
- dhClient.updateTableSchema("some_table", null);
+ dhClient.updateTableSchema("some_table", null, null);
verify(restEmitterMock,
times(2)).emit(any(MetadataChangeProposalWrapper.class),
Mockito.any());
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 0dae64bdc29..6a7b94e43aa 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -401,7 +401,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
LOG.info("No Schema difference for {}.", tableName);
} else {
LOG.info("Schema difference found for {}. Updated schema: {}",
tableName, schema);
- syncClient.updateTableSchema(tableName, schema);
+ syncClient.updateTableSchema(tableName, schema, schemaDiff);
schemaChanged = true;
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index ad96e511af6..fe93fe6fdf1 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -201,7 +201,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
}
@Override
- public void updateTableSchema(String tableName, MessageType newSchema) {
+ public void updateTableSchema(String tableName, MessageType newSchema,
SchemaDifference schemaDiff) {
ddlExecutor.updateTableDefinition(tableName, newSchema);
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/hive/SchemaDifference.java
similarity index 100%
rename from
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java
rename to
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/hive/SchemaDifference.java
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
index f0772f2b548..76b719c8dd0 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
@@ -20,6 +20,7 @@
package org.apache.hudi.sync.common;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;
@@ -167,7 +168,7 @@ public interface HoodieMetaSyncOperations {
/**
* Update schema for the table in the metastore.
*/
- default void updateTableSchema(String tableName, MessageType newSchema) {
+ default void updateTableSchema(String tableName, MessageType newSchema,
SchemaDifference schemaDiff) {
}