satishkotha commented on a change in pull request #2254:
URL: https://github.com/apache/hudi/pull/2254#discussion_r545364740
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -999,6 +999,103 @@ private void verifyInsertOverwritePartitionHandling(int
batch1RecordsCount, int
verifyRecordsWritten(commitTime2, inserts2, statuses);
}
+ /**
+ * Test scenario of writing fewer file groups for first partition than
second an third partition.
+ */
+ @Test
+ public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition()
throws Exception {
+ verifyDeletePartitionsHandling(1000, 3000, 3000);
+ }
+
+ /**
+ * Test scenario of writing similar number file groups in partition.
+ */
+ @Test
+ public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords()
throws Exception {
+ verifyDeletePartitionsHandling(3000, 3000, 3000);
+ }
+
+ /**
+ * Test scenario of writing more file groups for first partition than second
an third partition.
+ */
+ @Test
+ public void
verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition()
throws Exception {
+ verifyDeletePartitionsHandling(3000, 1000, 1000);
+ }
+
+ private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient
client, int recordsCount, String commitTime1, String partitionPath) {
+ client.startCommitWithTime(commitTime1);
+ List<HoodieRecord> inserts1 =
dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath);
+ JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
+ List<WriteStatus> statuses = client.upsert(insertRecordsRDD1,
commitTime1).collect();
+ assertNoWriteErrors(statuses);
+ Set<String> batchBuckets = statuses.stream().map(s ->
s.getFileId()).collect(Collectors.toSet());
+ verifyRecordsWritten(commitTime1, inserts1, statuses);
+ return batchBuckets;
+ }
+
+ private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client,
String commitTime, List<String> deletePartitionPath) {
+ client.startCommitWithTime(commitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
+ HoodieWriteResult writeResult =
client.deletePartitions(deletePartitionPath, commitTime);
+ Set<String> deletePartitionReplaceFileIds =
+ writeResult.getPartitionToReplaceFileIds().entrySet()
+ .stream().flatMap(entry ->
entry.getValue().stream()).collect(Collectors.toSet());
+ return deletePartitionReplaceFileIds;
+ }
+
+ /**
+ * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for
first partition.
+ * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for
second partition.
+ * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for
third partition.
+ * 4) delete first partition and check result.
+ * 5) delete second and third partition and check result.
+ *
+ */
+ private void verifyDeletePartitionsHandling(int batch1RecordsCount, int
batch2RecordsCount, int batch3RecordsCount) throws Exception {
+ HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false);
+ SparkRDDWriteClient client = getHoodieWriteClient(config, false);
+ dataGen = new HoodieTestDataGenerator();
+
+ // Do Inserts for DEFAULT_FIRST_PARTITION_PATH
+ String commitTime1 = "001";
+ Set<String> batch1Buckets =
+ this.insertPartitionRecordsWithCommit(client, batch1RecordsCount,
commitTime1, DEFAULT_FIRST_PARTITION_PATH);
+
+ // Do Inserts for DEFAULT_SECOND_PARTITION_PATH
+ String commitTime2 = "002";
+ Set<String> batch2Buckets =
+ this.insertPartitionRecordsWithCommit(client, batch2RecordsCount,
commitTime2, DEFAULT_SECOND_PARTITION_PATH);
+
+ // Do Inserts for DEFAULT_THIRD_PARTITION_PATH
+ String commitTime3 = "003";
+ Set<String> batch3Buckets =
+ this.insertPartitionRecordsWithCommit(client, batch3RecordsCount,
commitTime3, DEFAULT_THIRD_PARTITION_PATH);
+
+ // delete DEFAULT_FIRST_PARTITION_PATH
+ String commitTime4 = "004";
+ Set<String> deletePartitionReplaceFileIds1 =
+ deletePartitionWithCommit(client, commitTime4,
Arrays.asList(DEFAULT_FIRST_PARTITION_PATH));
+ assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
+ List<HoodieBaseFile> baseFiles =
HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
+ String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH));
+ assertEquals(0, baseFiles.size());
+
Review comment:
you may want to assert that other partitions still have base files.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]