Repository: hbase
Updated Branches:
  refs/heads/master d5f34adcd -> ded0842ca


HBASE-18398: Snapshot operation fails with FileNotFoundException


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ded0842c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ded0842c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ded0842c

Branch: refs/heads/master
Commit: ded0842cafbccff0ad64137abf4690c781d86e65
Parents: d5f34ad
Author: Ashu Pachauri <a...@apache.org>
Authored: Mon Aug 7 18:10:33 2017 -0700
Committer: Ashu Pachauri <a...@apache.org>
Committed: Thu Aug 10 14:20:08 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  25 ++-
 .../hadoop/hbase/regionserver/HStore.java       |  16 ++
 .../hadoop/hbase/regionserver/Region.java       |   9 +-
 .../snapshot/FlushSnapshotSubprocedure.java     |  31 ++-
 .../hadoop/hbase/snapshot/SnapshotManifest.java |  30 ++-
 .../hbase/snapshot/TestRegionSnapshotTask.java  | 204 +++++++++++++++++++
 6 files changed, 293 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ded0842c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 31357ad..483cb36 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7922,7 +7922,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       case DELETE:
       case BATCH_MUTATE:
       case COMPACT_REGION:
-        // when a region is in recovering state, no read, split or merge is 
allowed
+      case SNAPSHOT:
+        // when a region is in recovering state, no read, split, merge or 
snapshot is allowed
         if (isRecovering() && (this.disallowWritesInRecovering ||
               (op != Operation.PUT && op != Operation.DELETE && op != 
Operation.BATCH_MUTATE))) {
           throw new 
RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +
@@ -7946,6 +7947,15 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       lock.readLock().unlock();
       throw new 
NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is 
closed");
     }
+    // The unit for snapshot is a region. So, all stores for this region must 
be
+    // prepared for snapshot operation before proceeding.
+    if (op == Operation.SNAPSHOT) {
+      for (Store store : stores.values()) {
+        if (store instanceof HStore) {
+          ((HStore)store).preSnapshotOperation();
+        }
+      }
+    }
     try {
       if (coprocessorHost != null) {
         coprocessorHost.postStartRegionOperation(op);
@@ -7961,12 +7971,15 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     closeRegionOperation(Operation.ANY);
   }
 
-  /**
-   * Closes the lock. This needs to be called in the finally block 
corresponding
-   * to the try block of {@link #startRegionOperation(Operation)}
-   * @throws IOException
-   */
+  @Override
   public void closeRegionOperation(Operation operation) throws IOException {
+    if (operation == Operation.SNAPSHOT) {
+      for (Store store: stores.values()) {
+        if (store instanceof HStore) {
+          ((HStore)store).postSnapshotOperation();
+        }
+      }
+    }
     lock.readLock().unlock();
     if (coprocessorHost != null) {
       coprocessorHost.postCloseRegionOperation(operation);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ded0842c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 2d9e0f2..882e1fc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2486,6 +2486,22 @@ public class HStore implements Store {
           return getRegionInfo().getReplicaId() == 
HRegionInfo.DEFAULT_REPLICA_ID;
   }
 
+  /**
+   * Sets the store up for a region level snapshot operation.
+   * @see #postSnapshotOperation()
+   */
+  public void preSnapshotOperation() {
+    archiveLock.lock();
+  }
+
+  /**
+   * Perform tasks needed after the completion of snapshot operation.
+   * @see #preSnapshotOperation()
+   */
+  public void postSnapshotOperation() {
+    archiveLock.unlock();
+  }
+
   @Override
   public synchronized void closeAndArchiveCompactedFiles() throws IOException {
     // ensure other threads do not attempt to archive the same files on close()

http://git-wip-us.apache.org/repos/asf/hbase/blob/ded0842c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 48672a3..fe17cb2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -247,7 +247,7 @@ public interface Region extends ConfigurationObserver {
    */
   enum Operation {
     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, 
MERGE_REGION, BATCH_MUTATE,
-    REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT
+    REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT
   }
 
   /**
@@ -277,6 +277,13 @@ public interface Region extends ConfigurationObserver {
    */
   void closeRegionOperation() throws IOException;
 
+  /**
+   * Closes the region operation lock. This needs to be called in the finally 
block corresponding
+   * to the try block of {@link #startRegionOperation(Operation)}
+   * @throws IOException
+   */
+  void closeRegionOperation(Operation op) throws IOException;
+
   // Row write locks
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ded0842c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
index b30d622..281de18 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.procedure.Subprocedure;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
+import org.apache.hadoop.hbase.regionserver.Region.Operation;
 import 
org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@@ -74,10 +75,18 @@ public class FlushSnapshotSubprocedure extends Subprocedure 
{
   /**
    * Callable for adding files to snapshot manifest working dir.  Ready for 
multithreading.
    */
-  private class RegionSnapshotTask implements Callable<Void> {
-    Region region;
-    RegionSnapshotTask(Region region) {
+  public static class RegionSnapshotTask implements Callable<Void> {
+    private Region region;
+    private boolean skipFlush;
+    private ForeignExceptionDispatcher monitor;
+    private SnapshotDescription snapshotDesc;
+
+    public RegionSnapshotTask(Region region, SnapshotDescription snapshotDesc,
+        boolean skipFlush, ForeignExceptionDispatcher monitor) {
       this.region = region;
+      this.skipFlush = skipFlush;
+      this.monitor = monitor;
+      this.snapshotDesc = snapshotDesc;
     }
 
     @Override
@@ -87,10 +96,10 @@ public class FlushSnapshotSubprocedure extends Subprocedure 
{
       // snapshots that involve multiple regions and regionservers.  It is 
still possible to have
       // an interleaving such that globally regions are missing, so we still 
need the verification
       // step.
-      LOG.debug("Starting region operation on " + region);
-      region.startRegionOperation();
+      LOG.debug("Starting snapshot operation on " + region);
+      region.startRegionOperation(Operation.SNAPSHOT);
       try {
-        if (snapshotSkipFlush) {
+        if (skipFlush) {
         /*
          * This is to take an online-snapshot without force a coordinated 
flush to prevent pause
          * The snapshot type is defined inside the snapshot description. 
FlushSnapshotSubprocedure
@@ -123,15 +132,15 @@ public class FlushSnapshotSubprocedure extends 
Subprocedure {
             throw new IOException("Unable to complete flush after " + 
MAX_RETRIES + " attempts");
           }
         }
-        ((HRegion)region).addRegionToSnapshot(snapshot, monitor);
-        if (snapshotSkipFlush) {
+        ((HRegion)region).addRegionToSnapshot(snapshotDesc, monitor);
+        if (skipFlush) {
           LOG.debug("... SkipFlush Snapshotting region " + region.toString() + 
" completed.");
         } else {
           LOG.debug("... Flush Snapshotting region " + region.toString() + " 
completed.");
         }
       } finally {
-        LOG.debug("Closing region operation on " + region);
-        region.closeRegionOperation();
+        LOG.debug("Closing snapshot operation on " + region);
+        region.closeRegionOperation(Operation.SNAPSHOT);
       }
       return null;
     }
@@ -155,7 +164,7 @@ public class FlushSnapshotSubprocedure extends Subprocedure 
{
     // Add all hfiles already existing in region.
     for (Region region : regions) {
       // submit one task per region for parallelize by region.
-      taskManager.submitTask(new RegionSnapshotTask(region));
+      taskManager.submitTask(new RegionSnapshotTask(region, snapshot, 
snapshotSkipFlush, monitor));
       monitor.rethrowException();
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ded0842c/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
index 86687d9..f70fe9e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
 import 
org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -112,6 +113,7 @@ public final class SnapshotManifest {
       final Path workingDir, final SnapshotDescription desc,
       final ForeignExceptionSnare monitor) {
     return new SnapshotManifest(conf, fs, workingDir, desc, monitor);
+
   }
 
   /**
@@ -162,9 +164,15 @@ public final class SnapshotManifest {
   }
 
   public void addMobRegion(HRegionInfo regionInfo) throws IOException {
-    // 0. Get the ManifestBuilder/RegionVisitor
+    // Get the ManifestBuilder/RegionVisitor
     RegionVisitor visitor = createRegionVisitor(desc);
 
+    // Visit the region and add it to the manifest
+    addMobRegion(regionInfo, visitor);
+  }
+
+  @VisibleForTesting
+  protected void addMobRegion(HRegionInfo regionInfo, RegionVisitor visitor) 
throws IOException {
     // 1. dump region meta info into the snapshot directory
     LOG.debug("Storing mob region '" + regionInfo + "' region-info for 
snapshot.");
     Object regionData = visitor.regionOpen(regionInfo);
@@ -203,9 +211,15 @@ public final class SnapshotManifest {
    * This is used by the "online snapshot" when the table is enabled.
    */
   public void addRegion(final HRegion region) throws IOException {
-    // 0. Get the ManifestBuilder/RegionVisitor
+    // Get the ManifestBuilder/RegionVisitor
     RegionVisitor visitor = createRegionVisitor(desc);
 
+    // Visit the region and add it to the manifest
+    addRegion(region, visitor);
+  }
+
+  @VisibleForTesting
+  protected void addRegion(final HRegion region, RegionVisitor visitor) throws 
IOException {
     // 1. dump region meta info into the snapshot directory
     LOG.debug("Storing '" + region + "' region-info for snapshot.");
     Object regionData = visitor.regionOpen(region.getRegionInfo());
@@ -216,7 +230,8 @@ public final class SnapshotManifest {
 
     for (Store store : region.getStores()) {
       // 2.1. build the snapshot reference for the store
-      Object familyData = visitor.familyOpen(regionData, 
store.getColumnFamilyDescriptor().getName());
+      Object familyData = visitor.familyOpen(regionData,
+          store.getColumnFamilyDescriptor().getName());
       monitor.rethrowException();
 
       List<StoreFile> storeFiles = new ArrayList<>(store.getStorefiles());
@@ -243,9 +258,16 @@ public final class SnapshotManifest {
    * This is used by the "offline snapshot" when the table is disabled.
    */
   public void addRegion(final Path tableDir, final HRegionInfo regionInfo) 
throws IOException {
-    // 0. Get the ManifestBuilder/RegionVisitor
+    // Get the ManifestBuilder/RegionVisitor
     RegionVisitor visitor = createRegionVisitor(desc);
 
+    // Visit the region and add it to the manifest
+    addRegion(tableDir, regionInfo, visitor);
+  }
+
+  @VisibleForTesting
+  protected void addRegion(final Path tableDir, final HRegionInfo regionInfo, 
RegionVisitor visitor)
+      throws IOException {
     boolean isMobRegion = MobUtils.isMobRegionInfo(regionInfo);
     try {
       Path baseDir = tableDir;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ded0842c/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java
new file mode 100644
index 0000000..ef1c7ac
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.snapshot.FlushSnapshotSubprocedure;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Testing the region snapshot task on a cluster.
+ * @see 
org.apache.hadoop.hbase.regionserver.snapshot.FlushSnapshotSubprocedure.RegionSnapshotTask
+ */
+@Category({MediumTests.class, RegionServerTests.class})
+public class TestRegionSnapshotTask {
+  private final Log LOG = LogFactory.getLog(getClass());
+
+  private static HBaseTestingUtility TEST_UTIL;
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static Path rootDir;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+
+    conf = TEST_UTIL.getConfiguration();
+
+    // Try to frequently clean up compacted files
+    conf.setInt("hbase.hfile.compaction.discharger.interval", 1000);
+    conf.setInt("hbase.master.hfilecleaner.ttl", 1000);
+
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
+    TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
+
+    rootDir = FSUtils.getRootDir(conf);
+    fs = TEST_UTIL.getTestFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Tests adding a region to the snapshot manifest while compactions are 
running on the region.
+   * The idea is to slow down the process of adding a store file to the 
manifest while
+   * triggering compactions on the region, allowing the store files to be 
marked for archival while
+   * snapshot operation is running.
+   * This test checks for the correct behavior in such a case that the 
compacted files should
+   * not be moved around if a snapshot operation is in progress.
+   * See HBASE-18398
+   */
+  @Test(timeout = 30000)
+  public void testAddRegionWithCompactions() throws Exception {
+    final TableName tableName = TableName.valueOf("test_table");
+    Table table = setupTable(tableName);
+
+    List<HRegion> hRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
+
+    final SnapshotProtos.SnapshotDescription snapshot =
+        SnapshotProtos.SnapshotDescription.newBuilder()
+        .setTable(tableName.getNameAsString())
+        .setType(SnapshotProtos.SnapshotDescription.Type.FLUSH)
+        .setName("test_table_snapshot")
+        .setVersion(SnapshotManifestV2.DESCRIPTOR_VERSION)
+        .build();
+    ForeignExceptionDispatcher monitor = new 
ForeignExceptionDispatcher(snapshot.getName());
+
+    final HRegion region = spy(hRegions.get(0));
+
+    Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, 
rootDir);
+    final SnapshotManifest manifest =
+        SnapshotManifest.create(conf, fs, workingDir, snapshot, monitor);
+    manifest.addTableDescriptor(table.getTableDescriptor());
+
+    if (!fs.exists(workingDir)) {
+      fs.mkdirs(workingDir);
+    }
+    assertTrue(fs.exists(workingDir));
+    SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, fs);
+
+    doAnswer(__ -> {
+      addRegionToSnapshot(snapshot, region, manifest);
+      return null;
+    }).when(region).addRegionToSnapshot(snapshot, monitor);
+
+    FlushSnapshotSubprocedure.RegionSnapshotTask snapshotTask =
+        new FlushSnapshotSubprocedure.RegionSnapshotTask(region, snapshot, 
true, monitor);
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    Future f = executor.submit(snapshotTask);
+
+    // Trigger major compaction and wait for snaphot operation to finish
+    LOG.info("Starting major compaction");
+    region.compact(true);
+    LOG.info("Finished major compaction");
+    f.get();
+
+    // Consolidate region manifests into a single snapshot manifest
+    manifest.consolidate();
+
+    // Make sure that the region manifest exists, which means the snapshot 
operation succeeded
+    assertNotNull(manifest.getRegionManifests());
+    // Sanity check, there should be only one region
+    assertEquals(1, manifest.getRegionManifests().size());
+
+    // Make sure that no files went missing after the snapshot operation
+    SnapshotReferenceUtil.verifySnapshot(conf, fs, manifest);
+  }
+
+  private void addRegionToSnapshot(SnapshotProtos.SnapshotDescription snapshot,
+      HRegion region, SnapshotManifest manifest) throws Exception {
+    LOG.info("Adding region to snapshot: " + 
region.getRegionInfo().getRegionNameAsString());
+    Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, 
rootDir);
+    SnapshotManifest.RegionVisitor visitor = 
createRegionVisitorWithDelay(snapshot, workingDir);
+    manifest.addRegion(region, visitor);
+    LOG.info("Added the region to snapshot: " + 
region.getRegionInfo().getRegionNameAsString());
+  }
+
+  private SnapshotManifest.RegionVisitor createRegionVisitorWithDelay(
+      SnapshotProtos.SnapshotDescription desc, Path workingDir) {
+    return new SnapshotManifestV2.ManifestBuilder(conf, fs, workingDir) {
+      @Override
+      public void storeFile(final 
SnapshotProtos.SnapshotRegionManifest.Builder region,
+          final SnapshotProtos.SnapshotRegionManifest.FamilyFiles.Builder 
family,
+          final StoreFileInfo storeFile) throws IOException {
+        try {
+          LOG.debug("Introducing delay before adding store file to manifest");
+          Thread.sleep(2000);
+        } catch (InterruptedException ex) {
+          LOG.error("Interrupted due to error: " + ex);
+        }
+        super.storeFile(region, family, storeFile);
+      }
+    };
+  }
+
+  private Table setupTable(TableName tableName) throws Exception {
+    TableDescriptorBuilder builder = 
TableDescriptorBuilder.newBuilder(tableName);
+    // Flush many files, but do not compact immediately
+    // Make sure that the region does not split
+    builder
+        .setMemStoreFlushSize(5000)
+        
.setRegionSplitPolicyClassName(ConstantSizeRegionSplitPolicy.class.getName())
+        .setMaxFileSize(100 * 1024 * 1024)
+        .setConfiguration("hbase.hstore.compactionThreshold", "250");
+
+    TableDescriptor td = builder.build();
+    byte[] fam = Bytes.toBytes("fam");
+    Table table = TEST_UTIL.createTable(td, new byte[][] {fam},
+        TEST_UTIL.getConfiguration());
+    TEST_UTIL.loadTable(table, fam);
+    return table;
+  }
+}

Reply via email to