This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cd5da87344 [HUDI-6332] Fixing async indexer tests (#9044)
1cd5da87344 is described below

commit 1cd5da8734496f24f3d7ef7707bb595ab2769f38
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jun 28 02:05:28 2023 -0400

    [HUDI-6332] Fixing async indexer tests (#9044)
---
 .../hudi/metadata/HoodieBackedTableMetadataWriter.java   |  5 +++++
 .../hudi/table/action/index/RunIndexActionExecutor.java  |  4 ++++
 .../org/apache/hudi/utilities/TestHoodieIndexer.java     | 10 +++-------
 .../utilities/deltastreamer/TestHoodieDeltaStreamer.java | 16 +++++++++++-----
 4 files changed, 23 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f9d4514c7e3..f85fce263b1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -89,6 +89,7 @@ import java.util.stream.IntStream;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
@@ -423,6 +424,10 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
    * @return a unique timestamp for MDT
    */
   private String generateUniqueCommitInstantTime(String initializationTime) {
+    // if its initialized via Async indexer, we don't need to alter the init 
time
+    if (initializationTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()) {
+      return initializationTime;
+    }
     // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
     // This function would be called multiple times in a single application if 
multiple indexes are being
     // initialized one after the other.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 1e1a53942e4..7ccfe163a21 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -167,6 +167,10 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
       } else {
         String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant();
         // save index commit metadata and update table config
+        // instantiation of metadata writer will automatically instantiate the 
partitions.
+        table.getIndexingMetadataWriter(instantTime)
+            .orElseThrow(() -> new HoodieIndexException(String.format(
+                "Could not get metadata writer to run index action for 
instant: %s", instantTime)));
         finalIndexPartitionInfos = 
Collections.singletonList(fileIndexPartitionInfo).stream()
             .map(info -> new HoodieIndexPartitionInfo(
                 info.getVersion(),
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index c46383a1c88..751a6ddf1ee 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -48,7 +48,6 @@ import org.apache.hudi.testutils.providers.SparkProvider;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -145,7 +144,6 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
   }
 
   @Test
-  @Disabled("HUDI-6332") // Investigate and fix async indexer colstats index 
initialization
   public void testIndexerWithFilesPartition() {
     String tableName = "indexer_test";
     // enable files and bloom_filters on the regular write client
@@ -160,7 +158,6 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
   }
 
   @Test
-  @Disabled("HUDI-6332") // Investigate and fix async indexer colstats index 
initialization
   public void testIndexerWithWriterFinishingFirst() throws IOException {
     // Test the case where the indexer is running, i.e., the delta commit in 
the metadata table
     // is inflight, while the regular writer is updating metadata table.
@@ -208,7 +205,8 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
         CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.defaultValue());
     heartbeatClient.start(mdtCommitTime);
 
-    upsertToTable(metadataConfig, tableName);
+    HoodieMetadataConfig metadataConfigColStats = 
getMetadataConfigBuilder(true, 
false).withMetadataIndexBloomFilter(true).withMetadataIndexColumnStats(true).build();
+    upsertToTable(metadataConfigColStats, tableName);
     metaClient = reload(metaClient);
     metadataMetaClient = reload(metadataMetaClient);
     // The delta commit from async indexer in metadata table should not be 
rolled back
@@ -217,7 +215,7 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
 
     // Simulate heartbeat timeout
     heartbeatClient.stop(mdtCommitTime);
-    upsertToTable(metadataConfig, tableName);
+    upsertToTable(metadataConfigColStats, tableName);
     metaClient = reload(metaClient);
     metadataMetaClient = reload(metadataMetaClient);
     // The delta commit from async indexer in metadata table should be rolled 
back now
@@ -303,7 +301,6 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
 
   @ParameterizedTest
   @MethodSource("colStatsFileGroupCountParams")
-  @Disabled("HUDI-6332") // Investigate and fix async indexer colstats index 
initialization
   public void testColStatsFileGroupCount(int colStatsFileGroupCount) {
     TestHoodieIndexer.colStatsFileGroupCount = colStatsFileGroupCount;
     String tableName = "indexer_test";
@@ -332,7 +329,6 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
    * with regular writers.
    */
   @Test
-  @Disabled("HUDI-6332") // Investigate and fix async indexer colstats index 
initialization
   public void testIndexerForExceptionWithNonFilesPartition() {
     String tableName = "indexer_test";
     // enable files and bloom_filters on the regular write client
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 3b7ecf69510..c365aea0049 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -185,13 +185,19 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String 
tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType 
recordType,
-                                                             
WriteOperationType writeOperationType) throws IOException {
+                                                           WriteOperationType 
writeOperationType) throws IOException {
+    return initialHoodieDeltaStreamer(tableBasePath, totalRecords, 
asyncCluster, recordType, writeOperationType, Collections.emptySet());
+  }
+
+  protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String 
tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType 
recordType,
+                                                             
WriteOperationType writeOperationType, Set<String> customConfigs) throws 
IOException {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
writeOperationType);
     TestHelpers.addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
asyncCluster, ""));
     cfg.configs.addAll(getAllMultiWriterConfigs());
+    customConfigs.forEach(config -> cfg.configs.add(config));
     return new HoodieDeltaStreamer(cfg, jsc);
   }
 
@@ -1257,12 +1263,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     return config;
   }
 
-  //@ParameterizedTest
-  //@EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
-  @Disabled("HUDI-6332")
+  @ParameterizedTest
+  @EnumSource(value = HoodieRecordType.class, names = {"AVRO","SPARK"})
   public void testHoodieIndexer(HoodieRecordType recordType) throws Exception {
     String tableBasePath = basePath + "/asyncindexer";
-    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 1000, 
"false", recordType);
+    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 1000, 
"false", recordType, WriteOperationType.INSERT,
+        
Collections.singleton(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()
 + "=true"));
 
     deltaStreamerTestRunner(ds, (r) -> {
       TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);

Reply via email to