nsivabalan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r450809602



##########
File path: 
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record 
in old partition
+   * is deleted appropriately.
+   * @throws Exception
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = 
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and 
ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws 
Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, 
getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * Tests when update partition path is set in simple global bloom, existing 
record in
+   * old partition is deleted appropriately.
+   * @throws Exception
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws 
Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, 
getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
+      HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // Force using older timeline layout
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true).build())
+        .withTimelineLayoutVersion(
+            VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), 
metaClient.getBasePath(),
+        metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), 
metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
 
     // Write 1 (only inserts)
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 
numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new 
ArrayList<>();
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), 
rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, 
newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
+    // Check the entire dataset has all records still
     String[] fullPartitionPaths = new 
String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, 
dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, 
fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+    Dataset<Row> rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = new 
ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), 
row.getAs("_row_key")));
+    }
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), 
actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = 
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = 
dataGen.generateInserts(newCommitTime, numRecords);
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), 
rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
+
+    // Check the entire dataset has all records still
+    fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, 
dataGen.getPartitionPaths()[i]);
+    }
+
+    rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), 
row.getAs("_row_key")));
+    }
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), 
actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    // Write 2 (updates)
+    newCommitTime = "003";
+    records = records.subList(5, 10);
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+      String partitionPath = rec.getPartitionPath();
+      String newPartitionPath = null;
+      if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+      } else if 
(partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) 
{
+        newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+      } else {
+        throw new IllegalStateException("Unknown partition path " + 
rec.getPartitionPath());
+      }
+      recordsToUpsert.add(
+          new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+              rec.getData()));
+      expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, 
rec.getRecordKey()));
+    }
+
+    writeRecords = jsc.parallelize(recordsToUpsert, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    statuses = result.collect();
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(updates1, fs);
     // Check the entire dataset has all records still
     fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, 
dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, 
fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+
+    rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), 
row.getAs("_row_key")));
+    }
+
+    // verify all partitionpath, record key matches

Review comment:
       I have refactored the test now. You can check it out. have added 
assertions to verify basefile counts. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to