danny0405 commented on code in PR #10460:
URL: https://github.com/apache/hudi/pull/10460#discussion_r1525654522


##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -141,105 +156,208 @@ public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
     this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
     this.skipTableArchive = 
config.getBooleanOrDefault(GlueCatalogSyncClientConfig.GLUE_SKIP_TABLE_ARCHIVE);
     this.enableMetadataTable = 
Boolean.toString(config.getBoolean(GLUE_METADATA_FILE_LISTING)).toUpperCase();
+    this.allPartitionsReadParallelism = 
config.getIntOrDefault(ALL_PARTITIONS_READ_PARALLELISM);
+    this.changedPartitionsReadParallelism = 
config.getIntOrDefault(CHANGED_PARTITIONS_READ_PARALLELISM);
+    this.changeParallelism = 
config.getIntOrDefault(PARTITION_CHANGE_PARALLELISM);
   }
 
-  @Override
-  public List<Partition> getAllPartitions(String tableName) {
+  private List<Partition> getPartitionsSegment(Segment segment, String 
tableName) {
     try {
-      return getPartitions(GetPartitionsRequest.builder()
-              .databaseName(databaseName)
-              .tableName(tableName));
+      List<Partition> partitions = new ArrayList<>();
+      String nextToken = null;
+      do {
+        GetPartitionsResponse result = 
awsGlue.getPartitions(GetPartitionsRequest.builder()
+            .databaseName(databaseName)
+            .tableName(tableName)
+            .segment(segment)
+            .nextToken(nextToken)
+            .build()).get();
+        partitions.addAll(result.partitions().stream()
+            .map(p -> new Partition(p.values(), 
p.storageDescriptor().location()))
+            .collect(Collectors.toList()));
+        nextToken = result.nextToken();
+      } while (nextToken != null);
+      return partitions;
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Failed to get all partitions for 
table " + tableId(databaseName, tableName), e);
     }
   }
 
-  @Override
-  public List<Partition> getPartitionsByFilter(String tableName, String 
filter) {
-    try {
-      if (filter.length() <= GLUE_EXPRESSION_MAX_CHARS) {
-        LOG.info("Pushdown filters: {}", filter);
-        return getPartitions(GetPartitionsRequest.builder()
-              .databaseName(databaseName)
-              .tableName(tableName)
-              .expression(filter));
-      } else {
-        LOG.warn("Falling back to listing all partition since expression 
filter length > {}", GLUE_EXPRESSION_MAX_CHARS);
-        return getAllPartitions(tableName);
-      }
-    } catch (Exception e) {
-      throw new HoodieGlueSyncException("Failed to get partitions for table " 
+ tableId(databaseName, tableName) + " from expression: " + filter, e);
-    }
-  }
-
   private List<Partition> getPartitions(GetPartitionsRequest.Builder 
partitionRequestBuilder) throws InterruptedException, ExecutionException {
     List<Partition> partitions = new ArrayList<>();
     String nextToken = null;
     do {
       GetPartitionsResponse result = 
awsGlue.getPartitions(partitionRequestBuilder
-              .excludeColumnSchema(true)
-              .nextToken(nextToken)
-              .build()).get();
+          .excludeColumnSchema(true)
+          .nextToken(nextToken)
+          .build()).get();
       partitions.addAll(result.partitions().stream()
-              .map(p -> new Partition(p.values(), 
p.storageDescriptor().location()))
-              .collect(Collectors.toList()));
+          .map(p -> new Partition(p.values(), 
p.storageDescriptor().location()))
+          .collect(Collectors.toList()));
       nextToken = result.nextToken();
     } while (nextToken != null);
     return partitions;
   }
 
+  @Override
+  public List<Partition> getAllPartitions(String tableName) {
+    ExecutorService executorService = 
Executors.newFixedThreadPool(this.allPartitionsReadParallelism, new 
CustomizedThreadFactory("glue-sync-all-partitions", true));
+    try {
+      List<Segment> segments = new ArrayList<>();
+      for (int i = 0; i < allPartitionsReadParallelism; i++) {
+        segments.add(Segment.builder()
+            .segmentNumber(i)
+            .totalSegments(allPartitionsReadParallelism).build());
+      }
+      List<Future<List<Partition>>> futures = segments.stream()
+          .map(segment -> executorService.submit(() -> 
this.getPartitionsSegment(segment, tableName)))
+          .collect(Collectors.toList());
+
+      List<Partition> partitions = new ArrayList<>();
+      for (Future<List<Partition>> future : futures) {
+        partitions.addAll(future.get());
+      }
+
+      return partitions;
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Failed to get all partitions for 
table " + tableId(databaseName, tableName), e);
+    } finally {
+      executorService.shutdownNow();
+    }
+  }
+
+  @Override
+  public List<Partition> getPartitionsFromList(String tableName, List<String> 
partitionList) {
+    if (partitionList.isEmpty()) {
+      LOG.info("No partitions to read for " + tableId(this.databaseName, 
tableName));
+      return Collections.emptyList();
+    }
+    HoodieTimer timer = HoodieTimer.start();
+    List<List<String>> batches = CollectionUtils.batches(partitionList, 
MAX_PARTITIONS_PER_READ_REQUEST);
+    ExecutorService executorService = Executors.newFixedThreadPool(
+        Math.min(this.changedPartitionsReadParallelism, batches.size()),
+        new CustomizedThreadFactory("glue-sync-get-partitions-" + tableName, 
true)
+    );
+    try {
+      List<Future<List<Partition>>> futures = batches
+          .stream()
+          .map(batch -> executorService.submit(() -> 
this.getChangedPartitions(batch, tableName)))
+          .collect(Collectors.toList());
+
+      List<Partition> partitions = new ArrayList<>();
+      for (Future<List<Partition>> future : futures) {
+        partitions.addAll(future.get());
+      }
+      LOG.info(
+          "Requested {} partitions, found existing {} partitions, new {} 
partitions, took {} ms to perform.",
+          partitionList.size(),
+          partitions.size(),
+          partitionList.size() - partitions.size(),
+          timer.endTimer()
+      );
+
+      return partitions;
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Failed to get all partitions for 
table " + tableId(this.databaseName, tableName), e);
+    } finally {
+      executorService.shutdownNow();
+    }
+  }
+
+  private List<Partition> getChangedPartitions(List<String> changedPartitions, 
String tableName) throws ExecutionException, InterruptedException {
+    List<PartitionValueList> partitionValueList = 
changedPartitions.stream().map(str -> {
+      PartitionValueList individualPartition = 
PartitionValueList.builder().values(partitionValueExtractor.extractPartitionValuesInPath(str)).build();
+      return individualPartition;
+    }).collect(Collectors.toList());
+    BatchGetPartitionRequest request = BatchGetPartitionRequest.builder()
+        .databaseName(this.databaseName)
+        .tableName(tableName)
+        .partitionsToGet(partitionValueList)
+        .build();
+    BatchGetPartitionResponse callResult = 
awsGlue.batchGetPartition(request).get();
+    List<Partition> result = callResult
+        .partitions()
+        .stream()
+        .map(p -> new Partition(p.values(), p.storageDescriptor().location()))
+        .collect(Collectors.toList());
+
+    return result;
+  }
+
   @Override
   public void addPartitionsToTable(String tableName, List<String> 
partitionsToAdd) {
     if (partitionsToAdd.isEmpty()) {
-      LOG.info("No partitions to add for " + tableId(databaseName, tableName));
+      LOG.info("No partitions to add for " + tableId(this.databaseName, 
tableName));
       return;
     }
-    LOG.info("Adding " + partitionsToAdd.size() + " partition(s) in table " + 
tableId(databaseName, tableName));
+    HoodieTimer timer = HoodieTimer.start();
+    Table table = getTable(awsGlue, databaseName, tableName);
+    parallelizeChange(partitionsToAdd, this.changeParallelism, partitions -> 
this.addPartitionsToTableInternal(table, partitions), 
MAX_PARTITIONS_PER_CHANGE_REQUEST);
+    LOG.info("Added {} partitions to table {} in {} ms", 
partitionsToAdd.size(), tableId(this.databaseName, tableName), 
timer.endTimer());
+  }
+
+  private <T> void parallelizeChange(List<T> items, int parallelism, 
Consumer<List<T>> consumer, int sliceSize) {
+    List<List<T>> batches = CollectionUtils.batches(items, sliceSize);
+    ExecutorService executorService = 
Executors.newFixedThreadPool(Math.min(parallelism, batches.size()), new 
CustomizedThreadFactory("glue-sync", true));
+    try {
+      List<Future<?>> futures = batches.stream()
+          .map(item -> executorService.submit(() -> {
+            consumer.accept(item);
+          }))
+          .collect(Collectors.toList());
+      for (Future<?> future : futures) {
+        future.get();
+      }
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Failed to parallelize operation", e);
+    } finally {
+      executorService.shutdownNow();
+    }
+  }
+
+  private void addPartitionsToTableInternal(Table table, List<String> 
partitionsToAdd) {
     try {
-      Table table = getTable(awsGlue, databaseName, tableName);
       StorageDescriptor sd = table.storageDescriptor();
-      List<PartitionInput> partitionInputs = 
partitionsToAdd.stream().map(partition -> {
+      List<PartitionInput> partitionInputList = 
partitionsToAdd.stream().map(partition -> {
         String fullPartitionPath = 
FSUtils.getPartitionPath(s3aToS3(getBasePath()), partition).toString();
         List<String> partitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(partition);
         StorageDescriptor partitionSD = sd.copy(copySd -> 
copySd.location(fullPartitionPath));
         return 
PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
       }).collect(Collectors.toList());
 
-      List<CompletableFuture<BatchCreatePartitionResponse>> futures = new 
ArrayList<>();
-
-      for (List<PartitionInput> batch : 
CollectionUtils.batches(partitionInputs, MAX_PARTITIONS_PER_REQUEST)) {
-        BatchCreatePartitionRequest request = 
BatchCreatePartitionRequest.builder()
-                
.databaseName(databaseName).tableName(tableName).partitionInputList(batch).build();
-        futures.add(awsGlue.batchCreatePartition(request));
-      }
-
-      for (CompletableFuture<BatchCreatePartitionResponse> future : futures) {
-        BatchCreatePartitionResponse response = future.get();
-        if (CollectionUtils.nonEmpty(response.errors())) {
-          if (response.errors().stream()
-              .allMatch(
-                  (error) -> 
"AlreadyExistsException".equals(error.errorDetail().errorCode()))) {
-            LOG.warn("Partitions already exist in glue: " + response.errors());
-          } else {
-            throw new HoodieGlueSyncException("Fail to add partitions to " + 
tableId(databaseName, tableName)
+      BatchCreatePartitionRequest request = 
BatchCreatePartitionRequest.builder()
+          
.databaseName(databaseName).tableName(table.name()).partitionInputList(partitionInputList).build();
+      CompletableFuture<BatchCreatePartitionResponse> future = 
awsGlue.batchCreatePartition(request);
+      BatchCreatePartitionResponse response = future.get();
+      if (CollectionUtils.nonEmpty(response.errors())) {
+        if (response.errors().stream()
+            .allMatch(
+                (error) -> 
"AlreadyExistsException".equals(error.errorDetail().errorCode()))) {
+          LOG.warn("Partitions already exist in glue: " + response.errors());
+        } else {
+          throw new HoodieGlueSyncException("Fail to add partitions to " + 
tableId(databaseName, table.name())
               + " with error(s): " + response.errors());
-          }
         }
       }
     } catch (Exception e) {
-      throw new HoodieGlueSyncException("Fail to add partitions to " + 
tableId(databaseName, tableName), e);
+      throw new HoodieGlueSyncException("Fail to add partitions to " + 
tableId(databaseName, table.name()), e);
     }
   }
 
   @Override
   public void updatePartitionsToTable(String tableName, List<String> 
changedPartitions) {
     if (changedPartitions.isEmpty()) {
-      LOG.info("No partitions to change for " + tableName);
+      LOG.info("No partitions to update for " + tableId(this.databaseName, 
tableName));
       return;
     }
-    LOG.info("Updating " + changedPartitions.size() + "partition(s) in table " 
+ tableId(databaseName, tableName));
+    HoodieTimer timer = HoodieTimer.start();
+    Table table = getTable(awsGlue, databaseName, tableName);

Review Comment:
   Not sure whether the timer can be ensured to always close down.



##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -249,57 +367,51 @@ public void updatePartitionsToTable(String tableName, 
List<String> changedPartit
         return 
BatchUpdatePartitionRequestEntry.builder().partitionInput(partitionInput).partitionValueList(partitionValues).build();
       }).collect(Collectors.toList());
 
-      List<CompletableFuture<BatchUpdatePartitionResponse>> futures = new 
ArrayList<>();
-      for (List<BatchUpdatePartitionRequestEntry> batch : 
CollectionUtils.batches(updatePartitionEntries, MAX_PARTITIONS_PER_REQUEST)) {
-        BatchUpdatePartitionRequest request = 
BatchUpdatePartitionRequest.builder()
-                
.databaseName(databaseName).tableName(tableName).entries(batch).build();
-        futures.add(awsGlue.batchUpdatePartition(request));
-      }
+      BatchUpdatePartitionRequest request = 
BatchUpdatePartitionRequest.builder()
+              
.databaseName(databaseName).tableName(table.name()).entries(updatePartitionEntries).build();
+      CompletableFuture<BatchUpdatePartitionResponse> future = 
awsGlue.batchUpdatePartition(request);
 
-      for (CompletableFuture<BatchUpdatePartitionResponse> future : futures) {
-        BatchUpdatePartitionResponse response = future.get();
-        if (CollectionUtils.nonEmpty(response.errors())) {
-          throw new HoodieGlueSyncException("Fail to update partitions to " + 
tableId(databaseName, tableName)
-              + " with error(s): " + response.errors());
-        }
+      BatchUpdatePartitionResponse response = future.get();
+      if (CollectionUtils.nonEmpty(response.errors())) {
+        throw new HoodieGlueSyncException("Fail to update partitions to " + 
tableId(databaseName, table.name())
+            + " with error(s): " + response.errors());
       }
     } catch (Exception e) {
-      throw new HoodieGlueSyncException("Fail to update partitions to " + 
tableId(databaseName, tableName), e);
+      throw new HoodieGlueSyncException("Fail to update partitions to " + 
tableId(databaseName, table.name()), e);
     }
   }
 
   @Override
   public void dropPartitions(String tableName, List<String> partitionsToDrop) {
-    if (CollectionUtils.isNullOrEmpty(partitionsToDrop)) {
-      LOG.info("No partitions to drop for " + tableName);
+    if (partitionsToDrop.isEmpty()) {
+      LOG.info("No partitions to drop for " + tableId(this.databaseName, 
tableName));
       return;
     }
-    LOG.info("Drop " + partitionsToDrop.size() + "partition(s) in table " + 
tableId(databaseName, tableName));
-    try {
-      List<CompletableFuture<BatchDeletePartitionResponse>> futures = new 
ArrayList<>();
-      for (List<String> batch : CollectionUtils.batches(partitionsToDrop, 
MAX_DELETE_PARTITIONS_PER_REQUEST)) {
-
-        List<PartitionValueList> partitionValueLists = 
batch.stream().map(partition -> {
-          PartitionValueList partitionValueList = PartitionValueList.builder()
-                  
.values(partitionValueExtractor.extractPartitionValuesInPath(partition))
-                  .build();
-          return partitionValueList;
-        }).collect(Collectors.toList());
+    HoodieTimer timer = HoodieTimer.start();
+    parallelizeChange(partitionsToDrop, this.changeParallelism, partitions -> 
this.dropPartitionsInternal(tableName, partitions), 
MAX_DELETE_PARTITIONS_PER_REQUEST);
+    LOG.info("Deleted {} partitions to table {} in {} ms", 
partitionsToDrop.size(), tableId(this.databaseName, tableName), 
timer.endTimer());

Review Comment:
   Not sure whether the timer can be ensured to always close down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to