HBASE-21582 If call HBaseAdmin#snapshotAsync but forget call isSnapshotFinished, then SnapshotHFileCleaner will skip to run every time
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f32d2618 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f32d2618 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f32d2618 Branch: refs/heads/HBASE-21512 Commit: f32d2618430f70e1b0db92785294b2c7892cc02b Parents: 4640ff5 Author: huzheng <[email protected]> Authored: Tue Dec 11 20:27:56 2018 +0800 Committer: huzheng <[email protected]> Committed: Thu Dec 13 10:35:20 2018 +0800 ---------------------------------------------------------------------- .../hbase/master/snapshot/SnapshotManager.java | 48 ++++++++++++++------ .../master/cleaner/TestSnapshotFromMaster.java | 27 ++++++++++- .../master/snapshot/TestSnapshotManager.java | 36 +++++++++++++-- 3 files changed, 92 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f32d2618/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 2b963b2..05db4ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -28,7 +28,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -91,6 +95,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringP import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * This class manages the procedure of taking and restoring snapshots. There is only one @@ -120,7 +126,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable * At this point, if the user asks for the snapshot/restore status, the result will be * snapshot done if exists or failed if it doesn't exists. */ - private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000; + public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS = + "hbase.snapshot.sentinels.cleanup.timeoutMillis"; + public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L; /** Enable or disable snapshot support */ public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled"; @@ -151,7 +159,11 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable // The map is always accessed and modified under the object lock using synchronized. // snapshotTable() will insert an Handler in the table. // isSnapshotDone() will remove the handler requested if the operation is finished. - private Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>(); + private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduleThreadPool = + Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build()); + private ScheduledFuture<?> snapshotHandlerChoreCleanerTask; // Restore map, with table name as key, procedure ID as value. // The map is always accessed and modified under the object lock using synchronized. @@ -181,17 +193,21 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable * @param coordinator procedure coordinator instance. exposed for testing. * @param pool HBase ExecutorServcie instance, exposed for testing. */ - public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster, - ProcedureCoordinator coordinator, ExecutorService pool) + @VisibleForTesting + SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator, + ExecutorService pool, int sentinelCleanInterval) throws IOException, UnsupportedOperationException { this.master = master; this.rootDir = master.getMasterFileSystem().getRootDir(); - checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem()); + Configuration conf = master.getConfiguration(); + checkSnapshotSupport(conf, master.getMasterFileSystem()); this.coordinator = coordinator; this.executorService = pool; resetTempDir(); + snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate( + this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS); } /** @@ -274,7 +290,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable * * @throws IOException if we can't reach the filesystem */ - void resetTempDir() throws IOException { + private void resetTempDir() throws IOException { // cleanup any existing snapshots. Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, master.getConfiguration()); @@ -290,7 +306,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable * @throws SnapshotDoesNotExistException If the specified snapshot does not exist. * @throws IOException For filesystem IOExceptions */ - public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException { + public void deleteSnapshot(SnapshotDescription snapshot) throws IOException { // check to see if it is completed if (!isSnapshotCompleted(snapshot)) { throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot)); @@ -934,7 +950,6 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable this.restoreTableToProcIdMap.remove(tableName); return false; } - } /** @@ -989,14 +1004,15 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable */ private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) { long currentTime = EnvironmentEdgeManager.currentTime(); - Iterator<Map.Entry<TableName, SnapshotSentinel>> it = - sentinels.entrySet().iterator(); + long sentinelsCleanupTimeoutMillis = + master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, + SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT); + Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator(); while (it.hasNext()) { Map.Entry<TableName, SnapshotSentinel> entry = it.next(); SnapshotSentinel sentinel = entry.getValue(); - if (sentinel.isFinished() && - (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT) - { + if (sentinel.isFinished() + && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) { it.remove(); } } @@ -1031,7 +1047,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) { snapshotHandler.cancel(why); } - + if (snapshotHandlerChoreCleanerTask != null) { + snapshotHandlerChoreCleanerTask.cancel(true); + } try { if (coordinator != null) { coordinator.close(); @@ -1166,6 +1184,8 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); this.executorService = master.getExecutorService(); resetTempDir(); + snapshotHandlerChoreCleanerTask = + scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/f32d2618/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java index 9d76ede..cc2ee06 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.cleaner; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -25,6 +26,8 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,8 +35,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.master.HMaster; @@ -129,11 +136,11 @@ public class TestSnapshotFromMaster { conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, ""); // Enable snapshot conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 3 * 1000L); conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod); conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName()); conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000); - } @Before @@ -419,4 +426,22 @@ public class TestSnapshotFromMaster { builder.commit(); return builder.getSnapshotDescription(); } + + @Test + public void testAsyncSnapshotWillNotBlockSnapshotHFileCleaner() throws Exception { + // Write some data + Table table = UTIL.getConnection().getTable(TABLE_NAME); + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes(i)).addColumn(TEST_FAM, Bytes.toBytes("q"), Bytes.toBytes(i)); + table.put(put); + } + String snapshotName = "testAsyncSnapshotWillNotBlockSnapshotHFileCleaner01"; + UTIL.getAdmin().snapshotAsync(new org.apache.hadoop.hbase.client.SnapshotDescription( + snapshotName, TABLE_NAME, SnapshotType.FLUSH)); + Waiter.waitFor(UTIL.getConfiguration(), 10 * 1000L, 200L, + () -> UTIL.getAdmin().listSnapshots(Pattern.compile(snapshotName)).size() == 1); + assertTrue(master.getSnapshotManager().isTakingAnySnapshot()); + Thread.sleep(11 * 1000L); + assertFalse(master.getSnapshotManager().isTakingAnySnapshot()); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f32d2618/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java index 3a6a61f..ff903c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.MetricsMaster; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner; import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; @@ -62,7 +61,6 @@ public class TestSnapshotManager { public TestName name = new TestName(); MasterServices services = Mockito.mock(MasterServices.class); - MetricsMaster metrics = Mockito.mock(MetricsMaster.class); ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class); ExecutorService pool = Mockito.mock(ExecutorService.class); MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class); @@ -79,14 +77,44 @@ public class TestSnapshotManager { return getNewManager(UTIL.getConfiguration()); } - private SnapshotManager getNewManager(final Configuration conf) + private SnapshotManager getNewManager(Configuration conf) throws IOException, KeeperException { + return getNewManager(conf, 1); + } + + private SnapshotManager getNewManager(Configuration conf, int intervalSeconds) throws IOException, KeeperException { Mockito.reset(services); Mockito.when(services.getConfiguration()).thenReturn(conf); Mockito.when(services.getMasterFileSystem()).thenReturn(mfs); Mockito.when(mfs.getFileSystem()).thenReturn(fs); Mockito.when(mfs.getRootDir()).thenReturn(UTIL.getDataTestDir()); - return new SnapshotManager(services, metrics, coordinator, pool); + return new SnapshotManager(services, coordinator, pool, intervalSeconds); + } + + @Test + public void testCleanFinishedHandler() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + Configuration conf = UTIL.getConfiguration(); + try { + conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 5 * 1000L); + SnapshotManager manager = getNewManager(conf, 1); + TakeSnapshotHandler handler = Mockito.mock(TakeSnapshotHandler.class); + assertFalse("Manager is in process when there is no current handler", + manager.isTakingSnapshot(tableName)); + manager.setSnapshotHandlerForTesting(tableName, handler); + Mockito.when(handler.isFinished()).thenReturn(false); + assertTrue(manager.isTakingAnySnapshot()); + assertTrue("Manager isn't in process when handler is running", + manager.isTakingSnapshot(tableName)); + Mockito.when(handler.isFinished()).thenReturn(true); + assertFalse("Manager is process when handler isn't running", + manager.isTakingSnapshot(tableName)); + assertTrue(manager.isTakingAnySnapshot()); + Thread.sleep(6 * 1000); + assertFalse(manager.isTakingAnySnapshot()); + } finally { + conf.unset(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS); + } } @Test
