HBASE-4224 Need a flush by regionserver rather than by table option
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f952779b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f952779b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f952779b Branch: refs/heads/HBASE-19397-branch-2 Commit: f952779ba2be34f5c566353ab9b3619e60f29c4c Parents: b1269ec Author: Chia-Ping Tsai <[email protected]> Authored: Sun Dec 17 01:59:19 2017 +0800 Committer: Chia-Ping Tsai <[email protected]> Committed: Tue Jan 23 09:47:15 2018 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Admin.java | 7 + .../apache/hadoop/hbase/client/AsyncAdmin.java | 6 + .../hadoop/hbase/client/AsyncHBaseAdmin.java | 5 + .../apache/hadoop/hbase/client/HBaseAdmin.java | 35 ++-- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 49 ++++-- .../hbase/client/TestFlushFromClient.java | 176 +++++++++++++++++++ 6 files changed, 254 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f952779b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index f61b32e..40dac2f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -826,6 +826,13 @@ public interface Admin extends Abortable, Closeable { void flushRegion(byte[] regionName) throws IOException; /** + * Flush all regions on the region server. Synchronous operation. + * @param serverName the region server name to flush + * @throws IOException if a remote or network exception occurs + */ + void flushRegionServer(ServerName serverName) throws IOException; + + /** * Compact a table. Asynchronous operation in that this method requests that a * Compaction run and then it returns. It does not wait on the completion of Compaction * (it can take a while). http://git-wip-us.apache.org/repos/asf/hbase/blob/f952779b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index a375265..35cdd3f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -295,6 +295,12 @@ public interface AsyncAdmin { CompletableFuture<Void> flushRegion(byte[] regionName); /** + * Flush all region on the region server. + * @param serverName server to flush + */ + CompletableFuture<Void> flushRegionServer(ServerName serverName); + + /** * Compact a table. When the returned CompletableFuture is done, it only means the compact request * was sent to HBase and may need some time to finish the compact operation. * @param tableName table to compact http://git-wip-us.apache.org/repos/asf/hbase/blob/f952779b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index d0d19c1..9b2390c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -243,6 +243,11 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override + public CompletableFuture<Void> flushRegionServer(ServerName sn) { + return wrap(rawAdmin.flushRegionServer(sn)); + } + + @Override public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { return wrap(rawAdmin.compact(tableName, compactType)); http://git-wip-us.apache.org/repos/asf/hbase/blob/f952779b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 4ac1c21..c137383 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1182,21 +1182,28 @@ public class HBaseAdmin implements Admin { if (regionServerPair.getSecond() == null) { throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); } - final RegionInfo hRegionInfo = regionServerPair.getFirst(); + final RegionInfo regionInfo = regionServerPair.getFirst(); ServerName serverName = regionServerPair.getSecond(); - final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); - Callable<Void> callable = new Callable<Void>() { - @Override - public Void call() throws Exception { - // TODO: There is no timeout on this controller. Set one! - HBaseRpcController controller = rpcControllerFactory.newController(); - FlushRegionRequest request = - RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); - admin.flushRegion(controller, request); - return null; - } - }; - ProtobufUtil.call(callable); + flush(this.connection.getAdmin(serverName), regionInfo); + } + + private void flush(AdminService.BlockingInterface admin, final RegionInfo info) + throws IOException { + ProtobufUtil.call(() -> { + // TODO: There is no timeout on this controller. Set one! + HBaseRpcController controller = rpcControllerFactory.newController(); + FlushRegionRequest request = + RequestConverter.buildFlushRegionRequest(info.getRegionName()); + admin.flushRegion(controller, request); + return null; + }); + } + + @Override + public void flushRegionServer(ServerName serverName) throws IOException { + for (RegionInfo region : getRegions(serverName)) { + flush(this.connection.getAdmin(serverName), region); + } } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/f952779b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 5111bfc..a826f8c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -826,25 +826,54 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { .toStringBinary(regionName))); return; } - - RegionInfo regionInfo = location.getRegion(); - this.<Void> newAdminCaller() - .serverName(serverName) - .action( - (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall( - controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo - .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), - resp -> null)).call().whenComplete((ret, err2) -> { + flush(serverName, location.getRegion()) + .whenComplete((ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { future.complete(ret); } - }); + }); }); return future; } + private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) { + return this.<Void> newAdminCaller() + .serverName(serverName) + .action( + (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall( + controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo + .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), + resp -> null)) + .call(); + } + + @Override + public CompletableFuture<Void> flushRegionServer(ServerName sn) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getRegions(sn).whenComplete((hRegionInfos, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); + if (hRegionInfos != null) { + hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); + } + CompletableFuture + .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])) + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + @Override public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { return compact(tableName, null, false, compactType); http://git-wip-us.apache.org/repos/asf/hbase/blob/f952779b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java new file mode 100644 index 0000000..9085fa5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java @@ -0,0 +1,176 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.io.IOUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({MediumTests.class, ClientTests.class}) +public class TestFlushFromClient { + private static final Log LOG = LogFactory.getLog(TestFlushFromClient.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static AsyncConnection asyncConn; + private static final byte[][] SPLITS = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("7")}; + private static final List<byte[]> ROWS = Arrays.asList( + Bytes.toBytes("1"), + Bytes.toBytes("4"), + Bytes.toBytes("8")); + private static final byte[] FAMILY = Bytes.toBytes("f1"); + + @Rule + public TestName name = new TestName(); + + public TableName tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(ROWS.size()); + asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + IOUtils.cleanup(null, asyncConn); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + tableName = TableName.valueOf(name.getMethodName()); + try (Table t = TEST_UTIL.createTable(tableName, FAMILY, SPLITS)) { + List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList()); + for (int i = 0; i != 20; ++i) { + byte[] value = Bytes.toBytes(i); + puts.forEach(p -> p.addColumn(FAMILY, value, value)); + } + t.put(puts); + } + assertFalse(getRegionInfo().isEmpty()); + assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreSize() != 0)); + } + + @After + public void tearDown() throws Exception { + for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { + LOG.info("Tear down, remove table=" + htd.getTableName()); + TEST_UTIL.deleteTable(htd.getTableName()); + } + } + + @Test + public void testFlushTable() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.flush(tableName); + assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreSize() != 0)); + } + } + + @Test + public void testAsyncFlushTable() throws Exception { + AsyncAdmin admin = asyncConn.getAdmin(); + admin.flush(tableName).get(); + assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreSize() != 0)); + } + + @Test + public void testFlushRegion() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + for (HRegion r : getRegionInfo()) { + admin.flushRegion(r.getRegionInfo().getRegionName()); + TimeUnit.SECONDS.sleep(1); + assertEquals(0, r.getMemStoreSize()); + } + } + } + + @Test + public void testAsyncFlushRegion() throws Exception { + AsyncAdmin admin = asyncConn.getAdmin(); + for (HRegion r : getRegionInfo()) { + admin.flushRegion(r.getRegionInfo().getRegionName()).get(); + TimeUnit.SECONDS.sleep(1); + assertEquals(0, r.getMemStoreSize()); + } + } + + @Test + public void testFlushRegionServer() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + for (HRegionServer rs : TEST_UTIL.getHBaseCluster() + .getLiveRegionServerThreads() + .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) + .collect(Collectors.toList())) { + admin.flushRegionServer(rs.getServerName()); + assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreSize() != 0)); + } + } + } + + @Test + public void testAsyncFlushRegionServer() throws Exception { + AsyncAdmin admin = asyncConn.getAdmin(); + for (HRegionServer rs : TEST_UTIL.getHBaseCluster() + .getLiveRegionServerThreads() + .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) + .collect(Collectors.toList())) { + admin.flushRegionServer(rs.getServerName()).get(); + assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreSize() != 0)); + } + } + + private List<HRegion> getRegionInfo() { + return TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() + .map(JVMClusterUtil.RegionServerThread::getRegionServer) + .flatMap(r -> r.getRegions().stream()) + .filter(r -> r.getTableDescriptor().getTableName().equals(tableName)) + .collect(Collectors.toList()); + } + + private List<HRegion> getRegionInfo(HRegionServer rs) { + return rs.getRegions().stream() + .filter(v -> v.getTableDescriptor().getTableName().equals(tableName)) + .collect(Collectors.toList()); + } +}
