Repository: hbase Updated Branches: refs/heads/branch-1.3 bb8c3ac03 -> 52c2dcbaa
HBASE-18398: Snapshot operation fails with FileNotFoundException Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/52c2dcba Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/52c2dcba Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/52c2dcba Branch: refs/heads/branch-1.3 Commit: 52c2dcbaa321705df143e1dc9dca28c849f8f9bb Parents: bb8c3ac Author: Ashu Pachauri <a...@apache.org> Authored: Mon Aug 7 18:10:33 2017 -0700 Committer: Ashu Pachauri <a...@apache.org> Committed: Thu Aug 10 14:21:29 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 29 ++- .../hadoop/hbase/regionserver/HStore.java | 16 ++ .../hadoop/hbase/regionserver/Region.java | 9 +- .../snapshot/FlushSnapshotSubprocedure.java | 30 ++- .../hadoop/hbase/snapshot/SnapshotManifest.java | 22 +- .../hbase/snapshot/TestRegionSnapshotTask.java | 205 +++++++++++++++++++ 6 files changed, 288 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/52c2dcba/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 90f3079..4ba66b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -8325,11 +8325,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi case DELETE: case BATCH_MUTATE: case COMPACT_REGION: - // when a region is in recovering state, no read, split or merge is allowed + case SNAPSHOT: + // when a region is in recovering state, no read, split, merge or snapshot is allowed if (isRecovering() && (this.disallowWritesInRecovering || - (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { + (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() + - " is recovering; cannot take reads"); + " is recovering; cannot take reads"); } break; default: @@ -8349,6 +8350,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi lock.readLock().unlock(); throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed"); } + // The unit for snapshot is a region. So, all stores for this region must be + // prepared for snapshot operation before proceeding. + if (op == Operation.SNAPSHOT) { + for (Store store : stores.values()) { + if (store instanceof HStore) { + ((HStore)store).preSnapshotOperation(); + } + } + } try { if (coprocessorHost != null) { coprocessorHost.postStartRegionOperation(op); @@ -8364,12 +8374,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi closeRegionOperation(Operation.ANY); } - /** - * Closes the lock. This needs to be called in the finally block corresponding - * to the try block of {@link #startRegionOperation(Operation)} - * @throws IOException - */ + @Override public void closeRegionOperation(Operation operation) throws IOException { + if (operation == Operation.SNAPSHOT) { + for (Store store: stores.values()) { + if (store instanceof HStore) { + ((HStore)store).postSnapshotOperation(); + } + } + } lock.readLock().unlock(); if (coprocessorHost != null) { coprocessorHost.postCloseRegionOperation(operation); http://git-wip-us.apache.org/repos/asf/hbase/blob/52c2dcba/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 24ec480..d6303f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2653,6 +2653,22 @@ public class HStore implements Store { return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID; } + /** + * Sets the store up for a region level snapshot operation. + * @see #postSnapshotOperation() + */ + public void preSnapshotOperation() { + archiveLock.lock(); + } + + /** + * Perform tasks needed after the completion of snapshot operation. + * @see #preSnapshotOperation() + */ + public void postSnapshotOperation() { + archiveLock.unlock(); + } + @Override public synchronized void closeAndArchiveCompactedFiles() throws IOException { // ensure other threads do not attempt to archive the same files on close() http://git-wip-us.apache.org/repos/asf/hbase/blob/52c2dcba/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index c763ac0..4651021 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -221,7 +221,7 @@ public interface Region extends ConfigurationObserver { */ enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, - REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT + REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT } /** @@ -251,6 +251,13 @@ public interface Region extends ConfigurationObserver { */ void closeRegionOperation() throws IOException; + /** + * Closes the region operation lock. This needs to be called in the finally block corresponding + * to the try block of {@link #startRegionOperation(Operation)} + * @throws IOException + */ + void closeRegionOperation(Operation op) throws IOException; + // Row write locks /** http://git-wip-us.apache.org/repos/asf/hbase/blob/52c2dcba/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java index 5669452..c56204d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java @@ -75,10 +75,18 @@ public class FlushSnapshotSubprocedure extends Subprocedure { /** * Callable for adding files to snapshot manifest working dir. Ready for multithreading. */ - private class RegionSnapshotTask implements Callable<Void> { - Region region; - RegionSnapshotTask(Region region) { + public static class RegionSnapshotTask implements Callable<Void> { + private Region region; + private boolean skipFlush; + private ForeignExceptionDispatcher monitor; + private SnapshotDescription snapshotDesc; + + public RegionSnapshotTask(Region region, SnapshotDescription snapshotDesc, + boolean skipFlush, ForeignExceptionDispatcher monitor) { this.region = region; + this.skipFlush = skipFlush; + this.monitor = monitor; + this.snapshotDesc = snapshotDesc; } @Override @@ -88,10 +96,10 @@ public class FlushSnapshotSubprocedure extends Subprocedure { // snapshots that involve multiple regions and regionservers. It is still possible to have // an interleaving such that globally regions are missing, so we still need the verification // step. - LOG.debug("Starting region operation on " + region); - region.startRegionOperation(); + LOG.debug("Starting snapshot operation on " + region); + region.startRegionOperation(Operation.SNAPSHOT); try { - if (snapshotSkipFlush) { + if (skipFlush) { /* * This is to take an online-snapshot without force a coordinated flush to prevent pause * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure @@ -127,15 +135,15 @@ public class FlushSnapshotSubprocedure extends Subprocedure { throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts"); } } - ((HRegion)region).addRegionToSnapshot(snapshot, monitor); - if (snapshotSkipFlush) { + ((HRegion)region).addRegionToSnapshot(snapshotDesc, monitor); + if (skipFlush) { LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed."); } else { LOG.debug("... Flush Snapshotting region " + region.toString() + " completed."); } } finally { - LOG.debug("Closing region operation on " + region); - region.closeRegionOperation(); + LOG.debug("Closing snapshot operation on " + region); + region.closeRegionOperation(Operation.SNAPSHOT); } return null; } @@ -159,7 +167,7 @@ public class FlushSnapshotSubprocedure extends Subprocedure { // Add all hfiles already existing in region. for (Region region : regions) { // submit one task per region for parallelize by region. - taskManager.submitTask(new RegionSnapshotTask(region)); + taskManager.submitTask(new RegionSnapshotTask(region, snapshot, snapshotSkipFlush, monitor)); monitor.rethrowException(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/52c2dcba/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java index 004d580..d15789e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.snapshot; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.CodedInputStream; import com.google.protobuf.InvalidProtocolBufferException; @@ -110,6 +111,7 @@ public final class SnapshotManifest { final Path workingDir, final SnapshotDescription desc, final ForeignExceptionSnare monitor) { return new SnapshotManifest(conf, fs, workingDir, desc, monitor); + } /** @@ -163,9 +165,15 @@ public final class SnapshotManifest { * This is used by the "online snapshot" when the table is enabled. */ public void addRegion(final HRegion region) throws IOException { - // 0. Get the ManifestBuilder/RegionVisitor + // Get the ManifestBuilder/RegionVisitor RegionVisitor visitor = createRegionVisitor(desc); + // Visit the region and add it to the manifest + addRegion(region, visitor); + } + + @VisibleForTesting + protected void addRegion(final HRegion region, RegionVisitor visitor) throws IOException { // 1. dump region meta info into the snapshot directory LOG.debug("Storing '" + region + "' region-info for snapshot."); Object regionData = visitor.regionOpen(region.getRegionInfo()); @@ -203,12 +211,20 @@ public final class SnapshotManifest { * This is used by the "offline snapshot" when the table is disabled. */ public void addRegion(final Path tableDir, final HRegionInfo regionInfo) throws IOException { - // 0. Get the ManifestBuilder/RegionVisitor + // Get the ManifestBuilder/RegionVisitor RegionVisitor visitor = createRegionVisitor(desc); + // Visit the region and add it to the manifest + addRegion(tableDir, regionInfo, visitor); + } + + @VisibleForTesting + protected void addRegion(final Path tableDir, final HRegionInfo regionInfo, RegionVisitor visitor) + throws IOException { + // Open the RegionFS HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(conf, fs, - tableDir, regionInfo, true); + tableDir, regionInfo, true); monitor.rethrowException(); // 1. dump region meta info into the snapshot directory http://git-wip-us.apache.org/repos/asf/hbase/blob/52c2dcba/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java new file mode 100644 index 0000000..403b1e6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.snapshot; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos; +import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.snapshot.FlushSnapshotSubprocedure; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +/** + * Testing the region snapshot task on a cluster. + * @see org.apache.hadoop.hbase.regionserver.snapshot.FlushSnapshotSubprocedure.RegionSnapshotTask + */ +@Category({ MediumTests.class, RegionServerTests.class}) +public class TestRegionSnapshotTask { + private final Log LOG = LogFactory.getLog(getClass()); + + private static HBaseTestingUtility TEST_UTIL; + private static Configuration conf; + private static FileSystem fs; + private static Path rootDir; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + + conf = TEST_UTIL.getConfiguration(); + + // Try to frequently clean up compacted files + conf.setInt("hbase.hfile.compaction.discharger.interval", 1000); + conf.setInt("hbase.master.hfilecleaner.ttl", 1000); + + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); + + rootDir = FSUtils.getRootDir(conf); + fs = TEST_UTIL.getTestFileSystem(); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Tests adding a region to the snapshot manifest while compactions are running on the region. + * The idea is to slow down the process of adding a store file to the manifest while + * triggering compactions on the region, allowing the store files to be marked for archival while + * snapshot operation is running. + * This test checks for the correct behavior in such a case that the compacted files should + * not be moved around if a snapshot operation is in progress. + * See HBASE-18398 + */ + @Test(timeout = 30000) + public void testAddRegionWithCompactions() throws Exception { + final TableName tableName = TableName.valueOf("test_table"); + Table table = setupTable(tableName); + + List<HRegion> hRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName); + + final HBaseProtos.SnapshotDescription snapshot = HBaseProtos.SnapshotDescription.newBuilder() + .setTable(tableName.getNameAsString()) + .setType(HBaseProtos.SnapshotDescription.Type.FLUSH) + .setName("test_table_snapshot") + .setVersion(SnapshotManifestV2.DESCRIPTOR_VERSION) + .build(); + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName()); + + final HRegion region = spy(hRegions.get(0)); + + Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); + final SnapshotManifest manifest = + SnapshotManifest.create(conf, fs, workingDir, snapshot, monitor); + manifest.addTableDescriptor(table.getTableDescriptor()); + + if (!fs.exists(workingDir)) { + fs.mkdirs(workingDir); + } + assertTrue(fs.exists(workingDir)); + SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, fs); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + addRegionToSnapshot(snapshot, region, manifest); + return null; + } + }).when(region).addRegionToSnapshot(snapshot, monitor); + + FlushSnapshotSubprocedure.RegionSnapshotTask snapshotTask = + new FlushSnapshotSubprocedure.RegionSnapshotTask(region, snapshot, true, monitor); + ExecutorService executor = Executors.newFixedThreadPool(1); + Future f = executor.submit(snapshotTask); + + // Trigger major compaction and wait for snaphot operation to finish + LOG.info("Starting major compaction"); + region.compact(true); + LOG.info("Finished major compaction"); + f.get(); + + // Consolidate region manifests into a single snapshot manifest + manifest.consolidate(); + + // Make sure that the region manifest exists, which means the snapshot operation succeeded + assertNotNull(manifest.getRegionManifests()); + // Sanity check, there should be only one region + assertEquals(1, manifest.getRegionManifests().size()); + + // Make sure that no files went missing after the snapshot operation + SnapshotReferenceUtil.verifySnapshot(conf, fs, manifest); + } + + private void addRegionToSnapshot(HBaseProtos.SnapshotDescription snapshot, + HRegion region, SnapshotManifest manifest) throws Exception { + LOG.info("Adding region to snapshot: " + region.getRegionInfo().getRegionNameAsString()); + Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); + SnapshotManifest.RegionVisitor visitor = createRegionVisitorWithDelay(snapshot, workingDir); + manifest.addRegion(region, visitor); + LOG.info("Added the region to snapshot: " + region.getRegionInfo().getRegionNameAsString()); + } + + private SnapshotManifest.RegionVisitor createRegionVisitorWithDelay( + HBaseProtos.SnapshotDescription desc, Path workingDir) { + return new SnapshotManifestV2.ManifestBuilder(conf, fs, workingDir) { + @Override + public void storeFile(final SnapshotProtos.SnapshotRegionManifest.Builder region, + final SnapshotProtos.SnapshotRegionManifest.FamilyFiles.Builder family, + final StoreFileInfo storeFile) throws IOException { + try { + LOG.debug("Introducing delay before adding store file to manifest"); + Thread.sleep(2000); + } catch (InterruptedException ex) { + LOG.error("Interrupted due to error: " + ex); + } + super.storeFile(region, family, storeFile); + } + }; + } + + private Table setupTable(TableName tableName) throws Exception { + HTableDescriptor htd = new HTableDescriptor(tableName); + // Flush many files, but do not compact immediately + htd.setMemStoreFlushSize(5000).setConfiguration("hbase.hstore.compactionThreshold", "250"); + // Make sure the region does not split + htd.setRegionSplitPolicyClassName(ConstantSizeRegionSplitPolicy.class.getName()); + htd.setMaxFileSize(100 * 1024 * 1024); + + byte[] fam = Bytes.toBytes("fam"); + Table table = TEST_UTIL.createTable(htd, new byte[][] {fam}, + TEST_UTIL.getConfiguration()); + TEST_UTIL.loadTable(table, fam); + return table; + } +}