VitoMakarevich commented on code in PR #10460:
URL: https://github.com/apache/hudi/pull/10460#discussion_r1524885452


##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -138,42 +156,141 @@ public List<Partition> getAllPartitions(String 
tableName) {
     }
   }
 
+  @Override
+  public List<Partition> getAllPartitions(String tableName) {
+    ExecutorService executorService = 
Executors.newFixedThreadPool(this.allPartitionsReadParallelism);
+    try {
+      List<Segment> segments = new ArrayList<>();
+      for (int i = 0; i < allPartitionsReadParallelism; i++) {
+        segments.add(Segment.builder()
+            .segmentNumber(i)
+            .totalSegments(allPartitionsReadParallelism).build());
+      }
+      List<Future<List<Partition>>> futures = segments.stream()
+          .map(segment -> executorService.submit(() -> 
this.getPartitionsSegment(segment, tableName)))
+          .collect(Collectors.toList());
+
+      List<Partition> partitions = new ArrayList<>();
+      for (Future<List<Partition>> future : futures) {
+        partitions.addAll(future.get());
+      }
+
+      return partitions;
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Failed to get all partitions for 
table " + tableId(databaseName, tableName), e);
+    } finally {
+      executorService.shutdownNow();
+    }
+  }
+
+  @Override
+  public List<Partition> getPartitionsFromList(String tableName, List<String> 
partitionList) {
+    if (partitionList.isEmpty()) {
+      LOG.info("No partitions to read for " + tableId(this.databaseName, 
tableName));
+      return Collections.emptyList();
+    }
+    HoodieTimer timer = HoodieTimer.start();
+    List<List<String>> batches = CollectionUtils.batches(partitionList, 
MAX_PARTITIONS_PER_READ_REQUEST);
+    ExecutorService executorService = 
Executors.newFixedThreadPool(Math.min(this.changedPartitionsReadParallelism, 
batches.size()));
+    try {
+      List<Future<List<Partition>>> futures = batches
+          .stream()
+          .map(batch -> executorService.submit(() -> 
this.getChangedPartitions(batch, tableName)))
+          .collect(Collectors.toList());
+
+      List<Partition> partitions = new ArrayList<>();
+      for (Future<List<Partition>> future : futures) {
+        partitions.addAll(future.get());
+      }
+      LOG.info(
+          "Requested {} partitions, found existing {} partitions, new {} 
partitions, took {} ms to perform.",
+          partitionList.size(),
+          partitions.size(),
+          partitionList.size() - partitions.size(),
+          timer.endTimer()
+      );
+
+      return partitions;
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Failed to get all partitions for 
table " + tableId(this.databaseName, tableName), e);
+    } finally {
+      executorService.shutdownNow();
+    }
+  }
+
+  private List<Partition> getChangedPartitions(List<String> changedPartitions, 
String tableName) throws ExecutionException, InterruptedException {
+    List<PartitionValueList> partitionValueList = 
changedPartitions.stream().map(str -> {
+      PartitionValueList individualPartition = 
PartitionValueList.builder().values(partitionValueExtractor.extractPartitionValuesInPath(str)).build();
+      return individualPartition;
+    }).collect(Collectors.toList());
+    BatchGetPartitionRequest request = BatchGetPartitionRequest.builder()
+        .databaseName(this.databaseName)
+        .tableName(tableName)
+        .partitionsToGet(partitionValueList)
+        .build();
+    BatchGetPartitionResponse callResult = 
awsGlue.batchGetPartition(request).get();
+    List<Partition> result = callResult
+        .partitions()
+        .stream()
+        .map(p -> new Partition(p.values(), p.storageDescriptor().location()))
+        .collect(Collectors.toList());
+
+    return result;
+  }
+
   @Override
   public void addPartitionsToTable(String tableName, List<String> 
partitionsToAdd) {
     if (partitionsToAdd.isEmpty()) {
-      LOG.info("No partitions to add for " + tableId(databaseName, tableName));
+      LOG.info("No partitions to add for " + tableId(this.databaseName, 
tableName));
       return;
     }
-    LOG.info("Adding " + partitionsToAdd.size() + " partition(s) in table " + 
tableId(databaseName, tableName));
+    HoodieTimer timer = HoodieTimer.start();
+    parallelizeChange(partitionsToAdd, this.changeParallelism, partitions -> 
this.addPartitionsToTableInternal(tableName, partitions), 
MAX_PARTITIONS_PER_CHANGE_REQUEST);
+    LOG.info("Added {} partitions to table {} in {} ms", 
partitionsToAdd.size(), tableId(this.databaseName, tableName), 
timer.endTimer());
+  }
+
+  private <T> void parallelizeChange(List<T> items, int parallelism, 
Consumer<List<T>> consumer, int sliceSize) {

Review Comment:
   Maybe, I see only 1 method there right now, so I can move it there. My 
intent was to create something specifically for this class and code inside 
didn't look sophisticated, so that's why I made it simple and put here, as 
otherwise it may grow and require more generalization and so on. I will move it 
if you think it's worth it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to