satishkotha commented on a change in pull request #2254:
URL: https://github.com/apache/hudi/pull/2254#discussion_r545364740



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -999,6 +999,103 @@ private void verifyInsertOverwritePartitionHandling(int 
batch1RecordsCount, int
     verifyRecordsWritten(commitTime2, inserts2, statuses);
   }
 
+  /**
+   * Test scenario of writing fewer file groups for first partition than 
second an third partition.
+   */
+  @Test
+  public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition() 
throws Exception {
+    verifyDeletePartitionsHandling(1000, 3000, 3000);
+  }
+
+  /**
+   * Test scenario of writing similar number file groups in partition.
+   */
+  @Test
+  public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords() 
throws Exception {
+    verifyDeletePartitionsHandling(3000, 3000, 3000);
+  }
+
+  /**
+   * Test scenario of writing more file groups for first partition than second 
an third partition.
+   */
+  @Test
+  public void 
verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition() 
throws Exception {
+    verifyDeletePartitionsHandling(3000, 1000, 1000);
+  }
+
+  private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient 
client, int recordsCount, String commitTime1, String partitionPath) {
+    client.startCommitWithTime(commitTime1);
+    List<HoodieRecord> inserts1 = 
dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath);
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
+    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime1).collect();
+    assertNoWriteErrors(statuses);
+    Set<String> batchBuckets = statuses.stream().map(s -> 
s.getFileId()).collect(Collectors.toSet());
+    verifyRecordsWritten(commitTime1, inserts1, statuses);
+    return batchBuckets;
+  }
+
+  private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client, 
String commitTime, List<String> deletePartitionPath) {
+    client.startCommitWithTime(commitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+    HoodieWriteResult writeResult = 
client.deletePartitions(deletePartitionPath, commitTime);
+    Set<String> deletePartitionReplaceFileIds =
+        writeResult.getPartitionToReplaceFileIds().entrySet()
+            .stream().flatMap(entry -> 
entry.getValue().stream()).collect(Collectors.toSet());
+    return deletePartitionReplaceFileIds;
+  }
+
+  /**
+   *  1) Do write1 (upsert) with 'batch1RecordsCount' number of records for 
first partition.
+   *  2) Do write2 (upsert) with 'batch2RecordsCount' number of records for 
second partition.
+   *  3) Do write3 (upsert) with 'batch3RecordsCount' number of records for 
third partition.
+   *  4) delete first partition and check result.
+   *  5) delete second and third partition and check result.
+   *
+   */
+  private void verifyDeletePartitionsHandling(int batch1RecordsCount, int 
batch2RecordsCount, int batch3RecordsCount) throws Exception {
+    HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false);
+    SparkRDDWriteClient client = getHoodieWriteClient(config, false);
+    dataGen = new HoodieTestDataGenerator();
+
+    // Do Inserts for DEFAULT_FIRST_PARTITION_PATH
+    String commitTime1 = "001";
+    Set<String> batch1Buckets =
+        this.insertPartitionRecordsWithCommit(client, batch1RecordsCount, 
commitTime1, DEFAULT_FIRST_PARTITION_PATH);
+
+    // Do Inserts for DEFAULT_SECOND_PARTITION_PATH
+    String commitTime2 = "002";
+    Set<String> batch2Buckets =
+        this.insertPartitionRecordsWithCommit(client, batch2RecordsCount, 
commitTime2, DEFAULT_SECOND_PARTITION_PATH);
+
+    // Do Inserts for DEFAULT_THIRD_PARTITION_PATH
+    String commitTime3 = "003";
+    Set<String> batch3Buckets =
+        this.insertPartitionRecordsWithCommit(client, batch3RecordsCount, 
commitTime3, DEFAULT_THIRD_PARTITION_PATH);
+
+    // delete DEFAULT_FIRST_PARTITION_PATH
+    String commitTime4 = "004";
+    Set<String> deletePartitionReplaceFileIds1 =
+        deletePartitionWithCommit(client, commitTime4, 
Arrays.asList(DEFAULT_FIRST_PARTITION_PATH));
+    assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
+    List<HoodieBaseFile> baseFiles = 
HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
+        String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH));
+    assertEquals(0, baseFiles.size());
+

Review comment:
       you may want to assert that other partitions still have base files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to