This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b1fc2fbb988 [HUDI-6230] Handle aws glue partition index (#8743)
b1fc2fbb988 is described below
commit b1fc2fbb98833099d81c2820b0efb4e235661d97
Author: Nicolas Paris <[email protected]>
AuthorDate: Fri Jan 26 03:01:18 2024 +0100
[HUDI-6230] Handle aws glue partition index (#8743)
---
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 137 ++++++++++++++++++++-
.../hudi/config/GlueCatalogSyncClientConfig.java | 19 +++
2 files changed, 155 insertions(+), 1 deletion(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 4a846e4970f..f0a15916dc0 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -40,14 +40,20 @@ import
software.amazon.awssdk.services.glue.model.BatchUpdatePartitionResponse;
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionIndexRequest;
import software.amazon.awssdk.services.glue.model.CreateTableRequest;
import software.amazon.awssdk.services.glue.model.CreateTableResponse;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeletePartitionIndexRequest;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionIndexesRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionIndexesResponse;
import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.PartitionIndex;
+import software.amazon.awssdk.services.glue.model.PartitionIndexDescriptor;
import software.amazon.awssdk.services.glue.model.PartitionInput;
import software.amazon.awssdk.services.glue.model.PartitionValueList;
import software.amazon.awssdk.services.glue.model.SerDeInfo;
@@ -55,12 +61,14 @@ import
software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;
import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -74,6 +82,8 @@ import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.MapUtils.containsAll;
import static org.apache.hudi.common.util.MapUtils.isNullOrEmpty;
import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_METADATA_FILE_LISTING;
+import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS;
+import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS_ENABLE;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
@@ -95,7 +105,8 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
private static final int MAX_DELETE_PARTITIONS_PER_REQUEST = 25;
private final GlueAsyncClient awsGlue;
- private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L;
+ private static final String GLUE_PARTITION_INDEX_ENABLE =
"partition_filtering.enabled";
+ private static final int PARTITION_INDEX_MAX_NUMBER = 3;
/**
* athena v2/v3 table property
* see https://docs.aws.amazon.com/athena/latest/ug/querying-hudi.html
@@ -432,6 +443,120 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
}
}
+ /**
+ * This will manage partitions indexes. Users can activate/deactivate them
on existing tables.
+ * Removing index definition, will result in dropping the index.
+ * <p>
+ * reference doc for partition indexes:
+ *
https://docs.aws.amazon.com/glue/latest/dg/partition-indexes.html#partition-index-getpartitions
+ *
+ * @param tableName
+ */
+ public void managePartitionIndexes(String tableName) throws
ExecutionException, InterruptedException {
+ if (!config.getBooleanOrDefault(META_SYNC_PARTITION_INDEX_FIELDS_ENABLE)) {
+ // deactivate indexing if enabled
+ if (getPartitionIndexEnable(tableName)) {
+ LOG.warn("Deactivating partition indexing");
+ updatePartitionIndexEnable(tableName, false);
+ }
+ // also drop all existing indexes
+ GetPartitionIndexesRequest indexesRequest =
GetPartitionIndexesRequest.builder().databaseName(databaseName).tableName(tableName).build();
+ GetPartitionIndexesResponse existingIdxsResp =
awsGlue.getPartitionIndexes(indexesRequest).get();
+ for (PartitionIndexDescriptor idsToDelete :
existingIdxsResp.partitionIndexDescriptorList()) {
+ LOG.warn("Dropping partition index: " + idsToDelete.indexName());
+ DeletePartitionIndexRequest idxToDelete =
DeletePartitionIndexRequest.builder()
+
.databaseName(databaseName).tableName(tableName).indexName(idsToDelete.indexName()).build();
+ awsGlue.deletePartitionIndex(idxToDelete).get();
+ }
+ } else {
+ // activate indexing usage if disabled
+ if (!getPartitionIndexEnable(tableName)) {
+ LOG.warn("Activating partition indexing");
+ updatePartitionIndexEnable(tableName, true);
+ }
+
+ // get indexes to be created
+ List<List<String>> partitionsIndexNeeded = parsePartitionsIndexConfig();
+ // get existing indexes
+ GetPartitionIndexesRequest indexesRequest =
GetPartitionIndexesRequest.builder()
+ .databaseName(databaseName).tableName(tableName).build();
+ GetPartitionIndexesResponse existingIdxsResp =
awsGlue.getPartitionIndexes(indexesRequest).get();
+
+ // for each existing index remove if not relevant anymore
+ boolean indexesChanges = false;
+ for (PartitionIndexDescriptor existingIdx:
existingIdxsResp.partitionIndexDescriptorList()) {
+ List<String> idxColumns = existingIdx.keys().stream().map(key ->
key.name()).collect(Collectors.toList());
+ Boolean toBeRemoved = true;
+ for (List<String> neededIdx : partitionsIndexNeeded) {
+ if (neededIdx.equals(idxColumns)) {
+ toBeRemoved = false;
+ }
+ }
+ if (toBeRemoved) {
+ indexesChanges = true;
+ DeletePartitionIndexRequest idxToDelete =
DeletePartitionIndexRequest.builder()
+
.databaseName(databaseName).tableName(tableName).indexName(existingIdx.indexName()).build();
+ LOG.warn("Dropping irrelevant index: " + existingIdx.indexName());
+ awsGlue.deletePartitionIndex(idxToDelete).get();
+ }
+ }
+ if (indexesChanges) { // refresh indexes list
+ existingIdxsResp = awsGlue.getPartitionIndexes(indexesRequest).get();
+ }
+
+ // for each needed index create if not exist
+ for (List<String> neededIdx : partitionsIndexNeeded) {
+ Boolean toBeCreated = true;
+ for (PartitionIndexDescriptor existingIdx:
existingIdxsResp.partitionIndexDescriptorList()) {
+ List<String> collect = existingIdx.keys().stream().map(key ->
key.name()).collect(Collectors.toList());
+ if (collect.equals(neededIdx)) {
+ toBeCreated = false;
+ }
+ }
+ if (toBeCreated) {
+ String newIdxName = String.format("hudi_managed_%s",
neededIdx.toString());
+ PartitionIndex newIdx = PartitionIndex.builder()
+ .indexName(newIdxName)
+ .keys(neededIdx).build();
+ LOG.warn("Creating new partition index: " + newIdxName);
+ CreatePartitionIndexRequest creationRequest =
CreatePartitionIndexRequest.builder()
+
.databaseName(databaseName).tableName(tableName).partitionIndex(newIdx).build();
+ awsGlue.createPartitionIndex(creationRequest).get();
+ }
+ }
+ }
+ }
+
+ protected List<List<String>> parsePartitionsIndexConfig() {
+ config.setDefaultValue(META_SYNC_PARTITION_INDEX_FIELDS);
+ String rawPartitionIndex =
config.getString(META_SYNC_PARTITION_INDEX_FIELDS);
+ List<List<String>> indexes = Arrays.stream(rawPartitionIndex.split(","))
+ .map(idx ->
Arrays.stream(idx.split(";"))
+
.collect(Collectors.toList())).collect(Collectors.toList());
+ if (indexes.size() > PARTITION_INDEX_MAX_NUMBER) {
+ LOG.warn(String.format("Only considering first %s indexes",
PARTITION_INDEX_MAX_NUMBER));
+ return indexes.subList(0, PARTITION_INDEX_MAX_NUMBER);
+ }
+ return indexes;
+ }
+
+ public Boolean getPartitionIndexEnable(String tableName) {
+ try {
+ Table table = getTable(awsGlue, databaseName, tableName);
+ return
Boolean.valueOf(table.parameters().get(GLUE_PARTITION_INDEX_ENABLE));
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Fail to get parameter " +
GLUE_PARTITION_INDEX_ENABLE + " time for " + tableId(databaseName, tableName),
e);
+ }
+ }
+
+ public void updatePartitionIndexEnable(String tableName, Boolean enable) {
+ try {
+ updateTableParameters(awsGlue, databaseName, tableName,
Collections.singletonMap(GLUE_PARTITION_INDEX_ENABLE, enable.toString()),
false);
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Fail to update parameter " +
GLUE_PARTITION_INDEX_ENABLE + " time for " + tableId(databaseName, tableName),
e);
+ }
+ }
+
@Override
public Map<String, String> getMetastoreSchema(String tableName) {
try {
@@ -540,6 +665,16 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update last sync commit time
for " + tableId(databaseName, tableName), e);
}
+ try {
+ // as a side effect, we also refresh the partition indexes if needed
+ // people may wan't to add indexes, without re-creating the table
+ // therefore we call this at each commit as a workaround
+ managePartitionIndexes(tableName);
+ } catch (ExecutionException e) {
+ LOG.warn("An indexation process is currently running.", e);
+ } catch (Exception e) {
+ LOG.warn("Something went wrong with partition index", e);
+ }
}
@Override
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
index efffae5bd89..21244e65154 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
@@ -22,6 +22,9 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
/**
* Hoodie Configs for Glue.
@@ -46,4 +49,20 @@ public class GlueCatalogSyncClientConfig extends
HoodieConfig {
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Makes athena use the metadata table to list
partitions and files. Currently it won't benefit from other features such stats
indexes");
+
+ public static final ConfigProperty<Boolean>
META_SYNC_PARTITION_INDEX_FIELDS_ENABLE = ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "partition_index_fields.enable")
+ .defaultValue(false)
+ .sinceVersion("1.0.0")
+ .withDocumentation("Enable aws glue partition index feature, to speedup
partition based query pattern");
+
+ public static final ConfigProperty<String> META_SYNC_PARTITION_INDEX_FIELDS
= ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "partition_index_fields")
+ .noDefaultValue()
+ .withInferFunction(cfg ->
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+ .or(() ->
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
+ .sinceVersion("1.0.0")
+ .withDocumentation(String.join(" ", "Specify the partitions fields to
index on aws glue. Separate the fields by semicolon.",
+ "By default, when the feature is enabled, all the partition will be
indexed.",
+ "You can create up to three indexes, separate them by comma. Eg:
col1;col2;col3,col2,col3"));
}