This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 04ecc7712e027cac97545b7366cae8b75fd46247
Author: zhangduo <[email protected]>
AuthorDate: Sat Jun 15 10:37:06 2019 +0800

    HBASE-22577 BufferedMutatorOverAsyncBufferedMutator.tryCompleteFuture 
consume too much CPU time
---
 .../BufferedMutatorOverAsyncBufferedMutator.java   | 86 +++++++++++++---------
 .../hadoop/hbase/client/TestBufferedMutator.java   |  3 +-
 2 files changed, 52 insertions(+), 37 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
index a7d4595..b8bc55c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
@@ -23,16 +23,20 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@link BufferedMutator} implementation based on {@link 
AsyncBufferedMutator}.
@@ -40,17 +44,20 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator {
 
+  private static final Logger LOG =
+    LoggerFactory.getLogger(BufferedMutatorOverAsyncBufferedMutator.class);
+
   private final AsyncBufferedMutator mutator;
 
   private final ExceptionListener listener;
 
-  private List<CompletableFuture<Void>> futures = new ArrayList<>();
+  private final Set<CompletableFuture<Void>> futures = 
ConcurrentHashMap.newKeySet();
+
+  private final AtomicLong bufferedSize = new AtomicLong(0);
 
   private final ConcurrentLinkedQueue<Pair<Mutation, Throwable>> errors =
     new ConcurrentLinkedQueue<>();
 
-  private final static int BUFFERED_FUTURES_THRESHOLD = 1024;
-
   BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator,
       ExceptionListener listener) {
     this.mutator = mutator;
@@ -100,62 +107,69 @@ class BufferedMutatorOverAsyncBufferedMutator implements 
BufferedMutator {
     return new RetriesExhaustedWithDetailsException(throwables, rows, 
hostnameAndPorts);
   }
 
+  private void internalFlush() throws RetriesExhaustedWithDetailsException {
+    // should get the future array before calling mutator.flush, otherwise we 
may hit an infinite
+    // wait, since someone may add new future to the map after we calling the 
flush.
+    CompletableFuture<?>[] toWait = futures.toArray(new 
CompletableFuture<?>[0]);
+    mutator.flush();
+    try {
+      CompletableFuture.allOf(toWait).join();
+    } catch (CompletionException e) {
+      // just ignore, we will record the actual error in the errors field
+      LOG.debug("Flush failed, you should get an exception thrown to your 
code", e);
+    }
+    if (!errors.isEmpty()) {
+      RetriesExhaustedWithDetailsException error = makeError();
+      listener.onException(error, this);
+    }
+  }
+
   @Override
   public void mutate(List<? extends Mutation> mutations) throws IOException {
-    List<CompletableFuture<Void>> toBuffered = new ArrayList<>();
     List<CompletableFuture<Void>> fs = mutator.mutate(mutations);
     for (int i = 0, n = fs.size(); i < n; i++) {
       CompletableFuture<Void> toComplete = new CompletableFuture<>();
-      final int index = i;
-      addListener(fs.get(index), (r, e) -> {
+      futures.add(toComplete);
+      Mutation mutation = mutations.get(i);
+      long heapSize = mutation.heapSize();
+      bufferedSize.addAndGet(heapSize);
+      addListener(fs.get(i), (r, e) -> {
+        futures.remove(toComplete);
+        bufferedSize.addAndGet(-heapSize);
         if (e != null) {
-          errors.add(Pair.newPair(mutations.get(index), e));
+          errors.add(Pair.newPair(mutation, e));
           toComplete.completeExceptionally(e);
         } else {
           toComplete.complete(r);
         }
       });
-      toBuffered.add(toComplete);
     }
     synchronized (this) {
-      futures.addAll(toBuffered);
-      if (futures.size() > BUFFERED_FUTURES_THRESHOLD) {
-        tryCompleteFuture();
-      }
-      if (!errors.isEmpty()) {
+      if (bufferedSize.get() > mutator.getWriteBufferSize() * 2) {
+        // We have too many mutations which are not completed yet, let's call 
a flush to release the
+        // memory to prevent OOM
+        // We use buffer size * 2 is because that, the async buffered mutator 
will flush
+        // automatically when the write buffer size limit is reached, so 
usually we do not need to
+        // call flush explicitly if the buffered size is only a little larger 
than the buffer size
+        // limit. But if the buffered size is too large(2 times of the buffer 
size), we still need
+        // to block here to prevent OOM.
+        internalFlush();
+      } else if (!errors.isEmpty()) {
         RetriesExhaustedWithDetailsException error = makeError();
         listener.onException(error, this);
       }
     }
   }
 
-  private void tryCompleteFuture() {
-    futures = futures.stream().filter(f -> 
!f.isDone()).collect(Collectors.toList());
-  }
-
   @Override
-  public void close() throws IOException {
-    flush();
+  public synchronized void close() throws IOException {
+    internalFlush();
     mutator.close();
   }
 
   @Override
-  public void flush() throws IOException {
-    mutator.flush();
-    synchronized (this) {
-      List<CompletableFuture<Void>> toComplete = this.futures;
-      this.futures = new ArrayList<>();
-      try {
-        CompletableFuture.allOf(toComplete.toArray(new 
CompletableFuture<?>[toComplete.size()]))
-          .join();
-      } catch (CompletionException e) {
-        // just ignore, we will record the actual error in the errors field
-      }
-      if (!errors.isEmpty()) {
-        RetriesExhaustedWithDetailsException error = makeError();
-        listener.onException(error, this);
-      }
-    }
+  public synchronized void flush() throws IOException {
+    internalFlush();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
index 23e69ee..3c660d9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
@@ -66,7 +66,8 @@ public class TestBufferedMutator {
 
   @Test
   public void test() throws Exception {
-    try (BufferedMutator mutator = 
TEST_UTIL.getConnection().getBufferedMutator(TABLE_NAME)) {
+    try (BufferedMutator mutator = TEST_UTIL.getConnection()
+      .getBufferedMutator(new 
BufferedMutatorParams(TABLE_NAME).writeBufferSize(64 * 1024))) {
       mutator.mutate(IntStream.range(0, COUNT / 2)
         .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
         .collect(Collectors.toList()));

Reply via email to