This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 60ac41489a0 [HUDI-6454] Remove redundant indexing testcases (#9077)
60ac41489a0 is described below
commit 60ac41489a00cf3e560a9cd15f68501c2b807a3d
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jun 28 10:32:42 2023 -0700
[HUDI-6454] Remove redundant indexing testcases (#9077)
---
.../TestHoodieClientOnCopyOnWriteStorage.java | 86 ------
.../hudi/client/functional/TestHoodieIndex.java | 308 ++++-----------------
.../index/bloom/TestHoodieGlobalBloomIndex.java | 104 +------
3 files changed, 60 insertions(+), 438 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 8183fa4d830..014bb070436 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -89,7 +89,6 @@ import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
-import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
@@ -134,7 +133,6 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
@@ -168,7 +166,6 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
-import static
org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
@@ -1069,89 +1066,6 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
populateMetaFields);
}
- /**
- * Test update of a record to different partition with Global Index.
- */
- @ParameterizedTest
- @EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM",
"GLOBAL_SIMPLE"})
- public void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType)
throws Exception {
- testUpsertsUpdatePartitionPath(indexType, getConfig(),
SparkRDDWriteClient::upsert);
- }
-
- /**
- * This test ensures in a global bloom when update partition path is set to
true in config, if an incoming record has mismatched partition
- * compared to what is in storage, then appropriate actions are taken. i.e.
old record is deleted in old partition and new one is inserted
- * in the new partition.
- * test structure:
- * 1. insert 1 batch
- * 2. insert 2nd batch with larger no of records so that a new file group is
created for partitions
- * 3. issue upserts to records from batch 1 with different partition path.
This should ensure records from batch 1 are deleted and new
- * records are upserted to the new partition
- *
- * @param indexType index type to be tested for
- * @param config instance of {@link HoodieWriteConfig} to use
- * @param writeFn write function to be used for testing
- */
- private void testUpsertsUpdatePartitionPath(IndexType indexType,
HoodieWriteConfig config,
- Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
- throws Exception {
- HoodieTableMetaClient.withPropertyBuilder()
- .fromMetaClient(metaClient)
- .setPartitionFields("time")
- .setTimelineLayoutVersion(VERSION_0)
- .initTable(metaClient.getHadoopConf(),
metaClient.getBasePathV2().toString());
- HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
- .withProps(config.getProps())
- .withSchema(RawTripTestPayload.JSON_DATA_SCHEMA_STR)
- .withPayloadConfig(HoodiePayloadConfig.newBuilder()
- .withPayloadClass(RawTripTestPayload.class.getName()).build())
- .withCompactionConfig(
-
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
-
.withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(false).build())
-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
- .build()).withTimelineLayoutVersion(VERSION_0).build();
- SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
-
- // Write 1
- RawTripTestPayload payload1 = new
RawTripTestPayload("{\"_row_key\":\"001\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":1}");
- RawTripTestPayload payload2 = new
RawTripTestPayload("{\"_row_key\":\"002\",\"time\":\"2017-01-31T00:00:01.000Z\",\"number\":1}");
- JavaRDD<HoodieRecord> writeRecords1 =
jsc.parallelize(Arrays.asList(payload1.toHoodieRecord(),
payload2.toHoodieRecord()), 1);
- client.startCommitWithTime("001");
- writeFn.apply(client, writeRecords1, "001").collect();
-
- // Write 2
- RawTripTestPayload payload3 = new
RawTripTestPayload("{\"_row_key\":\"003\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":1}");
- RawTripTestPayload payload4 = new
RawTripTestPayload("{\"_row_key\":\"004\",\"time\":\"2017-01-31T00:00:01.000Z\",\"number\":1}");
- JavaRDD<HoodieRecord> writeRecords2 =
jsc.parallelize(Arrays.asList(payload3.toHoodieRecord(),
payload4.toHoodieRecord()), 1);
- client.startCommitWithTime("002");
- writeFn.apply(client, writeRecords2, "002").collect();
-
- // Write 3
- // update record 001 from partition 2016/01/31 to 2017/01/31
- // update record 004 from partition 2017/01/31 to 2018/01/31
- RawTripTestPayload payload1Updated = new
RawTripTestPayload("{\"_row_key\":\"001\",\"time\":\"2017-01-31T00:00:01.000Z\",\"number\":2}");
- RawTripTestPayload payload3Updated = new
RawTripTestPayload("{\"_row_key\":\"004\",\"time\":\"2018-01-31T00:00:01.000Z\",\"number\":2}");
- JavaRDD<HoodieRecord> writeRecords3 =
jsc.parallelize(Arrays.asList(payload1Updated.toHoodieRecord(),
payload3Updated.toHoodieRecord()), 1);
- client.startCommitWithTime("003");
- writeFn.apply(client, writeRecords3, "003").collect();
-
- client.close();
-
- // Check the entire dataset has all records
- assertPartitionPathRecordKeys(Arrays.asList(
- Pair.of("2016/01/31", "003"),
- Pair.of("2017/01/31", "001"),
- Pair.of("2017/01/31", "002"),
- Pair.of("2018/01/31", "004")),
- getFullPartitionPaths("2016/01/31", "2017/01/31", "2018/01/31"));
-
- // verify base file counts
- Map<String, Long> baseFileCounts = getBaseFileCountsForPaths(basePath, fs,
getFullPartitionPaths("2016/01/31", "2017/01/31", "2018/01/31"));
- assertEquals(2, baseFileCounts.get(getFullPartitionPath("2016/01/31")));
- assertEquals(3, baseFileCounts.get(getFullPartitionPath("2017/01/31")));
- assertEquals(1, baseFileCounts.get(getFullPartitionPath("2018/01/31")));
- }
-
private void assertPartitionPathRecordKeys(List<Pair<String, String>>
expectedPartitionPathRecKeyPairs, String[] fullPartitionPaths) {
Dataset<Row> rows = getAllRows(fullPartitionPaths);
List<Pair<String, String>> actualPartitionPathRecKeyPairs =
getActualPartitionPathAndRecordKeys(rows);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index 963274d4a29..edf4327bfaf 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -51,14 +50,12 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.RawTripTestPayloadKeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadata;
-import org.apache.hudi.metadata.HoodieTableMetadataWriter;
-import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
-import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
@@ -70,9 +67,7 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -82,9 +77,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import java.util.UUID;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import scala.Tuple2;
@@ -94,10 +87,10 @@ import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPar
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
@Tag("functional")
public class TestHoodieIndex extends TestHoodieMetadataBase {
@@ -111,23 +104,17 @@ public class TestHoodieIndex extends
TestHoodieMetadataBase {
{IndexType.GLOBAL_BLOOM, true, false},
{IndexType.SIMPLE, true, true},
{IndexType.SIMPLE, true, false},
- {IndexType.SIMPLE, false, true},
- {IndexType.SIMPLE, false, false},
{IndexType.GLOBAL_SIMPLE, true, true},
{IndexType.GLOBAL_SIMPLE, true, false},
- {IndexType.GLOBAL_SIMPLE, false, true},
- {IndexType.GLOBAL_SIMPLE, false, false},
- {IndexType.BUCKET, false, true},
+ {IndexType.BUCKET, true, false},
{IndexType.BUCKET, false, false},
- {IndexType.RECORD_INDEX, true, true},
- {IndexType.RECORD_INDEX, true, false}
+ {IndexType.RECORD_INDEX, true, true}
};
return Stream.of(data).map(Arguments::of);
}
private HoodieIndex index;
private HoodieWriteConfig config;
- private final Random random = new Random();
private void setUp(IndexType indexType, boolean populateMetaFields, boolean
enableMetadataIndex) throws Exception {
setUp(indexType, populateMetaFields, enableMetadataIndex, true);
@@ -137,10 +124,11 @@ public class TestHoodieIndex extends
TestHoodieMetadataBase {
initPath();
initSparkContexts();
initFileSystem();
- metaClient = HoodieTestUtils.init(hadoopConf, basePath,
HoodieTableType.COPY_ON_WRITE, populateMetaFields ? new Properties()
- : getPropertiesForKeyGen());
+
+ Properties keyGenProps = getPropsForKeyGen(indexType, populateMetaFields);
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath,
HoodieTableType.COPY_ON_WRITE, keyGenProps);
HoodieIndexConfig.Builder indexBuilder =
HoodieIndexConfig.newBuilder().withIndexType(indexType)
- .fromProperties(populateMetaFields ? new Properties() :
getPropertiesForKeyGen())
+ .fromProperties(keyGenProps)
.withIndexType(indexType);
HoodieMetadataConfig.Builder metadataConfigBuilder =
HoodieMetadataConfig.newBuilder()
@@ -151,7 +139,7 @@ public class TestHoodieIndex extends TestHoodieMetadataBase
{
}
config = getConfigBuilder()
- .withProperties(populateMetaFields ? new Properties() :
getPropertiesForKeyGen())
+ .withProperties(keyGenProps)
.withSchema(RawTripTestPayload.JSON_DATA_SCHEMA_STR)
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
.withPayloadClass(RawTripTestPayload.class.getName())
@@ -167,16 +155,25 @@ public class TestHoodieIndex extends
TestHoodieMetadataBase {
this.index = writeClient.getIndex();
}
- private Properties getPropertiesForKeyGen() {
+ /**
+ * For {@link KeyGenerator}'s use based on {@link
HoodieTableConfig#POPULATE_META_FIELDS}.
+ */
+ private Properties getPropsForKeyGen(IndexType indexType, boolean
populateMetaFields) {
Properties properties = new Properties();
- properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
- properties.put("hoodie.datasource.write.keygenerator.class",
RawTripTestPayloadKeyGenerator.class.getName());
- properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
- properties.put("hoodie.datasource.write.partitionpath.field", "time");
- properties.put("hoodie.datasource.write.precombine.field", "number");
- properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
- properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "time");
- properties.put(HoodieTableConfig.PRECOMBINE_FIELD.key(), "number");
+ properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(populateMetaFields));
+ if (indexType == IndexType.BUCKET) {
+ properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
+ properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
+ }
+ if (!populateMetaFields) {
+ properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
+ properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
+ properties.put("hoodie.datasource.write.keygenerator.class",
RawTripTestPayloadKeyGenerator.class.getName());
+ properties.put("hoodie.datasource.write.partitionpath.field", "time");
+ properties.put("hoodie.datasource.write.precombine.field", "number");
+ properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "time");
+ properties.put(HoodieTableConfig.PRECOMBINE_FIELD.key(), "number");
+ }
return properties;
}
@@ -223,38 +220,44 @@ public class TestHoodieIndex extends
TestHoodieMetadataBase {
@ParameterizedTest
@MethodSource("indexTypeParams")
- public void testSimpleTagLocationAndUpdate(IndexType indexType, boolean
populateMetaFields, boolean enableMetadataIndex) throws Exception {
+ public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType,
boolean populateMetaFields, boolean enableMetadataIndex) throws Exception {
setUp(indexType, populateMetaFields, enableMetadataIndex);
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
final int totalRecords = 4;
List<HoodieRecord> records = getInserts();
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<HoodieRecord> writtenRecords = jsc.parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
// Test tagLocation without any entries in index
- JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writeRecords,
hoodieTable);
+ JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writtenRecords,
hoodieTable);
assertTrue(javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().isEmpty());
// Insert totalRecords records
writeClient.startCommitWithTime(newCommitTime);
- JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords,
newCommitTime);
- assertNoWriteErrors(writeStatues.collect());
+ JavaRDD<WriteStatus> writeStatusRdd = writeClient.upsert(writtenRecords,
newCommitTime);
+ List<WriteStatus> writeStatuses = writeStatusRdd.collect();
+ assertNoWriteErrors(writeStatuses);
+ String[] fileIdsFromWriteStatuses =
writeStatuses.stream().map(WriteStatus::getFileId)
+ .sorted().toArray(String[]::new);
// Now tagLocation for these records, index should not tag them since it
was a failed
// commit
- javaRDD = tagLocation(index, writeRecords, hoodieTable);
+ javaRDD = tagLocation(index, writtenRecords, hoodieTable);
assertTrue(javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().isEmpty());
// Now commit this & update location of records inserted and validate no
errors
- writeClient.commit(newCommitTime, writeStatues);
+ writeClient.commit(newCommitTime, writeStatusRdd);
// Now tagLocation for these records, index should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(config, context, metaClient);
- javaRDD = tagLocation(index, writeRecords, hoodieTable);
+ javaRDD = tagLocation(index, writtenRecords, hoodieTable);
Map<String, String> recordKeyToPartitionPathMap = new HashMap();
- List<HoodieRecord> hoodieRecords = writeRecords.collect();
+ List<HoodieRecord> hoodieRecords = writtenRecords.collect();
hoodieRecords.forEach(entry ->
recordKeyToPartitionPathMap.put(entry.getRecordKey(),
entry.getPartitionPath()));
+ String[] taggedFileIds = javaRDD.map(record ->
record.getCurrentLocation().getFileId()).distinct().collect()
+ .stream().sorted().toArray(String[]::new);
+ assertArrayEquals(taggedFileIds, fileIdsFromWriteStatuses);
assertEquals(totalRecords, javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size());
assertEquals(totalRecords, javaRDD.map(record ->
record.getKey().getRecordKey()).distinct().count());
@@ -262,17 +265,27 @@ public class TestHoodieIndex extends
TestHoodieMetadataBase {
&&
record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
javaRDD.foreach(entry ->
assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()),
entry.getPartitionPath(), "PartitionPath mismatch"));
- JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry ->
entry.getKey());
- JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations =
getRecordLocations(hoodieKeyJavaRDD, hoodieTable);
- List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
+ JavaRDD<HoodieKey> keysRdd = writtenRecords.map(entry -> entry.getKey());
+ JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations =
getRecordLocations(keysRdd, hoodieTable);
+ List<HoodieKey> keys = keysRdd.collect();
assertEquals(totalRecords, recordLocations.collect().size());
assertEquals(totalRecords, recordLocations.map(record ->
record._1).distinct().count());
- recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1),
"Missing HoodieKey"));
+ recordLocations.foreach(entry -> assertTrue(keys.contains(entry._1),
"Missing HoodieKey"));
recordLocations.foreach(entry ->
assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()),
entry._1.getPartitionPath(), "PartitionPath mismatch"));
+
+ // Rollback the last commit
+ writeClient.rollback(newCommitTime);
+
+ hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+ // Now tagLocation for these records, hbaseIndex should not tag them since
it was a rolled
+ // back commit
+ javaRDD = tagLocation(index, writtenRecords, hoodieTable);
+
assertTrue(javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().isEmpty());
+ assertTrue(javaRDD.filter(record -> record.getCurrentLocation() !=
null).collect().isEmpty());
}
@Test
- public void testLookupIndexWithOrWithoutColumnStats() throws Exception {
+ public void testLookupIndexWithAndWithoutColumnStats() throws Exception {
setUp(IndexType.BLOOM, true, true);
String newCommitTime = "001";
final int totalRecords = 4;
@@ -377,61 +390,6 @@ public class TestHoodieIndex extends
TestHoodieMetadataBase {
recordLocations.foreach(entry ->
assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()),
entry._1.getPartitionPath(), "PartitionPath mismatch"));
}
- @ParameterizedTest
- @MethodSource("indexTypeParams")
- public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType,
boolean populateMetaFields, boolean enableMetadataIndex) throws Exception {
- setUp(indexType, populateMetaFields, enableMetadataIndex, false);
- String newCommitTime = writeClient.startCommit();
- final int totalRecords = 4;
- List<HoodieRecord> records = getInserts();
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- // Insert 200 records
- JavaRDD<WriteStatus> writeStatusesRDD = writeClient.upsert(writeRecords,
newCommitTime);
- // NOTE: This will trigger an actual write
- List<WriteStatus> writeStatuses = writeStatusesRDD.collect();
- assertNoWriteErrors(writeStatuses);
- // Commit
- writeClient.commit(newCommitTime, jsc.parallelize(writeStatuses));
-
- List<String> fileIds =
writeStatuses.stream().map(WriteStatus::getFileId).collect(Collectors.toList());
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
-
- // Now tagLocation for these records, hbaseIndex should tag them
- JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writeRecords,
hoodieTable);
- assertEquals(totalRecords,
javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
-
- // check tagged records are tagged with correct fileIds
- assertEquals(0, javaRDD.filter(record ->
record.getCurrentLocation().getFileId() == null).collect().size());
- List<String> taggedFileIds = javaRDD.map(record ->
record.getCurrentLocation().getFileId()).distinct().collect();
-
- Map<String, String> recordKeyToPartitionPathMap = new HashMap();
- List<HoodieRecord> hoodieRecords = writeRecords.collect();
- hoodieRecords.forEach(entry ->
recordKeyToPartitionPathMap.put(entry.getRecordKey(),
entry.getPartitionPath()));
-
- JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry ->
entry.getKey());
- JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations =
getRecordLocations(hoodieKeyJavaRDD, hoodieTable);
- List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
- assertEquals(totalRecords, recordLocations.collect().size());
- assertEquals(totalRecords, recordLocations.map(record ->
record._1).distinct().count());
- recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1),
"Missing HoodieKey"));
- recordLocations.foreach(entry ->
assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()),
entry._1.getPartitionPath(), "PartitionPath mismatch"));
-
- // both lists should match
- assertTrue(taggedFileIds.containsAll(fileIds) &&
fileIds.containsAll(taggedFileIds));
- // Rollback the last commit
- writeClient.rollback(newCommitTime);
-
- hoodieTable = HoodieSparkTable.create(config, context, metaClient);
- // Now tagLocation for these records, hbaseIndex should not tag them since
it was a rolled
- // back commit
- javaRDD = tagLocation(index, writeRecords, hoodieTable);
-
assertTrue(javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().isEmpty());
- assertTrue(javaRDD.filter(record -> record.getCurrentLocation() !=
null).collect().isEmpty());
- }
-
private static Stream<Arguments> regularIndexTypeParams() {
// IndexType, populateMetaFields, enableMetadataIndex
Object[][] data = new Object[][] {
@@ -543,89 +501,6 @@ public class TestHoodieIndex extends
TestHoodieMetadataBase {
}
}
- @ParameterizedTest
- @EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM",
"GLOBAL_SIMPLE"})
- public void
testSimpleGlobalIndexTagLocationWhenShouldUpdatePartitionPath(IndexType
indexType) throws Exception {
- setUp(indexType, true, true);
- HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
- HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(
- writeClient.getEngineContext().getHadoopConf().get(), config,
writeClient.getEngineContext());
- HoodieSparkWriteableTestTable testTable =
HoodieSparkWriteableTestTable.of(hoodieTable.getMetaClient(),
- addMetadataFields(RawTripTestPayload.JSON_DATA_SCHEMA),
metadataWriter);
-
- final String p1 = "2016/01/31";
- final String p2 = "2016/02/28";
-
- // Create the original partition, and put a record, along with the meta
file
- // "2016/01/31": 1 file (1_0_20160131101010.parquet)
- // this record will be saved in table and will be tagged to an empty record
- RawTripTestPayload originalPayload =
- new
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
- HoodieRecord originalRecord = originalPayload.toHoodieRecord();
-
- /*
- This record has the same record key as originalRecord but different time
so different partition
- Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
- globalBloomIndex should
- - tag the original partition of the originalRecord to an empty record for
deletion, and
- - tag the new partition of the incomingRecord
- */
- RawTripTestPayload incomingPayload =
- new
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-28T03:16:41.415Z\",\"number\":12}");
- HoodieRecord incomingRecord = incomingPayload.toHoodieRecord();
- /*
- This record has the same record key as originalRecord and the same
partition
- Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
- globalBloomIndex should just tag the original partition
- */
- RawTripTestPayload incomingPayloadSamePartition =
- new
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}");
- HoodieRecord incomingRecordSamePartition =
incomingPayloadSamePartition.toHoodieRecord();
-
- final String file1P1C0 = UUID.randomUUID().toString();
- Map<String, List<Pair<String, Integer>>> c1PartitionToFilesNameLengthMap =
new HashMap<>();
- // We have some records to be tagged (two different partitions)
- Path baseFilePath = testTable.forCommit("1000").withInserts(p1, file1P1C0,
Collections.singletonList(originalRecord));
- long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
- c1PartitionToFilesNameLengthMap.put(p1,
Collections.singletonList(Pair.of(file1P1C0, Integer.valueOf((int)
baseFileLength))));
- testTable.doWriteOperation("1000", WriteOperationType.INSERT,
Arrays.asList(p1),
- c1PartitionToFilesNameLengthMap, false, false);
-
- // We have some records to be tagged (two different partitions)
- testTable.withInserts(p1, file1P1C0, originalRecord);
-
- // test against incoming record with a different partition
- JavaRDD<HoodieRecord> recordRDD =
jsc.parallelize(Collections.singletonList(incomingRecord));
- JavaRDD<HoodieRecord> taggedRecordRDD = tagLocation(index, recordRDD,
hoodieTable);
-
- assertEquals(2, taggedRecordRDD.count());
- for (HoodieRecord record : taggedRecordRDD.collect()) {
- switch (record.getPartitionPath()) {
- case p1:
- assertEquals("000", record.getRecordKey());
- assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
- break;
- case p2:
- assertEquals("000", record.getRecordKey());
- assertEquals(incomingPayload.getJsonDataAsMap(),
((RawTripTestPayload) record.getData()).getJsonDataAsMap());
- break;
- default:
- fail(String.format("Should not get partition path: %s",
record.getPartitionPath()));
- }
- }
-
- // test against incoming record with the same partition
- JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
- .parallelize(Collections.singletonList(incomingRecordSamePartition));
- JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = tagLocation(index,
recordRDDSamePartition, hoodieTable);
-
- assertEquals(1, taggedRecordRDDSamePartition.count());
- HoodieRecord record = taggedRecordRDDSamePartition.first();
- assertEquals("000", record.getRecordKey());
- assertEquals(p1, record.getPartitionPath());
- assertEquals(incomingPayloadSamePartition.getJsonDataAsMap(),
((RawTripTestPayload) record.getData()).getJsonDataAsMap());
- }
-
@Test
public void testCheckIfValidCommit() throws Exception {
setUp(IndexType.BLOOM, true, false);
@@ -680,75 +555,6 @@ public class TestHoodieIndex extends
TestHoodieMetadataBase {
assertFalse(HoodieIndexUtils.checkIfValidCommit(timeline,
instantTimestampSec));
}
- @ParameterizedTest
- @ValueSource(booleans = {true})
- public void testRecordIndexTagLocationAndUpdate(boolean populateMetaFields)
throws Exception {
- setUp(IndexType.RECORD_INDEX, populateMetaFields, false);
- String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- // int initialRecords = 10 + random.nextInt(20);
- List<HoodieRecord> records = getInserts();
- int initialRecords = records.size();
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
-
- // Test tagLocation without any entries in index
- JavaRDD<HoodieRecord> javaRDD = tagLocation(hoodieTable.getIndex(),
writeRecords, hoodieTable);
- assert (javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size() == 0);
-
- // Insert totalRecords records
- writeClient.startCommitWithTime(newCommitTime);
- JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords,
newCommitTime);
- Assertions.assertNoWriteErrors(writeStatues.collect());
-
- // Now tagLocation for these records, index should not tag them since it
was a failed
- // commit
- javaRDD = tagLocation(hoodieTable.getIndex(), writeRecords, hoodieTable);
- assert (javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size() == 0);
- // Now commit this & update location of records inserted and validate no
errors
- writeClient.commit(newCommitTime, writeStatues);
-
- // Create new commit time.
- String secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
- // Now tagLocation for these records, index should tag them correctly
- metaClient = HoodieTableMetaClient.reload(metaClient);
- hoodieTable = HoodieSparkTable.create(config, context, metaClient);
-
- // Generate updates for all existing records.
- List<HoodieRecord> newRecords = getUpdates();
- // int newInsertsCount = 10;
- List<HoodieRecord> newInserts = getInsertsBatch2();
- int newInsertsCount = newInserts.size();
- newRecords.addAll(newInserts);
- // Update partitionPath information.
- String newPartitionPath = "2022/11/04";
- newRecords = newRecords.stream()
- .map(rec -> new HoodieAvroRecord(new HoodieKey(rec.getRecordKey(),
newPartitionPath), (HoodieRecordPayload) rec.getData()))
- .collect(Collectors.toList());
- JavaRDD<HoodieRecord> newWriteRecords = jsc.parallelize(newRecords, 1);
-
- javaRDD = tagLocation(hoodieTable.getIndex(), newWriteRecords,
hoodieTable);
- Map<String, String> recordKeyToPartitionPathMap = new HashMap();
- List<HoodieRecord> hoodieRecords = newWriteRecords.collect();
- hoodieRecords.forEach(entry ->
recordKeyToPartitionPathMap.put(entry.getRecordKey(),
entry.getPartitionPath()));
-
- assertEquals(initialRecords, javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size());
- assertEquals(initialRecords + newInsertsCount, javaRDD.map(record ->
record.getKey().getRecordKey()).distinct().count());
- assertEquals(initialRecords, javaRDD.filter(record ->
(record.getCurrentLocation() != null
- &&
record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
- assertEquals(newInsertsCount, javaRDD.filter(record ->
record.getKey().getPartitionPath().equalsIgnoreCase(newPartitionPath))
- .count(), "PartitionPath mismatch");
-
- JavaRDD<HoodieKey> hoodieKeyJavaRDD = newWriteRecords.map(entry ->
entry.getKey());
- JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations =
getRecordLocations(hoodieKeyJavaRDD, hoodieTable);
- List<String> hoodieRecordKeys = hoodieKeyJavaRDD.map(key ->
key.getRecordKey()).collect();
- assertEquals(initialRecords + newInsertsCount,
recordLocations.collect().size());
- assertEquals(initialRecords + newInsertsCount, recordLocations.map(record
-> record._1).distinct().count());
- recordLocations.foreach(entry ->
assertTrue(hoodieRecordKeys.contains(entry._1.getRecordKey()), "Missing
HoodieRecordKey"));
- assertEquals(newInsertsCount, recordLocations.filter(entry ->
newPartitionPath.equalsIgnoreCase(entry._1.getPartitionPath())).count());
- }
-
private HoodieWriteConfig.Builder getConfigBuilder() {
return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2,
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 8597c023f13..aa96020fa93 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -19,7 +19,6 @@
package org.apache.hudi.index.bloom;
import org.apache.hudi.client.functional.TestHoodieMetadataBase;
-import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
@@ -28,7 +27,6 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.index.HoodieIndex;
@@ -63,7 +61,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
public class TestHoodieGlobalBloomIndex extends TestHoodieMetadataBase {
@@ -346,104 +343,9 @@ public class TestHoodieGlobalBloomIndex extends
TestHoodieMetadataBase {
}
}
- @Test
- public void testTagLocationWhenShouldUpdatePartitionPath() throws Exception {
- HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
- .withPath(basePath)
- .withSchema(RawTripTestPayload.JSON_DATA_SCHEMA_STR)
- .withPayloadConfig(HoodiePayloadConfig.newBuilder()
- .withPayloadClass(RawTripTestPayload.class.getName())
- .withPayloadOrderingField("number").build())
- .withIndexConfig(HoodieIndexConfig.newBuilder()
- .withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM)
- .withBloomIndexUpdatePartitionPath(true)
- .build())
- .build();
- HoodieGlobalBloomIndex index =
- new HoodieGlobalBloomIndex(config,
SparkHoodieBloomIndexHelper.getInstance());
- HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
- HoodieSparkWriteableTestTable testTable =
HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
- final String p1 = "2016/01/31";
- final String p2 = "2016/02/28";
-
- // Create the original partition, and put a record, along with the meta
file
- // "2016/01/31": 1 file (1_0_20160131101010.parquet)
- // this record will be saved in table and will be tagged to an empty record
- RawTripTestPayload originalPayload =
- new
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
- HoodieRecord originalRecord =
- new HoodieAvroRecord(new HoodieKey(originalPayload.getRowKey(),
originalPayload.getPartitionPath()),
- originalPayload);
-
- /*
- This record has the same record key as originalRecord but different time
so different partition
- Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
- globalBloomIndex should
- - tag the original partition of the originalRecord to an empty record for
deletion, and
- - tag the new partition of the incomingRecord
- */
- RawTripTestPayload incomingPayload =
- new
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-28T03:16:41.415Z\",\"number\":12}");
- HoodieRecord incomingRecord =
- new HoodieAvroRecord(new HoodieKey(incomingPayload.getRowKey(),
incomingPayload.getPartitionPath()),
- incomingPayload);
-
- /*
- This record has the same record key as originalRecord and the same
partition
- Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
- globalBloomIndex should just tag the original partition
- */
- RawTripTestPayload incomingPayloadSamePartition =
- new
RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}");
- HoodieRecord incomingRecordSamePartition =
- new HoodieAvroRecord(
- new HoodieKey(incomingPayloadSamePartition.getRowKey(),
incomingPayloadSamePartition.getPartitionPath()),
- incomingPayloadSamePartition);
-
- final String fileId1 = UUID.randomUUID().toString();
- final Map<String, List<Pair<String, Integer>>>
partitionToFilesNameLengthMap = new HashMap<>();
-
- final String commitTime = "0000001";
- Path baseFilePath = testTable.forCommit(commitTime).withInserts(p1,
fileId1, Collections.singletonList(originalRecord));
- long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
- partitionToFilesNameLengthMap.computeIfAbsent(p1,
- k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int)
baseFileLength)));
- testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT,
Arrays.asList(p1),
- partitionToFilesNameLengthMap, false, false);
-
- // test against incoming record with a different partition
- JavaRDD<HoodieRecord> recordRDD =
jsc.parallelize(Collections.singletonList(incomingRecord));
- JavaRDD<HoodieRecord> taggedRecordRDD = tagLocation(index, recordRDD,
hoodieTable);
-
- assertEquals(2, taggedRecordRDD.count());
- for (HoodieRecord record : taggedRecordRDD.collect()) {
- switch (record.getPartitionPath()) {
- case p1:
- assertEquals("000", record.getRecordKey());
- assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
- break;
- case p2:
- assertEquals("000", record.getRecordKey());
- assertEquals(incomingPayload.getJsonDataAsMap(),
((RawTripTestPayload) record.getData()).getJsonDataAsMap());
- break;
- default:
- fail(String.format("Should not get partition path: %s",
record.getPartitionPath()));
- }
- }
-
- // test against incoming record with the same partition
- JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
- .parallelize(Collections.singletonList(incomingRecordSamePartition));
- JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = tagLocation(index,
recordRDDSamePartition, hoodieTable);
-
- assertEquals(1, taggedRecordRDDSamePartition.count());
- HoodieRecord record = taggedRecordRDDSamePartition.first();
- assertEquals("000", record.getRecordKey());
- assertEquals(p1, record.getPartitionPath());
- assertEquals(incomingPayloadSamePartition.getJsonDataAsMap(),
((RawTripTestPayload) record.getData()).getJsonDataAsMap());
- }
-
- // convert list to map to avoid sorting order dependencies
+ /**
+ * convert list to map to avoid sorting order dependencies
+ */
private static Map<String, BloomIndexFileInfo> toFileMap(List<Pair<String,
BloomIndexFileInfo>> filesList) {
Map<String, BloomIndexFileInfo> filesMap = new HashMap<>();
for (Pair<String, BloomIndexFileInfo> t : filesList) {