CTTY commented on code in PR #10460:
URL: https://github.com/apache/hudi/pull/10460#discussion_r1519125449


##########
hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java:
##########
@@ -40,6 +42,28 @@ public class GlueCatalogSyncClientConfig extends 
HoodieConfig {
       .sinceVersion("0.14.0")
       .withDocumentation("Glue catalog sync based client will skip archiving 
the table version if this config is set to true");
 
+  public static final ConfigProperty<Integer> ALL_PARTITIONS_READ_PARALLELISM 
= ConfigProperty
+      .key(GLUE_CLIENT_PROPERTY_PREFIX + "all_partitions_read_parallelism")
+      .defaultValue(1)
+      .markAdvanced()
+      .withValidValues(IntStream.rangeClosed(1, 
10).mapToObj(Integer::toString).toArray(String[]::new))
+      .sinceVersion("1.0.0")
+      .withDocumentation("Parallelism for listing all partitions(first time 
sync). Should be in interval [1, 10].");

Review Comment:
   Have the same question here, why do we need to have a upper bound for 
parallelism?



##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -138,42 +156,141 @@ public List<Partition> getAllPartitions(String 
tableName) {
     }
   }
 
+  @Override
+  public List<Partition> getAllPartitions(String tableName) {
+    ExecutorService executorService = 
Executors.newFixedThreadPool(this.allPartitionsReadParallelism);
+    try {
+      List<Segment> segments = new ArrayList<>();
+      for (int i = 0; i < allPartitionsReadParallelism; i++) {
+        segments.add(Segment.builder()
+            .segmentNumber(i)
+            .totalSegments(allPartitionsReadParallelism).build());
+      }
+      List<Future<List<Partition>>> futures = segments.stream()
+          .map(segment -> executorService.submit(() -> 
this.getPartitionsSegment(segment, tableName)))
+          .collect(Collectors.toList());
+
+      List<Partition> partitions = new ArrayList<>();
+      for (Future<List<Partition>> future : futures) {
+        partitions.addAll(future.get());
+      }
+
+      return partitions;
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Failed to get all partitions for 
table " + tableId(databaseName, tableName), e);
+    } finally {
+      executorService.shutdownNow();
+    }
+  }
+
+  @Override
+  public List<Partition> getPartitionsFromList(String tableName, List<String> 
partitionList) {
+    if (partitionList.isEmpty()) {
+      LOG.info("No partitions to read for " + tableId(this.databaseName, 
tableName));
+      return Collections.emptyList();
+    }
+    HoodieTimer timer = HoodieTimer.start();
+    List<List<String>> batches = CollectionUtils.batches(partitionList, 
MAX_PARTITIONS_PER_READ_REQUEST);
+    ExecutorService executorService = 
Executors.newFixedThreadPool(Math.min(this.changedPartitionsReadParallelism, 
batches.size()));
+    try {
+      List<Future<List<Partition>>> futures = batches
+          .stream()
+          .map(batch -> executorService.submit(() -> 
this.getChangedPartitions(batch, tableName)))
+          .collect(Collectors.toList());
+
+      List<Partition> partitions = new ArrayList<>();
+      for (Future<List<Partition>> future : futures) {
+        partitions.addAll(future.get());
+      }
+      LOG.info(
+          "Requested {} partitions, found existing {} partitions, new {} 
partitions, took {} ms to perform.",
+          partitionList.size(),
+          partitions.size(),
+          partitionList.size() - partitions.size(),
+          timer.endTimer()
+      );
+
+      return partitions;
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Failed to get all partitions for 
table " + tableId(this.databaseName, tableName), e);
+    } finally {
+      executorService.shutdownNow();
+    }
+  }
+
+  private List<Partition> getChangedPartitions(List<String> changedPartitions, 
String tableName) throws ExecutionException, InterruptedException {
+    List<PartitionValueList> partitionValueList = 
changedPartitions.stream().map(str -> {
+      PartitionValueList individualPartition = 
PartitionValueList.builder().values(partitionValueExtractor.extractPartitionValuesInPath(str)).build();
+      return individualPartition;
+    }).collect(Collectors.toList());
+    BatchGetPartitionRequest request = BatchGetPartitionRequest.builder()
+        .databaseName(this.databaseName)
+        .tableName(tableName)
+        .partitionsToGet(partitionValueList)
+        .build();
+    BatchGetPartitionResponse callResult = 
awsGlue.batchGetPartition(request).get();
+    List<Partition> result = callResult
+        .partitions()
+        .stream()
+        .map(p -> new Partition(p.values(), p.storageDescriptor().location()))
+        .collect(Collectors.toList());
+
+    return result;
+  }
+
   @Override
   public void addPartitionsToTable(String tableName, List<String> 
partitionsToAdd) {
     if (partitionsToAdd.isEmpty()) {
-      LOG.info("No partitions to add for " + tableId(databaseName, tableName));
+      LOG.info("No partitions to add for " + tableId(this.databaseName, 
tableName));
       return;
     }
-    LOG.info("Adding " + partitionsToAdd.size() + " partition(s) in table " + 
tableId(databaseName, tableName));
+    HoodieTimer timer = HoodieTimer.start();
+    parallelizeChange(partitionsToAdd, this.changeParallelism, partitions -> 
this.addPartitionsToTableInternal(tableName, partitions), 
MAX_PARTITIONS_PER_CHANGE_REQUEST);
+    LOG.info("Added {} partitions to table {} in {} ms", 
partitionsToAdd.size(), tableId(this.databaseName, tableName), 
timer.endTimer());
+  }
+
+  private <T> void parallelizeChange(List<T> items, int parallelism, 
Consumer<List<T>> consumer, int sliceSize) {
+    List<List<T>> batches = CollectionUtils.batches(items, sliceSize);
+    ExecutorService executorService = 
Executors.newFixedThreadPool(Math.min(parallelism, batches.size()));
+    try {
+      List<Future<?>> futures = batches.stream()
+          .map(item -> executorService.submit(() -> {
+            consumer.accept(item);
+          }))
+          .collect(Collectors.toList());
+      for (Future<?> future : futures) {
+        future.get();
+      }
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Failed to parallelize operation", e);
+    } finally {
+      executorService.shutdownNow();
+    }
+  }
+
+  private void addPartitionsToTableInternal(String tableName, List<String> 
partitionsToAdd) {
     try {
       Table table = getTable(awsGlue, databaseName, tableName);
       StorageDescriptor sd = table.storageDescriptor();
-      List<PartitionInput> partitionInputs = 
partitionsToAdd.stream().map(partition -> {
+      List<PartitionInput> partitionInput = 
partitionsToAdd.stream().map(partition -> {
         String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), 
partition).toString();
         List<String> partitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(partition);
         StorageDescriptor partitionSD = sd.copy(copySd -> 
copySd.location(fullPartitionPath));
         return 
PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
       }).collect(Collectors.toList());
 
-      List<CompletableFuture<BatchCreatePartitionResponse>> futures = new 
ArrayList<>();
-
-      for (List<PartitionInput> batch : 
CollectionUtils.batches(partitionInputs, MAX_PARTITIONS_PER_REQUEST)) {
-        BatchCreatePartitionRequest request = 
BatchCreatePartitionRequest.builder()
-                
.databaseName(databaseName).tableName(tableName).partitionInputList(batch).build();
-        futures.add(awsGlue.batchCreatePartition(request));
-      }
-
-      for (CompletableFuture<BatchCreatePartitionResponse> future : futures) {
-        BatchCreatePartitionResponse response = future.get();
-        if (CollectionUtils.nonEmpty(response.errors())) {
-          if (response.errors().stream()
-              .allMatch(
-                  (error) -> 
"AlreadyExistsException".equals(error.errorDetail().errorCode()))) {
-            LOG.warn("Partitions already exist in glue: " + response.errors());
-          } else {
-            throw new HoodieGlueSyncException("Fail to add partitions to " + 
tableId(databaseName, tableName)
+      BatchCreatePartitionRequest request = 
BatchCreatePartitionRequest.builder()
+          
.databaseName(databaseName).tableName(tableName).partitionInputList(partitionInput).build();

Review Comment:
   ```suggestion
             .databaseName(databaseName)
             .tableName(tableName)
             .partitionInputList(partitionInput)
             .build();
   ```
   nit



##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -184,10 +301,15 @@ public void addPartitionsToTable(String tableName, 
List<String> partitionsToAdd)
   @Override
   public void updatePartitionsToTable(String tableName, List<String> 
changedPartitions) {
     if (changedPartitions.isEmpty()) {
-      LOG.info("No partitions to change for " + tableName);
+      LOG.info("No partitions to add for " + tableId(this.databaseName, 
tableName));

Review Comment:
   ```suggestion
         LOG.info("No partitions to update for " + tableId(this.databaseName, 
tableName));
   ```
   nit, we are still using this method to update partitions



##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -138,42 +156,141 @@ public List<Partition> getAllPartitions(String 
tableName) {
     }
   }
 
+  @Override
+  public List<Partition> getAllPartitions(String tableName) {
+    ExecutorService executorService = 
Executors.newFixedThreadPool(this.allPartitionsReadParallelism);

Review Comment:
   Can we use newly added `parallelizeChange` here to shorten this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to