This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1cd5da87344 [HUDI-6332] Fixing async indexer tests (#9044)
1cd5da87344 is described below
commit 1cd5da8734496f24f3d7ef7707bb595ab2769f38
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jun 28 02:05:28 2023 -0400
[HUDI-6332] Fixing async indexer tests (#9044)
---
.../hudi/metadata/HoodieBackedTableMetadataWriter.java | 5 +++++
.../hudi/table/action/index/RunIndexActionExecutor.java | 4 ++++
.../org/apache/hudi/utilities/TestHoodieIndexer.java | 10 +++-------
.../utilities/deltastreamer/TestHoodieDeltaStreamer.java | 16 +++++++++++-----
4 files changed, 23 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f9d4514c7e3..f85fce263b1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -89,6 +89,7 @@ import java.util.stream.IntStream;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
@@ -423,6 +424,10 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
* @return a unique timestamp for MDT
*/
private String generateUniqueCommitInstantTime(String initializationTime) {
+ // if its initialized via Async indexer, we don't need to alter the init
time
+ if (initializationTime.length() == MILLIS_INSTANT_ID_LENGTH +
METADATA_INDEXER_TIME_SUFFIX.length()) {
+ return initializationTime;
+ }
// Add suffix to initializationTime to find an unused instant time for the
next index initialization.
// This function would be called multiple times in a single application if
multiple indexes are being
// initialized one after the other.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 1e1a53942e4..7ccfe163a21 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -167,6 +167,10 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
} else {
String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant();
// save index commit metadata and update table config
+ // instantiation of metadata writer will automatically instantiate the
partitions.
+ table.getIndexingMetadataWriter(instantTime)
+ .orElseThrow(() -> new HoodieIndexException(String.format(
+ "Could not get metadata writer to run index action for
instant: %s", instantTime)));
finalIndexPartitionInfos =
Collections.singletonList(fileIndexPartitionInfo).stream()
.map(info -> new HoodieIndexPartitionInfo(
info.getVersion(),
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index c46383a1c88..751a6ddf1ee 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -48,7 +48,6 @@ import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -145,7 +144,6 @@ public class TestHoodieIndexer extends
SparkClientFunctionalTestHarness implemen
}
@Test
- @Disabled("HUDI-6332") // Investigate and fix async indexer colstats index
initialization
public void testIndexerWithFilesPartition() {
String tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
@@ -160,7 +158,6 @@ public class TestHoodieIndexer extends
SparkClientFunctionalTestHarness implemen
}
@Test
- @Disabled("HUDI-6332") // Investigate and fix async indexer colstats index
initialization
public void testIndexerWithWriterFinishingFirst() throws IOException {
// Test the case where the indexer is running, i.e., the delta commit in
the metadata table
// is inflight, while the regular writer is updating metadata table.
@@ -208,7 +205,8 @@ public class TestHoodieIndexer extends
SparkClientFunctionalTestHarness implemen
CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.defaultValue());
heartbeatClient.start(mdtCommitTime);
- upsertToTable(metadataConfig, tableName);
+ HoodieMetadataConfig metadataConfigColStats =
getMetadataConfigBuilder(true,
false).withMetadataIndexBloomFilter(true).withMetadataIndexColumnStats(true).build();
+ upsertToTable(metadataConfigColStats, tableName);
metaClient = reload(metaClient);
metadataMetaClient = reload(metadataMetaClient);
// The delta commit from async indexer in metadata table should not be
rolled back
@@ -217,7 +215,7 @@ public class TestHoodieIndexer extends
SparkClientFunctionalTestHarness implemen
// Simulate heartbeat timeout
heartbeatClient.stop(mdtCommitTime);
- upsertToTable(metadataConfig, tableName);
+ upsertToTable(metadataConfigColStats, tableName);
metaClient = reload(metaClient);
metadataMetaClient = reload(metadataMetaClient);
// The delta commit from async indexer in metadata table should be rolled
back now
@@ -303,7 +301,6 @@ public class TestHoodieIndexer extends
SparkClientFunctionalTestHarness implemen
@ParameterizedTest
@MethodSource("colStatsFileGroupCountParams")
- @Disabled("HUDI-6332") // Investigate and fix async indexer colstats index
initialization
public void testColStatsFileGroupCount(int colStatsFileGroupCount) {
TestHoodieIndexer.colStatsFileGroupCount = colStatsFileGroupCount;
String tableName = "indexer_test";
@@ -332,7 +329,6 @@ public class TestHoodieIndexer extends
SparkClientFunctionalTestHarness implemen
* with regular writers.
*/
@Test
- @Disabled("HUDI-6332") // Investigate and fix async indexer colstats index
initialization
public void testIndexerForExceptionWithNonFilesPartition() {
String tableName = "indexer_test";
// enable files and bloom_filters on the regular write client
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 3b7ecf69510..c365aea0049 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -185,13 +185,19 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String
tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType
recordType,
-
WriteOperationType writeOperationType) throws IOException {
+ WriteOperationType
writeOperationType) throws IOException {
+ return initialHoodieDeltaStreamer(tableBasePath, totalRecords,
asyncCluster, recordType, writeOperationType, Collections.emptySet());
+ }
+
+ protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String
tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType
recordType,
+
WriteOperationType writeOperationType, Set<String> customConfigs) throws
IOException {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
writeOperationType);
TestHelpers.addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
asyncCluster, ""));
cfg.configs.addAll(getAllMultiWriterConfigs());
+ customConfigs.forEach(config -> cfg.configs.add(config));
return new HoodieDeltaStreamer(cfg, jsc);
}
@@ -1257,12 +1263,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
return config;
}
- //@ParameterizedTest
- //@EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
- @Disabled("HUDI-6332")
+ @ParameterizedTest
+ @EnumSource(value = HoodieRecordType.class, names = {"AVRO","SPARK"})
public void testHoodieIndexer(HoodieRecordType recordType) throws Exception {
String tableBasePath = basePath + "/asyncindexer";
- HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 1000,
"false", recordType);
+ HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 1000,
"false", recordType, WriteOperationType.INSERT,
+
Collections.singleton(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()
+ "=true"));
deltaStreamerTestRunner(ds, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);