This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 21b903fddb [HUDI-4197] Fix Async indexer to support building FILES 
partition (#5766)
21b903fddb is described below

commit 21b903fddbc0522909ab924e0d774b75902826f6
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Jun 6 15:47:11 2022 -0400

    [HUDI-4197] Fix Async indexer to support building FILES partition (#5766)
    
    - When async indexer is invoked only with "FILES" partition, it fails. 
Fixing it to work with Async indexer. Also, if metadata table itself is not 
initialized, and if someone is looking to build indexes via AsyncIndexer, first 
they are expected to index "FILES" partition followed by other partitions. In 
general, we have a limitation of building only one index at a time w/ 
AsyncIndexer and hence. Have added guards to ensure these conditions are met.
---
 .../table/action/index/RunIndexActionExecutor.java | 78 ++++++++++-------
 .../action/index/ScheduleIndexActionExecutor.java  |  5 +-
 .../org/apache/hudi/utilities/HoodieIndexer.java   | 11 ++-
 .../apache/hudi/utilities/TestHoodieIndexer.java   | 99 ++++++++++++++++++----
 4 files changed, 144 insertions(+), 49 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 339e95b9e0..182cf94504 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -51,6 +51,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
@@ -120,45 +121,64 @@ public class RunIndexActionExecutor<T extends 
HoodieRecordPayload, I, K, O> exte
       if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
         throw new HoodieIndexException(String.format("No partitions to index 
for instant: %s", instantTime));
       }
+      boolean firstTimeInitializingMetadataTable = false;
+      HoodieIndexPartitionInfo fileIndexPartitionInfo = null;
+      if (indexPartitionInfos.size() == 1 && 
indexPartitionInfos.get(0).getMetadataPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath()))
 {
+        firstTimeInitializingMetadataTable = true;
+        fileIndexPartitionInfo = indexPartitionInfos.get(0);
+      }
       // ensure the metadata partitions for the requested indexes are not 
already available (or inflight)
       Set<String> indexesInflightOrCompleted = 
getInflightAndCompletedMetadataPartitions(table.getMetaClient().getTableConfig());
       Set<String> requestedPartitions = indexPartitionInfos.stream()
           
.map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet());
       requestedPartitions.retainAll(indexesInflightOrCompleted);
-      if (!requestedPartitions.isEmpty()) {
+      if (!firstTimeInitializingMetadataTable && 
!requestedPartitions.isEmpty()) {
         throw new HoodieIndexException(String.format("Following partitions 
already exist or inflight: %s", requestedPartitions));
       }
 
       // transition requested indexInstant to inflight
       
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, 
Option.empty());
-      // start indexing for each partition
-      HoodieTableMetadataWriter metadataWriter = 
table.getMetadataWriter(instantTime)
-          .orElseThrow(() -> new HoodieIndexException(String.format("Could not 
get metadata writer to run index action for instant: %s", instantTime)));
-      // this will only build index upto base instant as generated by the 
plan, we will be doing catchup later
-      String indexUptoInstant = 
indexPartitionInfos.get(0).getIndexUptoInstant();
-      LOG.info("Starting Index Building with base instant: " + 
indexUptoInstant);
-      metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
-
-      // get remaining instants to catchup
-      List<HoodieInstant> instantsToCatchup = 
getInstantsToCatchup(indexUptoInstant);
-      LOG.info("Total remaining instants to index: " + 
instantsToCatchup.size());
-
-      // reconcile with metadata table timeline
-      String metadataBasePath = 
getMetadataTableBasePath(table.getMetaClient().getBasePath());
-      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
-      Set<String> metadataCompletedTimestamps = 
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, 
metadataMetaClient).stream()
-          .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
-
-      // index catchup for all remaining instants with a timeout
-      currentCaughtupInstant = indexUptoInstant;
-      catchupWithInflightWriters(metadataWriter, instantsToCatchup, 
metadataMetaClient, metadataCompletedTimestamps);
-      // save index commit metadata and update table config
-      List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = 
indexPartitionInfos.stream()
-          .map(info -> new HoodieIndexPartitionInfo(
-              info.getVersion(),
-              info.getMetadataPartitionPath(),
-              currentCaughtupInstant))
-          .collect(Collectors.toList());
+      List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = null;
+      if (!firstTimeInitializingMetadataTable) {
+        // start indexing for each partition
+        HoodieTableMetadataWriter metadataWriter = 
table.getMetadataWriter(instantTime)
+            .orElseThrow(() -> new HoodieIndexException(String.format("Could 
not get metadata writer to run index action for instant: %s", instantTime)));
+        // this will only build index upto base instant as generated by the 
plan, we will be doing catchup later
+        String indexUptoInstant = 
indexPartitionInfos.get(0).getIndexUptoInstant();
+        LOG.info("Starting Index Building with base instant: " + 
indexUptoInstant);
+        metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
+
+        // get remaining instants to catchup
+        List<HoodieInstant> instantsToCatchup = 
getInstantsToCatchup(indexUptoInstant);
+        LOG.info("Total remaining instants to index: " + 
instantsToCatchup.size());
+
+        // reconcile with metadata table timeline
+        String metadataBasePath = 
getMetadataTableBasePath(table.getMetaClient().getBasePath());
+        HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+        Set<String> metadataCompletedTimestamps = 
getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, 
metadataMetaClient).stream()
+            .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
+        // index catchup for all remaining instants with a timeout
+        currentCaughtupInstant = indexUptoInstant;
+        catchupWithInflightWriters(metadataWriter, instantsToCatchup, 
metadataMetaClient, metadataCompletedTimestamps);
+        // save index commit metadata and update table config
+        finalIndexPartitionInfos = indexPartitionInfos.stream()
+            .map(info -> new HoodieIndexPartitionInfo(
+                info.getVersion(),
+                info.getMetadataPartitionPath(),
+                currentCaughtupInstant))
+            .collect(Collectors.toList());
+      } else {
+        String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant();
+        // save index commit metadata and update table config
+        finalIndexPartitionInfos = 
Collections.singletonList(fileIndexPartitionInfo).stream()
+            .map(info -> new HoodieIndexPartitionInfo(
+                info.getVersion(),
+                info.getMetadataPartitionPath(),
+                indexUptoInstant))
+            .collect(Collectors.toList());
+      }
+
       HoodieIndexCommitMetadata indexCommitMetadata = 
HoodieIndexCommitMetadata.newBuilder()
           
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build();
       updateTableConfigAndTimeline(indexInstant, finalIndexPartitionInfos, 
indexCommitMetadata);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
index cfd975c509..c306f4f6f0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
@@ -105,7 +105,10 @@ public class ScheduleIndexActionExecutor<T extends 
HoodieRecordPayload, I, K, O>
         // in case FILES partition itself was not initialized before (i.e. 
metadata was never enabled), this will initialize synchronously
         HoodieTableMetadataWriter metadataWriter = 
table.getMetadataWriter(instantTime)
             .orElseThrow(() -> new HoodieIndexException(String.format("Could 
not get metadata writer to initialize filegroups for indexing for instant: %s", 
instantTime)));
-        metadataWriter.initializeMetadataPartitions(table.getMetaClient(), 
finalPartitionsToIndex, indexInstant.getTimestamp());
+        if 
(!finalPartitionsToIndex.get(0).getPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath()))
 {
+          // initialize metadata partition only if not for FILES partition.
+          metadataWriter.initializeMetadataPartitions(table.getMetaClient(), 
finalPartitionsToIndex, indexInstant.getTimestamp());
+        }
 
         // for each partitionToIndex add that time to the plan
         List<HoodieIndexPartitionInfo> indexPartitionInfos = 
finalPartitionsToIndex.stream()
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
index 96f6ce38cd..6f78487cce 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.metadata.MetadataPartitionType;
 
@@ -228,6 +229,9 @@ public class HoodieIndexer {
   private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> 
client) {
     List<MetadataPartitionType> partitionTypes = 
getRequestedPartitionTypes(cfg.indexTypes);
     checkArgument(partitionTypes.size() == 1, "Currently, only one index type 
can be scheduled at a time.");
+    if (!isMetadataInitialized() && 
!partitionTypes.contains(MetadataPartitionType.FILES)) {
+      throw new HoodieException("Metadata table is not yet initialized. 
Initialize FILES partition before any other partition " + 
Arrays.toString(partitionTypes.toArray()));
+    }
     if (indexExists(partitionTypes)) {
       return Option.empty();
     }
@@ -249,6 +253,11 @@ public class HoodieIndexer {
     return false;
   }
 
+  private boolean isMetadataInitialized() {
+    Set<String> indexedMetadataPartitions = 
getCompletedMetadataPartitions(metaClient.getTableConfig());
+    return !indexedMetadataPartitions.isEmpty();
+  }
+
   private int runIndexing(JavaSparkContext jsc) throws Exception {
     String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
     try (SparkRDDWriteClient<HoodieRecordPayload> client = 
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, 
Option.empty(), props)) {
@@ -318,8 +327,6 @@ public class HoodieIndexer {
     List<String> requestedIndexTypes = Arrays.asList(indexTypes.split(","));
     return requestedIndexTypes.stream()
         .map(p -> MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)))
-        // FILES partition is initialized synchronously while getting metadata 
writer
-        .filter(p -> !MetadataPartitionType.FILES.equals(p))
         .collect(Collectors.toList());
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 9312a26b4f..0f57e3b1b8 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -49,8 +49,10 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 import static org.apache.hudi.common.table.HoodieTableMetaClient.reload;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
@@ -106,7 +108,7 @@ public class TestHoodieIndexer extends 
HoodieCommonTestHarness implements SparkP
     config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS";
     HoodieIndexer indexer = new HoodieIndexer(jsc, config);
     List<MetadataPartitionType> partitionTypes = 
indexer.getRequestedPartitionTypes(config.indexTypes);
-    assertFalse(partitionTypes.contains(FILES));
+    assertTrue(partitionTypes.contains(FILES));
     assertTrue(partitionTypes.contains(BLOOM_FILTERS));
     assertTrue(partitionTypes.contains(COLUMN_STATS));
   }
@@ -134,11 +136,75 @@ public class TestHoodieIndexer extends 
HoodieCommonTestHarness implements SparkP
   @Test
   public void testIndexerWithNotAllIndexesEnabled() {
     initTestDataGenerator();
-    String tableName = "indexer_test";
-    HoodieWriteConfig.Builder writeConfigBuilder = 
getWriteConfigBuilder(basePath, tableName);
+    tableName = "indexer_test";
     // enable files and bloom_filters on the regular write client
     HoodieMetadataConfig.Builder metadataConfigBuilder = 
getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
-    HoodieWriteConfig writeConfig = 
writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
+    initializeWriteClient(metadataConfigBuilder.build());
+
+    // validate table config
+    
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
+    
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
+
+    // build indexer config which has only column_stats enabled (files and 
bloom filter is already enabled)
+    indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new 
MetadataPartitionType[]{FILES, BLOOM_FILTERS}), Collections.emptyList());
+  }
+
+  @Test
+  public void testIndexerWithFilesPartition() {
+    initTestDataGenerator();
+    tableName = "indexer_test";
+    // enable files and bloom_filters on the regular write client
+    HoodieMetadataConfig.Builder metadataConfigBuilder = 
getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true);
+    initializeWriteClient(metadataConfigBuilder.build());
+
+    // validate table config
+    
assertFalse(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
+
+    // build indexer config which has only files enabled
+    indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), 
Arrays.asList(new MetadataPartitionType[]{COLUMN_STATS, BLOOM_FILTERS}));
+  }
+
+  /**
+   * If first time indexing is done for any other partition other than FILES 
partition, exception will be thrown, given metadata table is not initialized in 
synchronous code path
+   * with regular writers.
+   */
+  @Test
+  public void testIndexerForExceptionWithNonFilesPartition() {
+    initTestDataGenerator();
+    tableName = "indexer_test";
+    // enable files and bloom_filters on the regular write client
+    HoodieMetadataConfig.Builder metadataConfigBuilder = 
getMetadataConfigBuilder(false, false);
+    initializeWriteClient(metadataConfigBuilder.build());
+    // validate table config
+    
assertFalse(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
+
+    // build indexer config which has only column stats enabled. expected to 
throw exception.
+    HoodieIndexer.Config config = new HoodieIndexer.Config();
+    String propsPath = 
Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
+    config.basePath = basePath;
+    config.tableName = tableName;
+    config.indexTypes = COLUMN_STATS.name();
+    config.runningMode = SCHEDULE_AND_EXECUTE;
+    config.propsFilePath = propsPath;
+    // start the indexer and validate index building fails
+    HoodieIndexer indexer = new HoodieIndexer(jsc, config);
+    assertEquals(-1, indexer.start(0));
+
+    // validate table config
+    metaClient = reload(metaClient);
+    
assertFalse(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(FILES.getPartitionPath()));
+    
assertFalse(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
+    
assertFalse(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
+    // validate metadata partitions actually exist
+    assertFalse(metadataPartitionExists(basePath, context, FILES));
+
+    // trigger FILES partition and indexing should succeed.
+    indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), 
Arrays.asList(new MetadataPartitionType[]{COLUMN_STATS, BLOOM_FILTERS}));
+  }
+
+  private void initializeWriteClient(HoodieMetadataConfig metadataConfig) {
+    HoodieWriteConfig.Builder writeConfigBuilder = 
getWriteConfigBuilder(basePath, tableName);
+    HoodieWriteConfig writeConfig = 
writeConfigBuilder.withMetadataConfig(metadataConfig).build();
     // do one upsert with synchronous metadata update
     SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, 
writeConfig);
     String instant = "0001";
@@ -147,31 +213,30 @@ public class TestHoodieIndexer extends 
HoodieCommonTestHarness implements SparkP
     JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 
1), instant);
     List<WriteStatus> statuses = result.collect();
     assertNoWriteErrors(statuses);
+  }
 
-    // validate table config
-    
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
-    
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
-
-    // build indexer config which has only column_stats enabled (files is 
enabled by default)
+  private void indexMetadataPartitionsAndAssert(MetadataPartitionType 
partitionTypeToIndex, List<MetadataPartitionType> alreadyCompletedPartitions, 
List<MetadataPartitionType> nonExistantPartitions) {
     HoodieIndexer.Config config = new HoodieIndexer.Config();
     String propsPath = 
Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
     config.basePath = basePath;
     config.tableName = tableName;
-    config.indexTypes = COLUMN_STATS.name();
+    config.indexTypes = partitionTypeToIndex.name();
     config.runningMode = SCHEDULE_AND_EXECUTE;
     config.propsFilePath = propsPath;
-    // start the indexer and validate column_stats index is also complete
+    // start the indexer and validate files index is completely built out
     HoodieIndexer indexer = new HoodieIndexer(jsc, config);
     assertEquals(0, indexer.start(0));
 
     // validate table config
-    
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
-    
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
-    
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
+    metaClient = reload(metaClient);
+    Set<String> completedPartitions = 
getCompletedMetadataPartitions(metaClient.getTableConfig());
+    
assertTrue(completedPartitions.contains(partitionTypeToIndex.getPartitionPath()));
+    alreadyCompletedPartitions.forEach(entry -> 
assertTrue(completedPartitions.contains(entry.getPartitionPath())));
+    nonExistantPartitions.forEach(entry -> 
assertFalse(completedPartitions.contains(entry.getPartitionPath())));
+
     // validate metadata partitions actually exist
-    assertTrue(metadataPartitionExists(basePath, context, FILES));
-    assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS));
-    assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
+    assertTrue(metadataPartitionExists(basePath, context, 
partitionTypeToIndex));
+    alreadyCompletedPartitions.forEach(entry -> 
assertTrue(metadataPartitionExists(basePath, context, entry)));
   }
 
   @Test

Reply via email to