nsivabalan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r453713674
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition is deleted appropriately.
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in old partition is deleted appropriately.
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * This test ensures in a global bloom when update partition path is set to
true in config, if an incoming record has mismatched partition
+ * compared to whats in storage, then appropriate actions are taken. i.e.
old record is deleted in old partition and new one is inserted
+ * in the new partition.
+ * test structure:
+ * 1. insert 1 batch
+ * 2. insert 2nd batch with larger no of records so that a new file group is
created for partitions
+ * 3. issue upserts to records from batch 1 with different partition path.
This should ensure records from batch 1 are deleted and new
+ * records are upserted to the new partition
+ *
+ * @param indexType index type to be tested for
+ * @param config instance of {@link HoodieWriteConfig} to use
+ * @param writeFn write function to be used for testing
+ */
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // instantiate client
+
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .build()).withTimelineLayoutVersion(VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
- String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ // Check the entire dataset has all records
+ String[] fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify one basefile per partition
+ Map<String, Integer> baseFileCounts =
getBaseFileCounts(fullPartitionPaths);
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ assertEquals(1, entry.getValue());
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // Write 2
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 =
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(newCommitTime, numRecords);
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
Review comment:
yes, we just need to trigger the action.
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition is deleted appropriately.
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in old partition is deleted appropriately.
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * This test ensures in a global bloom when update partition path is set to
true in config, if an incoming record has mismatched partition
+ * compared to whats in storage, then appropriate actions are taken. i.e.
old record is deleted in old partition and new one is inserted
+ * in the new partition.
+ * test structure:
+ * 1. insert 1 batch
+ * 2. insert 2nd batch with larger no of records so that a new file group is
created for partitions
+ * 3. issue upserts to records from batch 1 with different partition path.
This should ensure records from batch 1 are deleted and new
+ * records are upserted to the new partition
+ *
+ * @param indexType index type to be tested for
+ * @param config instance of {@link HoodieWriteConfig} to use
+ * @param writeFn write function to be used for testing
+ */
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // instantiate client
+
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .build()).withTimelineLayoutVersion(VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
- String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ // Check the entire dataset has all records
+ String[] fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify one basefile per partition
+ Map<String, Integer> baseFileCounts =
getBaseFileCounts(fullPartitionPaths);
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ assertEquals(1, entry.getValue());
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // Write 2
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 =
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(newCommitTime, numRecords);
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
- JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
- List<WriteStatus> statuses1 = result1.collect();
- assertNoWriteErrors(statuses1);
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify that there are more than 1 basefiles per partition
+ // we can't guarantee randomness in partitions where records are
distributed. So, verify atleast one partition has more than 1 basefile.
+ baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+ boolean hasMoreThanOneBaseFile = false;
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ if (entry.getValue() > 1) {
+ hasMoreThanOneBaseFile = true;
+ break;
+ }
+ }
+ assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more
than 1 base file after 2nd batch of writes");
+
+ // Write 3 (upserts to records from batch 1 with diff partition path)
+ newCommitTime = "003";
+
+ // update to diff partition paths
+ List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+ for (HoodieRecord rec : records) {
+ // remove older entry from expected partition path record key pairs
+ expectedPartitionPathRecKeyPairs
+ .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
Review comment:
don't get you. This tracks all partitionpath record key pairs so far.
And in this block, for records for which partition path is getting updated, we
need to remove those entries from this expected list and add updated entries.
So, don't quite understand how introducing new variable will simplify things.
would you mind clarifying.
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record
in old partition is deleted appropriately.
*/
@Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client =
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and
ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * Tests when update partition path is set in simple global bloom, existing
record in old partition is deleted appropriately.
+ */
+ @Test
+ public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws
Exception {
+ testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE,
getConfig(),
+ HoodieWriteClient::upsert);
+ }
+
+ /**
+ * This test ensures in a global bloom when update partition path is set to
true in config, if an incoming record has mismatched partition
+ * compared to whats in storage, then appropriate actions are taken. i.e.
old record is deleted in old partition and new one is inserted
+ * in the new partition.
+ * test structure:
+ * 1. insert 1 batch
+ * 2. insert 2nd batch with larger no of records so that a new file group is
created for partitions
+ * 3. issue upserts to records from batch 1 with different partition path.
This should ensure records from batch 1 are deleted and new
+ * records are upserted to the new partition
+ *
+ * @param indexType index type to be tested for
+ * @param config instance of {@link HoodieWriteConfig} to use
+ * @param writeFn write function to be used for testing
+ */
+ private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // instantiate client
+
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .build()).withTimelineLayoutVersion(VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new
ArrayList<>();
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords,
newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
- String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ // Check the entire dataset has all records
+ String[] fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify one basefile per partition
+ Map<String, Integer> baseFileCounts =
getBaseFileCounts(fullPartitionPaths);
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ assertEquals(1, entry.getValue());
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // Write 2
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 =
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(newCommitTime, numRecords);
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(),
rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
- JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
- List<WriteStatus> statuses1 = result1.collect();
- assertNoWriteErrors(statuses1);
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+
+ // verify that there are more than 1 basefiles per partition
+ // we can't guarantee randomness in partitions where records are
distributed. So, verify atleast one partition has more than 1 basefile.
+ baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+ boolean hasMoreThanOneBaseFile = false;
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ if (entry.getValue() > 1) {
+ hasMoreThanOneBaseFile = true;
+ break;
+ }
+ }
+ assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more
than 1 base file after 2nd batch of writes");
+
+ // Write 3 (upserts to records from batch 1 with diff partition path)
+ newCommitTime = "003";
+
+ // update to diff partition paths
+ List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+ for (HoodieRecord rec : records) {
+ // remove older entry from expected partition path record key pairs
+ expectedPartitionPathRecKeyPairs
+ .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+ String partitionPath = rec.getPartitionPath();
+ String newPartitionPath = null;
+ if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+ } else if
(partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+ } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH))
{
+ newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+ } else {
+ throw new IllegalStateException("Unknown partition path " +
rec.getPartitionPath());
+ }
+ recordsToUpsert.add(
+ new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+ rec.getData()));
+ // populate expected partition path and record keys
+ expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath,
rec.getRecordKey()));
+ }
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(updates1, fs);
- // Check the entire dataset has all records still
- fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+ writeRecords = jsc.parallelize(recordsToUpsert, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ statuses = result.collect();
+
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(fullPartitionPaths,
expectedPartitionPathRecKeyPairs);
+ }
+
+ private void assertPartitionPathRecordKeys(String[] fullPartitionPaths,
List<Pair<String, String>> expectedPartitionPathRecKeyPairs) {
+ Dataset<Row> rows = getAllRows(fullPartitionPaths);
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs =
getActualPartitionPathAndRecordKeys(rows);
+ // verify all partitionpath, record key matches
+
assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs,
actualPartitionPathRecKeyPairs);
+ }
+
+ private List<Pair<String, String>>
getActualPartitionPathAndRecordKeys(Dataset<org.apache.spark.sql.Row> rows) {
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs = new
ArrayList<>();
+ for (Row row : rows.collectAsList()) {
+ actualPartitionPathRecKeyPairs
+ .add(Pair.of(row.getAs("_hoodie_partition_path"),
row.getAs("_row_key")));
+ }
+ return actualPartitionPathRecKeyPairs;
+ }
+
+ private Dataset<org.apache.spark.sql.Row> getAllRows(String[]
fullPartitionPaths) {
+ return HoodieClientTestUtils
+ .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+ }
+
+ private String[] getFullPartitionPaths() {
+ String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext,
fs, fullPartitionPaths).count(),
- "Must contain 100 records");
+ return fullPartitionPaths;
+ }
+
+ private Map<String, Integer> getBaseFileCounts(String[] fullPartitionPaths) {
+ return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs,
fullPartitionPaths);
+ }
+
+ private void
assertActualAndExpectedPartitionPathRecordKeyMatches(List<Pair<String, String>>
expectedPartitionPathRecKeyPairs,
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
+ // verify all partitionpath, record key matches
+ assertEquals(expectedPartitionPathRecKeyPairs.size(),
actualPartitionPathRecKeyPairs.size());
+ for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+ assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+ assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+ }
Review comment:
I will fix the expected entries to be a Set, but can't fix the actual.
we want to capture any duplicates if any. So, can't really use
assertEquals(set, list).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]