This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a17071a08bee3fb54910a777f14cd6296a96619f
Author: Kirk Lund <[email protected]>
AuthorDate: Thu Jul 12 16:59:55 2018 -0700

    GEODE-5405: Streamline Backup distributed tests
---
 .../cache/backup/BackupDistributedTest.java        |  674 +++++-----
 .../backup/IncrementalBackupDistributedTest.java   | 1283 +++++++-------------
 ...titionedBackupPrepareAndFinishMsgDUnitTest.java |   29 -
 ... => PrepareAndFinishBackupDistributedTest.java} |  179 ++-
 ...eplicateBackupPrepareAndFinishMsgDUnitTest.java |   29 -
 5 files changed, 915 insertions(+), 1279 deletions(-)

diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDistributedTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDistributedTest.java
index ebc4372..bda648d 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDistributedTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDistributedTest.java
@@ -14,27 +14,30 @@
  */
 package org.apache.geode.internal.cache.backup;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.commons.io.FileUtils.listFiles;
 import static org.apache.commons.io.filefilter.DirectoryFileFilter.DIRECTORY;
-import static org.apache.geode.test.dunit.Host.getHost;
+import static org.apache.geode.cache.EvictionAction.OVERFLOW_TO_DISK;
+import static 
org.apache.geode.cache.EvictionAttributes.createLIFOEntryAttributes;
+import static org.apache.geode.cache.RegionShortcut.PARTITION;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static 
org.apache.geode.internal.admin.remote.AdminFailureResponse.create;
+import static org.apache.geode.test.dunit.Disconnect.disconnectFromDS;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.VM.getVM;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.Serializable;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -46,22 +49,20 @@ import junitparams.naming.TestCaseName;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.RegexFileFilter;
 import org.apache.logging.log4j.Logger;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.DiskStore;
-import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.DiskStoreFactory;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.PartitionedRegionStorageException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.control.RebalanceOperation;
 import org.apache.geode.cache.control.RebalanceResults;
 import org.apache.geode.cache.persistence.PartitionOfflineException;
@@ -72,16 +73,20 @@ import 
org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.internal.admin.remote.AdminFailureResponse;
 import 
org.apache.geode.internal.cache.DestroyRegionOperation.DestroyRegionMessage;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import 
org.apache.geode.internal.cache.partitioned.PersistentPartitionedRegionTestBase;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.lang.SystemUtils;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.process.ProcessStreamReader;
+import org.apache.geode.internal.process.ProcessStreamReader.ReadingMode;
 import org.apache.geode.management.BackupStatus;
-import org.apache.geode.management.ManagementException;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 
 /**
  * Additional tests to consider adding:
@@ -94,115 +99,148 @@ import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolde
 @Category(DistributedTest.class)
 @RunWith(JUnitParamsRunner.class)
 @SuppressWarnings("serial")
-public class BackupDistributedTest extends PersistentPartitionedRegionTestBase 
{
+public class BackupDistributedTest implements Serializable {
   private static final Logger logger = LogService.getLogger();
 
   private static final int NUM_BUCKETS = 15;
 
-  @Rule
-  public SerializableTemporaryFolder tempDir = new 
SerializableTemporaryFolder();
+  private String uniqueName;
+  private String regionName1;
+  private String regionName2;
+  private String regionName3;
+
+  private File backupBaseDir;
 
   private VM vm0;
   private VM vm1;
   private VM vm2;
   private VM vm3;
-  private Map<VM, File> workingDirByVm;
-  private File backupBaseDir;
+
+  private transient Process process;
+  private transient ProcessStreamReader processReader;
+
+  @Rule
+  public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public DistributedDiskDirRule diskDirRule = new DistributedDiskDirRule();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new 
SerializableTemporaryFolder();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
 
   @Before
   public void setUp() throws Exception {
-    vm0 = getHost(0).getVM(0);
-    vm1 = getHost(0).getVM(1);
-    vm2 = getHost(0).getVM(2);
-    vm3 = getHost(0).getVM(3);
+    vm0 = getVM(0);
+    vm1 = getVM(1);
+    vm2 = getVM(2);
+    vm3 = getVM(3);
 
-    workingDirByVm = new HashMap<>();
-    workingDirByVm.put(vm0, tempDir.newFolder());
-    workingDirByVm.put(vm1, tempDir.newFolder());
-    workingDirByVm.put(vm2, tempDir.newFolder());
-    workingDirByVm.put(vm3, tempDir.newFolder());
+    uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    regionName1 = uniqueName + "_region1";
+    regionName2 = uniqueName + "_region2";
+    regionName3 = uniqueName + "_region3";
 
-    backupBaseDir = tempDir.newFolder("backupDir");
+    backupBaseDir = temporaryFolder.newFolder("backupDir");
   }
 
-  @Override
-  public final void preTearDownCacheTestCase() throws Exception {
+  @After
+  public void tearDown() throws Exception {
     vm0.invoke(() -> {
       DistributionMessageObserver.setInstance(null);
       disconnectFromDS();
     });
+
+    if (process != null && process.isAlive()) {
+      process.destroyForcibly();
+      process.waitFor(2, MINUTES);
+    }
+    if (processReader != null && processReader.isRunning()) {
+      processReader.stop();
+    }
   }
 
   @Test
   public void testBackupPR() throws Exception {
-    createPersistentRegions();
+    createPersistentRegions(vm0, vm1);
 
-    long lastModified0 = setBackupFiles(vm0);
-    long lastModified1 = setBackupFiles(vm1);
+    long lastModified0 = vm0.invoke(() -> setBackupFiles(getDiskDirFor(vm0)));
+    long lastModified1 = vm1.invoke(() -> setBackupFiles(getDiskDirFor(vm1)));
 
-    createData();
+    vm0.invoke(() -> {
+      createData(0, 5, "A", regionName1);
+      createData(0, 5, "B", regionName2);
+    });
 
     vm0.invoke(() -> {
       
assertThat(getCache().getDistributionManager().getNormalDistributionManagerIds()).hasSize(2);
     });
 
     vm2.invoke(() -> {
-      getCache();
       
assertThat(getCache().getDistributionManager().getNormalDistributionManagerIds()).hasSize(3);
     });
 
-    BackupStatus status = backupMember(vm2);
+    BackupStatus status = vm2.invoke(() -> backupMember());
     assertThat(status.getBackedUpDiskStores()).hasSize(2);
     assertThat(status.getOfflineDiskStores()).isEmpty();
 
     Collection<File> files = FileUtils.listFiles(backupBaseDir, new String[] 
{"txt"}, true);
     assertThat(files).hasSize(4);
 
-    deleteOldUserUserFile(vm0);
-    deleteOldUserUserFile(vm1);
+    vm0.invoke(() -> deleteOldUserFile(getDiskDirFor(vm0)));
+    vm1.invoke(() -> deleteOldUserFile(getDiskDirFor(vm1)));
+
     validateBackupComplete();
 
-    createData(vm0, 0, 5, "C", "region1");
-    createData(vm0, 0, 5, "C", "region2");
+    vm0.invoke(() -> {
+      createData(0, 5, "C", regionName1);
+      createData(0, 5, "C", regionName2);
+    });
 
     assertThat(status.getBackedUpDiskStores()).hasSize(2);
     assertThat(status.getOfflineDiskStores()).isEmpty();
 
-    closeCache(vm0);
-    closeCache(vm1);
+    vm0.invoke(() -> getCache().close());
+    vm1.invoke(() -> getCache().close());
 
     // destroy the current data
     cleanDiskDirsInEveryVM();
 
     restoreBackup(2);
 
-    createPersistentRegions();
+    createPersistentRegions(vm0, vm1);
 
-    checkData(vm0, 0, 5, "A", "region1");
-    checkData(vm0, 0, 5, "B", "region2");
-    verifyUserFileRestored(vm0, lastModified0);
-    verifyUserFileRestored(vm1, lastModified1);
-  }
+    vm0.invoke(() -> {
+      validateData(0, 5, "A", regionName1);
+      validateData(0, 5, "B", regionName2);
+    });
 
-  private void createData() {
-    createData(vm0, 0, 5, "A", "region1");
-    createData(vm0, 0, 5, "B", "region2");
+    vm0.invoke(() -> verifyUserFileRestored(getDiskDirFor(vm0), 
lastModified0));
+    vm1.invoke(() -> verifyUserFileRestored(getDiskDirFor(vm1), 
lastModified1));
   }
 
   /**
    * Test of bug 42419.
    *
    * <p>
-   * TRAC 42419: backed up disk stores map contains null key instead of 
member; cannot restore
+   * TRAC #42419: backed up disk stores map contains null key instead of 
member; cannot restore
    * backup files
    */
   @Test
   public void testBackupFromMemberWithDiskStore() throws Exception {
-    createPersistentRegions();
+    createPersistentRegions(vm0, vm1);
 
-    createData();
+    vm0.invoke(() -> {
+      createData(0, 5, "A", regionName1);
+      createData(0, 5, "B", regionName2);
+    });
 
-    BackupStatus status = backupMember(vm1);
+    BackupStatus status = vm1.invoke(() -> backupMember());
     assertThat(status.getBackedUpDiskStores()).hasSize(2);
 
     for (DistributedMember key : status.getBackedUpDiskStores().keySet()) {
@@ -212,41 +250,45 @@ public class BackupDistributedTest extends 
PersistentPartitionedRegionTestBase {
 
     validateBackupComplete();
 
-    closeCache(vm0);
-    closeCache(vm1);
+    vm0.invoke(() -> getCache().close());
+    vm1.invoke(() -> getCache().close());
 
     // destroy the current data
     cleanDiskDirsInEveryVM();
 
     restoreBackup(2);
 
-    createPersistentRegions();
+    createPersistentRegions(vm0, vm1);
 
-    checkData(vm0, 0, 5, "A", "region1");
-    checkData(vm0, 0, 5, "B", "region2");
+    vm0.invoke(() -> {
+      validateData(0, 5, "A", regionName1);
+      validateData(0, 5, "B", regionName2);
+    });
   }
 
   /**
    * Test for bug 42419
    *
    * <p>
-   * TRAC 42419: backed up disk stores map contains null key instead of 
member; cannot restore
+   * TRAC #42419: backed up disk stores map contains null key instead of 
member; cannot restore
    * backup files
    */
   @Test
   public void testBackupWhileBucketIsCreated() throws Exception {
-    createPersistentRegion(vm0).await();
+    vm0.invoke(() -> createRegions(vm0));
 
     // create a bucket on vm0
-    createData(vm0, 0, 1, "A", "region1");
+    vm0.invoke(() -> createData(0, 1, "A", regionName1));
 
     // create the pr on vm1, which won't have any buckets
-    createPersistentRegion(vm1).await();
+    vm1.invoke(() -> createRegions(vm1));
 
     CompletableFuture<BackupStatus> backupStatusFuture =
-        CompletableFuture.supplyAsync(() -> backupMember(vm2));
+        CompletableFuture.supplyAsync(() -> vm2.invoke(() -> backupMember()));
+
     CompletableFuture<Void> createDataFuture =
-        CompletableFuture.runAsync(() -> createData(vm0, 1, 5, "A", 
"region1"));
+        CompletableFuture.runAsync(() -> vm0.invoke(() -> createData(1, 5, 
"A", regionName1)));
+
     CompletableFuture.allOf(backupStatusFuture, createDataFuture);
 
     BackupStatus status = backupStatusFuture.get();
@@ -255,29 +297,30 @@ public class BackupDistributedTest extends 
PersistentPartitionedRegionTestBase {
 
     validateBackupComplete();
 
-    createData(vm0, 0, 5, "C", "region1");
+    vm0.invoke(() -> createData(0, 5, "C", regionName1));
 
     assertThat(status.getBackedUpDiskStores()).hasSize(2);
     assertThat(status.getOfflineDiskStores()).isEmpty();
 
-    closeCache(vm0);
-    closeCache(vm1);
+    vm0.invoke(() -> getCache().close());
+    vm1.invoke(() -> getCache().close());
 
     // destroy the current data
     cleanDiskDirsInEveryVM();
 
     restoreBackup(2);
 
-    createPersistentRegions();
+    createPersistentRegions(vm0, vm1);
+
+    vm0.invoke(() -> validateData(0, 1, "A", regionName1));
 
-    checkData(vm0, 0, 1, "A", "region1");
   }
 
   /**
    * Test for bug 42420. Invoke a backup when a bucket is in the middle of 
being moved.
    *
    * <p>
-   * TRAC 42420: Online backup files sometimes cannot be restored
+   * TRAC #42420: Online backup files sometimes cannot be restored
    */
   @Test
   @Parameters({"BEFORE_SENDING_DESTROYREGIONMESSAGE", 
"BEFORE_PROCESSING_REPLYMESSAGE"})
@@ -294,41 +337,36 @@ public class BackupDistributedTest extends 
PersistentPartitionedRegionTestBase {
       
DistributionMessageObserver.setInstance(createTestHookToBackup(whenToInvokeBackup));
     });
 
-    createPersistentRegion(vm0).await();
+    vm0.invoke(() -> createRegions(vm0));
 
     // create twos bucket on vm0
-    createData(vm0, 0, 2, "A", "region1");
+    vm0.invoke(() -> createData(0, 2, "A", regionName1));
 
     // create the pr on vm1, which won't have any buckets
-    createPersistentRegion(vm1).await();
+    vm1.invoke(() -> createRegions(vm1));
 
     // Perform a rebalance. This will trigger the backup in the middle of the 
bucket move.
     vm0.invoke("Do rebalance", () -> {
       RebalanceOperation op = 
getCache().getResourceManager().createRebalanceFactory().start();
-      RebalanceResults results;
-      try {
-        results = op.getResults();
-        assertEquals(1, results.getTotalBucketTransfersCompleted());
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
+      RebalanceResults results = op.getResults();
+      assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(1);
     });
 
     validateBackupComplete();
 
-    createData(vm0, 0, 5, "C", "region1");
+    vm0.invoke(() -> createData(0, 5, "C", regionName1));
 
-    closeCache(vm0);
-    closeCache(vm1);
+    vm0.invoke(() -> getCache().close());
+    vm1.invoke(() -> getCache().close());
 
     // Destroy the current data
     cleanDiskDirsInEveryVM();
 
     restoreBackup(2);
 
-    createPersistentRegions();
+    createPersistentRegions(vm0, vm1);
 
-    checkData(vm0, 0, 2, "A", "region1");
+    vm0.invoke(() -> validateData(0, 2, "A", regionName1));
   }
 
   @Test
@@ -345,14 +383,18 @@ public class BackupDistributedTest extends 
PersistentPartitionedRegionTestBase {
           
createTestHookToThrowIOExceptionBeforeProcessingPrepareBackupRequest(exceptionMessage));
     });
 
-    createPersistentRegions();
+    createPersistentRegions(vm0, vm1);
 
-    createData();
+    vm0.invoke(() -> {
+      createData(0, 5, "A", regionName1);
+      createData(0, 5, "B", regionName2);
+    });
 
-    assertThatThrownBy(() -> 
backupMember(vm2)).hasRootCauseInstanceOf(IOException.class);
+    assertThatThrownBy(() -> vm2.invoke(() -> backupMember()))
+        .hasRootCauseInstanceOf(IOException.class);
 
     // second backup should succeed because the observer and backup state has 
been cleared
-    BackupStatus status = backupMember(vm2);
+    BackupStatus status = vm2.invoke(() -> backupMember());
     assertThat(status.getBackedUpDiskStores()).hasSize(2);
     assertThat(status.getOfflineDiskStores()).isEmpty();
   }
@@ -361,13 +403,16 @@ public class BackupDistributedTest extends 
PersistentPartitionedRegionTestBase {
    * Make sure we don't report members without persistent data as backed up.
    */
   @Test
-  public void testBackupOverflow() throws Exception {
-    createPersistentRegion(vm0).await();
-    createOverflowRegion(vm1);
+  public void testBackupOverflow() {
+    vm0.invoke(() -> createRegions(vm0));
+    vm1.invoke(() -> createRegionWithOverflow(getDiskStoreFor(vm1)));
 
-    createData();
+    vm0.invoke(() -> {
+      createData(0, 5, "A", regionName1);
+      createData(0, 5, "B", regionName2);
+    });
 
-    BackupStatus status = backupMember(vm2);
+    BackupStatus status = vm2.invoke(() -> backupMember());
     assertThat(status.getBackedUpDiskStores()).hasSize(1);
     
assertThat(status.getBackedUpDiskStores().values().iterator().next()).hasSize(2);
     assertThat(status.getOfflineDiskStores()).isEmpty();
@@ -376,62 +421,53 @@ public class BackupDistributedTest extends 
PersistentPartitionedRegionTestBase {
   }
 
   @Test
-  public void testBackupPRWithOfflineMembers() throws Exception {
-    createPersistentRegion(vm0).await();
-    createPersistentRegion(vm1).await();
-    createPersistentRegion(vm2).await();
+  public void testBackupPRWithOfflineMembers() {
+    vm0.invoke(() -> createRegions(vm0));
+    vm1.invoke(() -> createRegions(vm1));
+    vm2.invoke(() -> createRegions(vm2));
 
-    createData();
+    vm0.invoke(() -> {
+      createData(0, 5, "A", regionName1);
+      createData(0, 5, "B", regionName2);
+    });
 
-    closeCache(vm2);
+    vm1.invoke(() -> getCache().close());
 
-    BackupStatus status = backupMember(vm3);
+    BackupStatus status = vm3.invoke(() -> backupMember());
     assertThat(status.getBackedUpDiskStores()).hasSize(2);
     assertThat(status.getOfflineDiskStores()).hasSize(2);
   }
 
-  private DistributionMessageObserver createTestHookToBackup(
-      WhenToInvokeBackup backupInvocationTestHook) {
-    switch (backupInvocationTestHook) {
-      case BEFORE_SENDING_DESTROYREGIONMESSAGE:
-        return createTestHookToBackupBeforeSendingDestroyRegionMessage(() -> 
backupMember(vm2));
-      case BEFORE_PROCESSING_REPLYMESSAGE:
-        return createTestHookToBackupBeforeProcessingReplyMessage(() -> 
backupMember(vm2));
-      default:
-        throw new AssertionError("Invalid backupInvocationTestHook " + 
backupInvocationTestHook);
-    }
-  }
-
   /**
    * Test what happens when we restart persistent members while there is an 
accessor concurrently
    * performing puts.
    */
   @Test
-  public void testRecoverySystemWithConcurrentPutter() throws Throwable {
-    createColatedPersistentRegions(vm1).await();
-    createColatedPersistentRegions(vm2).await();
-
-    createAccessor(vm0);
+  public void testRecoverySystemWithConcurrentPutter() throws Exception {
+    vm1.invoke(() -> createColocatedRegions(vm1));
+    vm2.invoke(() -> createColocatedRegions(vm2));
 
-    createData(vm0, 0, NUM_BUCKETS, "a", "region1");
-    createData(vm0, 0, NUM_BUCKETS, "a", "region2");
+    vm0.invoke(() -> createAccessor());
 
+    vm0.invoke(() -> {
+      createData(0, NUM_BUCKETS, "a", regionName1);
+      createData(0, NUM_BUCKETS, "a", regionName2);
+    });
 
     // backup the system. We use this to get a snapshot of vm1 and vm2
     // when they both are online. Recovering from this backup simulates
-    // a simulataneous kill and recovery.
-    backupMember(vm3);
+    // a simultaneous kill and recovery.
+    vm3.invoke(() -> backupMember());
 
-    closeCache(vm1);
-    closeCache(vm2);
+    vm1.invoke(() -> getCache().close());
+    vm2.invoke(() -> getCache().close());
 
     cleanDiskDirsInEveryVM();
     restoreBackup(2);
 
     // in vm0, start doing a bunch of concurrent puts.
-    AsyncInvocation async0 = vm0.invokeAsync(() -> {
-      Cache cache = getCache();
-      Region region = cache.getRegion("region1");
+    AsyncInvocation putsInVM0 = vm0.invokeAsync(() -> {
+      Region region = getCache().getRegion(regionName1);
       try {
         for (int i = 0;; i++) {
           try {
@@ -445,30 +481,46 @@ public class BackupDistributedTest extends 
PersistentPartitionedRegionTestBase {
       }
     });
 
-    AsyncInvocation async1 = createColatedPersistentRegions(vm1);
-    AsyncInvocation async2 = createColatedPersistentRegions(vm2);
-    async1.await();
-    async2.await();
+    AsyncInvocation createRegionsInVM1 = vm1.invokeAsync(() -> 
createColocatedRegions(vm1));
+    AsyncInvocation createRegionsInVM2 = vm2.invokeAsync(() -> 
createColocatedRegions(vm2));
+
+    createRegionsInVM1.await();
+    createRegionsInVM2.await();
 
     // close the cache in vm0 to stop the async puts.
-    closeCache(vm0);
+    vm0.invoke(() -> getCache().close());
 
     // make sure we didn't get an exception
-    async0.await();
+    putsInVM0.await();
+  }
+
+  private DistributionMessageObserver createTestHookToBackup(
+      WhenToInvokeBackup backupInvocationTestHook) {
+    switch (backupInvocationTestHook) {
+      case BEFORE_SENDING_DESTROYREGIONMESSAGE:
+        return createTestHookToBackupBeforeSendingDestroyRegionMessage(
+            () -> vm2.invoke(() -> backupMember()));
+      case BEFORE_PROCESSING_REPLYMESSAGE:
+        return createTestHookToBackupBeforeProcessingReplyMessage(
+            () -> vm2.invoke(() -> backupMember()));
+      default:
+        throw new RuntimeException("Invalid backupInvocationTestHook " + 
backupInvocationTestHook);
+    }
   }
 
   private DistributionMessageObserver 
createTestHookToBackupBeforeProcessingReplyMessage(
-      Runnable task) {
+      final Runnable task) {
     return new DistributionMessageObserver() {
-      private volatile boolean done;
+
       private final AtomicInteger count = new AtomicInteger();
       private volatile int replyId = -0xBAD;
+      private volatile boolean done;
 
       @Override
       public void beforeSendMessage(ClusterDistributionManager dm, 
DistributionMessage message) {
         // the bucket move will send a destroy region message.
         if (message instanceof DestroyRegionMessage && !done) {
-          this.replyId = message.getProcessorId();
+          replyId = message.getProcessorId();
         }
       }
 
@@ -484,8 +536,9 @@ public class BackupDistributedTest extends 
PersistentPartitionedRegionTestBase {
   }
 
   private DistributionMessageObserver 
createTestHookToBackupBeforeSendingDestroyRegionMessage(
-      Runnable task) {
+      final Runnable task) {
     return new DistributionMessageObserver() {
+
       private volatile boolean done;
 
       @Override
@@ -499,26 +552,23 @@ public class BackupDistributedTest extends 
PersistentPartitionedRegionTestBase {
     };
   }
 
-  private void cleanDiskDirsInEveryVM() {
-    workingDirByVm.forEach((vm, file) -> {
-      try {
-        FileUtils.deleteDirectory(file);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    });
+  private void cleanDiskDirsInEveryVM() throws IOException {
+    FileUtils.deleteDirectory(getDiskDirFor(vm0));
+    FileUtils.deleteDirectory(getDiskDirFor(vm1));
+    FileUtils.deleteDirectory(getDiskDirFor(vm2));
+    FileUtils.deleteDirectory(getDiskDirFor(vm3));
   }
 
   private DistributionMessageObserver 
createTestHookToThrowIOExceptionBeforeProcessingPrepareBackupRequest(
       final String exceptionMessage) {
     return new DistributionMessageObserver() {
+
       @Override
       public void beforeProcessMessage(ClusterDistributionManager dm, 
DistributionMessage message) {
         if (message instanceof PrepareBackupRequest) {
           DistributionMessageObserver.setInstance(null);
           IOException exception = new IOException(exceptionMessage);
-          AdminFailureResponse response =
-              AdminFailureResponse.create(message.getSender(), exception);
+          AdminFailureResponse response = create(message.getSender(), 
exception);
           response.setMsgId(((PrepareBackupRequest) message).getMsgId());
           dm.putOutgoing(response);
           throw new RuntimeException("Stop processing");
@@ -527,227 +577,207 @@ public class BackupDistributedTest extends 
PersistentPartitionedRegionTestBase {
     };
   }
 
-  private void createPersistentRegions() throws ExecutionException, 
InterruptedException {
-    AsyncInvocation create1 = createPersistentRegion(vm0);
-    AsyncInvocation create2 = createPersistentRegion(vm1);
-    create1.await();
-    create2.await();
+  private void createPersistentRegions(final VM... vms)
+      throws ExecutionException, InterruptedException {
+    Set<AsyncInvocation> createRegionAsyncs = new HashSet<>();
+    for (VM vm : vms) {
+      createRegionAsyncs.add(vm.invokeAsync(() -> createRegions(vm)));
+    }
+    for (AsyncInvocation createRegion : createRegionAsyncs) {
+      createRegion.await();
+    }
   }
 
   private void validateBackupComplete() {
     Pattern pattern = Pattern.compile(".*INCOMPLETE.*");
-    File[] files = backupBaseDir.listFiles((dir1, name) -> 
pattern.matcher(name).matches());
-    assertNotNull(files);
-    assertTrue(files.length == 0);
+    File[] files = backupBaseDir.listFiles((dir, name) -> 
pattern.matcher(name).matches());
+
+    assertThat(files).isNotNull().hasSize(0);
   }
 
-  private void deleteOldUserUserFile(final VM vm) {
-    vm.invoke(() -> {
-      File userDir = new File(workingDirByVm.get(vm), "userbackup-");
-      FileUtils.deleteDirectory(userDir);
-    });
+  private void deleteOldUserFile(final File dir) throws IOException {
+    File userDir = new File(dir, "userbackup-");
+    FileUtils.deleteDirectory(userDir);
   }
 
-  private long setBackupFiles(final VM vm) {
-    return vm.invoke(() -> {
-      File workingDir = workingDirByVm.get(vm);
-      File test1 = new File(workingDir, "test1");
-      File test2 = new File(test1, "test2");
-      File mytext = new File(test2, "my.txt");
-      final ArrayList<File> backuplist = new ArrayList<>();
-      test2.mkdirs();
-      Files.createFile(mytext.toPath());
-      long lastModified = mytext.lastModified();
-      backuplist.add(test2);
+  private long setBackupFiles(final File dir) throws IOException {
+    File dir1 = new File(dir, "test1");
+    File dir2 = new File(dir1, "test2");
+    dir2.mkdirs();
 
-      GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
-      cache.setBackupFiles(backuplist);
+    File textFile = new File(dir2, "my.txt");
+    Files.createFile(textFile.toPath());
 
-      return lastModified;
-    });
-  }
+    List<File> backupList = new ArrayList<>();
+    backupList.add(dir2);
+    getCache().setBackupFiles(backupList);
 
-  private void verifyUserFileRestored(VM vm, final long lm) {
-    vm.invoke(() -> {
-      File workingDir = workingDirByVm.get(vm);
-      File test1 = new File(workingDir, "test1");
-      File test2 = new File(test1, "test2");
-      File mytext = new File(test2, "my.txt");
-      assertTrue(mytext.exists());
-      assertEquals(lm, mytext.lastModified());
-    });
+    return textFile.lastModified();
   }
 
-  private AsyncInvocation createPersistentRegion(final VM vm) {
-    return vm.invokeAsync(() -> {
-      Cache cache = getCache();
-      DiskStore diskStore1 = cache.createDiskStoreFactory()
-          .setDiskDirs(getDiskDirs(vm, "vm" + vm.getId() + 
"diskstores_1")).setMaxOplogSize(1)
-          .create(getUniqueName());
+  private void verifyUserFileRestored(final File dir, final long 
expectedLastModified) {
+    File dir1 = new File(dir, "test1");
+    File dir2 = new File(dir1, "test2");
+    File textFile = new File(dir2, "my.txt");
 
-      DiskStore diskStore2 = cache.createDiskStoreFactory()
-          .setDiskDirs(getDiskDirs(vm, "vm" + vm.getId() + 
"diskstores_2")).setMaxOplogSize(1)
-          .create(getUniqueName() + 2);
-
-      RegionFactory regionFactory = 
cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
-          .setPartitionAttributes(new 
PartitionAttributesFactory().setRedundantCopies(0).create());
+    assertThat(textFile).exists();
+    assertThat(textFile.lastModified()).isEqualTo(expectedLastModified);
+  }
 
-      
regionFactory.setDiskStoreName(diskStore1.getName()).setDiskSynchronous(true)
-          .create("region1");
-      
regionFactory.setDiskStoreName(diskStore2.getName()).setDiskSynchronous(true)
-          .create("region2");
-    });
+  private void createRegions(final VM vm) {
+    createRegion(regionName1, getUniqueName() + "-1", getDiskStoreFor(vm, 1));
+    createRegion(regionName2, getUniqueName() + "-2", getDiskStoreFor(vm, 2));
   }
 
-  private AsyncInvocation createColatedPersistentRegions(final VM vm) {
-    return vm.invokeAsync(() -> {
-      Cache cache = getCache();
-      DiskStore diskStore1 = cache.createDiskStoreFactory()
-          .setDiskDirs(getDiskDirs(vm, "vm" + vm.getId() + 
"diskstores_1")).setMaxOplogSize(1)
-          .create(getUniqueName());
+  private void createRegion(final String regionName, final String 
diskStoreName,
+      final File diskDir) {
+    DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
+    diskStoreFactory.setDiskDirs(toArray(diskDir));
+    diskStoreFactory.setMaxOplogSize(1);
 
-      DiskStore diskStore2 = cache.createDiskStoreFactory()
-          .setDiskDirs(getDiskDirs(vm, "vm" + vm.getId() + 
"diskstores_2")).setMaxOplogSize(1)
-          .create(getUniqueName() + 2);
+    PartitionAttributesFactory partitionAttributesFactory = new 
PartitionAttributesFactory();
+    partitionAttributesFactory.setRedundantCopies(0);
 
-      RegionFactory regionFactory = 
cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
-          .setPartitionAttributes(new 
PartitionAttributesFactory().setRedundantCopies(0).create());
+    RegionFactory regionFactory = 
getCache().createRegionFactory(PARTITION_PERSISTENT);
+    
regionFactory.setDiskStoreName(diskStoreFactory.create(diskStoreName).getName());
+    regionFactory.setDiskSynchronous(true);
+    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
+    regionFactory.create(regionName);
+  }
 
-      
regionFactory.setDiskStoreName(diskStore1.getName()).setDiskSynchronous(true)
-          .create("region1");
-      
regionFactory.setDiskStoreName(diskStore2.getName()).setDiskSynchronous(true)
-          .setPartitionAttributes(new 
PartitionAttributesFactory().setRedundantCopies(0)
-              .setColocatedWith("region1").create())
-          .create("region2");
-    });
+  private File[] toArray(final File file) {
+    return new File[] {file};
   }
 
-  private void createOverflowRegion(final VM vm) {
-    vm.invoke(() -> {
-      Cache cache = getCache();
-      DiskStore diskStore = cache.createDiskStoreFactory()
-          .setDiskDirs(getDiskDirs(vm, 
getUniqueName())).create(getUniqueName());
+  private void createColocatedRegions(final VM vm) {
+    DiskStoreFactory diskStoreFactory1 = getCache().createDiskStoreFactory();
+    diskStoreFactory1.setDiskDirs(toArray(getDiskStoreFor(vm, 1)));
+    diskStoreFactory1.setMaxOplogSize(1);
 
-      
cache.createRegionFactory(RegionShortcut.REPLICATE).setDiskStoreName(diskStore.getName())
-          .setDiskSynchronous(true)
-          .setEvictionAttributes(
-              EvictionAttributes.createLIFOEntryAttributes(1, 
EvictionAction.OVERFLOW_TO_DISK))
-          .create("region3");
-    });
-  }
+    DiskStoreFactory diskStoreFactory2 = getCache().createDiskStoreFactory();
+    diskStoreFactory2.setDiskDirs(toArray(getDiskStoreFor(vm, 2)));
+    diskStoreFactory2.setMaxOplogSize(1);
 
-  @Override
-  protected void createData(VM vm, final int startKey, final int endKey, final 
String value) {
-    createData(vm, startKey, endKey, value, getPartitionedRegionName());
-  }
+    PartitionAttributesFactory partitionAttributesFactory1 = new 
PartitionAttributesFactory();
+    partitionAttributesFactory1.setRedundantCopies(0);
 
-  @Override
-  protected void createData(VM vm, final int startKey, final int endKey, final 
String value,
-      final String regionName) {
-    vm.invoke(() -> {
-      Cache cache = getCache();
-      Region region = cache.getRegion(regionName);
+    RegionFactory regionFactory1 = 
getCache().createRegionFactory(PARTITION_PERSISTENT);
+    
regionFactory1.setPartitionAttributes(partitionAttributesFactory1.create());
+    regionFactory1.setDiskStoreName(diskStoreFactory1.create(getUniqueName() + 
"-1").getName());
+    regionFactory1.setDiskSynchronous(true);
+    regionFactory1.create(regionName1);
 
-      for (int i = startKey; i < endKey; i++) {
-        region.put(i, value);
-      }
-    });
-  }
+    PartitionAttributesFactory partitionAttributesFactory2 = new 
PartitionAttributesFactory();
+    partitionAttributesFactory2.setColocatedWith(regionName1);
+    partitionAttributesFactory2.setRedundantCopies(0);
 
-  @Override
-  protected void checkData(VM vm, final int startKey, final int endKey, final 
String value) {
-    checkData(vm, startKey, endKey, value, getPartitionedRegionName());
+    RegionFactory regionFactory2 = 
getCache().createRegionFactory(PARTITION_PERSISTENT);
+    regionFactory2.setDiskStoreName(diskStoreFactory2.create(getUniqueName() + 
"-2").getName());
+    regionFactory2.setDiskSynchronous(true);
+    
regionFactory2.setPartitionAttributes(partitionAttributesFactory2.create());
+    regionFactory2.create(regionName2);
   }
 
-  @Override
-  protected void checkData(VM vm, final int startKey, final int endKey, final 
String value,
-      final String regionName) {
-    vm.invoke(() -> {
-      Region region = getCache().getRegion(regionName);
+  private void createRegionWithOverflow(final File diskDir) {
+    DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
+    diskStoreFactory.setDiskDirs(toArray(diskDir));
 
-      for (int i = startKey; i < endKey; i++) {
-        assertEquals(value, region.get(i));
-      }
-    });
-  }
+    EvictionAttributes evictionAttributes = createLIFOEntryAttributes(1, 
OVERFLOW_TO_DISK);
 
-  @Override
-  protected void closeCache(final VM vm) {
-    vm.invoke(() -> getCache().close());
+    RegionFactory regionFactory = getCache().createRegionFactory(REPLICATE);
+    
regionFactory.setDiskStoreName(diskStoreFactory.create(getUniqueName()).getName());
+    regionFactory.setDiskSynchronous(true);
+    regionFactory.setEvictionAttributes(evictionAttributes);
+    regionFactory.create(regionName3);
   }
 
-  @Override
-  protected Set<Integer> getBucketList(VM vm) {
-    return getBucketList(vm, getPartitionedRegionName());
-  }
+  private void createData(final int startKey, final int endKey, final String 
value,
+      final String regionName) {
+    Region<Integer, String> region = getCache().getRegion(regionName);
 
-  @Override
-  protected Set<Integer> getBucketList(VM vm, final String regionName) {
-    return vm.invoke(() -> {
-      Cache cache = getCache();
-      PartitionedRegion region = (PartitionedRegion) 
cache.getRegion(regionName);
-      return new TreeSet<>(region.getDataStore().getAllLocalBucketIds());
-    });
+    for (int i = startKey; i < endKey; i++) {
+      region.put(i, value);
+    }
   }
 
-  private File[] getDiskDirs(VM vm, String dsName) {
-    File[] diskStoreDirs = new File[1];
-    diskStoreDirs[0] = new File(workingDirByVm.get(vm), dsName);
-    diskStoreDirs[0].mkdirs();
-    return diskStoreDirs;
+  private void validateData(final int startKey, final int endKey, final String 
value,
+      final String regionName) {
+    Region region = getCache().getRegion(regionName);
+
+    for (int i = startKey; i < endKey; i++) {
+      assertThat(region.get(i)).isEqualTo(value);
+    }
   }
 
-  private BackupStatus backupMember(final VM vm) {
-    return vm.invoke("backup", () -> {
-      try {
-        return new BackupOperation(getCache().getDistributionManager(), 
getCache())
-            .backupAllMembers(
-                backupBaseDir.toString(), null);
-      } catch (ManagementException e) {
-        throw new RuntimeException(e);
-      }
-    });
+  private BackupStatus backupMember() {
+    return new BackupOperation(getCache().getDistributionManager(), getCache())
+        .backupAllMembers(backupBaseDir.toString(), null);
   }
 
-  protected void restoreBackup(final int expectedNumScripts)
+  private void restoreBackup(final int expectedScriptCount)
       throws IOException, InterruptedException {
     Collection<File> restoreScripts =
         listFiles(backupBaseDir, new RegexFileFilter(".*restore.*"), 
DIRECTORY);
-    assertThat(restoreScripts).hasSize(expectedNumScripts);
+
+    assertThat(restoreScripts).hasSize(expectedScriptCount);
+
     for (File script : restoreScripts) {
-      execute(script);
+      executeScript(script);
     }
   }
 
-  private void execute(final File script) throws IOException, 
InterruptedException {
-    ProcessBuilder processBuilder = new 
ProcessBuilder(script.getAbsolutePath());
-    processBuilder.redirectErrorStream(true);
-    Process process = processBuilder.start();
+  private void executeScript(final File script) throws IOException, 
InterruptedException {
+    process = new 
ProcessBuilder(script.getAbsolutePath()).redirectErrorStream(true).start();
 
-    try (BufferedReader reader =
-        new BufferedReader(new InputStreamReader(process.getInputStream()))) {
-      String line;
-      while ((line = reader.readLine()) != null) {
-        logger.info("OUTPUT:" + line);
-      }
-    }
+    processReader = new 
ProcessStreamReader.Builder(process).inputStream(process.getInputStream())
+        .inputListener(line -> logger.info("OUTPUT: {}", line))
+        .readingMode(getReadingMode()).continueReadingMillis(2 * 
1000).build().start();
 
-    assertThat(process.waitFor()).isEqualTo(0);
+    assertThat(process.waitFor(5, MINUTES)).isTrue();
+    assertThat(process.exitValue()).isEqualTo(0);
   }
 
-  private void createAccessor(VM vm) {
-    vm.invoke(() -> {
-      Cache cache = getCache();
+  private void createAccessor() {
+    PartitionAttributesFactory partitionAttributesFactory1 = new 
PartitionAttributesFactory();
+    partitionAttributesFactory1.setLocalMaxMemory(0);
+    partitionAttributesFactory1.setRedundantCopies(0);
 
-      cache.createRegionFactory(RegionShortcut.PARTITION)
-          .setPartitionAttributes(
-              new 
PartitionAttributesFactory().setRedundantCopies(0).setLocalMaxMemory(0).create())
-          .create("region1");
-      cache.createRegionFactory(RegionShortcut.PARTITION)
-          .setPartitionAttributes(new 
PartitionAttributesFactory().setColocatedWith("region1")
-              .setRedundantCopies(0).setLocalMaxMemory(0).create())
-          .create("region2");
-    });
+    RegionFactory regionFactory1 = getCache().createRegionFactory(PARTITION);
+    
regionFactory1.setPartitionAttributes(partitionAttributesFactory1.create());
+    regionFactory1.create(regionName1);
+
+    PartitionAttributesFactory partitionAttributesFactory2 = new 
PartitionAttributesFactory();
+    partitionAttributesFactory2.setLocalMaxMemory(0);
+    partitionAttributesFactory2.setRedundantCopies(0);
+    partitionAttributesFactory2.setColocatedWith(regionName1);
+
+    RegionFactory regionFactory2 = getCache().createRegionFactory(PARTITION);
+    
regionFactory2.setPartitionAttributes(partitionAttributesFactory2.create());
+    regionFactory2.create(regionName2);
+  }
+
+  private InternalCache getCache() {
+    return cacheRule.getOrCreateCache();
+  }
+
+  private String getUniqueName() {
+    return uniqueName;
+  }
+
+  private File getDiskStoreFor(final VM vm) {
+    return new File(getDiskDirFor(vm), getUniqueName());
+  }
+
+  private File getDiskStoreFor(final VM vm, final int which) {
+    return new File(getDiskDirFor(vm), "vm-" + vm.getId() + "-diskstores-" + 
which);
+  }
+
+  private File getDiskDirFor(final VM vm) {
+    return diskDirRule.getDiskDirFor(vm);
+  }
+
+  private ReadingMode getReadingMode() {
+    return SystemUtils.isWindows() ? ReadingMode.NON_BLOCKING : 
ReadingMode.BLOCKING;
   }
 
   enum WhenToInvokeBackup {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
index d1316f7..7374a81 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
@@ -14,24 +14,23 @@
  */
 package org.apache.geode.internal.cache.backup;
 
-import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.commons.io.FileUtils.listFiles;
+import static org.apache.commons.io.filefilter.DirectoryFileFilter.DIRECTORY;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.apache.geode.test.dunit.VM.getVM;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.awaitility.Awaitility.await;
 
-import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.Serializable;
 import java.nio.file.Files;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -40,1026 +39,441 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.filefilter.DirectoryFileFilter;
 import org.apache.commons.io.filefilter.RegexFileFilter;
 import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.admin.AdminDistributedSystem;
 import org.apache.geode.admin.internal.AdminDistributedSystemImpl;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DiskStore;
-import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.DiskStoreFactory;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.persistence.PersistentID;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.ClassPathLoader;
 import org.apache.geode.internal.DeployedJar;
 import org.apache.geode.internal.cache.DiskStoreImpl;
-import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.lang.SystemUtils;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.process.ProcessStreamReader;
+import org.apache.geode.internal.process.ProcessStreamReader.ReadingMode;
 import org.apache.geode.internal.util.TransformUtils;
 import org.apache.geode.management.BackupStatus;
-import org.apache.geode.management.ManagementException;
 import org.apache.geode.test.compiler.ClassBuilder;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 
 /**
- * Tests for the incremental backup feature.
+ * Distributed tests for incremental backup.
  */
 @Category(DistributedTest.class)
 @SuppressWarnings("serial")
-public class IncrementalBackupDistributedTest extends JUnit4CacheTestCase {
-
+public class IncrementalBackupDistributedTest implements Serializable {
   private static final Logger logger = LogService.getLogger();
 
-  /**
-   * Data load increment.
-   */
-  private static final int DATA_INCREMENT = 10000;
-
-  /**
-   * Start value for data load.
-   */
-  private int dataStart = 0;
-
-  /**
-   * End value for data load.
-   */
-  private int dataEnd = this.dataStart + DATA_INCREMENT;
-
-  /**
-   * Regular expression used to search for member operation log files.
-   */
-  private static final String OPLOG_REGEX = ".*\\.[kdc]rf$";
-
-  @Rule
-  public SerializableTemporaryFolder tempDir = new 
SerializableTemporaryFolder();
-
-  private final Map<Integer, File> baseDirectoryByVm = new HashMap<>();
-
-  /**
-   * Creates test regions for a member.
-   */
-  private void createRegions(File baseDirectory, int vmNumber) throws 
IOException {
-    Cache cache = getCache(new CacheFactory().set(LOG_LEVEL, 
LogWriterUtils.getDUnitLogLevel()));
-    cache.createDiskStoreFactory().setDiskDirs(getDiskDirectory(baseDirectory, 
vmNumber))
-        .create("fooStore");
-    cache.createDiskStoreFactory().setDiskDirs(getDiskDirectory(baseDirectory, 
vmNumber))
-        .create("barStore");
-    getRegionFactory(cache).setDiskStoreName("fooStore").create("fooRegion");
-    getRegionFactory(cache).setDiskStoreName("barStore").create("barRegion");
-  }
-
-  private File[] getDiskDirectory(File parent, int vmNumber) throws 
IOException {
-    File dir = new File(parent, "disk" + 
String.valueOf(vmNumber)).getAbsoluteFile();
-    dir.mkdirs();
-    return new File[] {dir};
-  }
-
-  private RegionFactory<Integer, String> getRegionFactory(Cache cache) {
-    PartitionAttributes<Integer, String> attributes =
-        new PartitionAttributesFactory<Integer, 
String>().setTotalNumBuckets(5).create();
-    RegionFactory<Integer, String> factory =
-        cache.<Integer, 
String>createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
-            .setPartitionAttributes(attributes);
-    return factory;
-  }
-
-  /**
-   * A FileFilter that looks for a timestamped gemfire backup directory.
-   */
-  private static final FileFilter backupDirFilter = file -> {
-    // This will break in about 90 years...
-    return file.isDirectory() && file.getName().startsWith("20");
-  };
-
-  /**
-   * @return the baseline backup directory.
-   */
-  private static File getBaselineDir() {
-    File tmpDir = new File(System.getProperty("java.io.tmpdir"));
-    File dir = new File(tmpDir, "baseline");
-    if (!dir.exists()) {
-      dir.mkdirs();
-    }
-
-    return dir;
-  }
-
-  /**
-   * @return the second incremental backup directory.
-   */
-  private static File getIncremental2Dir() {
-    File tmpDir = new File(System.getProperty("java.io.tmpdir"));
-    File dir = new File(tmpDir, "incremental2");
-    if (!dir.exists()) {
-      dir.mkdirs();
-    }
-
-    return dir;
-  }
-
-  /**
-   * @return the incremental backup directory.
-   */
-  private static File getIncrementalDir() {
-    File tmpDir = new File(System.getProperty("java.io.tmpdir"));
-    File dir = new File(tmpDir, "incremental");
-    if (!dir.exists()) {
-      dir.mkdirs();
-    }
-
-    return dir;
-  }
-
-  /**
-   * Invokes {@link AdminDistributedSystem#getMissingPersistentMembers()} on a 
member.
-   *
-   * @param vm a member of the distributed system.
-   * @return a set of missing members for the distributed system.
-   */
-  @SuppressWarnings("unchecked")
-  private Set<PersistentID> getMissingMembers(VM vm) {
-    return (Set<PersistentID>) vm.invoke(new 
SerializableCallable("getMissingMembers") {
-      @Override
-      public Object call() {
-        return AdminDistributedSystemImpl
-            .getMissingPersistentMembers(getSystem().getDistributionManager());
-      }
-    });
-  }
+  private static final int DATA_INCREMENT = 10_000;
+  private static final RegexFileFilter OPLOG_FILTER = new 
RegexFileFilter(".*\\.[kdc]rf$");
 
-  private BackupStatus baseline(VM vm) {
-    return vm.invoke(() -> {
-      try {
-        return new BackupOperation(getSystem().getDistributionManager(), 
getCache())
-            .backupAllMembers(
-                getBaselineDir().toString(), null);
-      } catch (ManagementException e) {
-        throw new RuntimeException(e);
-      }
-    });
-  }
+  private int dataStart;
+  private int dataEnd = dataStart + DATA_INCREMENT;
 
-  private BackupStatus incremental(VM vm) {
-    return vm.invoke(() -> {
-      try {
-        return new BackupOperation(getSystem().getDistributionManager(), 
getCache())
-            .backupAllMembers(
-                getIncrementalDir().toString(), 
getBaselineBackupDir().toString());
-      } catch (ManagementException e) {
-        throw new RuntimeException(e);
-      }
-    });
-  }
+  private String uniqueName;
+  private String diskStoreName1;
+  private String diskStoreName2;
+  private String regionName1;
+  private String regionName2;
 
-  private BackupStatus incremental2(VM vm) {
-    return vm.invoke(() -> {
-      try {
-        return new BackupOperation(getSystem().getDistributionManager(), 
getCache())
-            .backupAllMembers(
-                getIncremental2Dir().toString(), 
getIncrementalBackupDir().toString());
-      } catch (ManagementException e) {
-        throw new RuntimeException(e);
-      }
-    });
-  }
+  private VM vm0;
+  private VM vm1;
 
-  /**
-   * Invokes {@link DistributedSystem#getDistributedMember()} on a member.
-   *
-   * @param vm a distributed system member.
-   * @return the member's id.
-   */
-  private String getMemberId(VM vm) {
-    return vm.invoke(() -> 
getCache().getDistributedSystem().getDistributedMember().toString()
-        .replaceAll("[^\\w]+", "_"));
-  }
+  private transient Process process;
+  private transient ProcessStreamReader processReader;
 
-  /**
-   * Invokes {@link Cache#close()} on a member.
-   */
-  private void closeCache(final VM closeVM) {
-    closeVM.invoke(new SerializableRunnable() {
-      @Override
-      public void run() {
-        getCache().close();
-      }
-    });
-  }
+  @Rule
+  public DistributedTestRule distributedTestRule = new DistributedTestRule();
 
-  /**
-   * Locates the PersistentID for the testStore disk store for a distributed 
member.
-   *
-   * @param vm a distributed member.
-   * @return a PersistentID for a member's disk store.
-   */
-  private PersistentID getPersistentID(final VM vm, final String 
diskStoreName) {
-    return vm.invoke(() -> {
-      PersistentID id = null;
-      Collection<DiskStore> diskStores = ((InternalCache) 
getCache()).listDiskStores();
-      for (DiskStore diskStore : diskStores) {
-        if (diskStore.getName().equals(diskStoreName)) {
-          id = ((DiskStoreImpl) diskStore).getPersistentID();
-          break;
-        }
-      }
-      return id;
-    });
-  }
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
 
-  /**
-   * Locates the PersistentID for the testStore disk store for a distributed 
member.
-   *
-   * @param vm a distributed member.
-   * @return a PersistentID for a member's disk store.
-   */
-  private PersistentID getPersistentID(final VM vm) {
-    return getPersistentID(vm, "fooStore");
-  }
+  @Rule
+  public DistributedDiskDirRule diskDirRule = new DistributedDiskDirRule();
 
-  /**
-   * Invokes {@link DistributedSystem#disconnect()} on a member.
-   *
-   * @param disconnectVM a member of the distributed system to disconnect.
-   * @param testVM a member of the distributed system to test for the missing 
member (just
-   *        disconnected).
-   */
-  private PersistentID disconnect(final VM disconnectVM, final VM testVM) {
-    final PersistentID id = disconnectVM.invoke(() -> {
-      PersistentID persistentID = null;
-      Collection<DiskStore> diskStores = getCache().listDiskStores();
-      for (DiskStore diskStore : diskStores) {
-        if (diskStore.getName().equals("fooStore")) {
-          persistentID = ((DiskStoreImpl) diskStore).getPersistentID();
-          break;
-        }
-      }
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new 
SerializableTemporaryFolder();
 
-      getSystem().disconnect();
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
 
-      return persistentID;
-    });
+  @Before
+  public void setUp() throws Exception {
+    vm0 = getVM(0);
+    vm1 = getVM(1);
 
-    final Set<PersistentID> missingMembers = new HashSet<>();
-    Wait.waitForCriterion(new WaitCriterion() {
-      @Override
-      public boolean done() {
-        missingMembers.clear();
-        missingMembers.addAll(getMissingMembers(testVM));
+    uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
 
-        return missingMembers.contains(id);
-      }
+    diskStoreName1 = uniqueName + "_diskStore-1";
+    diskStoreName2 = uniqueName + "_diskStore-2";
+    regionName1 = uniqueName + "_region-1";
+    regionName2 = uniqueName + "_region-2";
 
-      @Override
-      public String description() {
-        return "[IncrementalBackupDistributedTest] Waiting for missing member 
" + id;
-      }
-    }, 10000, 500, false);
+    vm0.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm0)));
+    vm1.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm1)));
 
-    return id;
-  }
+    createCache(diskDirRule.getDiskDirFor(getController()));
 
-  /**
-   * Invokes {@link CacheFactory#create()} on a member.
-   *
-   * @param vm a member of the distributed system.
-   */
-  private void openCache(VM vm) throws IOException {
-    int vmNumber = vm.getId();
-    File vmDir = getBaseDir(vmNumber);
-    vm.invoke(() -> createRegions(vmDir, vmNumber));
+    performPuts();
   }
 
-  private File getBaseDir(int vmNumber) throws IOException {
-    File baseDir = baseDirectoryByVm.get(vmNumber);
-    if (baseDir == null) {
-      baseDir = tempDir.newFolder("vm" + vmNumber);
-      baseDirectoryByVm.put(vmNumber, baseDir);
+  @After
+  public void tearDown() throws Exception {
+    if (process != null && process.isAlive()) {
+      process.destroyForcibly();
+      process.waitFor(2, MINUTES);
     }
-    return baseDir;
-  }
-
-  /**
-   * Blocks and waits for a backup operation to finish on a distributed member.
-   *
-   * @param vm a member of the distributed system.
-   */
-  private void waitForBackup(VM vm) {
-    vm.invoke(new SerializableRunnable() {
-      @Override
-      public void run() {
-        Collection<DiskStore> backupInProgress = ((InternalCache) 
getCache()).listDiskStores();
-        List<DiskStoreImpl> backupCompleteList = new LinkedList<>();
-
-        while (backupCompleteList.size() < backupInProgress.size()) {
-          for (DiskStore diskStore : backupInProgress) {
-            if (((DiskStoreImpl) diskStore).getInProgressBackup() == null
-                && !backupCompleteList.contains(diskStore)) {
-              backupCompleteList.add((DiskStoreImpl) diskStore);
-            }
-          }
-        }
-      }
-    });
-  }
-
-  /**
-   * Performs a full backup.
-   *
-   * @return the backup status.
-   */
-  private BackupStatus performBaseline() {
-    return baseline(Host.getHost(0).getVM(1));
-  }
-
-  /**
-   * Performs an incremental backup.
-   *
-   * @return the backup status.
-   */
-  private BackupStatus performIncremental() {
-    return incremental(Host.getHost(0).getVM(1));
-  }
-
-  /**
-   * Performs a second incremental backup.
-   */
-  private BackupStatus performIncremental2() {
-    return incremental2(Host.getHost(0).getVM(1));
-  }
-
-  /**
-   * @return the directory for the completed baseline backup.
-   */
-  private static File getBaselineBackupDir() {
-    File[] dirs = getBaselineDir().listFiles(backupDirFilter);
-    assertEquals(1, dirs.length);
-    return dirs[0];
-  }
-
-  /**
-   * @return the directory for the completed baseline backup.
-   */
-  private static File getIncrementalBackupDir() {
-    File[] dirs = getIncrementalDir().listFiles(backupDirFilter);
-    assertEquals(1, dirs.length);
-    return dirs[0];
-  }
-
-  /**
-   * Returns an individual member's backup directory.
-   *
-   * @param rootDir the directory to begin searching for the member's backup 
dir from.
-   * @param memberId the member's identifier.
-   * @return the member's backup directory.
-   */
-  private File getBackupDirForMember(final File rootDir, final String 
memberId) {
-    File[] dateDirs = rootDir.listFiles(backupDirFilter);
-    assertEquals(1, dateDirs.length);
-
-    File[] memberDirs = dateDirs[0].listFiles(new FileFilter() {
-      @Override
-      public boolean accept(File file) {
-        return file.isDirectory() && file.getName().contains(memberId);
-      }
-    });
-
-    assertEquals(1, memberDirs.length);
-
-    return memberDirs[0];
-  }
-
-  /**
-   * Adds the data region to every participating VM.
-   */
-  @SuppressWarnings("serial")
-  private void createDataRegions() throws IOException {
-    Host host = Host.getHost(0);
-    int numberOfVms = host.getVMCount();
-
-    for (int i = 0; i < numberOfVms; ++i) {
-      openCache(host.getVM(i));
+    if (processReader != null && processReader.isRunning()) {
+      processReader.stop();
     }
   }
 
   /**
-   * Executes a shell command in an external process.
-   *
-   * @param command a shell command.
-   * @return the exit value of processing the shell command.
-   */
-  private int execute(String command) throws IOException, InterruptedException 
{
-    final ProcessBuilder builder = new ProcessBuilder(command);
-    builder.redirectErrorStream(true);
-    final Process process = builder.start();
-
-    /*
-     * Consume standard out.
-     */
-    new Thread(new Runnable() {
-      @Override
-      public void run() {
-
-        try {
-          BufferedReader reader =
-              new BufferedReader(new 
InputStreamReader(process.getInputStream()));
-          String line;
-
-          do {
-            line = reader.readLine();
-          } while (null != line);
-
-          reader.close();
-        } catch (IOException e) {
-          logger.info(e);
-        }
-      }
-    }).start();
-
-    /*
-     * Consume standard error.
-     */
-    new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          BufferedReader reader =
-              new BufferedReader(new 
InputStreamReader(process.getErrorStream()));
-          String line;
-
-          do {
-            line = reader.readLine();
-          } while (null != line);
-
-          reader.close();
-        } catch (IOException e) {
-          logger.info(e);
-        }
-      }
-    }).start();
-
-    return process.waitFor();
-  }
-
-  /**
-   * Peforms an operation log restore for a member.
-   *
-   * @param backupDir the member's backup directory containing the restore 
script.
-   */
-  private void performRestore(File memberDir, File backupDir)
-      throws IOException, InterruptedException {
-    /*
-     * The restore script will not restore if there is an if file in the copy 
to directory. Remove
-     * these files first.
-     */
-    Collection<File> ifFiles = FileUtils.listFiles(memberDir, new 
RegexFileFilter(".*\\.if$"),
-        DirectoryFileFilter.DIRECTORY);
-    for (File file : ifFiles) {
-      file.delete();
-    }
-
-    /*
-     * Remove all operation logs.
-     */
-    Collection<File> oplogs = FileUtils.listFiles(memberDir, new 
RegexFileFilter(OPLOG_REGEX),
-        DirectoryFileFilter.DIRECTORY);
-    for (File file : oplogs) {
-      file.delete();
-    }
-
-    /*
-     * Get a hold of the restore script and make sure it is there.
-     */
-    File restoreScript = new File(backupDir, "restore.sh");
-    if (!restoreScript.exists()) {
-      restoreScript = new File(backupDir, "restore.bat");
-    }
-    assertTrue(restoreScript.exists());
-
-    assertEquals(0, execute(restoreScript.getAbsolutePath()));
-  }
-
-  /**
-   * Adds an incomplete marker to the baseline backup for a member.
-   *
-   * @param vm a distributed system member.
-   */
-  private void markAsIncomplete(VM vm) throws IOException {
-    File backupDir = getBackupDirForMember(getBaselineDir(), getMemberId(vm));
-    assertTrue(backupDir.exists());
-
-    File incomplete = new File(backupDir, BackupWriter.INCOMPLETE_BACKUP_FILE);
-    incomplete.createNewFile();
-  }
-
-  /**
-   * Loads additional data into the test regions.
-   */
-  private void loadMoreData() {
-    Region<Integer, String> fooRegion = getCache().getRegion("fooRegion");
-
-    // Fill our region data
-    for (int i = this.dataStart; i < this.dataEnd; ++i) {
-      fooRegion.put(i, Integer.toString(i));
-    }
-
-    Region<Integer, String> barRegion = getCache().getRegion("barRegion");
-
-    // Fill our region data
-    for (int i = this.dataStart; i < this.dataEnd; ++i) {
-      barRegion.put(i, Integer.toString(i));
-    }
-
-    this.dataStart += DATA_INCREMENT;
-    this.dataEnd += DATA_INCREMENT;
-  }
-
-  /**
-   * Used to confirm valid BackupStatus data. Confirms fix for defect #45657
-   *
-   * @param backupStatus contains a list of members that were backed up.
-   */
-  private void assertBackupStatus(final BackupStatus backupStatus) {
-    Map<DistributedMember, Set<PersistentID>> backupMap = 
backupStatus.getBackedUpDiskStores();
-    assertFalse(backupMap.isEmpty());
-
-    for (DistributedMember member : backupMap.keySet()) {
-      for (PersistentID id : backupMap.get(member)) {
-        assertNotNull(id.getHost());
-        assertNotNull(id.getUUID());
-        assertNotNull(id.getDirectory());
-      }
-    }
-  }
-
-  /**
-   * 1. Add partitioned persistent region to all members. 2. Fills region with 
data.
-   */
-  @Override
-  public final void postSetUp() throws Exception {
-    createDataRegions();
-    File dir = getBaseDir(-1);
-    createRegions(dir, -1);
-    loadMoreData();
-  }
-
-  /**
-   * Removes backup directories (and all backup data).
-   */
-  @Override
-  public final void preTearDownCacheTestCase() throws Exception {
-    FileUtils.deleteDirectory(getIncremental2Dir());
-    FileUtils.deleteDirectory(getIncrementalDir());
-    FileUtils.deleteDirectory(getBaselineDir());
-  }
-
-  /**
-   * This tests the basic features of incremental backup. This means that 
operation logs that are
-   * present in both the baseline and member's disk store should not be copied 
during the
-   * incremental backup. Additionally, the restore script should reference and 
copy operation logs
-   * from the baseline backup.
+   * This tests the basic features of performBackupIncremental backup. This 
means that operation
+   * logs that are present in both the performBackupBaseline and member's disk 
store should not be
+   * copied during the performBackupIncremental backup. Additionally, the 
restore script should
+   * reference and copy operation logs from the performBackupBaseline backup.
    */
   @Test
   public void testIncrementalBackup() throws Exception {
-    String memberId = getMemberId(Host.getHost(0).getVM(1));
+    String memberId = vm1.invoke(() -> getModifiedMemberId());
 
-    File memberDir = baseDirectoryByVm.get(1);
-    // getVMDir(Host.getHost(0).getVM(1));
-    assertNotNull(memberDir);
+    File memberDir = diskDirRule.getDiskDirFor(vm1);
 
     // Find all of the member's oplogs in the disk directory 
(*.crf,*.krf,*.drf)
-    Collection<File> memberOplogFiles = FileUtils.listFiles(memberDir,
-        new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(memberOplogFiles.isEmpty());
+    Collection<File> memberOplogFiles = listFiles(memberDir, OPLOG_FILTER, 
DIRECTORY);
+    assertThat(memberOplogFiles).isNotEmpty();
 
     // Perform a full backup and wait for it to finish
-    assertBackupStatus(performBaseline());
-    waitForBackup(Host.getHost(0).getVM(1));
+    validateBackupStatus(vm1.invoke(() -> performBackup(getBaselinePath())));
+    vm1.invoke(() -> waitForBackup());
 
-    // Find all of the member's oplogs in the baseline (*.crf,*.krf,*.drf)
+    // Find all of the member's oplogs in the performBackupBaseline 
(*.crf,*.krf,*.drf)
     Collection<File> memberBaselineOplogs =
-        FileUtils.listFiles(getBackupDirForMember(getBaselineDir(), memberId),
-            new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(memberBaselineOplogs.isEmpty());
+        listFiles(getBackupDirForMember(getBaselineDir(), memberId), 
OPLOG_FILTER, DIRECTORY);
+    assertThat(memberBaselineOplogs).isNotEmpty();
 
     List<String> memberBaselineOplogNames = new LinkedList<>();
     TransformUtils.transform(memberBaselineOplogs, memberBaselineOplogNames,
         TransformUtils.fileNameTransformer);
 
-    // Peform and incremental backup and wait for it to finish
-    loadMoreData(); // Doing this preserves the new oplogs created by the 
baseline backup
-    assertBackupStatus(performIncremental());
-    waitForBackup(Host.getHost(0).getVM(1));
+    // Perform and performBackupIncremental backup and wait for it to finish
+    performPuts(); // This preserves the new oplogs created by the 
performBackupBaseline backup
+    validateBackupStatus(
+        vm1.invoke(() -> performBackup(getIncrementalPath(), 
getBaselineBackupPath())));
+    vm1.invoke(() -> waitForBackup());
 
-    // Find all of the member's oplogs in the incremental (*.crf,*.krf,*.drf)
+    // Find all of the member's oplogs in the performBackupIncremental 
(*.crf,*.krf,*.drf)
     Collection<File> memberIncrementalOplogs =
-        FileUtils.listFiles(getBackupDirForMember(getIncrementalDir(), 
memberId),
-            new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(memberIncrementalOplogs.isEmpty());
+        listFiles(getBackupDirForMember(getIncrementalDir(), memberId), 
OPLOG_FILTER, DIRECTORY);
+    assertThat(memberIncrementalOplogs).isNotEmpty();
 
     List<String> memberIncrementalOplogNames = new LinkedList<>();
     TransformUtils.transform(memberIncrementalOplogs, 
memberIncrementalOplogNames,
         TransformUtils.fileNameTransformer);
 
-    /*
-     * Assert that the incremental backup does not contain baseline operation 
logs that the member
-     * still has copies of.
-     */
-    for (String oplog : memberBaselineOplogNames) {
-      assertFalse(memberIncrementalOplogNames.contains(oplog));
-    }
+    // Assert that the performBackupIncremental backup does not contain 
performBackupBaseline
+    // operation logs that the member still has copies of.
+    
assertThat(memberIncrementalOplogNames).doesNotContainAnyElementsOf(memberBaselineOplogNames);
+
+    // Perform a second performBackupIncremental and wait for it to finish.
 
-    // Perform a second incremental and wait for it to finish.
-    loadMoreData(); // Doing this preserves the new oplogs created by the 
incremental backup
-    assertBackupStatus(performIncremental2());
-    waitForBackup(Host.getHost(0).getVM(1));
+    // Doing this preserves the new oplogs created by the 
performBackupIncremental backup
+    performPuts();
+    validateBackupStatus(
+        vm1.invoke(() -> performBackup(getIncremental2Path(), 
getIncrementalBackupPath())));
+    vm1.invoke(() -> waitForBackup());
 
     Collection<File> memberIncremental2Oplogs =
-        FileUtils.listFiles(getBackupDirForMember(getIncremental2Dir(), 
memberId),
-            new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(memberIncremental2Oplogs.isEmpty());
+        listFiles(getBackupDirForMember(getIncremental2Dir(), memberId), 
OPLOG_FILTER, DIRECTORY);
+    assertThat(memberIncremental2Oplogs).isNotEmpty();
 
     List<String> memberIncremental2OplogNames = new LinkedList<>();
     TransformUtils.transform(memberIncremental2Oplogs, 
memberIncremental2OplogNames,
         TransformUtils.fileNameTransformer);
 
-    /*
-     * Assert that the second incremental backup does not contain operation 
logs copied into the
-     * baseline.
-     */
-    for (String oplog : memberBaselineOplogNames) {
-      assertFalse(memberIncremental2OplogNames.contains(oplog));
-    }
+    // Assert that the second performBackupIncremental backup does not contain 
operation logs copied
+    // into the performBackupBaseline.
+    
assertThat(memberIncremental2OplogNames).doesNotContainAnyElementsOf(memberBaselineOplogNames);
 
-    /*
-     * Also assert that the second incremental backup does not contain 
operation logs copied into
-     * the member's first incremental backup.
-     */
-    for (String oplog : memberIncrementalOplogNames) {
-      assertFalse(memberIncremental2OplogNames.contains(oplog));
-    }
+    // Also assert that the second performBackupIncremental backup does not 
contain operation logs
+    // copied into the member's first performBackupIncremental backup.
+    assertThat(memberIncremental2OplogNames)
+        .doesNotContainAnyElementsOf(memberIncrementalOplogNames);
 
     // Shut down our member so we can perform a restore
-    PersistentID id = getPersistentID(Host.getHost(0).getVM(1));
-    closeCache(Host.getHost(0).getVM(1));
+    PersistentID id = vm1.invoke(() -> getPersistentID(diskStoreName1));
+    vm1.invoke(() -> cacheRule.getCache().close());
 
     // Execute the restore
     performRestore(new File(id.getDirectory()),
         getBackupDirForMember(getIncremental2Dir(), memberId));
 
-    /*
-     * Collect all of the restored operation logs.
-     */
-    Collection<File> restoredOplogs = FileUtils.listFiles(new 
File(id.getDirectory()),
-        new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(restoredOplogs.isEmpty());
+    // Collect all of the restored operation logs.
+    Collection<File> restoredOplogs =
+        listFiles(new File(id.getDirectory()), OPLOG_FILTER, DIRECTORY);
+    assertThat(restoredOplogs).isNotEmpty();
+
     List<String> restoredOplogNames = new LinkedList<>();
     TransformUtils.transform(restoredOplogs, restoredOplogNames,
         TransformUtils.fileNameTransformer);
 
-    /*
-     * Assert that baseline operation logs have been copied over to the 
member's disk directory.
-     */
-    for (String oplog : memberBaselineOplogNames) {
-      assertTrue(restoredOplogNames.contains(oplog));
-    }
+    // Assert that performBackupBaseline operation logs have been copied over 
to the member's disk
+    // directory.
+    assertThat(restoredOplogNames).containsAll(memberBaselineOplogNames);
 
-    /*
-     * Assert that the incremental operation logs have been copied over to the 
member's disk
-     * directory.
-     */
-    for (String oplog : memberIncrementalOplogNames) {
-      assertTrue(restoredOplogNames.contains(oplog));
-    }
+    // Assert that the performBackupIncremental operation logs have been 
copied over to the member's
+    // disk directory.
+    assertThat(restoredOplogNames).containsAll(memberIncrementalOplogNames);
 
-    /*
-     * Assert that the second incremental operation logs have been copied over 
to the member's disk
-     * directory.
-     */
-    for (String oplog : memberIncremental2OplogNames) {
-      assertTrue(restoredOplogNames.contains(oplog));
-    }
+    // Assert that the second performBackupIncremental operation logs have 
been copied over to the
+    // member's disk directory.
+    assertThat(restoredOplogNames).containsAll(memberIncremental2OplogNames);
 
-    /*
-     * Reconnect the member.
-     */
-    openCache(Host.getHost(0).getVM(1));
+    // Reconnect the member.
+    vm1.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm1)));
   }
 
   /**
    * Successful if a member performs a full backup when its backup data is not 
present in the
-   * baseline (for whatever reason). This also tests what happens when a 
member is offline during
-   * the baseline backup.
+   * performBackupBaseline (for whatever reason). This also tests what happens 
when a member is
+   * offline during the performBackupBaseline backup.
    *
+   * <p>
    * The test is regarded as successful when all of the missing members oplog 
files are backed up
-   * during an incremental backup. This means that the member peformed a full 
backup because its
-   * oplogs were missing in the baseline.
+   * during an performBackupIncremental backup. This means that the member 
performed a full backup
+   * because its oplogs were missing in the performBackupBaseline.
    */
   @Test
   public void testMissingMemberInBaseline() throws Exception {
-    // Simulate the missing member by forcing a persistent member
-    // to go offline.
-    final PersistentID missingMember =
-        disconnect(Host.getHost(0).getVM(0), Host.getHost(0).getVM(1));
-
-    /*
-     * Perform baseline and make sure that the list of offline disk stores 
contains our missing
-     * member.
-     */
-    BackupStatus baselineStatus = performBaseline();
-    assertBackupStatus(baselineStatus);
-    assertNotNull(baselineStatus.getOfflineDiskStores());
-    assertEquals(2, baselineStatus.getOfflineDiskStores().size());
+    // Simulate the missing member by forcing a persistent member to go 
offline.
+    PersistentID missingMember = vm0.invoke(() -> 
getPersistentID(diskStoreName1));
+    vm0.invoke(() -> cacheRule.getCache().close());
+
+    await().atMost(2, MINUTES)
+        .until(() -> vm1.invoke(() -> 
getMissingPersistentMembers().contains(missingMember)));
+
+    // Perform performBackupBaseline and make sure that list of offline disk 
stores contains our
+    // missing member.
+    BackupStatus baselineStatus = vm1.invoke(() -> 
performBackup(getBaselinePath()));
+    validateBackupStatus(baselineStatus);
+    assertThat(baselineStatus.getOfflineDiskStores()).isNotNull().hasSize(2);
 
     // Find all of the member's oplogs in the missing member's diskstore 
directory structure
     // (*.crf,*.krf,*.drf)
     Collection<File> missingMemberOplogFiles =
-        FileUtils.listFiles(new File(missingMember.getDirectory()),
-            new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(missingMemberOplogFiles.isEmpty());
-
-    /*
-     * Restart our missing member and make sure it is back online and part of 
the distributed system
-     */
-    openCache(Host.getHost(0).getVM(0));
-
-    /*
-     * After reconnecting make sure the other members agree that the missing 
member is back online.
-     */
-    final Set<PersistentID> missingMembers = new HashSet<>();
-    Wait.waitForCriterion(new WaitCriterion() {
-      @Override
-      public boolean done() {
-        missingMembers.clear();
-        missingMembers.addAll(getMissingMembers(Host.getHost(0).getVM(1)));
-        return !missingMembers.contains(missingMember);
-      }
+        listFiles(new File(missingMember.getDirectory()), OPLOG_FILTER, 
DIRECTORY);
+    assertThat(missingMemberOplogFiles).isNotEmpty();
 
-      @Override
-      public String description() {
-        return "[testMissingMemberInBasline] Wait for missing member.";
-      }
-    }, 10000, 500, false);
+    // Restart our missing member and make sure it is back online and part of 
the cluster
+    vm0.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm0)));
 
-    assertEquals(0, missingMembers.size());
+    // After reconnecting make sure the other members agree that the missing 
member is back online.
+    await().atMost(2, MINUTES)
+        .until(() -> 
assertThat(getMissingPersistentMembers()).doesNotContain(missingMember));
 
-    /*
-     * Peform incremental and make sure we have no offline disk stores.
-     */
-    BackupStatus incrementalStatus = performIncremental();
-    assertBackupStatus(incrementalStatus);
-    assertNotNull(incrementalStatus.getOfflineDiskStores());
-    assertEquals(0, incrementalStatus.getOfflineDiskStores().size());
+    // Perform performBackupIncremental and make sure we have no offline disk 
stores.
+    BackupStatus incrementalStatus =
+        vm1.invoke(() -> performBackup(getIncrementalPath(), 
getBaselineBackupPath()));
+    validateBackupStatus(incrementalStatus);
+    assertThat(incrementalStatus.getOfflineDiskStores()).isNotNull().isEmpty();
 
     // Get the missing member's member id which is different from the 
PersistentID
-    String memberId = getMemberId(Host.getHost(0).getVM(0));
-    assertNotNull(memberId);
+    String memberId = vm0.invoke(() -> getModifiedMemberId());
 
-    // Get list of backed up oplog files in the incremental backup for the 
missing member
+    // Get list of backed up oplog files in the performBackupIncremental 
backup for the missing
+    // member
     File incrementalMemberDir = getBackupDirForMember(getIncrementalDir(), 
memberId);
-    Collection<File> backupOplogFiles = 
FileUtils.listFiles(incrementalMemberDir,
-        new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(backupOplogFiles.isEmpty());
+    Collection<File> backupOplogFiles = listFiles(incrementalMemberDir, 
OPLOG_FILTER, DIRECTORY);
+    assertThat(backupOplogFiles).isNotEmpty();
 
     // Transform missing member oplogs to just their file names.
     List<String> missingMemberOplogNames = new LinkedList<>();
     TransformUtils.transform(missingMemberOplogFiles, missingMemberOplogNames,
         TransformUtils.fileNameTransformer);
 
-    // Transform missing member's incremental backup oplogs to just their file 
names.
+    // Transform missing member's performBackupIncremental backup oplogs to 
just their file names.
     List<String> backupOplogNames = new LinkedList<>();
     TransformUtils.transform(backupOplogFiles, backupOplogNames,
         TransformUtils.fileNameTransformer);
 
-    /*
-     * Make sure that the incremental backup for the missing member contains 
all of the operation
-     * logs for that member. This proves that a full backup was performed for 
that member.
-     */
-    assertTrue(backupOplogNames.containsAll(missingMemberOplogNames));
+    // Make sure that the performBackupIncremental backup for the missing 
member contains all of the
+    // operation logs for that member. This proves that a full backup was 
performed for that member.
+    assertThat(backupOplogNames).containsAll(missingMemberOplogNames);
   }
 
   /**
    * Successful if a member performs a full backup if their backup is marked 
as incomplete in the
-   * baseline.
+   * performBackupBaseline.
    */
   @Test
   public void testIncompleteInBaseline() throws Exception {
-    /*
-     * Get the member ID for VM 1 and perform a baseline.
-     */
-    String memberId = getMemberId(Host.getHost(0).getVM(1));
-    assertBackupStatus(performBaseline());
-
-    /*
-     * Find all of the member's oplogs in the baseline (*.crf,*.krf,*.drf)
-     */
+    // Get the member ID for VM 1 and perform a performBackupBaseline.
+    String memberId = vm1.invoke(() -> getModifiedMemberId());
+    validateBackupStatus(vm1.invoke(() -> performBackup(getBaselinePath())));
+
+    // Find all of the member's oplogs in the performBackupBaseline 
(*.crf,*.krf,*.drf)
     Collection<File> memberBaselineOplogs =
-        FileUtils.listFiles(getBackupDirForMember(getBaselineDir(), memberId),
-            new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(memberBaselineOplogs.isEmpty());
+        listFiles(getBackupDirForMember(getBaselineDir(), memberId), 
OPLOG_FILTER, DIRECTORY);
+    assertThat(memberBaselineOplogs).isNotEmpty();
 
     List<String> memberBaselineOplogNames = new LinkedList<>();
     TransformUtils.transform(memberBaselineOplogs, memberBaselineOplogNames,
         TransformUtils.fileNameTransformer);
 
-    // Mark the baseline as incomplete (even though it really isn't)
-    markAsIncomplete(Host.getHost(0).getVM(1));
+    vm1.invoke(() -> {
+      File backupDir = getBackupDirForMember(getBaselineDir(), 
getModifiedMemberId());
+      assertThat(backupDir).exists();
+
+      // Mark the performBackupBaseline as incomplete (even though it really 
isn't)
+      File incomplete = new File(backupDir, 
BackupWriter.INCOMPLETE_BACKUP_FILE);
+      assertThat(incomplete.createNewFile()).isTrue();
+    });
 
-    /*
-     * Do an incremental. It should discover that the baseline is incomplete 
and backup all of the
-     * operation logs that are in the baseline.
-     */
-    assertBackupStatus(performIncremental());
+    // Do an performBackupIncremental. It should discover that the 
performBackupBaseline is
+    // incomplete and backup all of the operation logs that are in the 
performBackupBaseline.
+    validateBackupStatus(
+        vm1.invoke(() -> performBackup(getIncrementalPath(), 
getBaselineBackupPath())));
 
-    /*
-     * Find all of the member's oplogs in the incremental (*.crf,*.krf,*.drf)
-     */
+    // Find all of the member's oplogs in the performBackupIncremental 
(*.crf,*.krf,*.drf)
     Collection<File> memberIncrementalOplogs =
-        FileUtils.listFiles(getBackupDirForMember(getIncrementalDir(), 
memberId),
-            new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(memberIncrementalOplogs.isEmpty());
+        listFiles(getBackupDirForMember(getIncrementalDir(), memberId), 
OPLOG_FILTER, DIRECTORY);
+    assertThat(memberIncrementalOplogs).isNotEmpty();
 
     List<String> memberIncrementalOplogNames = new LinkedList<>();
     TransformUtils.transform(memberIncrementalOplogs, 
memberIncrementalOplogNames,
         TransformUtils.fileNameTransformer);
 
-    /*
-     * Assert that all of the baseline operation logs are in the incremental 
backup. If so, then the
-     * incomplete marker was discovered in the baseline by the incremental 
backup process.
-     */
-    for (String oplog : memberBaselineOplogNames) {
-      assertTrue(memberIncrementalOplogNames.contains(oplog));
-    }
+    // Assert that all of the performBackupBaseline operation logs are in the
+    // performBackupIncremental backup. If so, then the incomplete marker was 
discovered in the
+    // performBackupBaseline by the performBackupIncremental backup process.
+    
assertThat(memberIncrementalOplogNames).containsAll(memberBaselineOplogNames);
   }
 
   /**
-   * Successful if all members perform a full backup when they share the 
baseline directory and it
-   * is missing.
+   * Successful if all members perform a full backup when they share the 
performBackupBaseline
+   * directory and it is missing.
    */
   @Test
   public void testMissingBaseline() throws Exception {
-    /*
-     * Get the member ID for VM 1 and perform a baseline.
-     */
-    String memberId = getMemberId(Host.getHost(0).getVM(1));
-    assertBackupStatus(performBaseline());
-
-    /*
-     * Find all of the member's oplogs in the baseline (*.crf,*.krf,*.drf)
-     */
+    // Get the member ID for VM 1 and perform a performBackupBaseline.
+    String memberId = vm1.invoke(() -> getModifiedMemberId());
+    validateBackupStatus(vm1.invoke(() -> performBackup(getBaselinePath())));
+
+    // Find all of the member's oplogs in the performBackupBaseline 
(*.crf,*.krf,*.drf)
     Collection<File> memberBaselineOplogs =
-        FileUtils.listFiles(getBackupDirForMember(getBaselineDir(), memberId),
-            new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(memberBaselineOplogs.isEmpty());
+        listFiles(getBackupDirForMember(getBaselineDir(), memberId), 
OPLOG_FILTER, DIRECTORY);
+    assertThat(memberBaselineOplogs).isNotEmpty();
 
     List<String> memberBaselineOplogNames = new LinkedList<>();
     TransformUtils.transform(memberBaselineOplogs, memberBaselineOplogNames,
         TransformUtils.fileNameTransformer);
 
-    /*
-     * Custom incremental backup callable that retrieves the current baseline 
before deletion.
-     */
-    SerializableCallable callable = new SerializableCallable("Backup all 
members.") {
-      private final File baselineDir = getBaselineBackupDir();
-
-      @Override
-      public Object call() {
-        try {
-          return new BackupOperation(getSystem().getDistributionManager(), 
getCache())
-              .backupAllMembers(
-                  getIncrementalDir().toString(), this.baselineDir.toString());
-
-        } catch (ManagementException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-
-    /*
-     * Do an incremental after deleting the baseline. It should discover that 
the baseline is gone
-     * and backup all of the operation logs that are in the baseline.
-     */
+    // Do an performBackupIncremental after deleting the 
performBackupBaseline. It should discover
+    // that the performBackupBaseline is gone and backup all of the operation 
logs that are in the
+    // performBackupBaseline.
     FileUtils.deleteDirectory(getBaselineDir());
-    Host.getHost(0).getVM(1).invoke(callable);
 
-    /*
-     * Find all of the member's oplogs in the incremental (*.crf,*.krf,*.drf)
-     */
+    // Custom performBackupIncremental backup callable that retrieves the 
current
+    // performBackupBaseline before deletion.
+    vm1.invoke(() -> {
+      new BackupOperation(cacheRule.getSystem().getDistributionManager(), 
cacheRule.getCache())
+          .backupAllMembers(getIncrementalPath(), getBaselinePath());
+    });
+
+    // Find all of the member's oplogs in the performBackupIncremental 
(*.crf,*.krf,*.drf)
     Collection<File> memberIncrementalOplogs =
-        FileUtils.listFiles(getBackupDirForMember(getIncrementalDir(), 
memberId),
-            new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
+        listFiles(getBackupDirForMember(getIncrementalDir(), memberId), 
OPLOG_FILTER, DIRECTORY);
     assertThat(memberIncrementalOplogs).isNotEmpty();
 
     List<String> memberIncrementalOplogNames = new LinkedList<>();
     TransformUtils.transform(memberIncrementalOplogs, 
memberIncrementalOplogNames,
         TransformUtils.fileNameTransformer);
 
-    /*
-     * Assert that all of the baseline operation logs are in the incremental 
backup. If so, then the
-     * missing baseline was discovered by the incremental backup process.
-     */
-    for (String oplog : memberBaselineOplogNames) {
-      assertTrue(memberIncrementalOplogNames.contains(oplog));
-    }
+    // Assert that all of the performBackupBaseline operation logs are in the
+    // performBackupIncremental backup. If so, then the missing 
performBackupBaseline was discovered
+    // by the performBackupIncremental backup process.
+    
assertThat(memberIncrementalOplogNames).containsAll(memberBaselineOplogNames);
   }
 
   /**
-   * Successful if a user deployed jar file is included as part of the backup.
+   * Verifies that a user deployed jar file is included as part of the backup.
    */
   @Test
   public void testBackupUserDeployedJarFiles() throws Exception {
-    final String jarName = "BackupJarDeploymentDUnit";
-    final String jarNameRegex = ".*" + jarName + ".*";
-    final ClassBuilder classBuilder = new ClassBuilder();
-    final byte[] classBytes = classBuilder.createJarFromName(jarName);
+    String jarName = "BackupJarDeploymentDUnit";
+    byte[] classBytes = new ClassBuilder().createJarFromName(jarName);
 
-    File jarFile = tempDir.newFile();
+    File jarFile = temporaryFolder.newFile();
     IOUtils.copyLarge(new ByteArrayInputStream(classBytes), new 
FileOutputStream(jarFile));
 
-    VM vm0 = Host.getHost(0).getVM(0);
-
-    /*
-     * Deploy a "dummy" jar to the VM.
-     */
+    // Deploy a "dummy" jar to the VM.
     File deployedJarFile = vm0.invoke(() -> {
       DeployedJar deployedJar =
           ClassPathLoader.getLatest().getJarDeployer().deploy(jarName, 
jarFile);
       return deployedJar.getFile();
     });
 
-    assertTrue(deployedJarFile.exists());
-    /*
-     * Perform backup. Make sure it is successful.
-     */
-    assertBackupStatus(baseline(vm0));
+    assertThat(deployedJarFile).exists();
 
-    /*
-     * Make sure the user deployed jar is part of the backup.
-     */
+    // Perform backup. Make sure it is successful.
+    validateBackupStatus(vm0.invoke(() -> performBackup(getBaselinePath())));
+
+    // Make sure the user deployed jar is part of the backup.
     Collection<File> memberDeployedJarFiles =
-        FileUtils.listFiles(getBackupDirForMember(getBaselineDir(), 
getMemberId(vm0)),
-            new RegexFileFilter(jarNameRegex), DirectoryFileFilter.DIRECTORY);
-    assertFalse(memberDeployedJarFiles.isEmpty());
+        listFiles(getBackupDirForMember(getBaselineDir(), vm0.invoke(() -> 
getModifiedMemberId())),
+            new RegexFileFilter(".*" + jarName + ".*"), DIRECTORY);
+    assertThat(memberDeployedJarFiles).isNotEmpty();
 
     // Shut down our member so we can perform a restore
-    PersistentID id = getPersistentID(vm0);
-    closeCache(vm0);
-
-    /*
-     * Get the VM's user directory.
-     */
-    final String vmDir = vm0.invoke(() -> System.getProperty("user.dir"));
+    PersistentID id = vm0.invoke(() -> getPersistentID(diskStoreName1));
+    vm0.invoke(() -> cacheRule.getCache().close());
 
-    File backupDir = getBackupDirForMember(getBaselineDir(), getMemberId(vm0));
+    // Get the VM's user directory.
+    String vmDir = vm0.invoke(() -> System.getProperty("user.dir"));
 
-    vm0.bounce();
+    File backupDir =
+        getBackupDirForMember(getBaselineDir(), vm0.invoke(() -> 
getModifiedMemberId()));
 
-    /*
-     * Cleanup "dummy" jar from file system.
-     */
-    Pattern pattern = Pattern.compile('^' + jarName + ".*#\\d++$");
-    deleteMatching(new File("."), pattern);
+    // Cleanup "dummy" jar from file system.
+    deleteMatching(new File("."), Pattern.compile('^' + jarName + 
".*#\\d++$"));
 
     // Execute the restore
     performRestore(new File(id.getDirectory()), backupDir);
 
-    /*
-     * Make sure the user deployed jar is part of the restore.
-     */
-    Collection<File> restoredJars = FileUtils.listFiles(new File(vmDir),
-        new RegexFileFilter(jarNameRegex), DirectoryFileFilter.DIRECTORY);
-    assertFalse(restoredJars.isEmpty());
+    // Make sure the user deployed jar is part of the restore.
+    Collection<File> restoredJars =
+        listFiles(new File(vmDir), new RegexFileFilter(".*" + jarName + ".*"), 
DIRECTORY);
+    assertThat(restoredJars).isNotEmpty();
+
     List<String> restoredJarNames = new LinkedList<>();
     TransformUtils.transform(memberDeployedJarFiles, restoredJarNames,
         TransformUtils.fileNameTransformer);
     for (String name : restoredJarNames) {
-      assertTrue(name.contains(jarName));
+      assertThat(name).contains(jarName);
     }
 
     // Restart the member
-    openCache(vm0);
+    vm0.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm0)));
 
-    /*
-     * Remove the "dummy" jar from the VM.
-     */
+    // Remove the "dummy" jar from the VM.
     vm0.invoke(() -> {
       for (DeployedJar jarClassLoader : 
ClassPathLoader.getLatest().getJarDeployer()
           .findDeployedJars()) {
@@ -1069,16 +483,217 @@ public class IncrementalBackupDistributedTest extends 
JUnit4CacheTestCase {
       }
     });
 
-    /*
-     * Cleanup "dummy" jar from file system.
-     */
-    pattern = Pattern.compile('^' + jarName + ".*#\\d++$");
-    deleteMatching(new File(vmDir), pattern);
+    // Cleanup "dummy" jar from file system.
+    deleteMatching(new File(vmDir), Pattern.compile('^' + jarName + 
".*#\\d++$"));
+  }
+
+  private void createCache(final File diskDir) {
+    cacheRule.getOrCreateCache();
+
+    createDiskStore(diskStoreName1, diskDir);
+    createDiskStore(diskStoreName2, diskDir);
+
+    createRegion(regionName1, diskStoreName1);
+    createRegion(regionName2, diskStoreName2);
+  }
+
+  private void createDiskStore(final String diskStoreName, final File diskDir) 
{
+    DiskStoreFactory diskStoreFactory = 
cacheRule.getCache().createDiskStoreFactory();
+    diskStoreFactory.setDiskDirs(new File[] {diskDir});
+    diskStoreFactory.create(diskStoreName);
+  }
+
+  private void createRegion(final String regionName, final String 
diskStoreName) {
+    PartitionAttributesFactory<Integer, String> partitionAttributesFactory =
+        new PartitionAttributesFactory<>();
+    partitionAttributesFactory.setTotalNumBuckets(5);
+
+    RegionFactory<Integer, String> regionFactory =
+        cacheRule.getCache().createRegionFactory(PARTITION_PERSISTENT);
+    regionFactory.setDiskStoreName(diskStoreName);
+    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
+    regionFactory.create(regionName);
+  }
+
+  private File getBaselineDir() {
+    File dir = new File(temporaryFolder.getRoot(), "baseline");
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+
+    return dir;
+  }
+
+  private String getBaselinePath() {
+    return getBaselineDir().getAbsolutePath();
+  }
+
+  private File getIncrementalDir() {
+    File dir = new File(temporaryFolder.getRoot(), "incremental");
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+
+    return dir;
+  }
+
+  private String getIncrementalPath() {
+    return getIncrementalDir().getAbsolutePath();
+  }
+
+  private File getIncremental2Dir() {
+    File dir = new File(temporaryFolder.getRoot(), "incremental2");
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+
+    return dir;
+  }
+
+  private String getIncremental2Path() {
+    return getIncremental2Dir().getAbsolutePath();
+  }
+
+  private Set<PersistentID> getMissingPersistentMembers() {
+    return AdminDistributedSystemImpl
+        
.getMissingPersistentMembers(cacheRule.getCache().getDistributionManager());
+  }
+
+  private BackupStatus performBackup(final String targetDirPath) {
+    return performBackup(targetDirPath, null);
+  }
+
+  private BackupStatus performBackup(final String targetDirPath, final String 
baselineDirPath) {
+    return new BackupOperation(cacheRule.getCache().getDistributionManager(), 
cacheRule.getCache())
+        .backupAllMembers(targetDirPath, baselineDirPath);
+  }
+
+  private String getModifiedMemberId() {
+    return 
cacheRule.getCache().getDistributedSystem().getDistributedMember().toString()
+        .replaceAll("[^\\w]+", "_");
+  }
+
+  private PersistentID getPersistentID(final String diskStoreName) {
+    for (DiskStore diskStore : cacheRule.getCache().listDiskStores()) {
+      if (diskStore.getName().equals(diskStoreName)) {
+        return ((DiskStoreImpl) diskStore).getPersistentID();
+      }
+    }
+    throw new Error("Failed to find disk store " + diskStoreName);
+  }
+
+  private void waitForBackup() {
+    Collection<DiskStore> backupInProgress = 
cacheRule.getCache().listDiskStores();
+    List<DiskStoreImpl> backupCompleteList = new LinkedList<>();
+
+    while (backupCompleteList.size() < backupInProgress.size()) {
+      for (DiskStore diskStore : backupInProgress) {
+        if (((DiskStoreImpl) diskStore).getInProgressBackup() == null
+            && !backupCompleteList.contains(diskStore)) {
+          backupCompleteList.add((DiskStoreImpl) diskStore);
+        }
+      }
+    }
+  }
+
+  private String getBaselineBackupPath() {
+    File[] dirs = getBaselineDir().listFiles((FileFilter) DIRECTORY);
+    assertThat(dirs).hasSize(1);
+    return dirs[0].getAbsolutePath();
+  }
+
+  private String getIncrementalBackupPath() {
+    File[] dirs = getIncrementalDir().listFiles((FileFilter) DIRECTORY);
+    assertThat(dirs).hasSize(1);
+    return dirs[0].getAbsolutePath();
+  }
+
+  private File getBackupDirForMember(final File rootDir, final String 
memberId) {
+    File[] dateDirs = rootDir.listFiles((FileFilter) DIRECTORY);
+    assertThat(dateDirs).hasSize(1);
+
+    File[] memberDirs =
+        dateDirs[0].listFiles(file -> file.isDirectory() && 
file.getName().contains(memberId));
+    assertThat(memberDirs).hasSize(1);
+
+    return memberDirs[0];
+  }
+
+  private ReadingMode getReadingMode() {
+    return SystemUtils.isWindows() ? ReadingMode.NON_BLOCKING : 
ReadingMode.BLOCKING;
+  }
+
+  private void execute(final String command) throws IOException, 
InterruptedException {
+    process = new ProcessBuilder(command).redirectErrorStream(true).start();
+
+    processReader = new 
ProcessStreamReader.Builder(process).inputStream(process.getInputStream())
+        .inputListener(line -> logger.info("OUTPUT: {}", line))
+        .readingMode(getReadingMode()).continueReadingMillis(2 * 
1000).build().start();
+
+    assertThat(process.waitFor(5, MINUTES)).isTrue();
+    assertThat(process.exitValue()).isEqualTo(0);
+  }
+
+  private void performRestore(final File memberDir, final File backupDir)
+      throws IOException, InterruptedException {
+    // The restore script will not restore if there is an if file in the copy 
to directory. Remove
+    // these files first.
+    Collection<File> ifFiles = listFiles(memberDir, new 
RegexFileFilter(".*\\.if$"), DIRECTORY);
+    for (File file : ifFiles) {
+      assertThat(file.delete()).isTrue();
+    }
+
+    // Remove all operation logs.
+    Collection<File> oplogs = listFiles(memberDir, OPLOG_FILTER, DIRECTORY);
+    for (File file : oplogs) {
+      assertThat(file.delete()).isTrue();
+    }
+
+    // Get a hold of the restore script and make sure it is there.
+    File restoreScript = new File(backupDir, "restore.sh");
+    if (!restoreScript.exists()) {
+      restoreScript = new File(backupDir, "restore.bat");
+    }
+    assertThat(restoreScript).exists();
+
+    execute(restoreScript.getAbsolutePath());
+  }
+
+  private void performPuts() {
+    Region<Integer, String> region = 
cacheRule.getCache().getRegion(regionName1);
+
+    // Fill our region data
+    for (int i = dataStart; i < dataEnd; ++i) {
+      region.put(i, Integer.toString(i));
+    }
+
+    Region<Integer, String> barRegion = 
cacheRule.getCache().getRegion(regionName2);
+
+    // Fill our region data
+    for (int i = dataStart; i < dataEnd; ++i) {
+      barRegion.put(i, Integer.toString(i));
+    }
+
+    dataStart += DATA_INCREMENT;
+    dataEnd += DATA_INCREMENT;
+  }
+
+  private void validateBackupStatus(final BackupStatus backupStatus) {
+    Map<DistributedMember, Set<PersistentID>> backupMap = 
backupStatus.getBackedUpDiskStores();
+    assertThat(backupMap).isNotEmpty();
+
+    for (DistributedMember member : backupMap.keySet()) {
+      assertThat(backupMap.get(member)).isNotEmpty();
+      for (PersistentID id : backupMap.get(member)) {
+        assertThat(id.getHost()).isNotNull();
+        assertThat(id.getUUID()).isNotNull();
+        assertThat(id.getDirectory()).isNotNull();
+      }
+    }
   }
 
-  private void deleteMatching(File dir, final Pattern pattern) throws 
IOException {
-    Collection<File> files =
-        FileUtils.listFiles(dir, new RegexFileFilter(pattern), 
DirectoryFileFilter.DIRECTORY);
+  private void deleteMatching(final File dir, final Pattern pattern) throws 
IOException {
+    Collection<File> files = listFiles(dir, new RegexFileFilter(pattern), 
DIRECTORY);
     for (File file : files) {
       Files.delete(file.toPath());
     }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PartitionedBackupPrepareAndFinishMsgDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PartitionedBackupPrepareAndFinishMsgDUnitTest.java
deleted file mode 100644
index 7ebbea8..0000000
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PartitionedBackupPrepareAndFinishMsgDUnitTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.backup;
-
-import java.io.IOException;
-
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-
-public class PartitionedBackupPrepareAndFinishMsgDUnitTest
-    extends BackupPrepareAndFinishMsgDUnitTest {
-
-  @Override
-  public Region<Integer, Integer> createRegion() throws IOException {
-    return createRegion(RegionShortcut.PARTITION_PERSISTENT);
-  }
-}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupPrepareAndFinishMsgDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareAndFinishBackupDistributedTest.java
similarity index 60%
rename from 
geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupPrepareAndFinishMsgDUnitTest.java
rename to 
geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareAndFinishBackupDistributedTest.java
index 1ab9933..0c648af 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupPrepareAndFinishMsgDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareAndFinishBackupDistributedTest.java
@@ -14,14 +14,15 @@
  */
 package org.apache.geode.internal.cache.backup;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -41,7 +42,11 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized.UseParametersRunnerFactory;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.DiskStore;
@@ -54,67 +59,92 @@ import org.apache.geode.cache.query.FunctionDomainException;
 import org.apache.geode.cache.query.NameResolutionException;
 import org.apache.geode.cache.query.QueryInvocationTargetException;
 import org.apache.geode.cache.query.TypeMismatchException;
-import org.apache.geode.cache30.CacheTestCase;
 import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
 import org.apache.geode.test.junit.categories.DistributedTest;
+import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+import 
org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 @Category({DistributedTest.class})
-public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase 
{
+@RunWith(Parameterized.class)
+@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+@SuppressWarnings({"serial", "unused"})
+public class PrepareAndFinishBackupDistributedTest {
 
-  // Although this test does not make use of other members, the current member 
needs to be
-  // a distributed member (rather than local) because it sends prepare and 
finish backup messages
-  private static final String TEST_REGION_NAME = "TestRegion";
+  private String uniqueName;
+  private String regionName;
+  private Region<Integer, Integer> region;
+
+  @Parameter
+  public RegionShortcut regionShortcut;
+
+  @Parameters
+  public static Collection<RegionShortcut> data() {
+    return Arrays.asList(PARTITION_PERSISTENT, REPLICATE_PERSISTENT);
+  }
 
   @Rule
-  public TemporaryFolder tempDir = new TemporaryFolder();
+  public DistributedTestRule distributedTestRule = new DistributedTestRule();
 
-  private File[] diskDirs = null;
-  private Region<Integer, Integer> region;
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public DistributedDiskDirRule diskDirRule = new DistributedDiskDirRule();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new 
SerializableTemporaryFolder();
 
-  protected abstract Region<Integer, Integer> createRegion() throws 
IOException;
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
 
   @Before
-  public void setup() throws IOException {
-    region = createRegion();
+  public void setUp() {
+    uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    regionName = uniqueName + "_region";
+
+    region = createRegion(regionShortcut);
   }
 
   @Test
-  public void createWaitsForBackupTest() throws Throwable {
+  public void createWaitsForBackupTest() throws Exception {
     doActionAndVerifyWaitForBackup(() -> region.create(1, 1));
     verifyKeyValuePair(1, 1);
   }
 
   @Test
-  public void putThatCreatesWaitsForBackupTest() throws Throwable {
+  public void putThatCreatesWaitsForBackupTest() throws Exception {
     doActionAndVerifyWaitForBackup(() -> region.put(1, 1));
     verifyKeyValuePair(1, 1);
   }
 
   @Test
-  public void putWaitsForBackupTest() throws Throwable {
+  public void putWaitsForBackupTest() throws Exception {
     region.put(1, 1);
     doActionAndVerifyWaitForBackup(() -> region.put(1, 2));
     verifyKeyValuePair(1, 2);
   }
 
   @Test
-  public void invalidateWaitsForBackupTest() throws Throwable {
+  public void invalidateWaitsForBackupTest() throws Exception {
     region.put(1, 1);
     doActionAndVerifyWaitForBackup(() -> region.invalidate(1));
     verifyKeyValuePair(1, null);
   }
 
   @Test
-  public void destroyWaitsForBackupTest() throws Throwable {
+  public void destroyWaitsForBackupTest() throws Exception {
     region.put(1, 1);
     doActionAndVerifyWaitForBackup(() -> region.destroy(1));
-    assertFalse(region.containsKey(1));
+    assertThat(region).doesNotContainKey(1);
   }
 
   @Test
-  public void putAllWaitsForBackupTest() throws Throwable {
+  public void putAllWaitsForBackupTest() throws Exception {
     Map<Integer, Integer> entries = new HashMap<>();
     entries.put(1, 1);
     entries.put(2, 2);
@@ -125,13 +155,13 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest 
extends CacheTestCase {
   }
 
   @Test
-  public void removeAllWaitsForBackupTest() throws Throwable {
+  public void removeAllWaitsForBackupTest() throws Exception {
     region.put(1, 1);
     region.put(2, 2);
 
     List<Integer> keys = Arrays.asList(1, 2);
     doActionAndVerifyWaitForBackup(() -> region.removeAll(keys));
-    assertTrue(region.isEmpty());
+    assertThat(region).isEmpty();
   }
 
   @Test
@@ -140,42 +170,77 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest 
extends CacheTestCase {
     doReadActionsAndVerifyCompletion();
   }
 
+  /**
+   * Create a region, installing the test hook in the backup lock
+   *
+   * @param shortcut The region shortcut to use to create the region
+   * @return The newly created region.
+   */
+  private Region<Integer, Integer> createRegion(RegionShortcut shortcut) {
+    Cache cache = cacheRule.getOrCreateCache();
+
+    DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+    diskStoreFactory.setDiskDirs(new File[] {getDiskDir()});
+
+    DiskStore diskStore = diskStoreFactory.create(getUniqueName());
+
+    RegionFactory<Integer, Integer> regionFactory = 
cache.createRegionFactory(shortcut);
+    regionFactory.setDiskStoreName(diskStore.getName());
+    regionFactory.setDiskSynchronous(true);
+
+    if (shortcut.equals(PARTITION_PERSISTENT)) {
+      PartitionAttributesFactory partitionAttributesFactory = new 
PartitionAttributesFactory();
+      partitionAttributesFactory.setTotalNumBuckets(1);
+      
regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
+    }
+
+    return regionFactory.create(regionName);
+  }
+
   private void doActionAndVerifyWaitForBackup(Runnable function)
       throws InterruptedException, TimeoutException, ExecutionException {
-    DistributionManager dm = 
GemFireCacheImpl.getInstance().getDistributionManager();
+    DistributionManager dm = cacheRule.getCache().getDistributionManager();
     Set recipients = dm.getOtherDistributionManagerIds();
+
     Properties backupProperties = new BackupConfigFactory()
-        .withTargetDirPath(diskDirs[0].toString()).createBackupProperties();
-    Future<Void> future = null;
+        .withTargetDirPath(getDiskDir().toString()).createBackupProperties();
+
     new PrepareBackupStep(dm, dm.getId(), dm.getCache(), recipients,
         new PrepareBackupFactory(), backupProperties).send();
+
     ReentrantLock backupLock = ((LocalRegion) 
region).getDiskStore().getBackupLock();
-    future = CompletableFuture.runAsync(function);
+    Future<Void> future = CompletableFuture.runAsync(function);
     Awaitility.await().atMost(5, TimeUnit.SECONDS)
-        .until(() -> assertTrue(backupLock.getQueueLength() > 0));
+        .until(() -> 
assertThat(backupLock.getQueueLength()).isGreaterThanOrEqualTo(0));
+
     new FinishBackupStep(dm, dm.getId(), dm.getCache(), recipients, new 
FinishBackupFactory())
         .send();
+
     future.get(5, TimeUnit.SECONDS);
   }
 
   private void doReadActionsAndVerifyCompletion() {
-    DistributionManager dm = 
GemFireCacheImpl.getInstance().getDistributionManager();
+    DistributionManager dm = cacheRule.getCache().getDistributionManager();
     Set recipients = dm.getOtherDistributionManagerIds();
+
     Properties backupProperties = new BackupConfigFactory()
-        .withTargetDirPath(diskDirs[0].toString()).createBackupProperties();
+        .withTargetDirPath(getDiskDir().toString()).createBackupProperties();
+
     new PrepareBackupStep(dm, dm.getId(), dm.getCache(), recipients,
         new PrepareBackupFactory(), backupProperties).send();
+
     ReentrantLock backupLock = ((LocalRegion) 
region).getDiskStore().getBackupLock();
     List<CompletableFuture<?>> futureList = doReadActions();
     CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture<?>[futureList.size()]));
-    assertTrue(backupLock.getQueueLength() == 0);
+    assertThat(backupLock.getQueueLength()).isEqualTo(0);
+
     new FinishBackupStep(dm, dm.getId(), dm.getCache(), recipients, new 
FinishBackupFactory())
         .send();
   }
 
   private void verifyKeyValuePair(Integer key, Integer expectedValue) {
-    assertTrue(region.containsKey(key));
-    assertEquals(expectedValue, region.get(key));
+    assertThat(region).containsKey(key);
+    assertThat(region.get(key)).isEqualTo(expectedValue);
   }
 
   private List<CompletableFuture<?>> doReadActions() {
@@ -183,15 +248,15 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest 
extends CacheTestCase {
     actions.add(() -> region.get(1));
     actions.add(() -> region.containsKey(1));
     actions.add(() -> region.containsValue(1));
-    actions.add(region::entrySet);
-    actions.add(this::valueExistsCheck);
+    actions.add(() -> region.entrySet());
+    actions.add(() -> valueExistsCheck());
     actions.add(() -> region.getAll(Collections.emptyList()));
     actions.add(() -> region.getEntry(1));
-    actions.add(region::isEmpty);
-    actions.add(region::keySet);
-    actions.add(region::size);
-    actions.add(region::values);
-    actions.add(this::queryCheck);
+    actions.add(() -> region.isEmpty());
+    actions.add(() -> region.keySet());
+    actions.add(() -> region.size());
+    actions.add(() -> region.values());
+    actions.add(() -> queryCheck());
     return actions.stream().map(runnable -> 
CompletableFuture.runAsync(runnable))
         .collect(Collectors.toList());
   }
@@ -207,34 +272,18 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest 
extends CacheTestCase {
 
   private void queryCheck() {
     try {
-      region.query("select * from /" + TEST_REGION_NAME);
+      region.query("select * from /" + regionName);
     } catch (FunctionDomainException | TypeMismatchException | 
NameResolutionException
         | QueryInvocationTargetException e) {
       throw new RuntimeException(e);
     }
   }
 
-  /**
-   * Create a region, installing the test hook in the backup lock
-   *
-   * @param shortcut The region shortcut to use to create the region
-   * @return The newly created region.
-   */
-  protected Region<Integer, Integer> createRegion(RegionShortcut shortcut) 
throws IOException {
-    Cache cache = getCache();
-    DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
-    diskDirs = new File[] {tempDir.newFolder()};
-    diskStoreFactory.setDiskDirs(diskDirs);
-    DiskStore diskStore = diskStoreFactory.create(getUniqueName());
+  private String getUniqueName() {
+    return uniqueName;
+  }
 
-    RegionFactory<Integer, Integer> regionFactory = 
cache.createRegionFactory(shortcut);
-    regionFactory.setDiskStoreName(diskStore.getName());
-    regionFactory.setDiskSynchronous(true);
-    if (shortcut.equals(RegionShortcut.PARTITION_PERSISTENT)) {
-      PartitionAttributesFactory prFactory = new PartitionAttributesFactory();
-      prFactory.setTotalNumBuckets(1);
-      regionFactory.setPartitionAttributes(prFactory.create());
-    }
-    return regionFactory.create(TEST_REGION_NAME);
+  private File getDiskDir() {
+    return diskDirRule.getDiskDirFor(getController());
   }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/ReplicateBackupPrepareAndFinishMsgDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/ReplicateBackupPrepareAndFinishMsgDUnitTest.java
deleted file mode 100644
index 45941cc..0000000
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/ReplicateBackupPrepareAndFinishMsgDUnitTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.backup;
-
-import java.io.IOException;
-
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-
-public class ReplicateBackupPrepareAndFinishMsgDUnitTest
-    extends BackupPrepareAndFinishMsgDUnitTest {
-
-  @Override
-  public Region<Integer, Integer> createRegion() throws IOException {
-    return createRegion(RegionShortcut.REPLICATE_PERSISTENT);
-  }
-}

Reply via email to