yihua commented on code in PR #10460:
URL: https://github.com/apache/hudi/pull/10460#discussion_r1519068094
##########
hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java:
##########
@@ -40,6 +42,28 @@ public class GlueCatalogSyncClientConfig extends
HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Glue catalog sync based client will skip archiving
the table version if this config is set to true");
+ public static final ConfigProperty<Integer> ALL_PARTITIONS_READ_PARALLELISM
= ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "all_partitions_read_parallelism")
+ .defaultValue(1)
+ .markAdvanced()
+ .withValidValues(IntStream.rangeClosed(1,
10).mapToObj(Integer::toString).toArray(String[]::new))
+ .sinceVersion("1.0.0")
+ .withDocumentation("Parallelism for listing all partitions(first time
sync). Should be in interval [1, 10].");
+
+ public static final ConfigProperty<Integer>
CHANGED_PARTITIONS_READ_PARALLELISM = ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "changed_partitions_read_parallelism")
+ .defaultValue(1)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Parallelism for listing changed partitions(second
and subsequent syncs).");
+
+ public static final ConfigProperty<Integer> CHANGE_PARALLELISM =
ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "change_parallelism")
+ .defaultValue(1)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Parallelism for change operations - such as
create/update/delete.");
Review Comment:
Similar here on value range.
##########
hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java:
##########
@@ -40,6 +42,28 @@ public class GlueCatalogSyncClientConfig extends
HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Glue catalog sync based client will skip archiving
the table version if this config is set to true");
+ public static final ConfigProperty<Integer> ALL_PARTITIONS_READ_PARALLELISM
= ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "all_partitions_read_parallelism")
+ .defaultValue(1)
+ .markAdvanced()
+ .withValidValues(IntStream.rangeClosed(1,
10).mapToObj(Integer::toString).toArray(String[]::new))
+ .sinceVersion("1.0.0")
+ .withDocumentation("Parallelism for listing all partitions(first time
sync). Should be in interval [1, 10].");
Review Comment:
```suggestion
.withDocumentation("Parallelism for listing all partitions(first time
sync). Should be in the range of [1, 10].");
```
##########
hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java:
##########
@@ -40,6 +42,28 @@ public class GlueCatalogSyncClientConfig extends
HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Glue catalog sync based client will skip archiving
the table version if this config is set to true");
+ public static final ConfigProperty<Integer> ALL_PARTITIONS_READ_PARALLELISM
= ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "all_partitions_read_parallelism")
+ .defaultValue(1)
+ .markAdvanced()
+ .withValidValues(IntStream.rangeClosed(1,
10).mapToObj(Integer::toString).toArray(String[]::new))
+ .sinceVersion("1.0.0")
+ .withDocumentation("Parallelism for listing all partitions(first time
sync). Should be in interval [1, 10].");
+
+ public static final ConfigProperty<Integer>
CHANGED_PARTITIONS_READ_PARALLELISM = ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "changed_partitions_read_parallelism")
+ .defaultValue(1)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Parallelism for listing changed partitions(second
and subsequent syncs).");
+
+ public static final ConfigProperty<Integer> CHANGE_PARALLELISM =
ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "change_parallelism")
Review Comment:
nit: rename to sth like
`PARTITION_CHANGE_PARALLELISM`/`partition_change_parallelism`
##########
hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java:
##########
@@ -40,6 +42,28 @@ public class GlueCatalogSyncClientConfig extends
HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Glue catalog sync based client will skip archiving
the table version if this config is set to true");
+ public static final ConfigProperty<Integer> ALL_PARTITIONS_READ_PARALLELISM
= ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "all_partitions_read_parallelism")
+ .defaultValue(1)
+ .markAdvanced()
+ .withValidValues(IntStream.rangeClosed(1,
10).mapToObj(Integer::toString).toArray(String[]::new))
+ .sinceVersion("1.0.0")
+ .withDocumentation("Parallelism for listing all partitions(first time
sync). Should be in interval [1, 10].");
+
+ public static final ConfigProperty<Integer>
CHANGED_PARTITIONS_READ_PARALLELISM = ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "changed_partitions_read_parallelism")
+ .defaultValue(1)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Parallelism for listing changed partitions(second
and subsequent syncs).");
Review Comment:
Also, is there a reason to separate
`ALL_PARTITIONS_READ_PARALLELISM`/`CHANGED_PARTITIONS_READ_PARALLELISM` instead
of having one `PARTITION_READ_PARALLELISM`?
##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -138,42 +156,141 @@ public List<Partition> getAllPartitions(String
tableName) {
}
}
+ @Override
+ public List<Partition> getAllPartitions(String tableName) {
+ ExecutorService executorService =
Executors.newFixedThreadPool(this.allPartitionsReadParallelism);
+ try {
+ List<Segment> segments = new ArrayList<>();
+ for (int i = 0; i < allPartitionsReadParallelism; i++) {
+ segments.add(Segment.builder()
+ .segmentNumber(i)
+ .totalSegments(allPartitionsReadParallelism).build());
+ }
+ List<Future<List<Partition>>> futures = segments.stream()
+ .map(segment -> executorService.submit(() ->
this.getPartitionsSegment(segment, tableName)))
+ .collect(Collectors.toList());
+
+ List<Partition> partitions = new ArrayList<>();
+ for (Future<List<Partition>> future : futures) {
+ partitions.addAll(future.get());
+ }
+
+ return partitions;
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to get all partitions for
table " + tableId(databaseName, tableName), e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ @Override
+ public List<Partition> getPartitionsFromList(String tableName, List<String>
partitionList) {
+ if (partitionList.isEmpty()) {
+ LOG.info("No partitions to read for " + tableId(this.databaseName,
tableName));
+ return Collections.emptyList();
+ }
+ HoodieTimer timer = HoodieTimer.start();
+ List<List<String>> batches = CollectionUtils.batches(partitionList,
MAX_PARTITIONS_PER_READ_REQUEST);
+ ExecutorService executorService =
Executors.newFixedThreadPool(Math.min(this.changedPartitionsReadParallelism,
batches.size()));
+ try {
+ List<Future<List<Partition>>> futures = batches
+ .stream()
+ .map(batch -> executorService.submit(() ->
this.getChangedPartitions(batch, tableName)))
+ .collect(Collectors.toList());
+
+ List<Partition> partitions = new ArrayList<>();
+ for (Future<List<Partition>> future : futures) {
+ partitions.addAll(future.get());
+ }
+ LOG.info(
+ "Requested {} partitions, found existing {} partitions, new {}
partitions, took {} ms to perform.",
+ partitionList.size(),
+ partitions.size(),
+ partitionList.size() - partitions.size(),
+ timer.endTimer()
+ );
+
+ return partitions;
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to get all partitions for
table " + tableId(this.databaseName, tableName), e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ private List<Partition> getChangedPartitions(List<String> changedPartitions,
String tableName) throws ExecutionException, InterruptedException {
+ List<PartitionValueList> partitionValueList =
changedPartitions.stream().map(str -> {
+ PartitionValueList individualPartition =
PartitionValueList.builder().values(partitionValueExtractor.extractPartitionValuesInPath(str)).build();
+ return individualPartition;
+ }).collect(Collectors.toList());
+ BatchGetPartitionRequest request = BatchGetPartitionRequest.builder()
+ .databaseName(this.databaseName)
+ .tableName(tableName)
+ .partitionsToGet(partitionValueList)
+ .build();
+ BatchGetPartitionResponse callResult =
awsGlue.batchGetPartition(request).get();
+ List<Partition> result = callResult
+ .partitions()
+ .stream()
+ .map(p -> new Partition(p.values(), p.storageDescriptor().location()))
+ .collect(Collectors.toList());
+
+ return result;
+ }
+
@Override
public void addPartitionsToTable(String tableName, List<String>
partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
- LOG.info("No partitions to add for " + tableId(databaseName, tableName));
+ LOG.info("No partitions to add for " + tableId(this.databaseName,
tableName));
return;
}
- LOG.info("Adding " + partitionsToAdd.size() + " partition(s) in table " +
tableId(databaseName, tableName));
+ HoodieTimer timer = HoodieTimer.start();
+ parallelizeChange(partitionsToAdd, this.changeParallelism, partitions ->
this.addPartitionsToTableInternal(tableName, partitions),
MAX_PARTITIONS_PER_CHANGE_REQUEST);
+ LOG.info("Added {} partitions to table {} in {} ms",
partitionsToAdd.size(), tableId(this.databaseName, tableName),
timer.endTimer());
+ }
+
+ private <T> void parallelizeChange(List<T> items, int parallelism,
Consumer<List<T>> consumer, int sliceSize) {
+ List<List<T>> batches = CollectionUtils.batches(items, sliceSize);
+ ExecutorService executorService =
Executors.newFixedThreadPool(Math.min(parallelism, batches.size()));
+ try {
+ List<Future<?>> futures = batches.stream()
+ .map(item -> executorService.submit(() -> {
+ consumer.accept(item);
+ }))
+ .collect(Collectors.toList());
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to parallelize operation", e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ private void addPartitionsToTableInternal(String tableName, List<String>
partitionsToAdd) {
try {
Table table = getTable(awsGlue, databaseName, tableName);
StorageDescriptor sd = table.storageDescriptor();
- List<PartitionInput> partitionInputs =
partitionsToAdd.stream().map(partition -> {
+ List<PartitionInput> partitionInput =
partitionsToAdd.stream().map(partition -> {
Review Comment:
```suggestion
List<PartitionInput> partitionInputList =
partitionsToAdd.stream().map(partition -> {
```
##########
hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java:
##########
@@ -40,6 +42,28 @@ public class GlueCatalogSyncClientConfig extends
HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Glue catalog sync based client will skip archiving
the table version if this config is set to true");
+ public static final ConfigProperty<Integer> ALL_PARTITIONS_READ_PARALLELISM
= ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "all_partitions_read_parallelism")
+ .defaultValue(1)
+ .markAdvanced()
+ .withValidValues(IntStream.rangeClosed(1,
10).mapToObj(Integer::toString).toArray(String[]::new))
+ .sinceVersion("1.0.0")
+ .withDocumentation("Parallelism for listing all partitions(first time
sync). Should be in interval [1, 10].");
+
+ public static final ConfigProperty<Integer>
CHANGED_PARTITIONS_READ_PARALLELISM = ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "changed_partitions_read_parallelism")
+ .defaultValue(1)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Parallelism for listing changed partitions(second
and subsequent syncs).");
Review Comment:
Does this parallelism has to be in [1, 10]?
##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -138,42 +156,141 @@ public List<Partition> getAllPartitions(String
tableName) {
}
}
+ @Override
+ public List<Partition> getAllPartitions(String tableName) {
+ ExecutorService executorService =
Executors.newFixedThreadPool(this.allPartitionsReadParallelism);
+ try {
+ List<Segment> segments = new ArrayList<>();
+ for (int i = 0; i < allPartitionsReadParallelism; i++) {
+ segments.add(Segment.builder()
+ .segmentNumber(i)
+ .totalSegments(allPartitionsReadParallelism).build());
+ }
+ List<Future<List<Partition>>> futures = segments.stream()
+ .map(segment -> executorService.submit(() ->
this.getPartitionsSegment(segment, tableName)))
+ .collect(Collectors.toList());
+
+ List<Partition> partitions = new ArrayList<>();
+ for (Future<List<Partition>> future : futures) {
+ partitions.addAll(future.get());
+ }
+
+ return partitions;
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to get all partitions for
table " + tableId(databaseName, tableName), e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ @Override
+ public List<Partition> getPartitionsFromList(String tableName, List<String>
partitionList) {
+ if (partitionList.isEmpty()) {
+ LOG.info("No partitions to read for " + tableId(this.databaseName,
tableName));
+ return Collections.emptyList();
+ }
+ HoodieTimer timer = HoodieTimer.start();
+ List<List<String>> batches = CollectionUtils.batches(partitionList,
MAX_PARTITIONS_PER_READ_REQUEST);
+ ExecutorService executorService =
Executors.newFixedThreadPool(Math.min(this.changedPartitionsReadParallelism,
batches.size()));
+ try {
+ List<Future<List<Partition>>> futures = batches
+ .stream()
+ .map(batch -> executorService.submit(() ->
this.getChangedPartitions(batch, tableName)))
+ .collect(Collectors.toList());
+
+ List<Partition> partitions = new ArrayList<>();
+ for (Future<List<Partition>> future : futures) {
+ partitions.addAll(future.get());
+ }
+ LOG.info(
+ "Requested {} partitions, found existing {} partitions, new {}
partitions, took {} ms to perform.",
+ partitionList.size(),
+ partitions.size(),
+ partitionList.size() - partitions.size(),
+ timer.endTimer()
+ );
+
+ return partitions;
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to get all partitions for
table " + tableId(this.databaseName, tableName), e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ private List<Partition> getChangedPartitions(List<String> changedPartitions,
String tableName) throws ExecutionException, InterruptedException {
+ List<PartitionValueList> partitionValueList =
changedPartitions.stream().map(str -> {
+ PartitionValueList individualPartition =
PartitionValueList.builder().values(partitionValueExtractor.extractPartitionValuesInPath(str)).build();
+ return individualPartition;
+ }).collect(Collectors.toList());
+ BatchGetPartitionRequest request = BatchGetPartitionRequest.builder()
+ .databaseName(this.databaseName)
+ .tableName(tableName)
+ .partitionsToGet(partitionValueList)
+ .build();
+ BatchGetPartitionResponse callResult =
awsGlue.batchGetPartition(request).get();
+ List<Partition> result = callResult
+ .partitions()
+ .stream()
+ .map(p -> new Partition(p.values(), p.storageDescriptor().location()))
+ .collect(Collectors.toList());
+
+ return result;
+ }
+
@Override
public void addPartitionsToTable(String tableName, List<String>
partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
- LOG.info("No partitions to add for " + tableId(databaseName, tableName));
+ LOG.info("No partitions to add for " + tableId(this.databaseName,
tableName));
return;
}
- LOG.info("Adding " + partitionsToAdd.size() + " partition(s) in table " +
tableId(databaseName, tableName));
+ HoodieTimer timer = HoodieTimer.start();
+ parallelizeChange(partitionsToAdd, this.changeParallelism, partitions ->
this.addPartitionsToTableInternal(tableName, partitions),
MAX_PARTITIONS_PER_CHANGE_REQUEST);
+ LOG.info("Added {} partitions to table {} in {} ms",
partitionsToAdd.size(), tableId(this.databaseName, tableName),
timer.endTimer());
+ }
+
+ private <T> void parallelizeChange(List<T> items, int parallelism,
Consumer<List<T>> consumer, int sliceSize) {
Review Comment:
Should this util method be moved to `ThreadUtils`?
##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java:
##########
@@ -138,42 +156,141 @@ public List<Partition> getAllPartitions(String
tableName) {
}
}
+ @Override
+ public List<Partition> getAllPartitions(String tableName) {
+ ExecutorService executorService =
Executors.newFixedThreadPool(this.allPartitionsReadParallelism);
+ try {
+ List<Segment> segments = new ArrayList<>();
+ for (int i = 0; i < allPartitionsReadParallelism; i++) {
+ segments.add(Segment.builder()
+ .segmentNumber(i)
+ .totalSegments(allPartitionsReadParallelism).build());
+ }
+ List<Future<List<Partition>>> futures = segments.stream()
+ .map(segment -> executorService.submit(() ->
this.getPartitionsSegment(segment, tableName)))
+ .collect(Collectors.toList());
+
+ List<Partition> partitions = new ArrayList<>();
+ for (Future<List<Partition>> future : futures) {
+ partitions.addAll(future.get());
+ }
+
+ return partitions;
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to get all partitions for
table " + tableId(databaseName, tableName), e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ @Override
+ public List<Partition> getPartitionsFromList(String tableName, List<String>
partitionList) {
+ if (partitionList.isEmpty()) {
+ LOG.info("No partitions to read for " + tableId(this.databaseName,
tableName));
+ return Collections.emptyList();
+ }
+ HoodieTimer timer = HoodieTimer.start();
+ List<List<String>> batches = CollectionUtils.batches(partitionList,
MAX_PARTITIONS_PER_READ_REQUEST);
+ ExecutorService executorService =
Executors.newFixedThreadPool(Math.min(this.changedPartitionsReadParallelism,
batches.size()));
+ try {
+ List<Future<List<Partition>>> futures = batches
+ .stream()
+ .map(batch -> executorService.submit(() ->
this.getChangedPartitions(batch, tableName)))
+ .collect(Collectors.toList());
+
+ List<Partition> partitions = new ArrayList<>();
+ for (Future<List<Partition>> future : futures) {
+ partitions.addAll(future.get());
+ }
+ LOG.info(
+ "Requested {} partitions, found existing {} partitions, new {}
partitions, took {} ms to perform.",
+ partitionList.size(),
+ partitions.size(),
+ partitionList.size() - partitions.size(),
+ timer.endTimer()
+ );
+
+ return partitions;
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to get all partitions for
table " + tableId(this.databaseName, tableName), e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ private List<Partition> getChangedPartitions(List<String> changedPartitions,
String tableName) throws ExecutionException, InterruptedException {
+ List<PartitionValueList> partitionValueList =
changedPartitions.stream().map(str -> {
+ PartitionValueList individualPartition =
PartitionValueList.builder().values(partitionValueExtractor.extractPartitionValuesInPath(str)).build();
+ return individualPartition;
+ }).collect(Collectors.toList());
+ BatchGetPartitionRequest request = BatchGetPartitionRequest.builder()
+ .databaseName(this.databaseName)
+ .tableName(tableName)
+ .partitionsToGet(partitionValueList)
+ .build();
+ BatchGetPartitionResponse callResult =
awsGlue.batchGetPartition(request).get();
+ List<Partition> result = callResult
+ .partitions()
+ .stream()
+ .map(p -> new Partition(p.values(), p.storageDescriptor().location()))
+ .collect(Collectors.toList());
+
+ return result;
+ }
+
@Override
public void addPartitionsToTable(String tableName, List<String>
partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
- LOG.info("No partitions to add for " + tableId(databaseName, tableName));
+ LOG.info("No partitions to add for " + tableId(this.databaseName,
tableName));
return;
}
- LOG.info("Adding " + partitionsToAdd.size() + " partition(s) in table " +
tableId(databaseName, tableName));
+ HoodieTimer timer = HoodieTimer.start();
+ parallelizeChange(partitionsToAdd, this.changeParallelism, partitions ->
this.addPartitionsToTableInternal(tableName, partitions),
MAX_PARTITIONS_PER_CHANGE_REQUEST);
+ LOG.info("Added {} partitions to table {} in {} ms",
partitionsToAdd.size(), tableId(this.databaseName, tableName),
timer.endTimer());
+ }
+
+ private <T> void parallelizeChange(List<T> items, int parallelism,
Consumer<List<T>> consumer, int sliceSize) {
+ List<List<T>> batches = CollectionUtils.batches(items, sliceSize);
+ ExecutorService executorService =
Executors.newFixedThreadPool(Math.min(parallelism, batches.size()));
+ try {
+ List<Future<?>> futures = batches.stream()
+ .map(item -> executorService.submit(() -> {
+ consumer.accept(item);
+ }))
+ .collect(Collectors.toList());
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to parallelize operation", e);
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ private void addPartitionsToTableInternal(String tableName, List<String>
partitionsToAdd) {
try {
Table table = getTable(awsGlue, databaseName, tableName);
Review Comment:
This incurs `GetTableRequest`. Previously there was one such request in
`addPartitionsToTable.` After the changes, each partition batch has to send
one table request, which can be avoided. Similar for
`updatePartitionsToTableInternal`. Could you keep it just one request overall
instead of one request per thread?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]