satishkotha commented on a change in pull request #1484: [HUDI-316] : Hbase qps 
repartition writestatus
URL: https://github.com/apache/incubator-hudi/pull/1484#discussion_r407805506
 
 

 ##########
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
 ##########
 @@ -322,66 +347,94 @@ private boolean checkIfValidCommit(HoodieTableMetaClient 
metaClient, String comm
   /**
    * Helper method to facilitate performing mutations (including puts and 
deletes) in Hbase.
    */
-  private void doMutations(BufferedMutator mutator, List<Mutation> mutations) 
throws IOException {
+  private void doMutations(BufferedMutator mutator, List<Mutation> mutations, 
RateLimiter limiter) throws IOException {
     if (mutations.isEmpty()) {
       return;
     }
+    // report number of operations to account per second with rate limiter.
+    // If #limiter.getRate() operations are acquired within 1 second, 
ratelimiter will limit the rest of calls
+    // for within that second
+    limiter.acquire(mutations.size());
     mutator.mutate(mutations);
     mutator.flush();
     mutations.clear();
-    sleepForTime(SLEEP_TIME_MILLISECONDS);
-  }
-
-  private static void sleepForTime(int sleepTimeMs) {
-    try {
-      Thread.sleep(sleepTimeMs);
-    } catch (InterruptedException e) {
-      LOG.error("Sleep interrupted during throttling", e);
-      throw new RuntimeException(e);
-    }
   }
 
   @Override
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> 
writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
-    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = 
createQPSResourceAllocator(this.config);
-    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
-    JavaRDD<WriteStatus> writeStatusJavaRDD = 
writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
+    final Option<Float> desiredQPSFraction =  
calculateQPSFraction(writeStatusRDD, hBaseIndexQPSResourceAllocator);
+    // Map each fileId that has inserts to a unique partition Id. This will be 
used while
+    // repartitioning RDD<WriteStatus>
+    int partitionIndex = 0;
+    final List<String> fileIds = writeStatusRDD.filter(w -> 
w.getStat().getNumInserts() > 0)
+                                   .map(w -> w.getFileId()).collect();
+    for (final String fileId : fileIds) {
+      this.fileIdPartitionMap.put(fileId, partitionIndex++);
+    }
+    JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 
? writeStatusRDD :
+                                          writeStatusRDD.mapToPair(w -> new 
Tuple2<>(w.getFileId(), w))
+                                            .partitionBy(new 
WriteStatusPartitioner(this.fileIdPartitionMap,
+                                              this.numWriteStatusWithInserts))
+                                            .map(w -> w._2());
+    acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
+    LOG.info("multiPutBatchSize before hbase puts: " + this.multiPutBatchSize);
+    JavaRDD<WriteStatus> writeStatusJavaRDD = 
partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
+        true);
     // caching the index updated status RDD
     writeStatusJavaRDD = 
writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps()));
+    // force trigger update location(hbase puts)
+    writeStatusJavaRDD.count();
+    this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
 
 Review comment:
   Can you help me understand this code? why do we need to force trigger here? 
Is this just to releaseQPSResources? releaseQPSResources seems to be doing 
nothing (at least default implementation, are there other implementations 
outside hoodie?). Is it really important to release here as opposed to doing it 
in 'close()' (earlier behavior)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to