v3nkatesh commented on a change in pull request #685: default implementation 
for HBase index qps allocator
URL: https://github.com/apache/incubator-hudi/pull/685#discussion_r286638963
 
 

 ##########
 File path: 
hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
 ##########
 @@ -370,22 +411,38 @@ private void setPutBatchSize(JavaRDD<WriteStatus> 
writeStatusRDD,
         If a writeStatus has any insert, it implies that the corresponding 
task contacts HBase for
         doing puts, since we only do puts for inserts from HBaseIndex.
        */
-      int hbasePutAccessParallelism = 
getHBasePutAccessParallelism(writeStatusRDD);
+      final Tuple2<Long, Integer> numPutsParallelismTuple  = 
getHBasePutAccessParallelism(writeStatusRDD);
+      final long numPuts = numPutsParallelismTuple._1;
+      final int hbasePutsParallelism = numPutsParallelismTuple._2;
+      this.numRegionServersForTable = getNumRegionServersAliveForTable();
+      final float desiredQPSFraction = 
hBaseIndexQPSResourceAllocator.getQPSFractionForPutsTime(numPuts,
+          this.numRegionServersForTable);
+      logger.info("Desired QPSFraction :" + desiredQPSFraction);
+      logger.info("Number HBase puts :" + numPuts);
+      logger.info("Hbase Puts Parallelism :" + hbasePutsParallelism);
+      final float availableQpsFraction = 
hBaseIndexQPSResourceAllocator.getQPSFraction(desiredQPSFraction, numPuts);
+      logger.info("Allocated QPS Fraction :" + availableQpsFraction);
       multiPutBatchSize = putBatchSizeCalculator
           .getBatchSize(
-              getNumRegionServersAliveForTable(),
+              numRegionServersForTable,
               maxQpsPerRegionServer,
-              hbasePutAccessParallelism,
+              hbasePutsParallelism,
               maxExecutors,
               SLEEP_TIME_MILLISECONDS,
-              qpsFraction);
+              availableQpsFraction);
+      logger.info("multiPutBatchSize :" + multiPutBatchSize);
     }
   }
 
   @VisibleForTesting
-  public int getHBasePutAccessParallelism(final JavaRDD<WriteStatus> 
writeStatusRDD) {
-    return Math.toIntExact(Math.max(writeStatusRDD
-        .filter(w -> w.getStat().getNumInserts() > 0).count(), 1));
+  public Tuple2<Long, Integer> getHBasePutAccessParallelism(final 
JavaRDD<WriteStatus> writeStatusRDD) {
+    final JavaRDD<WriteStatus> filteredWriteStatusRDD = 
writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0);
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to