satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps repartition writestatus URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407807929
########## File path: hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java ########## @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm /** * Helper method to facilitate performing mutations (including puts and deletes) in Hbase. */ - private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException { + private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException { if (mutations.isEmpty()) { return; } + // report number of operations to account per second with rate limiter. + // If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls + // for within that second + limiter.acquire(mutations.size()); mutator.mutate(mutations); mutator.flush(); mutations.clear(); - sleepForTime(SLEEP_TIME_MILLISECONDS); - } - - private static void sleepForTime(int sleepTimeMs) { - try { - Thread.sleep(sleepTimeMs); - } catch (InterruptedException e) { - LOG.error("Sleep interrupted during throttling", e); - throw new RuntimeException(e); - } } @Override public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) { - final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); - setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); - LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize); - JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); + final Option<Float> desiredQPSFraction = calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator); + // Map each fileId that has inserts to a unique partition Id. This will be used while + // repartitioning RDD<WriteStatus> + int partitionIndex = 0; + final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0) + .map(w -> w.getFileId()).collect(); + for (final String fileId : fileIds) { + this.fileIdPartitionMap.put(fileId, partitionIndex++); + } + JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD : + writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w)) + .partitionBy(new WriteStatusPartitioner(this.fileIdPartitionMap, + this.numWriteStatusWithInserts)) + .map(w -> w._2()); + acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc); + LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize); + JavaRDD<WriteStatus> writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(), + true); // caching the index updated status RDD writeStatusJavaRDD = writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps())); + // force trigger update location(hbase puts) + writeStatusJavaRDD.count(); + this.hBaseIndexQPSResourceAllocator.releaseQPSResources(); return writeStatusJavaRDD; } - private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD, - HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator, final JavaSparkContext jsc) { + private Option<Float> calculateQPSFraction(JavaRDD<WriteStatus> writeStatusRDD, + HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator) { Review comment: Not directly related to your change, so feel free to ignore this comment. But hBaseIndexQPSResourceAllocator is instance variable. why is this again passed as argument. This seems like a consistent pattern in this class. Because we are also using exact same name for local variable, it masks instance variable and can become easily error prone if the two variables evolve to mean different things. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services