Repository: hbase
Updated Branches:
  refs/heads/master 520b9efc2 -> f6582400b


HBASE-19242 Add MOB compact support for AsyncAdmin

Signed-off-by: Michael Stack <st...@apache.org>
Signed-off-by: Guanghao Zhang <zghao...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f6582400
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f6582400
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f6582400

Branch: refs/heads/master
Commit: f6582400bec7a9b2450437efbfb7139f212e5631
Parents: 520b9ef
Author: Balazs Meszaros <balazs.mesza...@cloudera.com>
Authored: Thu Nov 23 14:42:39 2017 +0100
Committer: Michael Stack <st...@apache.org>
Committed: Tue Nov 28 15:03:50 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Admin.java   | 130 +++++------
 .../apache/hadoop/hbase/client/AsyncAdmin.java  |  69 +++++-
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    |  24 +-
 .../apache/hadoop/hbase/client/HBaseAdmin.java  |  13 +-
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 232 ++++++++++++-------
 .../apache/hadoop/hbase/client/RegionInfo.java  |  11 +
 .../hbase/client/TestAsyncRegionAdminApi.java   |  38 +++
 7 files changed, 341 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f6582400/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 6f1190e..d9f8e899 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -872,6 +872,33 @@ public interface Admin extends Abortable, Closeable {
     throws IOException;
 
   /**
+   * Compact a table.  Asynchronous operation in that this method requests 
that a
+   * Compaction run and then it returns. It does not wait on the completion of 
Compaction
+   * (it can take a while).
+   *
+   * @param tableName table to compact
+   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
+   * @throws IOException if a remote or network exception occurs
+   * @throws InterruptedException
+   */
+  void compact(TableName tableName, CompactType compactType)
+    throws IOException, InterruptedException;
+
+  /**
+   * Compact a column family within a table.  Asynchronous operation in that 
this method
+   * requests that a Compaction run and then it returns. It does not wait on 
the
+   * completion of Compaction (it can take a while).
+   *
+   * @param tableName table to compact
+   * @param columnFamily column family within a table
+   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
+   * @throws IOException if not a mob column family or if a remote or network 
exception occurs
+   * @throws InterruptedException
+   */
+  void compact(TableName tableName, byte[] columnFamily, CompactType 
compactType)
+    throws IOException, InterruptedException;
+
+  /**
    * Major compact a table. Asynchronous operation in that this method requests
    * that a Compaction run and then it returns. It does not wait on the 
completion of Compaction
    * (it can take a while).
@@ -916,6 +943,33 @@ public interface Admin extends Abortable, Closeable {
     throws IOException;
 
   /**
+   * Major compact a table.  Asynchronous operation in that this method 
requests that a
+   * Compaction run and then it returns. It does not wait on the completion of 
Compaction
+   * (it can take a while).
+   *
+   * @param tableName table to compact
+   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
+   * @throws IOException if a remote or network exception occurs
+   * @throws InterruptedException
+   */
+  void majorCompact(TableName tableName, CompactType compactType)
+    throws IOException, InterruptedException;
+
+  /**
+   * Major compact a column family within a table.  Asynchronous operation in 
that this method requests that a
+   * Compaction run and then it returns. It does not wait on the completion of 
Compaction
+   * (it can take a while).
+   *
+   * @param tableName table to compact
+   * @param columnFamily column family within a table
+   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
+   * @throws IOException if not a mob column family or if a remote or network 
exception occurs
+   * @throws InterruptedException
+   */
+  void majorCompact(TableName tableName, byte[] columnFamily, CompactType 
compactType)
+    throws IOException, InterruptedException;
+
+  /**
    * Compact all regions on the region server. Asynchronous operation in that 
this method requests
    * that a Compaction run and then it returns. It does not wait on the 
completion of Compaction (it
    * can take a while).
@@ -1736,6 +1790,17 @@ public interface Admin extends Abortable, Closeable {
   CompactionState getCompactionState(TableName tableName) throws IOException;
 
   /**
+   * Get the current compaction state of a table. It could be in a compaction, 
or none.
+   *
+   * @param tableName table to examine
+   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
+   * @return the current compaction state
+   * @throws IOException if a remote or network exception occurs
+   */
+  CompactionState getCompactionState(TableName tableName,
+    CompactType compactType) throws IOException;
+
+  /**
    * Get the current compaction state of region. It could be in a major 
compaction, a minor
    * compaction, both, or none.
    *
@@ -2311,71 +2376,6 @@ public interface Admin extends Abortable, Closeable {
   }
 
   /**
-   * Compact a table.  Asynchronous operation in that this method requests 
that a
-   * Compaction run and then it returns. It does not wait on the completion of 
Compaction
-   * (it can take a while).
-   *
-   * @param tableName table to compact
-   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void compact(TableName tableName, CompactType compactType)
-    throws IOException, InterruptedException;
-
-  /**
-   * Compact a column family within a table.  Asynchronous operation in that 
this method requests that a
-   * Compaction run and then it returns. It does not wait on the completion of 
Compaction
-   * (it can take a while).
-   *
-   * @param tableName table to compact
-   * @param columnFamily column family within a table
-   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
-   * @throws IOException if not a mob column family or if a remote or network 
exception occurs
-   * @throws InterruptedException
-   */
-  void compact(TableName tableName, byte[] columnFamily, CompactType 
compactType)
-    throws IOException, InterruptedException;
-
-  /**
-   * Major compact a table.  Asynchronous operation in that this method 
requests that a
-   * Compaction run and then it returns. It does not wait on the completion of 
Compaction
-   * (it can take a while).
-   *
-   * @param tableName table to compact
-   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void majorCompact(TableName tableName, CompactType compactType)
-    throws IOException, InterruptedException;
-
-  /**
-   * Major compact a column family within a table.  Asynchronous operation in 
that this method requests that a
-   * Compaction run and then it returns. It does not wait on the completion of 
Compaction
-   * (it can take a while).
-   *
-   * @param tableName table to compact
-   * @param columnFamily column family within a table
-   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
-   * @throws IOException if not a mob column family or if a remote or network 
exception occurs
-   * @throws InterruptedException
-   */
-  void majorCompact(TableName tableName, byte[] columnFamily, CompactType 
compactType)
-    throws IOException, InterruptedException;
-
-  /**
-   * Get the current compaction state of a table. It could be in a compaction, 
or none.
-   *
-   * @param tableName table to examine
-   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
-   * @return the current compaction state
-   * @throws IOException if a remote or network exception occurs
-   */
-  CompactionState getCompactionState(TableName tableName,
-    CompactType compactType) throws IOException;
-
-  /**
    * Return the set of supported security capabilities.
    * @throws IOException
    * @throws UnsupportedOperationException

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6582400/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index abd8e5a..2ae51ac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -299,7 +299,9 @@ public interface AsyncAdmin {
    * was sent to HBase and may need some time to finish the compact operation.
    * @param tableName table to compact
    */
-  CompletableFuture<Void> compact(TableName tableName);
+  default CompletableFuture<Void> compact(TableName tableName) {
+    return compact(tableName, CompactType.NORMAL);
+  }
 
   /**
    * Compact a column family within a table. When the returned 
CompletableFuture is done, it only
@@ -309,7 +311,28 @@ public interface AsyncAdmin {
    * @param columnFamily column family within a table. If not present, compact 
the table's all
    *          column families.
    */
-  CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily);
+  default CompletableFuture<Void> compact(TableName tableName, byte[] 
columnFamily) {
+    return compact(tableName, columnFamily, CompactType.NORMAL);
+  }
+
+  /**
+   * Compact a table. When the returned CompletableFuture is done, it only 
means the compact request
+   * was sent to HBase and may need some time to finish the compact operation.
+   * @param tableName table to compact
+   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
+   */
+  CompletableFuture<Void> compact(TableName tableName, CompactType 
compactType);
+
+  /**
+   * Compact a column family within a table. When the returned 
CompletableFuture is done, it only
+   * means the compact request was sent to HBase and may need some time to 
finish the compact
+   * operation.
+   * @param tableName table to compact
+   * @param columnFamily column family within a table
+   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
+   */
+  CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
+      CompactType compactType);
 
   /**
    * Compact an individual region. When the returned CompletableFuture is 
done, it only means the
@@ -333,7 +356,29 @@ public interface AsyncAdmin {
    * request was sent to HBase and may need some time to finish the compact 
operation.
    * @param tableName table to major compact
    */
-  CompletableFuture<Void> majorCompact(TableName tableName);
+  default CompletableFuture<Void> majorCompact(TableName tableName) {
+    return majorCompact(tableName, CompactType.NORMAL);
+  }
+
+  /**
+   * Major compact a column family within a table. When the returned 
CompletableFuture is done, it
+   * only means the compact request was sent to HBase and may need some time 
to finish the compact
+   * operation.
+   * @param tableName table to major compact
+   * @param columnFamily column family within a table. If not present, major 
compact the table's all
+   *          column families.
+   */
+  default CompletableFuture<Void> majorCompact(TableName tableName, byte[] 
columnFamily) {
+    return majorCompact(tableName, columnFamily, CompactType.NORMAL);
+  }
+
+  /**
+   * Major compact a table. When the returned CompletableFuture is done, it 
only means the compact
+   * request was sent to HBase and may need some time to finish the compact 
operation.
+   * @param tableName table to major compact
+   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
+   */
+  CompletableFuture<Void> majorCompact(TableName tableName, CompactType 
compactType);
 
   /**
    * Major compact a column family within a table. When the returned 
CompletableFuture is done, it
@@ -342,8 +387,10 @@ public interface AsyncAdmin {
    * @param tableName table to major compact
    * @param columnFamily column family within a table. If not present, major 
compact the table's all
    *          column families.
+   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
    */
-  CompletableFuture<Void> majorCompact(TableName tableName, byte[] 
columnFamily);
+  CompletableFuture<Void> majorCompact(TableName tableName, byte[] 
columnFamily,
+      CompactType compactType);
 
   /**
    * Major compact a region. When the returned CompletableFuture is done, it 
only means the compact
@@ -960,7 +1007,19 @@ public interface AsyncAdmin {
    * @param tableName table to examine
    * @return the current compaction state wrapped by a {@link 
CompletableFuture}
    */
-  CompletableFuture<CompactionState> getCompactionState(TableName tableName);
+  default CompletableFuture<CompactionState> getCompactionState(TableName 
tableName) {
+    return getCompactionState(tableName, CompactType.NORMAL);
+  }
+
+  /**
+   * Get the current compaction state of a table. It could be in a major 
compaction, a minor
+   * compaction, both, or none.
+   * @param tableName table to examine
+   * @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
+   * @return the current compaction state wrapped by a {@link 
CompletableFuture}
+   */
+  CompletableFuture<CompactionState> getCompactionState(TableName tableName,
+      CompactType compactType);
 
   /**
    * Get the current compaction state of region. It could be in a major 
compaction, a minor

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6582400/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index fb16fce..0f0679d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -244,13 +244,15 @@ class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> compact(TableName tableName) {
-    return wrap(rawAdmin.compact(tableName));
+  public CompletableFuture<Void> compact(TableName tableName,
+      CompactType compactType) {
+    return wrap(rawAdmin.compact(tableName, compactType));
   }
 
   @Override
-  public CompletableFuture<Void> compact(TableName tableName, byte[] 
columnFamily) {
-    return wrap(rawAdmin.compact(tableName, columnFamily));
+  public CompletableFuture<Void> compact(TableName tableName,
+      byte[] columnFamily, CompactType compactType) {
+    return wrap(rawAdmin.compact(tableName, columnFamily, compactType));
   }
 
   @Override
@@ -264,13 +266,14 @@ class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> majorCompact(TableName tableName) {
-    return wrap(rawAdmin.majorCompact(tableName));
+  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType 
compactType) {
+    return wrap(rawAdmin.majorCompact(tableName, compactType));
   }
 
   @Override
-  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] 
columnFamily) {
-    return wrap(rawAdmin.majorCompact(tableName, columnFamily));
+  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] 
columnFamily,
+      CompactType compactType) {
+    return wrap(rawAdmin.majorCompact(tableName, columnFamily, compactType));
   }
 
   @Override
@@ -632,8 +635,9 @@ class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<CompactionState> getCompactionState(TableName 
tableName) {
-    return wrap(rawAdmin.getCompactionState(tableName));
+  public CompletableFuture<CompactionState> getCompactionState(
+      TableName tableName, CompactType compactType) {
+    return wrap(rawAdmin.getCompactionState(tableName, compactType));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6582400/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 05157dd..1a00efe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -1282,8 +1282,8 @@ public class HBaseAdmin implements Admin {
                        CompactType compactType) throws IOException {
     switch (compactType) {
       case MOB:
-        compact(this.connection.getAdminForMaster(), 
getMobRegionInfo(tableName), major,
-          columnFamily);
+        compact(this.connection.getAdminForMaster(), 
RegionInfo.createMobRegionInfo(tableName),
+            major, columnFamily);
         break;
       case NORMAL:
         checkTableExists(tableName);
@@ -3240,7 +3240,7 @@ public class HBaseAdmin implements Admin {
           new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
             @Override
             public AdminProtos.GetRegionInfoResponse.CompactionState call() 
throws Exception {
-              RegionInfo info = getMobRegionInfo(tableName);
+              RegionInfo info = RegionInfo.createMobRegionInfo(tableName);
               GetRegionInfoRequest request =
                 
RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true);
               GetRegionInfoResponse response = 
masterAdmin.getRegionInfo(rpcController, request);
@@ -3304,7 +3304,7 @@ public class HBaseAdmin implements Admin {
         }
         break;
       default:
-        throw new IllegalArgumentException("Unknowne compactType: " + 
compactType);
+        throw new IllegalArgumentException("Unknown compactType: " + 
compactType);
     }
     if (state != null) {
       return ProtobufUtil.createCompactionState(state);
@@ -3839,11 +3839,6 @@ public class HBaseAdmin implements Admin {
     });
   }
 
-  private RegionInfo getMobRegionInfo(TableName tableName) {
-    return 
RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(".mob")).setRegionId(0)
-        .build();
-  }
-
   private RpcControllerFactory getRpcControllerFactory() {
     return this.rpcControllerFactory;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6582400/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index f56e7ca..5e9356a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -842,15 +842,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> compact(TableName tableName) {
-    return compact(tableName, null, false, CompactType.NORMAL);
+  public CompletableFuture<Void> compact(TableName tableName, CompactType 
compactType) {
+    return compact(tableName, null, false, compactType);
   }
 
   @Override
-  public CompletableFuture<Void> compact(TableName tableName, byte[] 
columnFamily) {
-    Preconditions.checkNotNull(columnFamily,
-      "columnFamily is null. If you don't specify a columnFamily, use 
compact(TableName) instead");
-    return compact(tableName, columnFamily, false, CompactType.NORMAL);
+  public CompletableFuture<Void> compact(TableName tableName, byte[] 
columnFamily,
+      CompactType compactType) {
+    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
+        + "If you don't specify a columnFamily, use compact(TableName) 
instead");
+    return compact(tableName, columnFamily, false, compactType);
   }
 
   @Override
@@ -866,15 +867,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> majorCompact(TableName tableName) {
-    return compact(tableName, null, true, CompactType.NORMAL);
+  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType 
compactType) {
+    return compact(tableName, null, true, compactType);
   }
 
   @Override
-  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] 
columnFamily) {
+  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] 
columnFamily,
+      CompactType compactType) {
     Preconditions.checkNotNull(columnFamily, "columnFamily is null."
-        + " If you don't specify a columnFamily, use majorCompact(TableName) 
instead");
-    return compact(tableName, columnFamily, true, CompactType.NORMAL);
+        + "If you don't specify a columnFamily, use compact(TableName) 
instead");
+    return compact(tableName, columnFamily, true, compactType);
   }
 
   @Override
@@ -926,6 +928,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] 
columnFamily,
       boolean major) {
     CompletableFuture<Void> future = new CompletableFuture<>();
+
     getRegionLocation(regionName).whenComplete(
       (location, err) -> {
         if (err != null) {
@@ -981,31 +984,51 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   /**
    * Compact column family of a table, Asynchronous operation even if 
CompletableFuture.get()
    */
-  private CompletableFuture<Void> compact(TableName tableName, byte[] 
columnFamily, boolean major,
-      CompactType compactType) {
-    if (CompactType.MOB.equals(compactType)) {
-      // TODO support MOB compact.
-      return failedFuture(new UnsupportedOperationException("MOB compact does 
not support"));
-    }
+  private CompletableFuture<Void> compact(TableName tableName, byte[] 
columnFamily,
+      boolean major, CompactType compactType) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> 
l.getRegion() != null)
-          .filter(l -> !l.getRegion().isOffline()).filter(l -> 
l.getServerName() != null)
-          .map(l -> compact(l.getServerName(), l.getRegion(), major, 
columnFamily))
-          .toArray(CompletableFuture<?>[]::new);
-      // future complete unless all of the compact futures are completed.
-      CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> {
-        if (err2 != null) {
-          future.completeExceptionally(err2);
-        } else {
-          future.complete(ret);
-        }
-      });
-    });
+
+    switch (compactType) {
+      case MOB:
+        connection.registry.getMasterAddress().whenComplete((serverName, err) 
-> {
+          if (err != null) {
+            future.completeExceptionally(err);
+            return;
+          }
+          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
+          compact(serverName, regionInfo, major, columnFamily)
+              .whenComplete((ret, err2) -> {
+                if (err2 != null) {
+                  future.completeExceptionally(err2);
+                } else {
+                  future.complete(ret);
+                }
+              });
+        });
+        break;
+      case NORMAL:
+        getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
+          if (err != null) {
+            future.completeExceptionally(err);
+            return;
+          }
+          CompletableFuture<?>[] compactFutures = locations.stream().filter(l 
-> l.getRegion() != null)
+              .filter(l -> !l.getRegion().isOffline()).filter(l -> 
l.getServerName() != null)
+              .map(l -> compact(l.getServerName(), l.getRegion(), major, 
columnFamily))
+              .toArray(CompletableFuture<?>[]::new);
+          // future complete unless all of the compact futures are completed.
+          CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(ret);
+            }
+          });
+        });
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown compactType: " + 
compactType);
+    }
     return future;
   }
 
@@ -2741,64 +2764,99 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<CompactionState> getCompactionState(TableName 
tableName) {
+  public CompletableFuture<CompactionState> getCompactionState(TableName 
tableName,
+      CompactType compactType) {
     CompletableFuture<CompactionState> future = new CompletableFuture<>();
-    getTableHRegionLocations(tableName).whenComplete(
-      (locations, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        List<CompactionState> regionStates = new ArrayList<>();
-        List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
-        locations.stream().filter(loc -> loc.getServerName() != null)
-            .filter(loc -> loc.getRegion() != null)
-            .filter(loc -> !loc.getRegion().isOffline())
-            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
-              
futures.add(getCompactionStateForRegion(region).whenComplete((regionState, 
err2) -> {
-                // If any region compaction state is MAJOR_AND_MINOR
-                // the table compaction state is MAJOR_AND_MINOR, too.
-                if (err2 != null) {
-                  future.completeExceptionally(err2);
-                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
 
-                  future.complete(regionState);
-                } else {
-                  regionStates.add(regionState);
-                }
-              }));
-            });
-        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[futures.size()]))
-            .whenComplete((ret, err3) -> {
-              // If future not completed, check all regions's compaction state
-              if (!future.isCompletedExceptionally() && !future.isDone()) {
-                CompactionState state = CompactionState.NONE;
-                for (CompactionState regionState : regionStates) {
-                  switch (regionState) {
-                  case MAJOR:
-                    if (state == CompactionState.MINOR) {
-                      future.complete(CompactionState.MAJOR_AND_MINOR);
+    switch (compactType) {
+      case MOB:
+        connection.registry.getMasterAddress().whenComplete((serverName, err) 
-> {
+          if (err != null) {
+            future.completeExceptionally(err);
+            return;
+          }
+          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
+
+          this.<GetRegionInfoResponse> 
newAdminCaller().serverName(serverName).action(
+            (controller, stub) -> this
+            .<GetRegionInfoRequest, GetRegionInfoResponse, 
GetRegionInfoResponse> adminCall(
+                controller, stub,
+                
RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
+                (s, c, req, done) -> s.getRegionInfo(controller, req, done), 
resp -> resp)
+          ).call().whenComplete((resp2, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              if (resp2.hasCompactionState()) {
+                
future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+              } else {
+                future.complete(CompactionState.NONE);
+              }
+            }
+          });
+        });
+        break;
+      case NORMAL:
+        getTableHRegionLocations(tableName).whenComplete(
+          (locations, err) -> {
+            if (err != null) {
+              future.completeExceptionally(err);
+              return;
+            }
+            List<CompactionState> regionStates = new ArrayList<>();
+            List<CompletableFuture<CompactionState>> futures = new 
ArrayList<>();
+            locations.stream().filter(loc -> loc.getServerName() != null)
+                .filter(loc -> loc.getRegion() != null)
+                .filter(loc -> !loc.getRegion().isOffline())
+                .map(loc -> loc.getRegion().getRegionName()).forEach(region -> 
{
+                  
futures.add(getCompactionStateForRegion(region).whenComplete((regionState, 
err2) -> {
+                    // If any region compaction state is MAJOR_AND_MINOR
+                    // the table compaction state is MAJOR_AND_MINOR, too.
+                    if (err2 != null) {
+                      future.completeExceptionally(err2);
+                    } else if (regionState == CompactionState.MAJOR_AND_MINOR) 
{
+                      future.complete(regionState);
                     } else {
-                      state = CompactionState.MAJOR;
+                      regionStates.add(regionState);
                     }
-                    break;
-                  case MINOR:
-                    if (state == CompactionState.MAJOR) {
-                      future.complete(CompactionState.MAJOR_AND_MINOR);
-                    } else {
-                      state = CompactionState.MINOR;
+                  }));
+                });
+            CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[futures.size()]))
+                .whenComplete((ret, err3) -> {
+                  // If future not completed, check all regions's compaction 
state
+                  if (!future.isCompletedExceptionally() && !future.isDone()) {
+                    CompactionState state = CompactionState.NONE;
+                    for (CompactionState regionState : regionStates) {
+                      switch (regionState) {
+                        case MAJOR:
+                          if (state == CompactionState.MINOR) {
+                            future.complete(CompactionState.MAJOR_AND_MINOR);
+                          } else {
+                            state = CompactionState.MAJOR;
+                          }
+                          break;
+                        case MINOR:
+                          if (state == CompactionState.MAJOR) {
+                            future.complete(CompactionState.MAJOR_AND_MINOR);
+                          } else {
+                            state = CompactionState.MINOR;
+                          }
+                          break;
+                        case NONE:
+                        default:
+                      }
+                      if (!future.isDone()) {
+                        future.complete(state);
+                      }
                     }
-                    break;
-                  case NONE:
-                  default:
                   }
-                  if (!future.isDone()) {
-                    future.complete(state);
-                  }
-                }
-              }
-            });
-      });
+                });
+          });
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown compactType: " + 
compactType);
+    }
+
     return future;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6582400/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java
index 0eb4e42..cfca6da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java
@@ -573,6 +573,17 @@ public interface RegionInfo {
   }
 
   /**
+   * Creates a RegionInfo object for MOB data.
+   *
+   * @param tableName the name of the table
+   * @return the MOB {@link RegionInfo}.
+   */
+  static RegionInfo createMobRegionInfo(TableName tableName) {
+    return RegionInfoBuilder.newBuilder(tableName)
+        .setStartKey(Bytes.toBytes(".mob")).setRegionId(0).build();
+  }
+
+  /**
    * Separate elements of a regionName.
    * @param regionName
    * @return Array of byte[] containing tableName, startKey and id

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6582400/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
index 8a1afab..e6cffd6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Assert;
@@ -422,6 +424,42 @@ public class TestAsyncRegionAdminApi extends 
TestAsyncAdminBase {
     assertEquals(count, 2);
   }
 
+  private void waitUntilMobCompactionFinished(TableName tableName)
+      throws ExecutionException, InterruptedException {
+    long finished = EnvironmentEdgeManager.currentTime() + 60000;
+    CompactionState state = admin.getCompactionState(tableName, 
CompactType.MOB).get();
+    while (EnvironmentEdgeManager.currentTime() < finished) {
+      if (state == CompactionState.NONE) {
+        break;
+      }
+      Thread.sleep(10);
+      state = admin.getCompactionState(tableName, CompactType.MOB).get();
+    }
+    assertEquals(CompactionState.NONE, state);
+  }
+
+  @Test
+  public void testCompactMob() throws Exception {
+    ColumnFamilyDescriptor columnDescriptor =
+        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob"))
+            .setMobEnabled(true).setMobThreshold(0).build();
+
+    TableDescriptor tableDescriptor = 
TableDescriptorBuilder.newBuilder(tableName)
+        .addColumnFamily(columnDescriptor).build();
+
+    admin.createTable(tableDescriptor).get();
+
+    byte[][] families = { Bytes.toBytes("mob") };
+    loadData(tableName, families, 3000, 8);
+
+    admin.majorCompact(tableName, CompactType.MOB).get();
+
+    CompactionState state = admin.getCompactionState(tableName, 
CompactType.MOB).get();
+    assertNotEquals(CompactionState.NONE, state);
+
+    waitUntilMobCompactionFinished(tableName);
+  }
+
   @Test
   public void testCompactRegionServer() throws Exception {
     byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), 
Bytes.toBytes("f3") };

Reply via email to