mansipp commented on code in PR #9347:
URL: https://github.com/apache/hudi/pull/9347#discussion_r1283879950
##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -143,31 +142,32 @@ public void addPartitionsToTable(String tableName,
List<String> partitionsToAdd)
LOG.info("Adding " + partitionsToAdd.size() + " partition(s) in table " +
tableId(databaseName, tableName));
try {
Table table = getTable(awsGlue, databaseName, tableName);
- StorageDescriptor sd = table.getStorageDescriptor();
+ StorageDescriptor sd = table.storageDescriptor();
List<PartitionInput> partitionInputs =
partitionsToAdd.stream().map(partition -> {
- StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(),
partition).toString();
List<String> partitionValues =
partitionValueExtractor.extractPartitionValuesInPath(partition);
- partitionSd.setLocation(fullPartitionPath);
- return new
PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
+ StorageDescriptor partitionSD = sd.copy(copySd ->
copySd.location(fullPartitionPath));
+ return
PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
}).collect(Collectors.toList());
- List<Future<BatchCreatePartitionResult>> futures = new ArrayList<>();
+ List<CompletableFuture<BatchCreatePartitionResponse>> futures = new
ArrayList<>();
for (List<PartitionInput> batch :
CollectionUtils.batches(partitionInputs, MAX_PARTITIONS_PER_REQUEST)) {
- BatchCreatePartitionRequest request = new
BatchCreatePartitionRequest();
-
request.withDatabaseName(databaseName).withTableName(tableName).withPartitionInputList(batch);
- futures.add(awsGlue.batchCreatePartitionAsync(request));
+ BatchCreatePartitionRequest request =
BatchCreatePartitionRequest.builder()
+
.databaseName(databaseName).tableName(tableName).partitionInputList(batch).build();
+ futures.add(awsGlue.batchCreatePartition(request));
}
- for (Future<BatchCreatePartitionResult> future : futures) {
- BatchCreatePartitionResult result = future.get();
- if (CollectionUtils.nonEmpty(result.getErrors())) {
- if (result.getErrors().stream().allMatch((error) ->
"AlreadyExistsException".equals(error.getErrorDetail().getErrorCode()))) {
- LOG.warn("Partitions already exist in glue: " +
result.getErrors());
+ for (CompletableFuture<BatchCreatePartitionResponse> future : futures) {
+ BatchCreatePartitionResponse response = future.get();
+ if (CollectionUtils.nonEmpty(response.errors())) {
+ if (response.errors().stream()
+ .allMatch(
+ (error) ->
"AlreadyExistsException".equals(error.errorDetail().errorCode()))) {
Review Comment:
We haven't changed anything here other than making it compatible with sdk
v2. Haven't run any test to specifically test this error case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]