CTTY commented on code in PR #9071:
URL: https://github.com/apache/hudi/pull/9071#discussion_r1244396918
##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -569,6 +587,52 @@ private static Table getTable(AWSGlue awsGlue, String
databaseName, String table
}
}
+ // TODO: make this faster with Glue Segment API
+ private static List<com.amazonaws.services.glue.model.Partition>
getAllGluePartitions(
+ AWSGlue awsGlue,
+ String databaseName,
+ String tableName) {
+ try {
+ List<com.amazonaws.services.glue.model.Partition> partitions = new
ArrayList<>();
+ String nextToken = null;
+ do {
+ GetPartitionsResult result = awsGlue.getPartitions(new
GetPartitionsRequest()
+ .withDatabaseName(databaseName)
+ .withTableName(tableName)
+ .withNextToken(nextToken));
+ partitions.addAll(result.getPartitions());
+ nextToken = result.getNextToken();
+ } while (nextToken != null);
+ return partitions;
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to get all glue partitions for
table " + tableId(databaseName, tableName), e);
+ }
+ }
+
+ private static void batchUpdateGluePartitions(AWSGlueAsync awsGlue,
+ String databaseName,
+ String tableName,
+
List<BatchUpdatePartitionRequestEntry> updatePartitionRequestEntries) {
+ try {
+ List<Future<BatchUpdatePartitionResult>> futures = new ArrayList<>();
+ for (List<BatchUpdatePartitionRequestEntry> batch :
CollectionUtils.batches(updatePartitionRequestEntries,
MAX_PARTITIONS_PER_REQUEST)) {
+ BatchUpdatePartitionRequest request = new
BatchUpdatePartitionRequest();
+
request.withDatabaseName(databaseName).withTableName(tableName).withEntries(batch);
+ futures.add(awsGlue.batchUpdatePartitionAsync(request));
+ }
+
+ for (Future<BatchUpdatePartitionResult> future : futures) {
+ BatchUpdatePartitionResult result = future.get();
+ if (CollectionUtils.nonEmpty(result.getErrors())) {
+ throw new HoodieGlueSyncException("Fail to update partitions to " +
tableId(databaseName, tableName)
+ + " with error(s): " + result.getErrors());
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Fail to update partitions to " + tableId(databaseName,
tableName), e);
Review Comment:
Need to throw the exception instead of catching it here, will revise this
later
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]