Apache9 commented on a change in pull request #305: HBASE-22577 
BufferedMutatorOverAsyncBufferedMutator.tryCompleteFuture…
URL: https://github.com/apache/hbase/pull/305#discussion_r293854876
 
 

 ##########
 File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
 ##########
 @@ -100,62 +107,64 @@ private RetriesExhaustedWithDetailsException makeError() 
{
     return new RetriesExhaustedWithDetailsException(throwables, rows, 
hostnameAndPorts);
   }
 
+  private void internalFlush() throws RetriesExhaustedWithDetailsException {
+    // should get the future array before calling mutator.flush, otherwise we 
may hit an infinite
+    // wait, since someone may add new future to the map after we calling the 
flush.
+    CompletableFuture<?>[] toWait = futures.keySet().toArray(new 
CompletableFuture<?>[0]);
+    mutator.flush();
+    try {
+      CompletableFuture.allOf(toWait).join();
+    } catch (CompletionException e) {
+      // just ignore, we will record the actual error in the errors field
+      LOG.debug("Flush failed, you should get an exception thrown to your 
code", e);
+    }
+    if (!errors.isEmpty()) {
+      RetriesExhaustedWithDetailsException error = makeError();
+      listener.onException(error, this);
+    }
+  }
+
   @Override
   public void mutate(List<? extends Mutation> mutations) throws IOException {
-    List<CompletableFuture<Void>> toBuffered = new ArrayList<>();
     List<CompletableFuture<Void>> fs = mutator.mutate(mutations);
     for (int i = 0, n = fs.size(); i < n; i++) {
       CompletableFuture<Void> toComplete = new CompletableFuture<>();
-      final int index = i;
-      addListener(fs.get(index), (r, e) -> {
+      futures.put(toComplete, toComplete);
+      Mutation mutation = mutations.get(i);
+      long heapSize = mutation.heapSize();
+      bufferedSize.addAndGet(heapSize);
+      addListener(fs.get(i), (r, e) -> {
+        futures.remove(toComplete);
+        bufferedSize.addAndGet(-heapSize);
         if (e != null) {
-          errors.add(Pair.newPair(mutations.get(index), e));
+          errors.add(Pair.newPair(mutation, e));
           toComplete.completeExceptionally(e);
         } else {
           toComplete.complete(r);
         }
       });
-      toBuffered.add(toComplete);
     }
     synchronized (this) {
-      futures.addAll(toBuffered);
-      if (futures.size() > BUFFERED_FUTURES_THRESHOLD) {
-        tryCompleteFuture();
-      }
-      if (!errors.isEmpty()) {
+      if (bufferedSize.get() > mutator.getWriteBufferSize() * 2) {
 
 Review comment:
   The comment is just below this line. It is not a good idea to just check for 
writeBufferSize as we will likely to always enter the branch, so a * 2 is used.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to