vinothchandar commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r450160809
##########
File path:
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
##########
@@ -125,6 +125,9 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config) {
// Create an empty record to delete the record in the old partition
HoodieRecord<T> emptyRecord = new
HoodieRecord(recordLocationHoodieKeyPair.get()._2,
Review comment:
and `taggedRecord` to insert.. this makes what this block is doing
clearer..
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition
+ * is deleted appropriately.
+ * @throws Exception
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in
+ * old partition is deleted appropriately.
+ * @throws Exception
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
+ HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // Force using older timeline layout
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true).build())
+ .withTimelineLayoutVersion(
+ VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(),
+ metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
// Write 1 (only inserts)
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
+ // Check the entire dataset has all records still
String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
+ Dataset<Row> rows = HoodieClientTestUtils
+ .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs = new
ArrayList<>();
+ for (Row row : rows.collectAsList()) {
+ actualPartitionPathRecKeyPairs
+ .add(Pair.of(row.getAs("_hoodie_partition_path"),
row.getAs("_row_key")));
+ }
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // verify all partitionpath, record key matches
+ assertEquals(expectedPartitionPathRecKeyPairs.size(),
actualPartitionPathRecKeyPairs.size());
+ for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+ assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+ assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 =
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(newCommitTime, numRecords);
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
+
+ // Check the entire dataset has all records still
+ fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+ for (int i = 0; i < fullPartitionPaths.length; i++) {
+ fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ }
+
+ rows = HoodieClientTestUtils
+ .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+ actualPartitionPathRecKeyPairs = new ArrayList<>();
+ for (Row row : rows.collectAsList()) {
+ actualPartitionPathRecKeyPairs
+ .add(Pair.of(row.getAs("_hoodie_partition_path"),
row.getAs("_row_key")));
+ }
- JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
- List<WriteStatus> statuses1 = result1.collect();
- assertNoWriteErrors(statuses1);
+ // verify all partitionpath, record key matches
+ assertEquals(expectedPartitionPathRecKeyPairs.size(),
actualPartitionPathRecKeyPairs.size());
+ for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+ assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+ assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ // Write 2 (updates)
+ newCommitTime = "003";
+ records = records.subList(5, 10);
+
+ // update to diff partition paths
+ List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs
+ .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+ String partitionPath = rec.getPartitionPath();
+ String newPartitionPath = null;
+ if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+ } else if
(partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+ } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH))
{
+ newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+ } else {
+ throw new IllegalStateException("Unknown partition path " +
rec.getPartitionPath());
+ }
+ recordsToUpsert.add(
+ new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+ rec.getData()));
+ expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath,
rec.getRecordKey()));
+ }
+
+ writeRecords = jsc.parallelize(recordsToUpsert, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ statuses = result.collect();
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(updates1, fs);
// Check the entire dataset has all records still
fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
+
+ rows = HoodieClientTestUtils
+ .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+ actualPartitionPathRecKeyPairs = new ArrayList<>();
+ for (Row row : rows.collectAsList()) {
+ actualPartitionPathRecKeyPairs
+ .add(Pair.of(row.getAs("_hoodie_partition_path"),
row.getAs("_row_key")));
+ }
+
+ // verify all partitionpath, record key matches
Review comment:
can. we explicitly check for duplicates?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
##########
@@ -125,6 +125,9 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config) {
// Create an empty record to delete the record in the old partition
HoodieRecord<T> emptyRecord = new
HoodieRecord(recordLocationHoodieKeyPair.get()._2,
Review comment:
lets rename this to `deleteRecord`
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition
+ * is deleted appropriately.
+ * @throws Exception
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in
+ * old partition is deleted appropriately.
+ * @throws Exception
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
+ HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // Force using older timeline layout
Review comment:
why is this relevant for this test?
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition
+ * is deleted appropriately.
+ * @throws Exception
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in
+ * old partition is deleted appropriately.
+ * @throws Exception
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
+ HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // Force using older timeline layout
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true).build())
+ .withTimelineLayoutVersion(
+ VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(),
+ metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
// Write 1 (only inserts)
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
+ // Check the entire dataset has all records still
String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
+ Dataset<Row> rows = HoodieClientTestUtils
+ .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs = new
ArrayList<>();
+ for (Row row : rows.collectAsList()) {
+ actualPartitionPathRecKeyPairs
+ .add(Pair.of(row.getAs("_hoodie_partition_path"),
row.getAs("_row_key")));
+ }
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // verify all partitionpath, record key matches
+ assertEquals(expectedPartitionPathRecKeyPairs.size(),
actualPartitionPathRecKeyPairs.size());
+ for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+ assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+ assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 =
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(newCommitTime, numRecords);
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
+
+ // Check the entire dataset has all records still
+ fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+ for (int i = 0; i < fullPartitionPaths.length; i++) {
+ fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ }
+
+ rows = HoodieClientTestUtils
+ .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+ actualPartitionPathRecKeyPairs = new ArrayList<>();
+ for (Row row : rows.collectAsList()) {
+ actualPartitionPathRecKeyPairs
+ .add(Pair.of(row.getAs("_hoodie_partition_path"),
row.getAs("_row_key")));
+ }
- JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
- List<WriteStatus> statuses1 = result1.collect();
- assertNoWriteErrors(statuses1);
+ // verify all partitionpath, record key matches
+ assertEquals(expectedPartitionPathRecKeyPairs.size(),
actualPartitionPathRecKeyPairs.size());
+ for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+ assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+ assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ // Write 2 (updates)
+ newCommitTime = "003";
+ records = records.subList(5, 10);
+
+ // update to diff partition paths
+ List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs
+ .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+ String partitionPath = rec.getPartitionPath();
+ String newPartitionPath = null;
+ if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+ } else if
(partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+ } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH))
{
+ newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+ } else {
+ throw new IllegalStateException("Unknown partition path " +
rec.getPartitionPath());
+ }
+ recordsToUpsert.add(
+ new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+ rec.getData()));
+ expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath,
rec.getRecordKey()));
+ }
+
+ writeRecords = jsc.parallelize(recordsToUpsert, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ statuses = result.collect();
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(updates1, fs);
// Check the entire dataset has all records still
fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
+
+ rows = HoodieClientTestUtils
+ .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+ actualPartitionPathRecKeyPairs = new ArrayList<>();
+ for (Row row : rows.collectAsList()) {
+ actualPartitionPathRecKeyPairs
+ .add(Pair.of(row.getAs("_hoodie_partition_path"),
row.getAs("_row_key")));
+ }
+
+ // verify all partitionpath, record key matches
Review comment:
and also atleast two files being present before triggering the update of
the partition path
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]