satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r408345007
 
 

 ##########
 File path: hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
 ##########
 @@ -329,47 +332,140 @@ public void testPutBatchSizeCalculation() {
     // All asserts cases below are derived out of the first
     // example below, with change in one parameter at a time.
 
-    int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 
100, 0.1f);
-    // Expected batchSize is 8 because in that case, total request sent in one 
second is below
-    // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 
(numRegionServers) * 0.1 (qpsFraction)) => 16000
-    // We assume requests get distributed to Region Servers uniformly, so each 
RS gets 1600 request
-    // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
-    Assert.assertEquals(putBatchSize, 8);
+    int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 
0.1f);
+    // Total puts that can be sent  in 1 second = (10 * 16667 * 0.1) = 16,667
+    // Total puts per batch will be (16,667 / parallelism) = 83.335, where 200 
is the maxExecutors
+    Assert.assertEquals(putBatchSize, 83);
 
     // Number of Region Servers are halved, total requests sent in a second 
are also halved, so batchSize is also halved
-    int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 
100, 0.1f);
-    Assert.assertEquals(putBatchSize2, 4);
+    int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 
0.1f);
+    Assert.assertEquals(putBatchSize2, 41);
 
     // If the parallelism is halved, batchSize has to double
-    int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 
100, 0.1f);
-    Assert.assertEquals(putBatchSize3, 16);
+    int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 
0.1f);
+    Assert.assertEquals(putBatchSize3, 166);
 
     // If the parallelism is halved, batchSize has to double.
     // This time parallelism is driven by numTasks rather than numExecutors
-    int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 
100, 0.1f);
-    Assert.assertEquals(putBatchSize4, 16);
+    int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 
0.1f);
+    Assert.assertEquals(putBatchSize4, 166);
 
     // If sleepTimeMs is halved, batchSize has to halve
-    int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 
100, 0.05f);
-    Assert.assertEquals(putBatchSize5, 4);
+    int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 
0.05f);
+    Assert.assertEquals(putBatchSize5, 41);
 
     // If maxQPSPerRegionServer is doubled, batchSize also doubles
-    int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 
100, 0.1f);
-    Assert.assertEquals(putBatchSize6, 16);
+    int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 
0.1f);
+    Assert.assertEquals(putBatchSize6, 166);
   }
 
   @Test
   public void testsHBasePutAccessParallelism() {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
     final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
-        Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), 
getSampleWriteStatus(10, 0)), 10);
+        Arrays.asList(
+          getSampleWriteStatus(0, 2),
+          getSampleWriteStatus(2, 3),
+          getSampleWriteStatus(4, 3),
+          getSampleWriteStatus(6, 3),
+          getSampleWriteStatus(8, 0)),
+        10);
     final Tuple2<Long, Integer> tuple = 
index.getHBasePutAccessParallelism(writeStatusRDD);
     final int hbasePutAccessParallelism = 
Integer.parseInt(tuple._2.toString());
     final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
     Assert.assertEquals(10, writeStatusRDD.getNumPartitions());
-    Assert.assertEquals(2, hbasePutAccessParallelism);
-    Assert.assertEquals(11, hbaseNumPuts);
+    Assert.assertEquals(4, hbasePutAccessParallelism);
+    Assert.assertEquals(20, hbaseNumPuts);
+  }
+
+  @Test
+  public void testsWriteStatusPartitioner() {
+    HoodieWriteConfig config = getConfig();
+    HBaseIndex index = new HBaseIndex(config);
+    int parallelism = 4;
+    final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
+        Arrays.asList(
+          getSampleWriteStatusWithFileId(0, 2),
+          getSampleWriteStatusWithFileId(2, 3),
+          getSampleWriteStatusWithFileId(4, 3),
+          getSampleWriteStatusWithFileId(0, 3),
+          getSampleWriteStatusWithFileId(11, 0)), parallelism);
+    int partitionIndex = 0;
+    final Map<String, Integer> fileIdPartitionMap = new HashMap<>();
+
+    final List<String> fileIds = writeStatusRDD.filter(w -> 
w.getStat().getNumInserts() > 0)
 
 Review comment:
   lot of code in this test seems like repetition from source code. consider 
refactoring this part into a library to reuse in tests if needed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to