v3nkatesh commented on a change in pull request #685: default implementation
for HBase index qps allocator
URL: https://github.com/apache/incubator-hudi/pull/685#discussion_r286607404
##########
File path:
hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
##########
@@ -370,22 +411,38 @@ private void setPutBatchSize(JavaRDD<WriteStatus>
writeStatusRDD,
If a writeStatus has any insert, it implies that the corresponding
task contacts HBase for
doing puts, since we only do puts for inserts from HBaseIndex.
*/
- int hbasePutAccessParallelism =
getHBasePutAccessParallelism(writeStatusRDD);
+ final Tuple2<Long, Integer> numPutsParallelismTuple =
getHBasePutAccessParallelism(writeStatusRDD);
+ final long numPuts = numPutsParallelismTuple._1;
+ final int hbasePutsParallelism = numPutsParallelismTuple._2;
+ this.numRegionServersForTable = getNumRegionServersAliveForTable();
+ final float desiredQPSFraction =
hBaseIndexQPSResourceAllocator.getQPSFractionForPutsTime(numPuts,
+ this.numRegionServersForTable);
+ logger.info("Desired QPSFraction :" + desiredQPSFraction);
+ logger.info("Number HBase puts :" + numPuts);
+ logger.info("Hbase Puts Parallelism :" + hbasePutsParallelism);
+ final float availableQpsFraction =
hBaseIndexQPSResourceAllocator.getQPSFraction(desiredQPSFraction, numPuts);
+ logger.info("Allocated QPS Fraction :" + availableQpsFraction);
multiPutBatchSize = putBatchSizeCalculator
.getBatchSize(
- getNumRegionServersAliveForTable(),
+ numRegionServersForTable,
maxQpsPerRegionServer,
- hbasePutAccessParallelism,
+ hbasePutsParallelism,
maxExecutors,
SLEEP_TIME_MILLISECONDS,
- qpsFraction);
+ availableQpsFraction);
+ logger.info("multiPutBatchSize :" + multiPutBatchSize);
}
}
@VisibleForTesting
- public int getHBasePutAccessParallelism(final JavaRDD<WriteStatus>
writeStatusRDD) {
- return Math.toIntExact(Math.max(writeStatusRDD
- .filter(w -> w.getStat().getNumInserts() > 0).count(), 1));
+ public Tuple2<Long, Integer> getHBasePutAccessParallelism(final
JavaRDD<WriteStatus> writeStatusRDD) {
+ final JavaRDD<WriteStatus> filteredWriteStatusRDD =
writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0);
+ final JavaPairRDD<Long, Integer> putsParallelismRDD =
filteredWriteStatusRDD
+ .mapToPair(w -> new
Tuple2<>(w.getStat().getNumInserts(), 1));
+
+ final Tuple2<Long, Integer> putsAndParallelismTuple = putsParallelismRDD
Review comment:
I want to calculate both 1) the number of inserts as well as 2)the number of
WriteStatus that has inserts, while reducing.
I could do a count on `writeStatusRDD.filter(w ->
w.getStat().getNumInserts() > 0)` but since I want to reduce the rdd anyway, I
am counting there. Let me know if I am missing anything.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services