Repository: hbase
Updated Branches:
  refs/heads/master 25ee5f7f8 -> d12eb7a4a


HBASE-18347 Implement a BufferedMutator for async client


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d12eb7a4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d12eb7a4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d12eb7a4

Branch: refs/heads/master
Commit: d12eb7a4aae5c2dc7b230bf2a12d2313b93b8ba9
Parents: 25ee5f7
Author: zhangduo <zhang...@apache.org>
Authored: Mon Aug 21 18:37:26 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Aug 24 15:55:43 2017 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncBufferedMutator.java      |  84 +++++++++++
 .../client/AsyncBufferedMutatorBuilder.java     |  85 +++++++++++
 .../client/AsyncBufferedMutatorBuilderImpl.java |  85 +++++++++++
 .../hbase/client/AsyncBufferedMutatorImpl.java  | 144 +++++++++++++++++++
 .../hadoop/hbase/client/AsyncConnection.java    |  39 +++++
 .../client/AsyncConnectionConfiguration.java    |   9 ++
 .../hbase/client/AsyncConnectionImpl.java       |  11 ++
 .../hbase/client/TestAsyncBufferMutator.java    | 128 +++++++++++++++++
 8 files changed, 585 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d12eb7a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
new file mode 100644
index 0000000..ad9279b
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Used to communicate with a single HBase table in batches. Obtain an 
instance from a
+ * {@link AsyncConnection} and call {@link #close()} afterwards.
+ * <p>
+ * The implementation is required to be thread safe.
+ */
+@InterfaceAudience.Public
+public interface AsyncBufferedMutator extends Closeable {
+
+  /**
+   * Gets the fully qualified table name instance of the table that this
+   * {@code AsyncBufferedMutator} writes to.
+   */
+  TableName getName();
+
+  /**
+   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by 
this instance.
+   * <p>
+   * The reference returned is not a copy, so any change made to it will 
affect this instance.
+   */
+  Configuration getConfiguration();
+
+  /**
+   * Sends a {@link Mutation} to the table. The mutations will be buffered and 
sent over the wire as
+   * part of a batch. Currently only supports {@link Put} and {@link Delete} 
mutations.
+   * @param mutation The data to send.
+   */
+  CompletableFuture<Void> mutate(Mutation mutation);
+
+  /**
+   * Send some {@link Mutation}s to the table. The mutations will be buffered 
and sent over the wire
+   * as part of a batch. There is no guarantee of sending entire content of 
{@code mutations} in a
+   * single batch, the implementations are free to break it up according to 
the write buffer
+   * capacity.
+   * @param mutations The data to send.
+   */
+  List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations);
+
+  /**
+   * Executes all the buffered, asynchronous operations.
+   */
+  void flush();
+
+  /**
+   * Performs a {@link #flush()} and releases any resources held.
+   */
+  @Override
+  void close();
+
+  /**
+   * Returns the maximum size in bytes of the write buffer.
+   * <p>
+   * The default value comes from the configuration parameter {@code 
hbase.client.write.buffer}.
+   * @return The size of the write buffer in bytes.
+   */
+  long getWriteBufferSize();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d12eb7a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
new file mode 100644
index 0000000..d47ba00
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * For creating {@link AsyncBufferedMutator}.
+ */
+@InterfaceAudience.Public
+public interface AsyncBufferedMutatorBuilder {
+
+  /**
+   * Set timeout for the background flush operation.
+   */
+  AsyncBufferedMutatorBuilder setOperationTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Set timeout for each rpc request when doing background flush.
+   */
+  AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Set the base pause time for retrying. We use an exponential policy to 
generate sleep time when
+   * retrying.
+   */
+  AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit);
+
+  /**
+   * Set the max retry times for an operation. Usually it is the max attempt 
times minus 1.
+   * <p>
+   * Operation timeout and max attempt times(or max retry times) are both 
limitations for retrying,
+   * we will stop retrying when we reach any of the limitations.
+   * @see #setMaxAttempts(int)
+   * @see #setOperationTimeout(long, TimeUnit)
+   */
+  default AsyncBufferedMutatorBuilder setMaxRetries(int maxRetries) {
+    return setMaxAttempts(retries2Attempts(maxRetries));
+  }
+
+  /**
+   * Set the max attempt times for an operation. Usually it is the max retry 
times plus 1. Operation
+   * timeout and max attempt times(or max retry times) are both limitations 
for retrying, we will
+   * stop retrying when we reach any of the limitations.
+   * @see #setMaxRetries(int)
+   * @see #setOperationTimeout(long, TimeUnit)
+   */
+  AsyncBufferedMutatorBuilder setMaxAttempts(int maxAttempts);
+
+  /**
+   * Set the number of retries that are allowed before we start to log.
+   */
+  AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt);
+
+  /**
+   * Override the write buffer size specified by the provided {@link 
AsyncConnection}'s
+   * {@link org.apache.hadoop.conf.Configuration} instance, via the 
configuration key
+   * {@code hbase.client.write.buffer}.
+   */
+  AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize);
+
+  /**
+   * Create the {@link AsyncBufferedMutator} instance.
+   */
+  AsyncBufferedMutator build();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d12eb7a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
new file mode 100644
index 0000000..0c5ab5a
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The implementation of {@link AsyncBufferedMutatorBuilder}.
+ */
+@InterfaceAudience.Private
+class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
+
+  private final AsyncTableBuilder<? extends AsyncTableBase> tableBuilder;
+
+  private long writeBufferSize;
+
+  public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
+      AsyncTableBuilder<? extends AsyncTableBase> tableBuilder) {
+    this.tableBuilder = tableBuilder;
+    this.writeBufferSize = connConf.getWriteBufferSize();
+  }
+
+  @Override
+  public AsyncBufferedMutatorBuilder setOperationTimeout(long timeout, 
TimeUnit unit) {
+    tableBuilder.setOperationTimeout(timeout, unit);
+    return this;
+  }
+
+  @Override
+  public AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit 
unit) {
+    tableBuilder.setRpcTimeout(timeout, unit);
+    return this;
+  }
+
+  @Override
+  public AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit) {
+    tableBuilder.setRetryPause(pause, unit);
+    return this;
+  }
+
+  @Override
+  public AsyncBufferedMutatorBuilder setMaxAttempts(int maxAttempts) {
+    tableBuilder.setMaxAttempts(maxAttempts);
+    return this;
+  }
+
+  @Override
+  public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int 
startLogErrorsCnt) {
+    tableBuilder.setStartLogErrorsCnt(startLogErrorsCnt);
+    return this;
+  }
+
+  @Override
+  public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
+    Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must 
be >= 0",
+      writeBufferSize);
+    this.writeBufferSize = writeBufferSize;
+    return this;
+  }
+
+  @Override
+  public AsyncBufferedMutator build() {
+    return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d12eb7a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
new file mode 100644
index 0000000..0118017
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link 
AsyncTableBase}.
+ */
+@InterfaceAudience.Private
+class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
+
+  private final AsyncTableBase table;
+
+  private final long writeBufferSize;
+
+  private List<Mutation> mutations = new ArrayList<>();
+
+  private List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+  private long bufferedSize;
+
+  private boolean closed;
+
+  AsyncBufferedMutatorImpl(AsyncTableBase table, long writeBufferSize) {
+    this.table = table;
+    this.writeBufferSize = writeBufferSize;
+  }
+
+  @Override
+  public TableName getName() {
+    return table.getName();
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return table.getConfiguration();
+  }
+
+  private void internalFlush() {
+    List<Mutation> toSend = this.mutations;
+    if (toSend.isEmpty()) {
+      return;
+    }
+    List<CompletableFuture<Void>> toComplete = this.futures;
+    assert toSend.size() == toComplete.size();
+    this.mutations = new ArrayList<>();
+    this.futures = new ArrayList<>();
+    bufferedSize = 0L;
+    Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
+    for (CompletableFuture<?> future : table.batch(toSend)) {
+      future.whenComplete((r, e) -> {
+        CompletableFuture<Void> f = toCompleteIter.next();
+        if (e != null) {
+          f.completeExceptionally(e);
+        } else {
+          f.complete(null);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> mutate(Mutation mutation) {
+    CompletableFuture<Void> future = new CompletableFuture<Void>();
+    long heapSize = mutation.heapSize();
+    synchronized (this) {
+      if (closed) {
+        future.completeExceptionally(new IOException("Already closed"));
+        return future;
+      }
+      mutations.add(mutation);
+      futures.add(future);
+      bufferedSize += heapSize;
+      if (bufferedSize >= writeBufferSize) {
+        internalFlush();
+      }
+    }
+    return future;
+  }
+
+  @Override
+  public List<CompletableFuture<Void>> mutate(List<? extends Mutation> 
mutations) {
+    List<CompletableFuture<Void>> futures =
+        Stream.<CompletableFuture<Void>> 
generate(CompletableFuture::new).limit(mutations.size())
+            .collect(Collectors.toList());
+    long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
+    synchronized (this) {
+      if (closed) {
+        IOException ioe = new IOException("Already closed");
+        futures.forEach(f -> f.completeExceptionally(ioe));
+        return futures;
+      }
+      this.mutations.addAll(mutations);
+      this.futures.addAll(futures);
+      bufferedSize += heapSize;
+      if (bufferedSize >= writeBufferSize) {
+        internalFlush();
+      }
+    }
+    return futures;
+  }
+
+  @Override
+  public synchronized void flush() {
+    internalFlush();
+  }
+
+  @Override
+  public synchronized void close() {
+    internalFlush();
+    closed = true;
+  }
+
+  @Override
+  public long getWriteBufferSize() {
+    return writeBufferSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d12eb7a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 24907ba..8d26368 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -136,4 +136,43 @@ public interface AsyncConnection extends Closeable {
    * @param pool the thread pool to use for executing callback
    */
   AsyncAdminBuilder getAdminBuilder(ExecutorService pool);
+
+  /**
+   * Retrieve an {@link AsyncBufferedMutator} for performing client-side 
buffering of writes.
+   * <p>
+   * The returned instance will use default configs. Use
+   * {@link #getBufferedMutatorBuilder(TableName)} if you want to customize 
some configs.
+   * @param tableName the name of the table
+   * @return an {@link AsyncBufferedMutator} for the supplied tableName.
+   */
+  default AsyncBufferedMutator getBufferedMutator(TableName tableName) {
+    return getBufferedMutatorBuilder(tableName).build();
+  }
+
+  /**
+   * Returns an {@link AsyncBufferedMutatorBuilder} for creating {@link 
AsyncBufferedMutator}.
+   * @param tableName the name of the table
+   */
+  AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName);
+
+  /**
+   * Retrieve an {@link AsyncBufferedMutator} for performing client-side 
buffering of writes.
+   * <p>
+   * The returned instance will use default configs. Use
+   * {@link #getBufferedMutatorBuilder(TableName, ExecutorService)} if you 
want to customize some
+   * configs.
+   * @param tableName the name of the table
+   * @param pool the thread pool to use for executing callback
+   * @return an {@link AsyncBufferedMutator} for the supplied tableName.
+   */
+  default AsyncBufferedMutator getBufferedMutator(TableName tableName, 
ExecutorService pool) {
+    return getBufferedMutatorBuilder(tableName, pool).build();
+  }
+
+  /**
+   * Returns an {@link AsyncBufferedMutatorBuilder} for creating {@link 
AsyncBufferedMutator}.
+   * @param tableName the name of the table
+   * @param pool the thread pool to use for executing callback
+   */
+  AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, 
ExecutorService pool);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d12eb7a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 83caea2..a15ff8d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -39,6 +39,8 @@ import static 
org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
 import static 
org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
 import static 
org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
 
 import java.util.concurrent.TimeUnit;
 
@@ -87,6 +89,8 @@ class AsyncConnectionConfiguration {
 
   private final long scannerMaxResultSize;
 
+  private final long writeBufferSize;
+
   @SuppressWarnings("deprecation")
   AsyncConnectionConfiguration(Configuration conf) {
     this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
@@ -112,6 +116,7 @@ class AsyncConnectionConfiguration {
     this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, 
DEFAULT_HBASE_META_SCANNER_CACHING);
     this.scannerMaxResultSize = 
conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
       DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+    this.writeBufferSize =  conf.getLong(WRITE_BUFFER_SIZE_KEY, 
WRITE_BUFFER_SIZE_DEFAULT);
   }
 
   long getMetaOperationTimeoutNs() {
@@ -161,4 +166,8 @@ class AsyncConnectionConfiguration {
   long getScannerMaxResultSize() {
     return scannerMaxResultSize;
   }
+
+  long getWriteBufferSize() {
+    return writeBufferSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d12eb7a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index d8f051f..800ce15 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -297,4 +297,15 @@ class AsyncConnectionImpl implements AsyncConnection {
       }
     };
   }
+
+  @Override
+  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName 
tableName) {
+    return new AsyncBufferedMutatorBuilderImpl(connConf, 
getRawTableBuilder(tableName));
+  }
+
+  @Override
+  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName 
tableName,
+      ExecutorService pool) {
+    return new AsyncBufferedMutatorBuilderImpl(connConf, 
getTableBuilder(tableName, pool));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d12eb7a4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
new file mode 100644
index 0000000..dca66d5
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncBufferMutator {
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private static byte[] CQ = Bytes.toBytes("cq");
+
+  private static int COUNT = 100;
+
+  private static byte[] VALUE = new byte[1024];
+
+  private static AsyncConnection CONN;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.createTable(TABLE_NAME, CF);
+    CONN = 
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    ThreadLocalRandom.current().nextBytes(VALUE);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws InterruptedException {
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+    try (AsyncBufferedMutator mutator =
+        CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferSize(16 * 
1024).build()) {
+      List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, 
COUNT / 2)
+          .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
+          .collect(Collectors.toList()));
+      // exceeded the write buffer size, a flush will be called directly
+      fs.forEach(f -> f.join());
+      IntStream.range(COUNT / 2, COUNT).forEach(i -> {
+        futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
VALUE)));
+      });
+      // the first future should have been sent out.
+      futures.get(0).join();
+      Thread.sleep(2000);
+      // the last one should still be in write buffer
+      assertFalse(futures.get(futures.size() - 1).isDone());
+    }
+    // mutator.close will call mutator.flush automatically so all tasks should 
have been done.
+    futures.forEach(f -> f.join());
+    RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
+    IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g 
-> table.get(g).join())
+        .forEach(r -> {
+          assertArrayEquals(VALUE, r.getValue(CF, CQ));
+        });
+  }
+
+  @Test
+  public void testClosedMutate() throws InterruptedException {
+    AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME);
+    mutator.close();
+    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+    try {
+      mutator.mutate(put).get();
+      fail("Close check failed");
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(IOException.class));
+      assertTrue(e.getCause().getMessage().startsWith("Already closed"));
+    }
+    for (CompletableFuture<Void> f : mutator.mutate(Arrays.asList(put))) {
+      try {
+        f.get();
+        fail("Close check failed");
+      } catch (ExecutionException e) {
+        assertThat(e.getCause(), instanceOf(IOException.class));
+        assertTrue(e.getCause().getMessage().startsWith("Already closed"));
+      }
+    }
+  }
+}

Reply via email to