xushiyan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r453215643
##########
File path:
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
##########
@@ -123,11 +123,13 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config) {
if (config.getBloomIndexUpdatePartitionPath()
&&
!recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath()))
{
// Create an empty record to delete the record in the old partition
- HoodieRecord<T> emptyRecord = new
HoodieRecord(recordLocationHoodieKeyPair.get()._2,
+ HoodieRecord<T> deleteRecord = new
HoodieRecord(recordLocationHoodieKeyPair.get()._2,
new EmptyHoodieRecordPayload());
+
deleteRecord.setCurrentLocation(recordLocationHoodieKeyPair.get()._1());
+ deleteRecord.seal();
// Tag the incoming record for inserting to the new partition
- HoodieRecord<T> taggedRecord =
HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
- return Arrays.asList(emptyRecord, taggedRecord).iterator();
+ HoodieRecord<T> insetRecord =
HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
+ return Arrays.asList(deleteRecord, insetRecord).iterator();
Review comment:
nit: typo `insertRecord`
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition is deleted appropriately.
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in old partition is deleted appropriately.
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * This test ensures in a global bloom when update partition path is set to
true in config, if an incoming record has mismatched partition
+ * compared to whats in storage, then appropriate actions are taken. i.e.
old record is deleted in old partition and new one is inserted
+ * in the new partition.
+ * test structure:
+ * 1. insert 1 batch
+ * 2. insert 2nd batch with larger no of records so that a new file group is
created for partitions
+ * 3. issue upserts to records from batch 1 with different partition path.
This should ensure records from batch 1 are deleted and new
+ * records are upserted to the new partition
+ *
+ * @param indexType index type to be tested for
+ * @param config instance of {@link HoodieWriteConfig} to use
+ * @param writeFn write function to be used for testing
+ */
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // instantiate client
+
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .build()).withTimelineLayoutVersion(VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
- String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ // Check the entire dataset has all records
+ String[] fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify one basefile per partition
+ Map<String, Integer> baseFileCounts =
getBaseFileCounts(fullPartitionPaths);
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ assertEquals(1, entry.getValue());
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // Write 2
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 =
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(newCommitTime, numRecords);
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
- JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
- List<WriteStatus> statuses1 = result1.collect();
- assertNoWriteErrors(statuses1);
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify that there are more than 1 basefiles per partition
+ // we can't guarantee randomness in partitions where records are
distributed. So, verify atleast one partition has more than 1 basefile.
+ baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+ boolean hasMoreThanOneBaseFile = false;
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ if (entry.getValue() > 1) {
+ hasMoreThanOneBaseFile = true;
+ break;
+ }
+ }
+ assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more
than 1 base file after 2nd batch of writes");
+
+ // Write 3 (upserts to records from batch 1 with diff partition path)
+ newCommitTime = "003";
+
+ // update to diff partition paths
+ List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+ for (HoodieRecord rec : records) {
+ // remove older entry from expected partition path record key pairs
+ expectedPartitionPathRecKeyPairs
+ .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+ String partitionPath = rec.getPartitionPath();
+ String newPartitionPath = null;
+ if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+ } else if
(partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+ } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH))
{
+ newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+ } else {
+ throw new IllegalStateException("Unknown partition path " +
rec.getPartitionPath());
+ }
+ recordsToUpsert.add(
+ new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+ rec.getData()));
+ // populate expected partition path and record keys
+ expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath,
rec.getRecordKey()));
+ }
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(updates1, fs);
- // Check the entire dataset has all records still
- fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+ writeRecords = jsc.parallelize(recordsToUpsert, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ statuses = result.collect();
+
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+ }
+
+ private void assertPartitionPathRecordKeys(String[] fullPartitionPaths,
List<Pair<String, String>> expectedPartitionPathRecKeyPairs) {
Review comment:
to align with junit assertions, can we make the expected variable 1st
argument?
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition is deleted appropriately.
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in old partition is deleted appropriately.
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * This test ensures in a global bloom when update partition path is set to
true in config, if an incoming record has mismatched partition
+ * compared to whats in storage, then appropriate actions are taken. i.e.
old record is deleted in old partition and new one is inserted
+ * in the new partition.
+ * test structure:
+ * 1. insert 1 batch
+ * 2. insert 2nd batch with larger no of records so that a new file group is
created for partitions
+ * 3. issue upserts to records from batch 1 with different partition path.
This should ensure records from batch 1 are deleted and new
+ * records are upserted to the new partition
+ *
+ * @param indexType index type to be tested for
+ * @param config instance of {@link HoodieWriteConfig} to use
+ * @param writeFn write function to be used for testing
+ */
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // instantiate client
+
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .build()).withTimelineLayoutVersion(VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
- String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ // Check the entire dataset has all records
+ String[] fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify one basefile per partition
+ Map<String, Integer> baseFileCounts =
getBaseFileCounts(fullPartitionPaths);
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ assertEquals(1, entry.getValue());
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // Write 2
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 =
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(newCommitTime, numRecords);
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
- JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
- List<WriteStatus> statuses1 = result1.collect();
- assertNoWriteErrors(statuses1);
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify that there are more than 1 basefiles per partition
+ // we can't guarantee randomness in partitions where records are
distributed. So, verify atleast one partition has more than 1 basefile.
+ baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+ boolean hasMoreThanOneBaseFile = false;
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ if (entry.getValue() > 1) {
+ hasMoreThanOneBaseFile = true;
+ break;
+ }
+ }
+ assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more
than 1 base file after 2nd batch of writes");
+
+ // Write 3 (upserts to records from batch 1 with diff partition path)
+ newCommitTime = "003";
+
+ // update to diff partition paths
+ List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+ for (HoodieRecord rec : records) {
+ // remove older entry from expected partition path record key pairs
+ expectedPartitionPathRecKeyPairs
+ .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+ String partitionPath = rec.getPartitionPath();
+ String newPartitionPath = null;
+ if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+ } else if
(partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+ } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH))
{
+ newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+ } else {
+ throw new IllegalStateException("Unknown partition path " +
rec.getPartitionPath());
+ }
+ recordsToUpsert.add(
+ new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+ rec.getData()));
+ // populate expected partition path and record keys
+ expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath,
rec.getRecordKey()));
+ }
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(updates1, fs);
- // Check the entire dataset has all records still
- fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+ writeRecords = jsc.parallelize(recordsToUpsert, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ statuses = result.collect();
+
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+ }
+
+ private void assertPartitionPathRecordKeys(String[] fullPartitionPaths,
List<Pair<String, String>> expectedPartitionPathRecKeyPairs) {
+ Dataset<Row> rows = getAllRows(fullPartitionPaths);
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs =
getActualPartitionPathAndRecordKeys(rows);
+ // verify all partitionpath, record key matches
+
assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs,
actualPartitionPathRecKeyPairs);
+ }
+
+ private List<Pair<String, String>>
getActualPartitionPathAndRecordKeys(Dataset<org.apache.spark.sql.Row> rows) {
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs = new
ArrayList<>();
+ for (Row row : rows.collectAsList()) {
+ actualPartitionPathRecKeyPairs
+ .add(Pair.of(row.getAs("_hoodie_partition_path"),
row.getAs("_row_key")));
+ }
+ return actualPartitionPathRecKeyPairs;
+ }
+
+ private Dataset<org.apache.spark.sql.Row> getAllRows(String[]
fullPartitionPaths) {
+ return HoodieClientTestUtils
+ .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+ }
+
+ private String[] getFullPartitionPaths() {
+ String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
+ return fullPartitionPaths;
+ }
+
+ private Map<String, Integer> getBaseFileCounts(String[] fullPartitionPaths) {
+ return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs,
fullPartitionPaths);
+ }
+
+ private void
assertActualAndExpectedPartitionPathRecordKeyMatches(List<Pair<String, String>>
expectedPartitionPathRecKeyPairs,
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
+ // verify all partitionpath, record key matches
+ assertEquals(expectedPartitionPathRecKeyPairs.size(),
actualPartitionPathRecKeyPairs.size());
+ for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+ assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+ assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+ }
Review comment:
Looks like `Set<Pair<String, String>>` is better suited for
`expectedPartitionPathRecKeyPairs`? so these can be simplified as
`assertEquals(expected, actual);`
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition is deleted appropriately.
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in old partition is deleted appropriately.
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
Review comment:
these 2 can be combined into a @ParameterizedTest with IndexType as
argument
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition is deleted appropriately.
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in old partition is deleted appropriately.
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * This test ensures in a global bloom when update partition path is set to
true in config, if an incoming record has mismatched partition
+ * compared to whats in storage, then appropriate actions are taken. i.e.
old record is deleted in old partition and new one is inserted
+ * in the new partition.
+ * test structure:
+ * 1. insert 1 batch
+ * 2. insert 2nd batch with larger no of records so that a new file group is
created for partitions
+ * 3. issue upserts to records from batch 1 with different partition path.
This should ensure records from batch 1 are deleted and new
+ * records are upserted to the new partition
+ *
+ * @param indexType index type to be tested for
+ * @param config instance of {@link HoodieWriteConfig} to use
+ * @param writeFn write function to be used for testing
+ */
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // instantiate client
+
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .build()).withTimelineLayoutVersion(VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
- String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ // Check the entire dataset has all records
+ String[] fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify one basefile per partition
+ Map<String, Integer> baseFileCounts =
getBaseFileCounts(fullPartitionPaths);
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ assertEquals(1, entry.getValue());
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // Write 2
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 =
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(newCommitTime, numRecords);
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
Review comment:
result.collect() return value not used?
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition is deleted appropriately.
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in old partition is deleted appropriately.
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * This test ensures in a global bloom when update partition path is set to
true in config, if an incoming record has mismatched partition
+ * compared to whats in storage, then appropriate actions are taken. i.e.
old record is deleted in old partition and new one is inserted
+ * in the new partition.
+ * test structure:
+ * 1. insert 1 batch
+ * 2. insert 2nd batch with larger no of records so that a new file group is
created for partitions
+ * 3. issue upserts to records from batch 1 with different partition path.
This should ensure records from batch 1 are deleted and new
+ * records are upserted to the new partition
+ *
+ * @param indexType index type to be tested for
+ * @param config instance of {@link HoodieWriteConfig} to use
+ * @param writeFn write function to be used for testing
+ */
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // instantiate client
+
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .build()).withTimelineLayoutVersion(VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
- String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ // Check the entire dataset has all records
+ String[] fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify one basefile per partition
+ Map<String, Integer> baseFileCounts =
getBaseFileCounts(fullPartitionPaths);
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ assertEquals(1, entry.getValue());
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // Write 2
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 =
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(newCommitTime, numRecords);
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
- JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
- List<WriteStatus> statuses1 = result1.collect();
- assertNoWriteErrors(statuses1);
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify that there are more than 1 basefiles per partition
+ // we can't guarantee randomness in partitions where records are
distributed. So, verify atleast one partition has more than 1 basefile.
+ baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+ boolean hasMoreThanOneBaseFile = false;
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ if (entry.getValue() > 1) {
+ hasMoreThanOneBaseFile = true;
+ break;
+ }
+ }
+ assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more
than 1 base file after 2nd batch of writes");
+
+ // Write 3 (upserts to records from batch 1 with diff partition path)
+ newCommitTime = "003";
+
+ // update to diff partition paths
+ List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+ for (HoodieRecord rec : records) {
+ // remove older entry from expected partition path record key pairs
+ expectedPartitionPathRecKeyPairs
+ .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
Review comment:
can this be replaced by using a new variable for
`expectedPartitionPathRecKeyPairs`? a bit hard to track what is contained.
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition is deleted appropriately.
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in old partition is deleted appropriately.
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * This test ensures in a global bloom when update partition path is set to
true in config, if an incoming record has mismatched partition
+ * compared to whats in storage, then appropriate actions are taken. i.e.
old record is deleted in old partition and new one is inserted
+ * in the new partition.
+ * test structure:
+ * 1. insert 1 batch
+ * 2. insert 2nd batch with larger no of records so that a new file group is
created for partitions
+ * 3. issue upserts to records from batch 1 with different partition path.
This should ensure records from batch 1 are deleted and new
+ * records are upserted to the new partition
+ *
+ * @param indexType index type to be tested for
+ * @param config instance of {@link HoodieWriteConfig} to use
+ * @param writeFn write function to be used for testing
+ */
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // instantiate client
+
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .build()).withTimelineLayoutVersion(VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
- String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ // Check the entire dataset has all records
+ String[] fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify one basefile per partition
+ Map<String, Integer> baseFileCounts =
getBaseFileCounts(fullPartitionPaths);
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ assertEquals(1, entry.getValue());
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // Write 2
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 =
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(newCommitTime, numRecords);
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
- JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
- List<WriteStatus> statuses1 = result1.collect();
- assertNoWriteErrors(statuses1);
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify that there are more than 1 basefiles per partition
+ // we can't guarantee randomness in partitions where records are
distributed. So, verify atleast one partition has more than 1 basefile.
+ baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+ boolean hasMoreThanOneBaseFile = false;
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ if (entry.getValue() > 1) {
+ hasMoreThanOneBaseFile = true;
+ break;
+ }
+ }
+ assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more
than 1 base file after 2nd batch of writes");
Review comment:
these lines can be simplified to
`assertTrue(entrySet().stream().filter().count()>=1)`
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition is deleted appropriately.
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in old partition is deleted appropriately.
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * This test ensures in a global bloom when update partition path is set to
true in config, if an incoming record has mismatched partition
+ * compared to whats in storage, then appropriate actions are taken. i.e.
old record is deleted in old partition and new one is inserted
+ * in the new partition.
+ * test structure:
+ * 1. insert 1 batch
+ * 2. insert 2nd batch with larger no of records so that a new file group is
created for partitions
+ * 3. issue upserts to records from batch 1 with different partition path.
This should ensure records from batch 1 are deleted and new
+ * records are upserted to the new partition
+ *
+ * @param indexType index type to be tested for
+ * @param config instance of {@link HoodieWriteConfig} to use
+ * @param writeFn write function to be used for testing
+ */
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // instantiate client
+
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .build()).withTimelineLayoutVersion(VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
- String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ // Check the entire dataset has all records
+ String[] fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify one basefile per partition
+ Map<String, Integer> baseFileCounts =
getBaseFileCounts(fullPartitionPaths);
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ assertEquals(1, entry.getValue());
}
Review comment:
assertAll() works better for iterating assertions
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]