lw309637554 commented on a change in pull request #2275:
URL: https://github.com/apache/hudi/pull/2275#discussion_r548934271



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -681,6 +698,75 @@ private void 
assertActualAndExpectedPartitionPathRecordKeyMatches(Set<Pair<Strin
     }
   }
 
+  private Pair<List<WriteStatus>, List<HoodieRecord>> 
insertBatchRecords(SparkRDDWriteClient client, String commitTime,
+                                                                         
Integer recordNum, int expectStatueSize) {
+    client.startCommitWithTime(commitTime);
+    List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime, 
recordNum);
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
+    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime).collect();
+    assertNoWriteErrors(statuses);
+    assertEquals(expectStatueSize, statuses.size(), "check expect statue 
size.");
+    return Pair.of(statuses, inserts1);
+  }
+
+  @Test
+  public void testUpdateRejectForClustering() throws IOException {
+    final String testPartitionPath = "2016/09/26";
+    dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+    Properties props = new Properties();
+    props.setProperty("hoodie.clustering.async", "true");
+    HoodieWriteConfig config = getSmallInsertWriteConfig(100,
+        TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), props);
+    SparkRDDWriteClient client = getHoodieWriteClient(config, false);
+    HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkTable.create(config, context, metaClient);
+
+    //1. insert to generate 2 file group
+    String commitTime1 = "001";
+    Pair<List<WriteStatus>, List<HoodieRecord>> upsertResult = 
insertBatchRecords(client, commitTime1, 600, 2);
+    List<WriteStatus> statuses = upsertResult.getKey();
+    List<HoodieRecord> inserts1 = upsertResult.getValue();
+    List<String> fileGroupIds1 = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+        .map(fileGroup -> 
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+    assertEquals(2, fileGroupIds1.size());
+
+    // 2. generate clustering plan for fileGroupIds1 file groups
+    String commitTime2 = "002";
+    List<List<FileSlice>> firstInsertFileSlicesList = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+        .map(fileGroup -> 
fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList());
+    List<FileSlice>[] fileSlices = 
(List<FileSlice>[])firstInsertFileSlicesList.toArray(new 
List[firstInsertFileSlicesList.size()]);
+    createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices);
+
+    // 3. insert one record with no updating reject exception, and not merge 
the small file, just generate a new file group
+    String commitTime3 = "003";
+    statuses = insertBatchRecords(client, commitTime3, 1, 1).getKey();
+    List<String> fileGroupIds2 = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+        .map(fileGroup -> 
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+    assertEquals(3, fileGroupIds2.size());
+
+
+    // 4. update one record for the clustering two file groups, throw reject 
update exception
+    String commitTime4 = "004";
+    client.startCommitWithTime(commitTime4);
+    List<HoodieRecord> insertsAndUpdates3 = new ArrayList<>();
+    insertsAndUpdates3.addAll(dataGen.generateUpdates(commitTime4, inserts1));
+    assertNoWriteErrors(statuses);

Review comment:
       have remove it , because in insertBatchRecords have do it .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to