This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 60ac41489a0 [HUDI-6454] Remove redundant indexing testcases (#9077)
60ac41489a0 is described below

commit 60ac41489a00cf3e560a9cd15f68501c2b807a3d
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jun 28 10:32:42 2023 -0700

    [HUDI-6454] Remove redundant indexing testcases (#9077)
---
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  86 ------
 .../hudi/client/functional/TestHoodieIndex.java    | 308 ++++-----------------
 .../index/bloom/TestHoodieGlobalBloomIndex.java    | 104 +------
 3 files changed, 60 insertions(+), 438 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 8183fa4d830..014bb070436 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -89,7 +89,6 @@ import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieLockConfig;
-import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
@@ -134,7 +133,6 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
@@ -168,7 +166,6 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
 import static 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
-import static 
org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
@@ -1069,89 +1066,6 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
         populateMetaFields);
   }
 
-  /**
-   * Test update of a record to different partition with Global Index.
-   */
-  @ParameterizedTest
-  @EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", 
"GLOBAL_SIMPLE"})
-  public void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType) 
throws Exception {
-    testUpsertsUpdatePartitionPath(indexType, getConfig(), 
SparkRDDWriteClient::upsert);
-  }
-
-  /**
-   * This test ensures in a global bloom when update partition path is set to 
true in config, if an incoming record has mismatched partition
-   * compared to what is in storage, then appropriate actions are taken. i.e. 
old record is deleted in old partition and new one is inserted
-   * in the new partition.
-   * test structure:
-   * 1. insert 1 batch
-   * 2. insert 2nd batch with larger no of records so that a new file group is 
created for partitions
-   * 3. issue upserts to records from batch 1 with different partition path. 
This should ensure records from batch 1 are deleted and new
-   * records are upserted to the new partition
-   *
-   * @param indexType index type to be tested for
-   * @param config instance of {@link HoodieWriteConfig} to use
-   * @param writeFn write function to be used for testing
-   */
-  private void testUpsertsUpdatePartitionPath(IndexType indexType, 
HoodieWriteConfig config,
-      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn)
-      throws Exception {
-    HoodieTableMetaClient.withPropertyBuilder()
-        .fromMetaClient(metaClient)
-        .setPartitionFields("time")
-        .setTimelineLayoutVersion(VERSION_0)
-        .initTable(metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString());
-    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
-        .withProps(config.getProps())
-        .withSchema(RawTripTestPayload.JSON_DATA_SCHEMA_STR)
-        .withPayloadConfig(HoodiePayloadConfig.newBuilder()
-            .withPayloadClass(RawTripTestPayload.class.getName()).build())
-        .withCompactionConfig(
-            
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
-        
.withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(false).build())
-        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
-            .build()).withTimelineLayoutVersion(VERSION_0).build();
-    SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
-
-    // Write 1
-    RawTripTestPayload payload1 = new 
RawTripTestPayload("{\"_row_key\":\"001\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":1}");
-    RawTripTestPayload payload2 = new 
RawTripTestPayload("{\"_row_key\":\"002\",\"time\":\"2017-01-31T00:00:01.000Z\",\"number\":1}");
-    JavaRDD<HoodieRecord> writeRecords1 = 
jsc.parallelize(Arrays.asList(payload1.toHoodieRecord(), 
payload2.toHoodieRecord()), 1);
-    client.startCommitWithTime("001");
-    writeFn.apply(client, writeRecords1, "001").collect();
-
-    // Write 2
-    RawTripTestPayload payload3 = new 
RawTripTestPayload("{\"_row_key\":\"003\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":1}");
-    RawTripTestPayload payload4 = new 
RawTripTestPayload("{\"_row_key\":\"004\",\"time\":\"2017-01-31T00:00:01.000Z\",\"number\":1}");
-    JavaRDD<HoodieRecord> writeRecords2 = 
jsc.parallelize(Arrays.asList(payload3.toHoodieRecord(), 
payload4.toHoodieRecord()), 1);
-    client.startCommitWithTime("002");
-    writeFn.apply(client, writeRecords2, "002").collect();
-
-    // Write 3
-    // update record 001 from partition 2016/01/31 to 2017/01/31
-    // update record 004 from partition 2017/01/31 to 2018/01/31
-    RawTripTestPayload payload1Updated = new 
RawTripTestPayload("{\"_row_key\":\"001\",\"time\":\"2017-01-31T00:00:01.000Z\",\"number\":2}");
-    RawTripTestPayload payload3Updated = new 
RawTripTestPayload("{\"_row_key\":\"004\",\"time\":\"2018-01-31T00:00:01.000Z\",\"number\":2}");
-    JavaRDD<HoodieRecord> writeRecords3 = 
jsc.parallelize(Arrays.asList(payload1Updated.toHoodieRecord(), 
payload3Updated.toHoodieRecord()), 1);
-    client.startCommitWithTime("003");
-    writeFn.apply(client, writeRecords3, "003").collect();
-
-    client.close();
-
-    // Check the entire dataset has all records
-    assertPartitionPathRecordKeys(Arrays.asList(
-            Pair.of("2016/01/31", "003"),
-            Pair.of("2017/01/31", "001"),
-            Pair.of("2017/01/31", "002"),
-            Pair.of("2018/01/31", "004")),
-        getFullPartitionPaths("2016/01/31", "2017/01/31", "2018/01/31"));
-
-    // verify base file counts
-    Map<String, Long> baseFileCounts = getBaseFileCountsForPaths(basePath, fs, 
getFullPartitionPaths("2016/01/31", "2017/01/31", "2018/01/31"));
-    assertEquals(2, baseFileCounts.get(getFullPartitionPath("2016/01/31")));
-    assertEquals(3, baseFileCounts.get(getFullPartitionPath("2017/01/31")));
-    assertEquals(1, baseFileCounts.get(getFullPartitionPath("2018/01/31")));
-  }
-
   private void assertPartitionPathRecordKeys(List<Pair<String, String>> 
expectedPartitionPathRecKeyPairs, String[] fullPartitionPaths) {
     Dataset<Row> rows = getAllRows(fullPartitionPaths);
     List<Pair<String, String>> actualPartitionPathRecKeyPairs = 
getActualPartitionPathAndRecordKeys(rows);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index 963274d4a29..edf4327bfaf 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -51,14 +50,12 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.RawTripTestPayloadKeyGenerator;
 import org.apache.hudi.metadata.HoodieTableMetadata;
-import org.apache.hudi.metadata.HoodieTableMetadataWriter;
-import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
-import org.apache.hudi.testutils.Assertions;
 import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
@@ -70,9 +67,7 @@ import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -82,9 +77,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 import java.util.UUID;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import scala.Tuple2;
@@ -94,10 +87,10 @@ import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPar
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
 import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 @Tag("functional")
 public class TestHoodieIndex extends TestHoodieMetadataBase {
@@ -111,23 +104,17 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
         {IndexType.GLOBAL_BLOOM, true, false},
         {IndexType.SIMPLE, true, true},
         {IndexType.SIMPLE, true, false},
-        {IndexType.SIMPLE, false, true},
-        {IndexType.SIMPLE, false, false},
         {IndexType.GLOBAL_SIMPLE, true, true},
         {IndexType.GLOBAL_SIMPLE, true, false},
-        {IndexType.GLOBAL_SIMPLE, false, true},
-        {IndexType.GLOBAL_SIMPLE, false, false},
-        {IndexType.BUCKET, false, true},
+        {IndexType.BUCKET, true, false},
         {IndexType.BUCKET, false, false},
-        {IndexType.RECORD_INDEX, true, true},
-        {IndexType.RECORD_INDEX, true, false}
+        {IndexType.RECORD_INDEX, true, true}
     };
     return Stream.of(data).map(Arguments::of);
   }
 
   private HoodieIndex index;
   private HoodieWriteConfig config;
-  private final Random random = new Random();
 
   private void setUp(IndexType indexType, boolean populateMetaFields, boolean 
enableMetadataIndex) throws Exception {
     setUp(indexType, populateMetaFields, enableMetadataIndex, true);
@@ -137,10 +124,11 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
     initPath();
     initSparkContexts();
     initFileSystem();
-    metaClient = HoodieTestUtils.init(hadoopConf, basePath, 
HoodieTableType.COPY_ON_WRITE, populateMetaFields ? new Properties()
-        : getPropertiesForKeyGen());
+
+    Properties keyGenProps = getPropsForKeyGen(indexType, populateMetaFields);
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, 
HoodieTableType.COPY_ON_WRITE, keyGenProps);
     HoodieIndexConfig.Builder indexBuilder = 
HoodieIndexConfig.newBuilder().withIndexType(indexType)
-        .fromProperties(populateMetaFields ? new Properties() : 
getPropertiesForKeyGen())
+        .fromProperties(keyGenProps)
         .withIndexType(indexType);
 
     HoodieMetadataConfig.Builder metadataConfigBuilder = 
HoodieMetadataConfig.newBuilder()
@@ -151,7 +139,7 @@ public class TestHoodieIndex extends TestHoodieMetadataBase 
{
     }
 
     config = getConfigBuilder()
-        .withProperties(populateMetaFields ? new Properties() : 
getPropertiesForKeyGen())
+        .withProperties(keyGenProps)
         .withSchema(RawTripTestPayload.JSON_DATA_SCHEMA_STR)
         .withPayloadConfig(HoodiePayloadConfig.newBuilder()
             .withPayloadClass(RawTripTestPayload.class.getName())
@@ -167,16 +155,25 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
     this.index = writeClient.getIndex();
   }
 
-  private Properties getPropertiesForKeyGen() {
+  /**
+   * For {@link KeyGenerator}'s use based on {@link 
HoodieTableConfig#POPULATE_META_FIELDS}.
+   */
+  private Properties getPropsForKeyGen(IndexType indexType, boolean 
populateMetaFields) {
     Properties properties = new Properties();
-    properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
-    properties.put("hoodie.datasource.write.keygenerator.class", 
RawTripTestPayloadKeyGenerator.class.getName());
-    properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
-    properties.put("hoodie.datasource.write.partitionpath.field", "time");
-    properties.put("hoodie.datasource.write.precombine.field", "number");
-    properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
-    properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "time");
-    properties.put(HoodieTableConfig.PRECOMBINE_FIELD.key(), "number");
+    properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), 
String.valueOf(populateMetaFields));
+    if (indexType == IndexType.BUCKET) {
+      properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
+      properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
+    }
+    if (!populateMetaFields) {
+      properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
+      properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
+      properties.put("hoodie.datasource.write.keygenerator.class", 
RawTripTestPayloadKeyGenerator.class.getName());
+      properties.put("hoodie.datasource.write.partitionpath.field", "time");
+      properties.put("hoodie.datasource.write.precombine.field", "number");
+      properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "time");
+      properties.put(HoodieTableConfig.PRECOMBINE_FIELD.key(), "number");
+    }
     return properties;
   }
 
@@ -223,38 +220,44 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
 
   @ParameterizedTest
   @MethodSource("indexTypeParams")
-  public void testSimpleTagLocationAndUpdate(IndexType indexType, boolean 
populateMetaFields, boolean enableMetadataIndex) throws Exception {
+  public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, 
boolean populateMetaFields, boolean enableMetadataIndex) throws Exception {
     setUp(indexType, populateMetaFields, enableMetadataIndex);
     String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     final int totalRecords = 4;
     List<HoodieRecord> records = getInserts();
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<HoodieRecord> writtenRecords = jsc.parallelize(records, 1);
 
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
 
     // Test tagLocation without any entries in index
-    JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writeRecords, 
hoodieTable);
+    JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writtenRecords, 
hoodieTable);
     assertTrue(javaRDD.filter(record -> 
record.isCurrentLocationKnown()).collect().isEmpty());
 
     // Insert totalRecords records
     writeClient.startCommitWithTime(newCommitTime);
-    JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, 
newCommitTime);
-    assertNoWriteErrors(writeStatues.collect());
+    JavaRDD<WriteStatus> writeStatusRdd = writeClient.upsert(writtenRecords, 
newCommitTime);
+    List<WriteStatus> writeStatuses = writeStatusRdd.collect();
+    assertNoWriteErrors(writeStatuses);
+    String[] fileIdsFromWriteStatuses = 
writeStatuses.stream().map(WriteStatus::getFileId)
+        .sorted().toArray(String[]::new);
 
     // Now tagLocation for these records, index should not tag them since it 
was a failed
     // commit
-    javaRDD = tagLocation(index, writeRecords, hoodieTable);
+    javaRDD = tagLocation(index, writtenRecords, hoodieTable);
     assertTrue(javaRDD.filter(record -> 
record.isCurrentLocationKnown()).collect().isEmpty());
     // Now commit this & update location of records inserted and validate no 
errors
-    writeClient.commit(newCommitTime, writeStatues);
+    writeClient.commit(newCommitTime, writeStatusRdd);
     // Now tagLocation for these records, index should tag them correctly
     metaClient = HoodieTableMetaClient.reload(metaClient);
     hoodieTable = HoodieSparkTable.create(config, context, metaClient);
-    javaRDD = tagLocation(index, writeRecords, hoodieTable);
+    javaRDD = tagLocation(index, writtenRecords, hoodieTable);
     Map<String, String> recordKeyToPartitionPathMap = new HashMap();
-    List<HoodieRecord> hoodieRecords = writeRecords.collect();
+    List<HoodieRecord> hoodieRecords = writtenRecords.collect();
     hoodieRecords.forEach(entry -> 
recordKeyToPartitionPathMap.put(entry.getRecordKey(), 
entry.getPartitionPath()));
+    String[] taggedFileIds = javaRDD.map(record -> 
record.getCurrentLocation().getFileId()).distinct().collect()
+        .stream().sorted().toArray(String[]::new);
+    assertArrayEquals(taggedFileIds, fileIdsFromWriteStatuses);
 
     assertEquals(totalRecords, javaRDD.filter(record -> 
record.isCurrentLocationKnown()).collect().size());
     assertEquals(totalRecords, javaRDD.map(record -> 
record.getKey().getRecordKey()).distinct().count());
@@ -262,17 +265,27 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
         && 
record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
     javaRDD.foreach(entry -> 
assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), 
entry.getPartitionPath(), "PartitionPath mismatch"));
 
-    JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry -> 
entry.getKey());
-    JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = 
getRecordLocations(hoodieKeyJavaRDD, hoodieTable);
-    List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
+    JavaRDD<HoodieKey> keysRdd = writtenRecords.map(entry -> entry.getKey());
+    JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = 
getRecordLocations(keysRdd, hoodieTable);
+    List<HoodieKey> keys = keysRdd.collect();
     assertEquals(totalRecords, recordLocations.collect().size());
     assertEquals(totalRecords, recordLocations.map(record -> 
record._1).distinct().count());
-    recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), 
"Missing HoodieKey"));
+    recordLocations.foreach(entry -> assertTrue(keys.contains(entry._1), 
"Missing HoodieKey"));
     recordLocations.foreach(entry -> 
assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), 
entry._1.getPartitionPath(), "PartitionPath mismatch"));
+
+    // Rollback the last commit
+    writeClient.rollback(newCommitTime);
+
+    hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+    // Now tagLocation for these records, hbaseIndex should not tag them since 
it was a rolled
+    // back commit
+    javaRDD = tagLocation(index, writtenRecords, hoodieTable);
+    
assertTrue(javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().isEmpty());
+    assertTrue(javaRDD.filter(record -> record.getCurrentLocation() != 
null).collect().isEmpty());
   }
 
   @Test
-  public void testLookupIndexWithOrWithoutColumnStats() throws Exception {
+  public void testLookupIndexWithAndWithoutColumnStats() throws Exception {
     setUp(IndexType.BLOOM, true, true);
     String newCommitTime = "001";
     final int totalRecords = 4;
@@ -377,61 +390,6 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
     recordLocations.foreach(entry -> 
assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), 
entry._1.getPartitionPath(), "PartitionPath mismatch"));
   }
 
-  @ParameterizedTest
-  @MethodSource("indexTypeParams")
-  public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, 
boolean populateMetaFields, boolean enableMetadataIndex) throws Exception {
-    setUp(indexType, populateMetaFields, enableMetadataIndex, false);
-    String newCommitTime = writeClient.startCommit();
-    final int totalRecords = 4;
-    List<HoodieRecord> records = getInserts();
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-
-    // Insert 200 records
-    JavaRDD<WriteStatus> writeStatusesRDD = writeClient.upsert(writeRecords, 
newCommitTime);
-    // NOTE: This will trigger an actual write
-    List<WriteStatus> writeStatuses = writeStatusesRDD.collect();
-    assertNoWriteErrors(writeStatuses);
-    // Commit
-    writeClient.commit(newCommitTime, jsc.parallelize(writeStatuses));
-
-    List<String> fileIds = 
writeStatuses.stream().map(WriteStatus::getFileId).collect(Collectors.toList());
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
-
-    // Now tagLocation for these records, hbaseIndex should tag them
-    JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writeRecords, 
hoodieTable);
-    assertEquals(totalRecords, 
javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
-
-    // check tagged records are tagged with correct fileIds
-    assertEquals(0, javaRDD.filter(record -> 
record.getCurrentLocation().getFileId() == null).collect().size());
-    List<String> taggedFileIds = javaRDD.map(record -> 
record.getCurrentLocation().getFileId()).distinct().collect();
-
-    Map<String, String> recordKeyToPartitionPathMap = new HashMap();
-    List<HoodieRecord> hoodieRecords = writeRecords.collect();
-    hoodieRecords.forEach(entry -> 
recordKeyToPartitionPathMap.put(entry.getRecordKey(), 
entry.getPartitionPath()));
-
-    JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry -> 
entry.getKey());
-    JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = 
getRecordLocations(hoodieKeyJavaRDD, hoodieTable);
-    List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
-    assertEquals(totalRecords, recordLocations.collect().size());
-    assertEquals(totalRecords, recordLocations.map(record -> 
record._1).distinct().count());
-    recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), 
"Missing HoodieKey"));
-    recordLocations.foreach(entry -> 
assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), 
entry._1.getPartitionPath(), "PartitionPath mismatch"));
-
-    // both lists should match
-    assertTrue(taggedFileIds.containsAll(fileIds) && 
fileIds.containsAll(taggedFileIds));
-    // Rollback the last commit
-    writeClient.rollback(newCommitTime);
-
-    hoodieTable = HoodieSparkTable.create(config, context, metaClient);
-    // Now tagLocation for these records, hbaseIndex should not tag them since 
it was a rolled
-    // back commit
-    javaRDD = tagLocation(index, writeRecords, hoodieTable);
-    
assertTrue(javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().isEmpty());
-    assertTrue(javaRDD.filter(record -> record.getCurrentLocation() != 
null).collect().isEmpty());
-  }
-
   private static Stream<Arguments> regularIndexTypeParams() {
     // IndexType, populateMetaFields, enableMetadataIndex
     Object[][] data = new Object[][] {
@@ -543,89 +501,6 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
     }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", 
"GLOBAL_SIMPLE"})
-  public void 
testSimpleGlobalIndexTagLocationWhenShouldUpdatePartitionPath(IndexType 
indexType) throws Exception {
-    setUp(indexType, true, true);
-    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
-    HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(
-        writeClient.getEngineContext().getHadoopConf().get(), config, 
writeClient.getEngineContext());
-    HoodieSparkWriteableTestTable testTable = 
HoodieSparkWriteableTestTable.of(hoodieTable.getMetaClient(),
-        addMetadataFields(RawTripTestPayload.JSON_DATA_SCHEMA), 
metadataWriter);
-
-    final String p1 = "2016/01/31";
-    final String p2 = "2016/02/28";
-
-    // Create the original partition, and put a record, along with the meta 
file
-    // "2016/01/31": 1 file (1_0_20160131101010.parquet)
-    // this record will be saved in table and will be tagged to an empty record
-    RawTripTestPayload originalPayload =
-        new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
-    HoodieRecord originalRecord = originalPayload.toHoodieRecord();
-
-    /*
-    This record has the same record key as originalRecord but different time 
so different partition
-    Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
-    globalBloomIndex should
-    - tag the original partition of the originalRecord to an empty record for 
deletion, and
-    - tag the new partition of the incomingRecord
-    */
-    RawTripTestPayload incomingPayload =
-        new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-28T03:16:41.415Z\",\"number\":12}");
-    HoodieRecord incomingRecord = incomingPayload.toHoodieRecord();
-    /*
-    This record has the same record key as originalRecord and the same 
partition
-    Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
-    globalBloomIndex should just tag the original partition
-    */
-    RawTripTestPayload incomingPayloadSamePartition =
-        new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}");
-    HoodieRecord incomingRecordSamePartition = 
incomingPayloadSamePartition.toHoodieRecord();
-
-    final String file1P1C0 = UUID.randomUUID().toString();
-    Map<String, List<Pair<String, Integer>>> c1PartitionToFilesNameLengthMap = 
new HashMap<>();
-    // We have some records to be tagged (two different partitions)
-    Path baseFilePath = testTable.forCommit("1000").withInserts(p1, file1P1C0, 
Collections.singletonList(originalRecord));
-    long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
-    c1PartitionToFilesNameLengthMap.put(p1, 
Collections.singletonList(Pair.of(file1P1C0, Integer.valueOf((int) 
baseFileLength))));
-    testTable.doWriteOperation("1000", WriteOperationType.INSERT, 
Arrays.asList(p1),
-        c1PartitionToFilesNameLengthMap, false, false);
-
-    // We have some records to be tagged (two different partitions)
-    testTable.withInserts(p1, file1P1C0, originalRecord);
-
-    // test against incoming record with a different partition
-    JavaRDD<HoodieRecord> recordRDD = 
jsc.parallelize(Collections.singletonList(incomingRecord));
-    JavaRDD<HoodieRecord> taggedRecordRDD = tagLocation(index, recordRDD, 
hoodieTable);
-
-    assertEquals(2, taggedRecordRDD.count());
-    for (HoodieRecord record : taggedRecordRDD.collect()) {
-      switch (record.getPartitionPath()) {
-        case p1:
-          assertEquals("000", record.getRecordKey());
-          assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
-          break;
-        case p2:
-          assertEquals("000", record.getRecordKey());
-          assertEquals(incomingPayload.getJsonDataAsMap(), 
((RawTripTestPayload) record.getData()).getJsonDataAsMap());
-          break;
-        default:
-          fail(String.format("Should not get partition path: %s", 
record.getPartitionPath()));
-      }
-    }
-
-    // test against incoming record with the same partition
-    JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
-        .parallelize(Collections.singletonList(incomingRecordSamePartition));
-    JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = tagLocation(index, 
recordRDDSamePartition, hoodieTable);
-
-    assertEquals(1, taggedRecordRDDSamePartition.count());
-    HoodieRecord record = taggedRecordRDDSamePartition.first();
-    assertEquals("000", record.getRecordKey());
-    assertEquals(p1, record.getPartitionPath());
-    assertEquals(incomingPayloadSamePartition.getJsonDataAsMap(), 
((RawTripTestPayload) record.getData()).getJsonDataAsMap());
-  }
-
   @Test
   public void testCheckIfValidCommit() throws Exception {
     setUp(IndexType.BLOOM, true, false);
@@ -680,75 +555,6 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
     assertFalse(HoodieIndexUtils.checkIfValidCommit(timeline, 
instantTimestampSec));
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = {true})
-  public void testRecordIndexTagLocationAndUpdate(boolean populateMetaFields) 
throws Exception {
-    setUp(IndexType.RECORD_INDEX, populateMetaFields, false);
-    String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-    //    int initialRecords = 10 + random.nextInt(20);
-    List<HoodieRecord> records = getInserts();
-    int initialRecords = records.size();
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
-
-    // Test tagLocation without any entries in index
-    JavaRDD<HoodieRecord> javaRDD = tagLocation(hoodieTable.getIndex(), 
writeRecords, hoodieTable);
-    assert (javaRDD.filter(record -> 
record.isCurrentLocationKnown()).collect().size() == 0);
-
-    // Insert totalRecords records
-    writeClient.startCommitWithTime(newCommitTime);
-    JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, 
newCommitTime);
-    Assertions.assertNoWriteErrors(writeStatues.collect());
-
-    // Now tagLocation for these records, index should not tag them since it 
was a failed
-    // commit
-    javaRDD = tagLocation(hoodieTable.getIndex(), writeRecords, hoodieTable);
-    assert (javaRDD.filter(record -> 
record.isCurrentLocationKnown()).collect().size() == 0);
-    // Now commit this & update location of records inserted and validate no 
errors
-    writeClient.commit(newCommitTime, writeStatues);
-
-    // Create new commit time.
-    String secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
-    // Now tagLocation for these records, index should tag them correctly
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    hoodieTable = HoodieSparkTable.create(config, context, metaClient);
-
-    // Generate updates for all existing records.
-    List<HoodieRecord> newRecords = getUpdates();
-    // int newInsertsCount = 10;
-    List<HoodieRecord> newInserts = getInsertsBatch2();
-    int newInsertsCount = newInserts.size();
-    newRecords.addAll(newInserts);
-    // Update partitionPath information.
-    String newPartitionPath = "2022/11/04";
-    newRecords = newRecords.stream()
-        .map(rec -> new HoodieAvroRecord(new HoodieKey(rec.getRecordKey(), 
newPartitionPath), (HoodieRecordPayload) rec.getData()))
-        .collect(Collectors.toList());
-    JavaRDD<HoodieRecord> newWriteRecords = jsc.parallelize(newRecords, 1);
-
-    javaRDD = tagLocation(hoodieTable.getIndex(), newWriteRecords, 
hoodieTable);
-    Map<String, String> recordKeyToPartitionPathMap = new HashMap();
-    List<HoodieRecord> hoodieRecords = newWriteRecords.collect();
-    hoodieRecords.forEach(entry -> 
recordKeyToPartitionPathMap.put(entry.getRecordKey(), 
entry.getPartitionPath()));
-
-    assertEquals(initialRecords, javaRDD.filter(record -> 
record.isCurrentLocationKnown()).collect().size());
-    assertEquals(initialRecords + newInsertsCount, javaRDD.map(record -> 
record.getKey().getRecordKey()).distinct().count());
-    assertEquals(initialRecords, javaRDD.filter(record -> 
(record.getCurrentLocation() != null
-        && 
record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
-    assertEquals(newInsertsCount, javaRDD.filter(record -> 
record.getKey().getPartitionPath().equalsIgnoreCase(newPartitionPath))
-        .count(), "PartitionPath mismatch");
-
-    JavaRDD<HoodieKey> hoodieKeyJavaRDD = newWriteRecords.map(entry -> 
entry.getKey());
-    JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = 
getRecordLocations(hoodieKeyJavaRDD, hoodieTable);
-    List<String> hoodieRecordKeys = hoodieKeyJavaRDD.map(key -> 
key.getRecordKey()).collect();
-    assertEquals(initialRecords + newInsertsCount, 
recordLocations.collect().size());
-    assertEquals(initialRecords + newInsertsCount, recordLocations.map(record 
-> record._1).distinct().count());
-    recordLocations.foreach(entry -> 
assertTrue(hoodieRecordKeys.contains(entry._1.getRecordKey()), "Missing 
HoodieRecordKey"));
-    assertEquals(newInsertsCount, recordLocations.filter(entry -> 
newPartitionPath.equalsIgnoreCase(entry._1.getPartitionPath())).count());
-  }
-
   private HoodieWriteConfig.Builder getConfigBuilder() {
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 8597c023f13..aa96020fa93 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.index.bloom;
 
 import org.apache.hudi.client.functional.TestHoodieMetadataBase;
-import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
@@ -28,7 +27,6 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.hudi.index.HoodieIndex;
@@ -63,7 +61,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 public class TestHoodieGlobalBloomIndex extends TestHoodieMetadataBase {
 
@@ -346,104 +343,9 @@ public class TestHoodieGlobalBloomIndex extends 
TestHoodieMetadataBase {
     }
   }
 
-  @Test
-  public void testTagLocationWhenShouldUpdatePartitionPath() throws Exception {
-    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
-        .withPath(basePath)
-        .withSchema(RawTripTestPayload.JSON_DATA_SCHEMA_STR)
-        .withPayloadConfig(HoodiePayloadConfig.newBuilder()
-            .withPayloadClass(RawTripTestPayload.class.getName())
-            .withPayloadOrderingField("number").build())
-        .withIndexConfig(HoodieIndexConfig.newBuilder()
-            .withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM)
-            .withBloomIndexUpdatePartitionPath(true)
-            .build())
-        .build();
-    HoodieGlobalBloomIndex index =
-        new HoodieGlobalBloomIndex(config, 
SparkHoodieBloomIndexHelper.getInstance());
-    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
-    HoodieSparkWriteableTestTable testTable = 
HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
-    final String p1 = "2016/01/31";
-    final String p2 = "2016/02/28";
-
-    // Create the original partition, and put a record, along with the meta 
file
-    // "2016/01/31": 1 file (1_0_20160131101010.parquet)
-    // this record will be saved in table and will be tagged to an empty record
-    RawTripTestPayload originalPayload =
-        new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
-    HoodieRecord originalRecord =
-        new HoodieAvroRecord(new HoodieKey(originalPayload.getRowKey(), 
originalPayload.getPartitionPath()),
-            originalPayload);
-
-    /*
-    This record has the same record key as originalRecord but different time 
so different partition
-    Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
-    globalBloomIndex should
-     - tag the original partition of the originalRecord to an empty record for 
deletion, and
-     - tag the new partition of the incomingRecord
-    */
-    RawTripTestPayload incomingPayload =
-        new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-28T03:16:41.415Z\",\"number\":12}");
-    HoodieRecord incomingRecord =
-        new HoodieAvroRecord(new HoodieKey(incomingPayload.getRowKey(), 
incomingPayload.getPartitionPath()),
-            incomingPayload);
-
-    /*
-    This record has the same record key as originalRecord and the same 
partition
-    Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
-    globalBloomIndex should just tag the original partition
-    */
-    RawTripTestPayload incomingPayloadSamePartition =
-        new 
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}");
-    HoodieRecord incomingRecordSamePartition =
-        new HoodieAvroRecord(
-            new HoodieKey(incomingPayloadSamePartition.getRowKey(), 
incomingPayloadSamePartition.getPartitionPath()),
-            incomingPayloadSamePartition);
-
-    final String fileId1 = UUID.randomUUID().toString();
-    final Map<String, List<Pair<String, Integer>>> 
partitionToFilesNameLengthMap = new HashMap<>();
-
-    final String commitTime = "0000001";
-    Path baseFilePath = testTable.forCommit(commitTime).withInserts(p1, 
fileId1, Collections.singletonList(originalRecord));
-    long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
-    partitionToFilesNameLengthMap.computeIfAbsent(p1,
-        k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) 
baseFileLength)));
-    testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, 
Arrays.asList(p1),
-        partitionToFilesNameLengthMap, false, false);
-
-    // test against incoming record with a different partition
-    JavaRDD<HoodieRecord> recordRDD = 
jsc.parallelize(Collections.singletonList(incomingRecord));
-    JavaRDD<HoodieRecord> taggedRecordRDD = tagLocation(index, recordRDD, 
hoodieTable);
-
-    assertEquals(2, taggedRecordRDD.count());
-    for (HoodieRecord record : taggedRecordRDD.collect()) {
-      switch (record.getPartitionPath()) {
-        case p1:
-          assertEquals("000", record.getRecordKey());
-          assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
-          break;
-        case p2:
-          assertEquals("000", record.getRecordKey());
-          assertEquals(incomingPayload.getJsonDataAsMap(), 
((RawTripTestPayload) record.getData()).getJsonDataAsMap());
-          break;
-        default:
-          fail(String.format("Should not get partition path: %s", 
record.getPartitionPath()));
-      }
-    }
-
-    // test against incoming record with the same partition
-    JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
-        .parallelize(Collections.singletonList(incomingRecordSamePartition));
-    JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = tagLocation(index, 
recordRDDSamePartition, hoodieTable);
-
-    assertEquals(1, taggedRecordRDDSamePartition.count());
-    HoodieRecord record = taggedRecordRDDSamePartition.first();
-    assertEquals("000", record.getRecordKey());
-    assertEquals(p1, record.getPartitionPath());
-    assertEquals(incomingPayloadSamePartition.getJsonDataAsMap(), 
((RawTripTestPayload) record.getData()).getJsonDataAsMap());
-  }
-
-  // convert list to map to avoid sorting order dependencies
+  /**
+   * convert list to map to avoid sorting order dependencies
+   */
   private static Map<String, BloomIndexFileInfo> toFileMap(List<Pair<String, 
BloomIndexFileInfo>> filesList) {
     Map<String, BloomIndexFileInfo> filesMap = new HashMap<>();
     for (Pair<String, BloomIndexFileInfo> t : filesList) {


Reply via email to