This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b1fc2fbb988 [HUDI-6230] Handle aws glue partition index (#8743)
b1fc2fbb988 is described below

commit b1fc2fbb98833099d81c2820b0efb4e235661d97
Author: Nicolas Paris <[email protected]>
AuthorDate: Fri Jan 26 03:01:18 2024 +0100

    [HUDI-6230] Handle aws glue partition index (#8743)
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    | 137 ++++++++++++++++++++-
 .../hudi/config/GlueCatalogSyncClientConfig.java   |  19 +++
 2 files changed, 155 insertions(+), 1 deletion(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 4a846e4970f..f0a15916dc0 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -40,14 +40,20 @@ import 
software.amazon.awssdk.services.glue.model.BatchUpdatePartitionResponse;
 import software.amazon.awssdk.services.glue.model.Column;
 import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
 import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionIndexRequest;
 import software.amazon.awssdk.services.glue.model.CreateTableRequest;
 import software.amazon.awssdk.services.glue.model.CreateTableResponse;
 import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeletePartitionIndexRequest;
 import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
 import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionIndexesRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionIndexesResponse;
 import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
 import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
 import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.PartitionIndex;
+import software.amazon.awssdk.services.glue.model.PartitionIndexDescriptor;
 import software.amazon.awssdk.services.glue.model.PartitionInput;
 import software.amazon.awssdk.services.glue.model.PartitionValueList;
 import software.amazon.awssdk.services.glue.model.SerDeInfo;
@@ -55,12 +61,14 @@ import 
software.amazon.awssdk.services.glue.model.StorageDescriptor;
 import software.amazon.awssdk.services.glue.model.Table;
 import software.amazon.awssdk.services.glue.model.TableInput;
 import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+
 import org.apache.parquet.schema.MessageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -74,6 +82,8 @@ import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
 import static org.apache.hudi.common.util.MapUtils.containsAll;
 import static org.apache.hudi.common.util.MapUtils.isNullOrEmpty;
 import static 
org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_METADATA_FILE_LISTING;
+import static 
org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS;
+import static 
org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS_ENABLE;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
 import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
@@ -95,7 +105,8 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
   private static final int MAX_PARTITIONS_PER_REQUEST = 100;
   private static final int MAX_DELETE_PARTITIONS_PER_REQUEST = 25;
   private final GlueAsyncClient awsGlue;
-  private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L;
+  private static final String GLUE_PARTITION_INDEX_ENABLE = 
"partition_filtering.enabled";
+  private static final int PARTITION_INDEX_MAX_NUMBER = 3;
   /**
    * athena v2/v3 table property
    * see https://docs.aws.amazon.com/athena/latest/ug/querying-hudi.html
@@ -432,6 +443,120 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     }
   }
 
+  /**
+   * This will manage partitions indexes. Users can activate/deactivate them 
on existing tables.
+   * Removing index definition, will result in dropping the index.
+   * <p>
+   * reference doc for partition indexes:
+   * 
https://docs.aws.amazon.com/glue/latest/dg/partition-indexes.html#partition-index-getpartitions
+   *
+   * @param tableName
+   */
+  public void managePartitionIndexes(String tableName) throws 
ExecutionException, InterruptedException {
+    if (!config.getBooleanOrDefault(META_SYNC_PARTITION_INDEX_FIELDS_ENABLE)) {
+      // deactivate indexing if enabled
+      if (getPartitionIndexEnable(tableName)) {
+        LOG.warn("Deactivating partition indexing");
+        updatePartitionIndexEnable(tableName, false);
+      }
+      // also drop all existing indexes
+      GetPartitionIndexesRequest indexesRequest = 
GetPartitionIndexesRequest.builder().databaseName(databaseName).tableName(tableName).build();
+      GetPartitionIndexesResponse existingIdxsResp = 
awsGlue.getPartitionIndexes(indexesRequest).get();
+      for (PartitionIndexDescriptor idsToDelete : 
existingIdxsResp.partitionIndexDescriptorList()) {
+        LOG.warn("Dropping partition index: " + idsToDelete.indexName());
+        DeletePartitionIndexRequest idxToDelete = 
DeletePartitionIndexRequest.builder()
+                
.databaseName(databaseName).tableName(tableName).indexName(idsToDelete.indexName()).build();
+        awsGlue.deletePartitionIndex(idxToDelete).get();
+      }
+    } else {
+      // activate indexing usage if disabled
+      if (!getPartitionIndexEnable(tableName)) {
+        LOG.warn("Activating partition indexing");
+        updatePartitionIndexEnable(tableName, true);
+      }
+
+      // get indexes to be created
+      List<List<String>> partitionsIndexNeeded = parsePartitionsIndexConfig();
+      // get existing indexes
+      GetPartitionIndexesRequest indexesRequest = 
GetPartitionIndexesRequest.builder()
+          .databaseName(databaseName).tableName(tableName).build();
+      GetPartitionIndexesResponse existingIdxsResp = 
awsGlue.getPartitionIndexes(indexesRequest).get();
+
+      // for each existing index remove if not relevant anymore
+      boolean indexesChanges = false;
+      for (PartitionIndexDescriptor existingIdx: 
existingIdxsResp.partitionIndexDescriptorList()) {
+        List<String> idxColumns = existingIdx.keys().stream().map(key -> 
key.name()).collect(Collectors.toList());
+        Boolean toBeRemoved = true;
+        for (List<String> neededIdx : partitionsIndexNeeded) {
+          if (neededIdx.equals(idxColumns)) {
+            toBeRemoved = false;
+          }
+        }
+        if (toBeRemoved) {
+          indexesChanges = true;
+          DeletePartitionIndexRequest idxToDelete = 
DeletePartitionIndexRequest.builder()
+                  
.databaseName(databaseName).tableName(tableName).indexName(existingIdx.indexName()).build();
+          LOG.warn("Dropping irrelevant index: " + existingIdx.indexName());
+          awsGlue.deletePartitionIndex(idxToDelete).get();
+        }
+      }
+      if (indexesChanges) { // refresh indexes list
+        existingIdxsResp = awsGlue.getPartitionIndexes(indexesRequest).get();
+      }
+
+      // for each needed index create if not exist
+      for (List<String> neededIdx : partitionsIndexNeeded) {
+        Boolean toBeCreated = true;
+        for (PartitionIndexDescriptor existingIdx: 
existingIdxsResp.partitionIndexDescriptorList()) {
+          List<String> collect = existingIdx.keys().stream().map(key -> 
key.name()).collect(Collectors.toList());
+          if (collect.equals(neededIdx)) {
+            toBeCreated = false;
+          }
+        }
+        if (toBeCreated) {
+          String newIdxName = String.format("hudi_managed_%s", 
neededIdx.toString());
+          PartitionIndex newIdx = PartitionIndex.builder()
+                  .indexName(newIdxName)
+                  .keys(neededIdx).build();
+          LOG.warn("Creating new partition index: " + newIdxName);
+          CreatePartitionIndexRequest creationRequest = 
CreatePartitionIndexRequest.builder()
+                  
.databaseName(databaseName).tableName(tableName).partitionIndex(newIdx).build();
+          awsGlue.createPartitionIndex(creationRequest).get();
+        }
+      }
+    }
+  }
+
+  protected List<List<String>> parsePartitionsIndexConfig() {
+    config.setDefaultValue(META_SYNC_PARTITION_INDEX_FIELDS);
+    String rawPartitionIndex = 
config.getString(META_SYNC_PARTITION_INDEX_FIELDS);
+    List<List<String>> indexes = Arrays.stream(rawPartitionIndex.split(","))
+                                       .map(idx -> 
Arrays.stream(idx.split(";"))
+                                                         
.collect(Collectors.toList())).collect(Collectors.toList());
+    if (indexes.size() > PARTITION_INDEX_MAX_NUMBER) {
+      LOG.warn(String.format("Only considering first %s indexes", 
PARTITION_INDEX_MAX_NUMBER));
+      return indexes.subList(0, PARTITION_INDEX_MAX_NUMBER);
+    }
+    return indexes;
+  }
+
+  public Boolean getPartitionIndexEnable(String tableName) {
+    try {
+      Table table = getTable(awsGlue, databaseName, tableName);
+      return 
Boolean.valueOf(table.parameters().get(GLUE_PARTITION_INDEX_ENABLE));
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Fail to get parameter " + 
GLUE_PARTITION_INDEX_ENABLE + " time for " + tableId(databaseName, tableName), 
e);
+    }
+  }
+
+  public void updatePartitionIndexEnable(String tableName, Boolean enable) {
+    try {
+      updateTableParameters(awsGlue, databaseName, tableName, 
Collections.singletonMap(GLUE_PARTITION_INDEX_ENABLE, enable.toString()), 
false);
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Fail to update parameter " + 
GLUE_PARTITION_INDEX_ENABLE + " time for " + tableId(databaseName, tableName), 
e);
+    }
+  }
+
   @Override
   public Map<String, String> getMetastoreSchema(String tableName) {
     try {
@@ -540,6 +665,16 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Fail to update last sync commit time 
for " + tableId(databaseName, tableName), e);
     }
+    try {
+      // as a side effect, we also refresh the partition indexes if needed
+      // people may wan't to add indexes, without re-creating the table
+      // therefore we call this at each commit as a workaround
+      managePartitionIndexes(tableName);
+    } catch (ExecutionException e) {
+      LOG.warn("An indexation process is currently running.", e);
+    } catch (Exception e) {
+      LOG.warn("Something went wrong with partition index", e);
+    }
   }
 
   @Override
diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
 
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
index efffae5bd89..21244e65154 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
@@ -22,6 +22,9 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 /**
  * Hoodie Configs for Glue.
@@ -46,4 +49,20 @@ public class GlueCatalogSyncClientConfig extends 
HoodieConfig {
       .markAdvanced()
       .sinceVersion("0.14.0")
       .withDocumentation("Makes athena use the metadata table to list 
partitions and files. Currently it won't benefit from other features such stats 
indexes");
+
+  public static final ConfigProperty<Boolean> 
META_SYNC_PARTITION_INDEX_FIELDS_ENABLE = ConfigProperty
+      .key(GLUE_CLIENT_PROPERTY_PREFIX + "partition_index_fields.enable")
+      .defaultValue(false)
+      .sinceVersion("1.0.0")
+      .withDocumentation("Enable aws glue partition index feature, to speedup 
partition based query pattern");
+
+  public static final ConfigProperty<String> META_SYNC_PARTITION_INDEX_FIELDS 
= ConfigProperty
+      .key(GLUE_CLIENT_PROPERTY_PREFIX + "partition_index_fields")
+      .noDefaultValue()
+      .withInferFunction(cfg -> 
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+          .or(() -> 
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
+      .sinceVersion("1.0.0")
+      .withDocumentation(String.join(" ", "Specify the partitions fields to 
index on aws glue. Separate the fields by semicolon.",
+          "By default, when the feature is enabled, all the partition will be 
indexed.",
+          "You can create up to three indexes, separate them by comma. Eg: 
col1;col2;col3,col2,col3"));
 }

Reply via email to