bvaradar commented on a change in pull request #1004: [HUDI-328] Adding delete
api to HoodieWriteClient
URL: https://github.com/apache/incubator-hudi/pull/1004#discussion_r347753713
##########
File path:
hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -515,6 +525,164 @@ public void testSmallInsertHandlingForInserts() throws
Exception {
inserts1.size() + inserts2.size() + insert3.size());
}
+ /**
+ * Test delete with delete api
+ */
+ @Test
+ public void testDeletesWithDeleteApi() throws Exception {
+ final String testPartitionPath = "2016/09/26";
+ final int insertSplitLimit = 100;
+ List<String> keysSoFar = new ArrayList<>();
+ // setup the small file handling params
+ HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); //
hold upto 200 records max
+ dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
+
+ HoodieWriteClient client = getHoodieWriteClient(config, false);
+
+ // Inserts => will write file1
+ String commitTime1 = "001";
+ client.startCommitWithTime(commitTime1);
+ List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1,
insertSplitLimit); // this writes ~500kb
+ Set<String> keys1 = HoodieClientTestUtils.getRecordKeys(inserts1);
+ keysSoFar.addAll(keys1);
+ JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
+ List<WriteStatus> statuses = client.upsert(insertRecordsRDD1,
commitTime1).collect();
+
+ assertNoWriteErrors(statuses);
+
+ assertEquals("Just 1 file needs to be added.", 1, statuses.size());
+ String file1 = statuses.get(0).getFileId();
+ Assert.assertEquals("file should contain 100 records",
+ readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath,
statuses.get(0).getStat().getPath()))
+ .size(),
+ 100);
+
+ // Delete 20 among 100 inserted
+ testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar);
+
+ // Insert and update 40 records
+ Pair<Set<String>, List<HoodieRecord>> updateBatch2 = testUpdates("003",
client, 40, 120);
+ keysSoFar.addAll(updateBatch2.getLeft());
+
+ // Delete 10 records among 40 updated
+ testDeletes(client, updateBatch2.getRight(), 10, file1, "004", 110,
keysSoFar);
+
+ // do another batch of updates
+ Pair<Set<String>, List<HoodieRecord>> updateBatch3 = testUpdates("005",
client, 40, 150);
+ keysSoFar.addAll(updateBatch3.getLeft());
+
+ // delete non existent keys
+ String commitTime6 = "006";
+ client.startCommitWithTime(commitTime6);
+
+ List<HoodieRecord> dummyInserts3 = dataGen.generateInserts(commitTime6,
20);
+ List<HoodieKey> hoodieKeysToDelete3 = HoodieClientTestUtils
+ .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(dummyInserts3),
20);
+ JavaRDD<HoodieKey> deleteKeys3 = jsc.parallelize(hoodieKeysToDelete3, 1);
+ statuses = client.delete(deleteKeys3, commitTime6).collect();
+ assertNoWriteErrors(statuses);
+ assertEquals("Just 0 write status for delete.", 0, statuses.size());
+
+ // Check the entire dataset has all records still
+ String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
+ for (int i = 0; i < fullPartitionPaths.length; i++) {
+ fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ }
+ assertEquals("Must contain " + 150 + " records", 150,
+ HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs,
fullPartitionPaths).count());
+
+ // delete another batch. previous delete commit should have persisted the
schema. If not,
+ // this will throw exception
+ testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140,
keysSoFar);
+ }
+
+ private Pair<Set<String>, List<HoodieRecord>> testUpdates(String commitTime,
HoodieWriteClient client,
+ int sizeToInsertAndUpdate, int expectedTotalRecords)
+ throws IOException {
+ client.startCommitWithTime(commitTime);
+ List<HoodieRecord> inserts = dataGen.generateInserts(commitTime,
sizeToInsertAndUpdate);
+ Set<String> keys = HoodieClientTestUtils.getRecordKeys(inserts);
+ List<HoodieRecord> insertsAndUpdates = new ArrayList<>();
+ insertsAndUpdates.addAll(inserts);
+ insertsAndUpdates.addAll(dataGen.generateUpdates(commitTime, inserts));
+
+ JavaRDD<HoodieRecord> insertAndUpdatesRDD =
jsc.parallelize(insertsAndUpdates, 1);
+ List<WriteStatus> statuses = client.upsert(insertAndUpdatesRDD,
commitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ // Check the entire dataset has all records still
+ String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
+ for (int i = 0; i < fullPartitionPaths.length; i++) {
+ fullPartitionPaths[i] = String.format("%s/%s/*", basePath,
dataGen.getPartitionPaths()[i]);
+ }
+ assertEquals("Must contain " + expectedTotalRecords + " records",
expectedTotalRecords,
+ HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs,
fullPartitionPaths).count());
+ return Pair.of(keys, inserts);
+ }
+
+ private void testDeletes(HoodieWriteClient client, List<HoodieRecord>
previousRecords, int sizeToDelete,
Review comment:
should be ok
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services