xushiyan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r454059760



##########
File path: 
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record 
in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = 
getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and 
ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws 
Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, 
getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing 
record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws 
Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, 
getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to 
true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. 
old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is 
created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. 
This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, 
HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), 
metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), 
metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 
numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new 
ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), 
rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, 
newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new 
String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, 
dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, 
expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = 
getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, 
fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = 
dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = 
dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), 
rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, 
expectedPartitionPathRecKeyPairs);
+
+    // verify that there are more than 1 basefiles per partition
+    // we can't guarantee randomness in partitions where records are 
distributed. So, verify atleast one partition has more than 1 basefile.
+    baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    boolean hasMoreThanOneBaseFile = false;
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      if (entry.getValue() > 1) {
+        hasMoreThanOneBaseFile = true;
+        break;
+      }
+    }
+    assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more 
than 1 base file after 2nd batch of writes");
+
+    // Write 3 (upserts to records from batch 1 with diff partition path)
+    newCommitTime = "003";
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      // remove older entry from expected partition path record key pairs
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));

Review comment:
       ah ok i see. thanks for clarifying




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to