VitoMakarevich commented on code in PR #10460:
URL: https://github.com/apache/hudi/pull/10460#discussion_r1526062021
##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -141,105 +156,208 @@ public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
this.skipTableArchive =
config.getBooleanOrDefault(GlueCatalogSyncClientConfig.GLUE_SKIP_TABLE_ARCHIVE);
this.enableMetadataTable =
Boolean.toString(config.getBoolean(GLUE_METADATA_FILE_LISTING)).toUpperCase();
+ this.allPartitionsReadParallelism =
config.getIntOrDefault(ALL_PARTITIONS_READ_PARALLELISM);
+ this.changedPartitionsReadParallelism =
config.getIntOrDefault(CHANGED_PARTITIONS_READ_PARALLELISM);
+ this.changeParallelism =
config.getIntOrDefault(PARTITION_CHANGE_PARALLELISM);
}
- @Override
- public List<Partition> getAllPartitions(String tableName) {
+ private List<Partition> getPartitionsSegment(Segment segment, String
tableName) {
try {
- return getPartitions(GetPartitionsRequest.builder()
- .databaseName(databaseName)
- .tableName(tableName));
+ List<Partition> partitions = new ArrayList<>();
+ String nextToken = null;
+ do {
+ GetPartitionsResponse result =
awsGlue.getPartitions(GetPartitionsRequest.builder()
+ .databaseName(databaseName)
+ .tableName(tableName)
+ .segment(segment)
+ .nextToken(nextToken)
+ .build()).get();
+ partitions.addAll(result.partitions().stream()
+ .map(p -> new Partition(p.values(),
p.storageDescriptor().location()))
+ .collect(Collectors.toList()));
+ nextToken = result.nextToken();
+ } while (nextToken != null);
+ return partitions;
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get all partitions for
table " + tableId(databaseName, tableName), e);
}
}
- @Override
- public List<Partition> getPartitionsByFilter(String tableName, String
filter) {
- try {
- if (filter.length() <= GLUE_EXPRESSION_MAX_CHARS) {
- LOG.info("Pushdown filters: {}", filter);
- return getPartitions(GetPartitionsRequest.builder()
- .databaseName(databaseName)
- .tableName(tableName)
- .expression(filter));
- } else {
- LOG.warn("Falling back to listing all partition since expression
filter length > {}", GLUE_EXPRESSION_MAX_CHARS);
- return getAllPartitions(tableName);
- }
- } catch (Exception e) {
- throw new HoodieGlueSyncException("Failed to get partitions for table "
+ tableId(databaseName, tableName) + " from expression: " + filter, e);
- }
- }
-
private List<Partition> getPartitions(GetPartitionsRequest.Builder
partitionRequestBuilder) throws InterruptedException, ExecutionException {
List<Partition> partitions = new ArrayList<>();
String nextToken = null;
do {
GetPartitionsResponse result =
awsGlue.getPartitions(partitionRequestBuilder
- .excludeColumnSchema(true)
- .nextToken(nextToken)
- .build()).get();
+ .excludeColumnSchema(true)
+ .nextToken(nextToken)
+ .build()).get();
partitions.addAll(result.partitions().stream()
- .map(p -> new Partition(p.values(),
p.storageDescriptor().location()))
- .collect(Collectors.toList()));
+ .map(p -> new Partition(p.values(),
p.storageDescriptor().location()))
+ .collect(Collectors.toList()));
nextToken = result.nextToken();
} while (nextToken != null);
return partitions;
}
+ @Override
+ public List<Partition> getAllPartitions(String tableName) {
+ ExecutorService executorService =
Executors.newFixedThreadPool(this.allPartitionsReadParallelism, new
CustomizedThreadFactory("glue-sync-all-partitions", true));
+ try {
+ List<Segment> segments = new ArrayList<>();
+ for (int i = 0; i < allPartitionsReadParallelism; i++) {
+ segments.add(Segment.builder()
+ .segmentNumber(i)
+ .totalSegments(allPartitionsReadParallelism).build());
+ }
+ List<Future<List<Partition>>> futures = segments.stream()
+ .map(segment -> executorService.submit(() ->
this.getPartitionsSegment(segment, tableName)))
+ .collect(Collectors.toList());
+
+ List<Partition> partitions = new ArrayList<>();
+ for (Future<List<Partition>> future : futures) {
+ partitions.addAll(future.get());
+ }
+
+ return partitions;
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to get all partitions for
table " + tableId(databaseName, tableName), e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ @Override
+ public List<Partition> getPartitionsFromList(String tableName, List<String>
partitionList) {
+ if (partitionList.isEmpty()) {
+ LOG.info("No partitions to read for " + tableId(this.databaseName,
tableName));
+ return Collections.emptyList();
+ }
+ HoodieTimer timer = HoodieTimer.start();
+ List<List<String>> batches = CollectionUtils.batches(partitionList,
MAX_PARTITIONS_PER_READ_REQUEST);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ Math.min(this.changedPartitionsReadParallelism, batches.size()),
+ new CustomizedThreadFactory("glue-sync-get-partitions-" + tableName,
true)
+ );
+ try {
+ List<Future<List<Partition>>> futures = batches
+ .stream()
+ .map(batch -> executorService.submit(() ->
this.getChangedPartitions(batch, tableName)))
+ .collect(Collectors.toList());
+
+ List<Partition> partitions = new ArrayList<>();
+ for (Future<List<Partition>> future : futures) {
+ partitions.addAll(future.get());
+ }
+ LOG.info(
+ "Requested {} partitions, found existing {} partitions, new {}
partitions, took {} ms to perform.",
+ partitionList.size(),
+ partitions.size(),
+ partitionList.size() - partitions.size(),
+ timer.endTimer()
+ );
+
+ return partitions;
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to get all partitions for
table " + tableId(this.databaseName, tableName), e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ private List<Partition> getChangedPartitions(List<String> changedPartitions,
String tableName) throws ExecutionException, InterruptedException {
+ List<PartitionValueList> partitionValueList =
changedPartitions.stream().map(str -> {
+ PartitionValueList individualPartition =
PartitionValueList.builder().values(partitionValueExtractor.extractPartitionValuesInPath(str)).build();
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]