This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit a17071a08bee3fb54910a777f14cd6296a96619f Author: Kirk Lund <[email protected]> AuthorDate: Thu Jul 12 16:59:55 2018 -0700 GEODE-5405: Streamline Backup distributed tests --- .../cache/backup/BackupDistributedTest.java | 674 +++++----- .../backup/IncrementalBackupDistributedTest.java | 1283 +++++++------------- ...titionedBackupPrepareAndFinishMsgDUnitTest.java | 29 - ... => PrepareAndFinishBackupDistributedTest.java} | 179 ++- ...eplicateBackupPrepareAndFinishMsgDUnitTest.java | 29 - 5 files changed, 915 insertions(+), 1279 deletions(-) diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDistributedTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDistributedTest.java index ebc4372..bda648d 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDistributedTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDistributedTest.java @@ -14,27 +14,30 @@ */ package org.apache.geode.internal.cache.backup; +import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.commons.io.FileUtils.listFiles; import static org.apache.commons.io.filefilter.DirectoryFileFilter.DIRECTORY; -import static org.apache.geode.test.dunit.Host.getHost; +import static org.apache.geode.cache.EvictionAction.OVERFLOW_TO_DISK; +import static org.apache.geode.cache.EvictionAttributes.createLIFOEntryAttributes; +import static org.apache.geode.cache.RegionShortcut.PARTITION; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT; +import static org.apache.geode.cache.RegionShortcut.REPLICATE; +import static org.apache.geode.internal.admin.remote.AdminFailureResponse.create; +import static org.apache.geode.test.dunit.Disconnect.disconnectFromDS; import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; +import static org.apache.geode.test.dunit.VM.getVM; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import java.io.BufferedReader; import java.io.File; import java.io.IOException; -import java.io.InputStreamReader; +import java.io.Serializable; import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.HashSet; +import java.util.List; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -46,22 +49,20 @@ import junitparams.naming.TestCaseName; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.RegexFileFilter; import org.apache.logging.log4j.Logger; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheClosedException; -import org.apache.geode.cache.DiskStore; -import org.apache.geode.cache.EvictionAction; +import org.apache.geode.cache.DiskStoreFactory; import org.apache.geode.cache.EvictionAttributes; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.PartitionedRegionStorageException; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; -import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.control.RebalanceOperation; import org.apache.geode.cache.control.RebalanceResults; import org.apache.geode.cache.persistence.PartitionOfflineException; @@ -72,16 +73,20 @@ import org.apache.geode.distributed.internal.DistributionMessageObserver; import org.apache.geode.distributed.internal.ReplyMessage; import org.apache.geode.internal.admin.remote.AdminFailureResponse; import org.apache.geode.internal.cache.DestroyRegionOperation.DestroyRegionMessage; -import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.partitioned.PersistentPartitionedRegionTestBase; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.lang.SystemUtils; import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.process.ProcessStreamReader; +import org.apache.geode.internal.process.ProcessStreamReader.ReadingMode; import org.apache.geode.management.BackupStatus; -import org.apache.geode.management.ManagementException; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.DistributedDiskDirRule; +import org.apache.geode.test.dunit.rules.DistributedTestRule; import org.apache.geode.test.junit.categories.DistributedTest; import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; /** * Additional tests to consider adding: @@ -94,115 +99,148 @@ import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolde @Category(DistributedTest.class) @RunWith(JUnitParamsRunner.class) @SuppressWarnings("serial") -public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { +public class BackupDistributedTest implements Serializable { private static final Logger logger = LogService.getLogger(); private static final int NUM_BUCKETS = 15; - @Rule - public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder(); + private String uniqueName; + private String regionName1; + private String regionName2; + private String regionName3; + + private File backupBaseDir; private VM vm0; private VM vm1; private VM vm2; private VM vm3; - private Map<VM, File> workingDirByVm; - private File backupBaseDir; + + private transient Process process; + private transient ProcessStreamReader processReader; + + @Rule + public DistributedTestRule distributedTestRule = new DistributedTestRule(); + + @Rule + public CacheRule cacheRule = new CacheRule(); + + @Rule + public DistributedDiskDirRule diskDirRule = new DistributedDiskDirRule(); + + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); @Before public void setUp() throws Exception { - vm0 = getHost(0).getVM(0); - vm1 = getHost(0).getVM(1); - vm2 = getHost(0).getVM(2); - vm3 = getHost(0).getVM(3); + vm0 = getVM(0); + vm1 = getVM(1); + vm2 = getVM(2); + vm3 = getVM(3); - workingDirByVm = new HashMap<>(); - workingDirByVm.put(vm0, tempDir.newFolder()); - workingDirByVm.put(vm1, tempDir.newFolder()); - workingDirByVm.put(vm2, tempDir.newFolder()); - workingDirByVm.put(vm3, tempDir.newFolder()); + uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName(); + regionName1 = uniqueName + "_region1"; + regionName2 = uniqueName + "_region2"; + regionName3 = uniqueName + "_region3"; - backupBaseDir = tempDir.newFolder("backupDir"); + backupBaseDir = temporaryFolder.newFolder("backupDir"); } - @Override - public final void preTearDownCacheTestCase() throws Exception { + @After + public void tearDown() throws Exception { vm0.invoke(() -> { DistributionMessageObserver.setInstance(null); disconnectFromDS(); }); + + if (process != null && process.isAlive()) { + process.destroyForcibly(); + process.waitFor(2, MINUTES); + } + if (processReader != null && processReader.isRunning()) { + processReader.stop(); + } } @Test public void testBackupPR() throws Exception { - createPersistentRegions(); + createPersistentRegions(vm0, vm1); - long lastModified0 = setBackupFiles(vm0); - long lastModified1 = setBackupFiles(vm1); + long lastModified0 = vm0.invoke(() -> setBackupFiles(getDiskDirFor(vm0))); + long lastModified1 = vm1.invoke(() -> setBackupFiles(getDiskDirFor(vm1))); - createData(); + vm0.invoke(() -> { + createData(0, 5, "A", regionName1); + createData(0, 5, "B", regionName2); + }); vm0.invoke(() -> { assertThat(getCache().getDistributionManager().getNormalDistributionManagerIds()).hasSize(2); }); vm2.invoke(() -> { - getCache(); assertThat(getCache().getDistributionManager().getNormalDistributionManagerIds()).hasSize(3); }); - BackupStatus status = backupMember(vm2); + BackupStatus status = vm2.invoke(() -> backupMember()); assertThat(status.getBackedUpDiskStores()).hasSize(2); assertThat(status.getOfflineDiskStores()).isEmpty(); Collection<File> files = FileUtils.listFiles(backupBaseDir, new String[] {"txt"}, true); assertThat(files).hasSize(4); - deleteOldUserUserFile(vm0); - deleteOldUserUserFile(vm1); + vm0.invoke(() -> deleteOldUserFile(getDiskDirFor(vm0))); + vm1.invoke(() -> deleteOldUserFile(getDiskDirFor(vm1))); + validateBackupComplete(); - createData(vm0, 0, 5, "C", "region1"); - createData(vm0, 0, 5, "C", "region2"); + vm0.invoke(() -> { + createData(0, 5, "C", regionName1); + createData(0, 5, "C", regionName2); + }); assertThat(status.getBackedUpDiskStores()).hasSize(2); assertThat(status.getOfflineDiskStores()).isEmpty(); - closeCache(vm0); - closeCache(vm1); + vm0.invoke(() -> getCache().close()); + vm1.invoke(() -> getCache().close()); // destroy the current data cleanDiskDirsInEveryVM(); restoreBackup(2); - createPersistentRegions(); + createPersistentRegions(vm0, vm1); - checkData(vm0, 0, 5, "A", "region1"); - checkData(vm0, 0, 5, "B", "region2"); - verifyUserFileRestored(vm0, lastModified0); - verifyUserFileRestored(vm1, lastModified1); - } + vm0.invoke(() -> { + validateData(0, 5, "A", regionName1); + validateData(0, 5, "B", regionName2); + }); - private void createData() { - createData(vm0, 0, 5, "A", "region1"); - createData(vm0, 0, 5, "B", "region2"); + vm0.invoke(() -> verifyUserFileRestored(getDiskDirFor(vm0), lastModified0)); + vm1.invoke(() -> verifyUserFileRestored(getDiskDirFor(vm1), lastModified1)); } /** * Test of bug 42419. * * <p> - * TRAC 42419: backed up disk stores map contains null key instead of member; cannot restore + * TRAC #42419: backed up disk stores map contains null key instead of member; cannot restore * backup files */ @Test public void testBackupFromMemberWithDiskStore() throws Exception { - createPersistentRegions(); + createPersistentRegions(vm0, vm1); - createData(); + vm0.invoke(() -> { + createData(0, 5, "A", regionName1); + createData(0, 5, "B", regionName2); + }); - BackupStatus status = backupMember(vm1); + BackupStatus status = vm1.invoke(() -> backupMember()); assertThat(status.getBackedUpDiskStores()).hasSize(2); for (DistributedMember key : status.getBackedUpDiskStores().keySet()) { @@ -212,41 +250,45 @@ public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { validateBackupComplete(); - closeCache(vm0); - closeCache(vm1); + vm0.invoke(() -> getCache().close()); + vm1.invoke(() -> getCache().close()); // destroy the current data cleanDiskDirsInEveryVM(); restoreBackup(2); - createPersistentRegions(); + createPersistentRegions(vm0, vm1); - checkData(vm0, 0, 5, "A", "region1"); - checkData(vm0, 0, 5, "B", "region2"); + vm0.invoke(() -> { + validateData(0, 5, "A", regionName1); + validateData(0, 5, "B", regionName2); + }); } /** * Test for bug 42419 * * <p> - * TRAC 42419: backed up disk stores map contains null key instead of member; cannot restore + * TRAC #42419: backed up disk stores map contains null key instead of member; cannot restore * backup files */ @Test public void testBackupWhileBucketIsCreated() throws Exception { - createPersistentRegion(vm0).await(); + vm0.invoke(() -> createRegions(vm0)); // create a bucket on vm0 - createData(vm0, 0, 1, "A", "region1"); + vm0.invoke(() -> createData(0, 1, "A", regionName1)); // create the pr on vm1, which won't have any buckets - createPersistentRegion(vm1).await(); + vm1.invoke(() -> createRegions(vm1)); CompletableFuture<BackupStatus> backupStatusFuture = - CompletableFuture.supplyAsync(() -> backupMember(vm2)); + CompletableFuture.supplyAsync(() -> vm2.invoke(() -> backupMember())); + CompletableFuture<Void> createDataFuture = - CompletableFuture.runAsync(() -> createData(vm0, 1, 5, "A", "region1")); + CompletableFuture.runAsync(() -> vm0.invoke(() -> createData(1, 5, "A", regionName1))); + CompletableFuture.allOf(backupStatusFuture, createDataFuture); BackupStatus status = backupStatusFuture.get(); @@ -255,29 +297,30 @@ public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { validateBackupComplete(); - createData(vm0, 0, 5, "C", "region1"); + vm0.invoke(() -> createData(0, 5, "C", regionName1)); assertThat(status.getBackedUpDiskStores()).hasSize(2); assertThat(status.getOfflineDiskStores()).isEmpty(); - closeCache(vm0); - closeCache(vm1); + vm0.invoke(() -> getCache().close()); + vm1.invoke(() -> getCache().close()); // destroy the current data cleanDiskDirsInEveryVM(); restoreBackup(2); - createPersistentRegions(); + createPersistentRegions(vm0, vm1); + + vm0.invoke(() -> validateData(0, 1, "A", regionName1)); - checkData(vm0, 0, 1, "A", "region1"); } /** * Test for bug 42420. Invoke a backup when a bucket is in the middle of being moved. * * <p> - * TRAC 42420: Online backup files sometimes cannot be restored + * TRAC #42420: Online backup files sometimes cannot be restored */ @Test @Parameters({"BEFORE_SENDING_DESTROYREGIONMESSAGE", "BEFORE_PROCESSING_REPLYMESSAGE"}) @@ -294,41 +337,36 @@ public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { DistributionMessageObserver.setInstance(createTestHookToBackup(whenToInvokeBackup)); }); - createPersistentRegion(vm0).await(); + vm0.invoke(() -> createRegions(vm0)); // create twos bucket on vm0 - createData(vm0, 0, 2, "A", "region1"); + vm0.invoke(() -> createData(0, 2, "A", regionName1)); // create the pr on vm1, which won't have any buckets - createPersistentRegion(vm1).await(); + vm1.invoke(() -> createRegions(vm1)); // Perform a rebalance. This will trigger the backup in the middle of the bucket move. vm0.invoke("Do rebalance", () -> { RebalanceOperation op = getCache().getResourceManager().createRebalanceFactory().start(); - RebalanceResults results; - try { - results = op.getResults(); - assertEquals(1, results.getTotalBucketTransfersCompleted()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + RebalanceResults results = op.getResults(); + assertThat(results.getTotalBucketTransfersCompleted()).isEqualTo(1); }); validateBackupComplete(); - createData(vm0, 0, 5, "C", "region1"); + vm0.invoke(() -> createData(0, 5, "C", regionName1)); - closeCache(vm0); - closeCache(vm1); + vm0.invoke(() -> getCache().close()); + vm1.invoke(() -> getCache().close()); // Destroy the current data cleanDiskDirsInEveryVM(); restoreBackup(2); - createPersistentRegions(); + createPersistentRegions(vm0, vm1); - checkData(vm0, 0, 2, "A", "region1"); + vm0.invoke(() -> validateData(0, 2, "A", regionName1)); } @Test @@ -345,14 +383,18 @@ public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { createTestHookToThrowIOExceptionBeforeProcessingPrepareBackupRequest(exceptionMessage)); }); - createPersistentRegions(); + createPersistentRegions(vm0, vm1); - createData(); + vm0.invoke(() -> { + createData(0, 5, "A", regionName1); + createData(0, 5, "B", regionName2); + }); - assertThatThrownBy(() -> backupMember(vm2)).hasRootCauseInstanceOf(IOException.class); + assertThatThrownBy(() -> vm2.invoke(() -> backupMember())) + .hasRootCauseInstanceOf(IOException.class); // second backup should succeed because the observer and backup state has been cleared - BackupStatus status = backupMember(vm2); + BackupStatus status = vm2.invoke(() -> backupMember()); assertThat(status.getBackedUpDiskStores()).hasSize(2); assertThat(status.getOfflineDiskStores()).isEmpty(); } @@ -361,13 +403,16 @@ public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { * Make sure we don't report members without persistent data as backed up. */ @Test - public void testBackupOverflow() throws Exception { - createPersistentRegion(vm0).await(); - createOverflowRegion(vm1); + public void testBackupOverflow() { + vm0.invoke(() -> createRegions(vm0)); + vm1.invoke(() -> createRegionWithOverflow(getDiskStoreFor(vm1))); - createData(); + vm0.invoke(() -> { + createData(0, 5, "A", regionName1); + createData(0, 5, "B", regionName2); + }); - BackupStatus status = backupMember(vm2); + BackupStatus status = vm2.invoke(() -> backupMember()); assertThat(status.getBackedUpDiskStores()).hasSize(1); assertThat(status.getBackedUpDiskStores().values().iterator().next()).hasSize(2); assertThat(status.getOfflineDiskStores()).isEmpty(); @@ -376,62 +421,53 @@ public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { } @Test - public void testBackupPRWithOfflineMembers() throws Exception { - createPersistentRegion(vm0).await(); - createPersistentRegion(vm1).await(); - createPersistentRegion(vm2).await(); + public void testBackupPRWithOfflineMembers() { + vm0.invoke(() -> createRegions(vm0)); + vm1.invoke(() -> createRegions(vm1)); + vm2.invoke(() -> createRegions(vm2)); - createData(); + vm0.invoke(() -> { + createData(0, 5, "A", regionName1); + createData(0, 5, "B", regionName2); + }); - closeCache(vm2); + vm1.invoke(() -> getCache().close()); - BackupStatus status = backupMember(vm3); + BackupStatus status = vm3.invoke(() -> backupMember()); assertThat(status.getBackedUpDiskStores()).hasSize(2); assertThat(status.getOfflineDiskStores()).hasSize(2); } - private DistributionMessageObserver createTestHookToBackup( - WhenToInvokeBackup backupInvocationTestHook) { - switch (backupInvocationTestHook) { - case BEFORE_SENDING_DESTROYREGIONMESSAGE: - return createTestHookToBackupBeforeSendingDestroyRegionMessage(() -> backupMember(vm2)); - case BEFORE_PROCESSING_REPLYMESSAGE: - return createTestHookToBackupBeforeProcessingReplyMessage(() -> backupMember(vm2)); - default: - throw new AssertionError("Invalid backupInvocationTestHook " + backupInvocationTestHook); - } - } - /** * Test what happens when we restart persistent members while there is an accessor concurrently * performing puts. */ @Test - public void testRecoverySystemWithConcurrentPutter() throws Throwable { - createColatedPersistentRegions(vm1).await(); - createColatedPersistentRegions(vm2).await(); - - createAccessor(vm0); + public void testRecoverySystemWithConcurrentPutter() throws Exception { + vm1.invoke(() -> createColocatedRegions(vm1)); + vm2.invoke(() -> createColocatedRegions(vm2)); - createData(vm0, 0, NUM_BUCKETS, "a", "region1"); - createData(vm0, 0, NUM_BUCKETS, "a", "region2"); + vm0.invoke(() -> createAccessor()); + vm0.invoke(() -> { + createData(0, NUM_BUCKETS, "a", regionName1); + createData(0, NUM_BUCKETS, "a", regionName2); + }); // backup the system. We use this to get a snapshot of vm1 and vm2 // when they both are online. Recovering from this backup simulates - // a simulataneous kill and recovery. - backupMember(vm3); + // a simultaneous kill and recovery. + vm3.invoke(() -> backupMember()); - closeCache(vm1); - closeCache(vm2); + vm1.invoke(() -> getCache().close()); + vm2.invoke(() -> getCache().close()); cleanDiskDirsInEveryVM(); restoreBackup(2); // in vm0, start doing a bunch of concurrent puts. - AsyncInvocation async0 = vm0.invokeAsync(() -> { - Cache cache = getCache(); - Region region = cache.getRegion("region1"); + AsyncInvocation putsInVM0 = vm0.invokeAsync(() -> { + Region region = getCache().getRegion(regionName1); try { for (int i = 0;; i++) { try { @@ -445,30 +481,46 @@ public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { } }); - AsyncInvocation async1 = createColatedPersistentRegions(vm1); - AsyncInvocation async2 = createColatedPersistentRegions(vm2); - async1.await(); - async2.await(); + AsyncInvocation createRegionsInVM1 = vm1.invokeAsync(() -> createColocatedRegions(vm1)); + AsyncInvocation createRegionsInVM2 = vm2.invokeAsync(() -> createColocatedRegions(vm2)); + + createRegionsInVM1.await(); + createRegionsInVM2.await(); // close the cache in vm0 to stop the async puts. - closeCache(vm0); + vm0.invoke(() -> getCache().close()); // make sure we didn't get an exception - async0.await(); + putsInVM0.await(); + } + + private DistributionMessageObserver createTestHookToBackup( + WhenToInvokeBackup backupInvocationTestHook) { + switch (backupInvocationTestHook) { + case BEFORE_SENDING_DESTROYREGIONMESSAGE: + return createTestHookToBackupBeforeSendingDestroyRegionMessage( + () -> vm2.invoke(() -> backupMember())); + case BEFORE_PROCESSING_REPLYMESSAGE: + return createTestHookToBackupBeforeProcessingReplyMessage( + () -> vm2.invoke(() -> backupMember())); + default: + throw new RuntimeException("Invalid backupInvocationTestHook " + backupInvocationTestHook); + } } private DistributionMessageObserver createTestHookToBackupBeforeProcessingReplyMessage( - Runnable task) { + final Runnable task) { return new DistributionMessageObserver() { - private volatile boolean done; + private final AtomicInteger count = new AtomicInteger(); private volatile int replyId = -0xBAD; + private volatile boolean done; @Override public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) { // the bucket move will send a destroy region message. if (message instanceof DestroyRegionMessage && !done) { - this.replyId = message.getProcessorId(); + replyId = message.getProcessorId(); } } @@ -484,8 +536,9 @@ public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { } private DistributionMessageObserver createTestHookToBackupBeforeSendingDestroyRegionMessage( - Runnable task) { + final Runnable task) { return new DistributionMessageObserver() { + private volatile boolean done; @Override @@ -499,26 +552,23 @@ public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { }; } - private void cleanDiskDirsInEveryVM() { - workingDirByVm.forEach((vm, file) -> { - try { - FileUtils.deleteDirectory(file); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + private void cleanDiskDirsInEveryVM() throws IOException { + FileUtils.deleteDirectory(getDiskDirFor(vm0)); + FileUtils.deleteDirectory(getDiskDirFor(vm1)); + FileUtils.deleteDirectory(getDiskDirFor(vm2)); + FileUtils.deleteDirectory(getDiskDirFor(vm3)); } private DistributionMessageObserver createTestHookToThrowIOExceptionBeforeProcessingPrepareBackupRequest( final String exceptionMessage) { return new DistributionMessageObserver() { + @Override public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { if (message instanceof PrepareBackupRequest) { DistributionMessageObserver.setInstance(null); IOException exception = new IOException(exceptionMessage); - AdminFailureResponse response = - AdminFailureResponse.create(message.getSender(), exception); + AdminFailureResponse response = create(message.getSender(), exception); response.setMsgId(((PrepareBackupRequest) message).getMsgId()); dm.putOutgoing(response); throw new RuntimeException("Stop processing"); @@ -527,227 +577,207 @@ public class BackupDistributedTest extends PersistentPartitionedRegionTestBase { }; } - private void createPersistentRegions() throws ExecutionException, InterruptedException { - AsyncInvocation create1 = createPersistentRegion(vm0); - AsyncInvocation create2 = createPersistentRegion(vm1); - create1.await(); - create2.await(); + private void createPersistentRegions(final VM... vms) + throws ExecutionException, InterruptedException { + Set<AsyncInvocation> createRegionAsyncs = new HashSet<>(); + for (VM vm : vms) { + createRegionAsyncs.add(vm.invokeAsync(() -> createRegions(vm))); + } + for (AsyncInvocation createRegion : createRegionAsyncs) { + createRegion.await(); + } } private void validateBackupComplete() { Pattern pattern = Pattern.compile(".*INCOMPLETE.*"); - File[] files = backupBaseDir.listFiles((dir1, name) -> pattern.matcher(name).matches()); - assertNotNull(files); - assertTrue(files.length == 0); + File[] files = backupBaseDir.listFiles((dir, name) -> pattern.matcher(name).matches()); + + assertThat(files).isNotNull().hasSize(0); } - private void deleteOldUserUserFile(final VM vm) { - vm.invoke(() -> { - File userDir = new File(workingDirByVm.get(vm), "userbackup-"); - FileUtils.deleteDirectory(userDir); - }); + private void deleteOldUserFile(final File dir) throws IOException { + File userDir = new File(dir, "userbackup-"); + FileUtils.deleteDirectory(userDir); } - private long setBackupFiles(final VM vm) { - return vm.invoke(() -> { - File workingDir = workingDirByVm.get(vm); - File test1 = new File(workingDir, "test1"); - File test2 = new File(test1, "test2"); - File mytext = new File(test2, "my.txt"); - final ArrayList<File> backuplist = new ArrayList<>(); - test2.mkdirs(); - Files.createFile(mytext.toPath()); - long lastModified = mytext.lastModified(); - backuplist.add(test2); + private long setBackupFiles(final File dir) throws IOException { + File dir1 = new File(dir, "test1"); + File dir2 = new File(dir1, "test2"); + dir2.mkdirs(); - GemFireCacheImpl cache = (GemFireCacheImpl) getCache(); - cache.setBackupFiles(backuplist); + File textFile = new File(dir2, "my.txt"); + Files.createFile(textFile.toPath()); - return lastModified; - }); - } + List<File> backupList = new ArrayList<>(); + backupList.add(dir2); + getCache().setBackupFiles(backupList); - private void verifyUserFileRestored(VM vm, final long lm) { - vm.invoke(() -> { - File workingDir = workingDirByVm.get(vm); - File test1 = new File(workingDir, "test1"); - File test2 = new File(test1, "test2"); - File mytext = new File(test2, "my.txt"); - assertTrue(mytext.exists()); - assertEquals(lm, mytext.lastModified()); - }); + return textFile.lastModified(); } - private AsyncInvocation createPersistentRegion(final VM vm) { - return vm.invokeAsync(() -> { - Cache cache = getCache(); - DiskStore diskStore1 = cache.createDiskStoreFactory() - .setDiskDirs(getDiskDirs(vm, "vm" + vm.getId() + "diskstores_1")).setMaxOplogSize(1) - .create(getUniqueName()); + private void verifyUserFileRestored(final File dir, final long expectedLastModified) { + File dir1 = new File(dir, "test1"); + File dir2 = new File(dir1, "test2"); + File textFile = new File(dir2, "my.txt"); - DiskStore diskStore2 = cache.createDiskStoreFactory() - .setDiskDirs(getDiskDirs(vm, "vm" + vm.getId() + "diskstores_2")).setMaxOplogSize(1) - .create(getUniqueName() + 2); - - RegionFactory regionFactory = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT) - .setPartitionAttributes(new PartitionAttributesFactory().setRedundantCopies(0).create()); + assertThat(textFile).exists(); + assertThat(textFile.lastModified()).isEqualTo(expectedLastModified); + } - regionFactory.setDiskStoreName(diskStore1.getName()).setDiskSynchronous(true) - .create("region1"); - regionFactory.setDiskStoreName(diskStore2.getName()).setDiskSynchronous(true) - .create("region2"); - }); + private void createRegions(final VM vm) { + createRegion(regionName1, getUniqueName() + "-1", getDiskStoreFor(vm, 1)); + createRegion(regionName2, getUniqueName() + "-2", getDiskStoreFor(vm, 2)); } - private AsyncInvocation createColatedPersistentRegions(final VM vm) { - return vm.invokeAsync(() -> { - Cache cache = getCache(); - DiskStore diskStore1 = cache.createDiskStoreFactory() - .setDiskDirs(getDiskDirs(vm, "vm" + vm.getId() + "diskstores_1")).setMaxOplogSize(1) - .create(getUniqueName()); + private void createRegion(final String regionName, final String diskStoreName, + final File diskDir) { + DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory(); + diskStoreFactory.setDiskDirs(toArray(diskDir)); + diskStoreFactory.setMaxOplogSize(1); - DiskStore diskStore2 = cache.createDiskStoreFactory() - .setDiskDirs(getDiskDirs(vm, "vm" + vm.getId() + "diskstores_2")).setMaxOplogSize(1) - .create(getUniqueName() + 2); + PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory(); + partitionAttributesFactory.setRedundantCopies(0); - RegionFactory regionFactory = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT) - .setPartitionAttributes(new PartitionAttributesFactory().setRedundantCopies(0).create()); + RegionFactory regionFactory = getCache().createRegionFactory(PARTITION_PERSISTENT); + regionFactory.setDiskStoreName(diskStoreFactory.create(diskStoreName).getName()); + regionFactory.setDiskSynchronous(true); + regionFactory.setPartitionAttributes(partitionAttributesFactory.create()); + regionFactory.create(regionName); + } - regionFactory.setDiskStoreName(diskStore1.getName()).setDiskSynchronous(true) - .create("region1"); - regionFactory.setDiskStoreName(diskStore2.getName()).setDiskSynchronous(true) - .setPartitionAttributes(new PartitionAttributesFactory().setRedundantCopies(0) - .setColocatedWith("region1").create()) - .create("region2"); - }); + private File[] toArray(final File file) { + return new File[] {file}; } - private void createOverflowRegion(final VM vm) { - vm.invoke(() -> { - Cache cache = getCache(); - DiskStore diskStore = cache.createDiskStoreFactory() - .setDiskDirs(getDiskDirs(vm, getUniqueName())).create(getUniqueName()); + private void createColocatedRegions(final VM vm) { + DiskStoreFactory diskStoreFactory1 = getCache().createDiskStoreFactory(); + diskStoreFactory1.setDiskDirs(toArray(getDiskStoreFor(vm, 1))); + diskStoreFactory1.setMaxOplogSize(1); - cache.createRegionFactory(RegionShortcut.REPLICATE).setDiskStoreName(diskStore.getName()) - .setDiskSynchronous(true) - .setEvictionAttributes( - EvictionAttributes.createLIFOEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK)) - .create("region3"); - }); - } + DiskStoreFactory diskStoreFactory2 = getCache().createDiskStoreFactory(); + diskStoreFactory2.setDiskDirs(toArray(getDiskStoreFor(vm, 2))); + diskStoreFactory2.setMaxOplogSize(1); - @Override - protected void createData(VM vm, final int startKey, final int endKey, final String value) { - createData(vm, startKey, endKey, value, getPartitionedRegionName()); - } + PartitionAttributesFactory partitionAttributesFactory1 = new PartitionAttributesFactory(); + partitionAttributesFactory1.setRedundantCopies(0); - @Override - protected void createData(VM vm, final int startKey, final int endKey, final String value, - final String regionName) { - vm.invoke(() -> { - Cache cache = getCache(); - Region region = cache.getRegion(regionName); + RegionFactory regionFactory1 = getCache().createRegionFactory(PARTITION_PERSISTENT); + regionFactory1.setPartitionAttributes(partitionAttributesFactory1.create()); + regionFactory1.setDiskStoreName(diskStoreFactory1.create(getUniqueName() + "-1").getName()); + regionFactory1.setDiskSynchronous(true); + regionFactory1.create(regionName1); - for (int i = startKey; i < endKey; i++) { - region.put(i, value); - } - }); - } + PartitionAttributesFactory partitionAttributesFactory2 = new PartitionAttributesFactory(); + partitionAttributesFactory2.setColocatedWith(regionName1); + partitionAttributesFactory2.setRedundantCopies(0); - @Override - protected void checkData(VM vm, final int startKey, final int endKey, final String value) { - checkData(vm, startKey, endKey, value, getPartitionedRegionName()); + RegionFactory regionFactory2 = getCache().createRegionFactory(PARTITION_PERSISTENT); + regionFactory2.setDiskStoreName(diskStoreFactory2.create(getUniqueName() + "-2").getName()); + regionFactory2.setDiskSynchronous(true); + regionFactory2.setPartitionAttributes(partitionAttributesFactory2.create()); + regionFactory2.create(regionName2); } - @Override - protected void checkData(VM vm, final int startKey, final int endKey, final String value, - final String regionName) { - vm.invoke(() -> { - Region region = getCache().getRegion(regionName); + private void createRegionWithOverflow(final File diskDir) { + DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory(); + diskStoreFactory.setDiskDirs(toArray(diskDir)); - for (int i = startKey; i < endKey; i++) { - assertEquals(value, region.get(i)); - } - }); - } + EvictionAttributes evictionAttributes = createLIFOEntryAttributes(1, OVERFLOW_TO_DISK); - @Override - protected void closeCache(final VM vm) { - vm.invoke(() -> getCache().close()); + RegionFactory regionFactory = getCache().createRegionFactory(REPLICATE); + regionFactory.setDiskStoreName(diskStoreFactory.create(getUniqueName()).getName()); + regionFactory.setDiskSynchronous(true); + regionFactory.setEvictionAttributes(evictionAttributes); + regionFactory.create(regionName3); } - @Override - protected Set<Integer> getBucketList(VM vm) { - return getBucketList(vm, getPartitionedRegionName()); - } + private void createData(final int startKey, final int endKey, final String value, + final String regionName) { + Region<Integer, String> region = getCache().getRegion(regionName); - @Override - protected Set<Integer> getBucketList(VM vm, final String regionName) { - return vm.invoke(() -> { - Cache cache = getCache(); - PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName); - return new TreeSet<>(region.getDataStore().getAllLocalBucketIds()); - }); + for (int i = startKey; i < endKey; i++) { + region.put(i, value); + } } - private File[] getDiskDirs(VM vm, String dsName) { - File[] diskStoreDirs = new File[1]; - diskStoreDirs[0] = new File(workingDirByVm.get(vm), dsName); - diskStoreDirs[0].mkdirs(); - return diskStoreDirs; + private void validateData(final int startKey, final int endKey, final String value, + final String regionName) { + Region region = getCache().getRegion(regionName); + + for (int i = startKey; i < endKey; i++) { + assertThat(region.get(i)).isEqualTo(value); + } } - private BackupStatus backupMember(final VM vm) { - return vm.invoke("backup", () -> { - try { - return new BackupOperation(getCache().getDistributionManager(), getCache()) - .backupAllMembers( - backupBaseDir.toString(), null); - } catch (ManagementException e) { - throw new RuntimeException(e); - } - }); + private BackupStatus backupMember() { + return new BackupOperation(getCache().getDistributionManager(), getCache()) + .backupAllMembers(backupBaseDir.toString(), null); } - protected void restoreBackup(final int expectedNumScripts) + private void restoreBackup(final int expectedScriptCount) throws IOException, InterruptedException { Collection<File> restoreScripts = listFiles(backupBaseDir, new RegexFileFilter(".*restore.*"), DIRECTORY); - assertThat(restoreScripts).hasSize(expectedNumScripts); + + assertThat(restoreScripts).hasSize(expectedScriptCount); + for (File script : restoreScripts) { - execute(script); + executeScript(script); } } - private void execute(final File script) throws IOException, InterruptedException { - ProcessBuilder processBuilder = new ProcessBuilder(script.getAbsolutePath()); - processBuilder.redirectErrorStream(true); - Process process = processBuilder.start(); + private void executeScript(final File script) throws IOException, InterruptedException { + process = new ProcessBuilder(script.getAbsolutePath()).redirectErrorStream(true).start(); - try (BufferedReader reader = - new BufferedReader(new InputStreamReader(process.getInputStream()))) { - String line; - while ((line = reader.readLine()) != null) { - logger.info("OUTPUT:" + line); - } - } + processReader = new ProcessStreamReader.Builder(process).inputStream(process.getInputStream()) + .inputListener(line -> logger.info("OUTPUT: {}", line)) + .readingMode(getReadingMode()).continueReadingMillis(2 * 1000).build().start(); - assertThat(process.waitFor()).isEqualTo(0); + assertThat(process.waitFor(5, MINUTES)).isTrue(); + assertThat(process.exitValue()).isEqualTo(0); } - private void createAccessor(VM vm) { - vm.invoke(() -> { - Cache cache = getCache(); + private void createAccessor() { + PartitionAttributesFactory partitionAttributesFactory1 = new PartitionAttributesFactory(); + partitionAttributesFactory1.setLocalMaxMemory(0); + partitionAttributesFactory1.setRedundantCopies(0); - cache.createRegionFactory(RegionShortcut.PARTITION) - .setPartitionAttributes( - new PartitionAttributesFactory().setRedundantCopies(0).setLocalMaxMemory(0).create()) - .create("region1"); - cache.createRegionFactory(RegionShortcut.PARTITION) - .setPartitionAttributes(new PartitionAttributesFactory().setColocatedWith("region1") - .setRedundantCopies(0).setLocalMaxMemory(0).create()) - .create("region2"); - }); + RegionFactory regionFactory1 = getCache().createRegionFactory(PARTITION); + regionFactory1.setPartitionAttributes(partitionAttributesFactory1.create()); + regionFactory1.create(regionName1); + + PartitionAttributesFactory partitionAttributesFactory2 = new PartitionAttributesFactory(); + partitionAttributesFactory2.setLocalMaxMemory(0); + partitionAttributesFactory2.setRedundantCopies(0); + partitionAttributesFactory2.setColocatedWith(regionName1); + + RegionFactory regionFactory2 = getCache().createRegionFactory(PARTITION); + regionFactory2.setPartitionAttributes(partitionAttributesFactory2.create()); + regionFactory2.create(regionName2); + } + + private InternalCache getCache() { + return cacheRule.getOrCreateCache(); + } + + private String getUniqueName() { + return uniqueName; + } + + private File getDiskStoreFor(final VM vm) { + return new File(getDiskDirFor(vm), getUniqueName()); + } + + private File getDiskStoreFor(final VM vm, final int which) { + return new File(getDiskDirFor(vm), "vm-" + vm.getId() + "-diskstores-" + which); + } + + private File getDiskDirFor(final VM vm) { + return diskDirRule.getDiskDirFor(vm); + } + + private ReadingMode getReadingMode() { + return SystemUtils.isWindows() ? ReadingMode.NON_BLOCKING : ReadingMode.BLOCKING; } enum WhenToInvokeBackup { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java index d1316f7..7374a81 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java @@ -14,24 +14,23 @@ */ package org.apache.geode.internal.cache.backup; -import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.commons.io.FileUtils.listFiles; +import static org.apache.commons.io.filefilter.DirectoryFileFilter.DIRECTORY; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT; +import static org.apache.geode.test.dunit.VM.getController; +import static org.apache.geode.test.dunit.VM.getVM; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.awaitility.Awaitility.await; -import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileFilter; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStreamReader; +import java.io.Serializable; import java.nio.file.Files; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -40,1026 +39,441 @@ import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.io.filefilter.DirectoryFileFilter; import org.apache.commons.io.filefilter.RegexFileFilter; import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.geode.admin.AdminDistributedSystem; import org.apache.geode.admin.internal.AdminDistributedSystemImpl; -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DiskStore; -import org.apache.geode.cache.PartitionAttributes; +import org.apache.geode.cache.DiskStoreFactory; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; -import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.persistence.PersistentID; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.internal.ClassPathLoader; import org.apache.geode.internal.DeployedJar; import org.apache.geode.internal.cache.DiskStoreImpl; -import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.lang.SystemUtils; import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.process.ProcessStreamReader; +import org.apache.geode.internal.process.ProcessStreamReader.ReadingMode; import org.apache.geode.internal.util.TransformUtils; import org.apache.geode.management.BackupStatus; -import org.apache.geode.management.ManagementException; import org.apache.geode.test.compiler.ClassBuilder; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.LogWriterUtils; -import org.apache.geode.test.dunit.SerializableCallable; -import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.DistributedDiskDirRule; +import org.apache.geode.test.dunit.rules.DistributedTestRule; import org.apache.geode.test.junit.categories.DistributedTest; import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; /** - * Tests for the incremental backup feature. + * Distributed tests for incremental backup. */ @Category(DistributedTest.class) @SuppressWarnings("serial") -public class IncrementalBackupDistributedTest extends JUnit4CacheTestCase { - +public class IncrementalBackupDistributedTest implements Serializable { private static final Logger logger = LogService.getLogger(); - /** - * Data load increment. - */ - private static final int DATA_INCREMENT = 10000; - - /** - * Start value for data load. - */ - private int dataStart = 0; - - /** - * End value for data load. - */ - private int dataEnd = this.dataStart + DATA_INCREMENT; - - /** - * Regular expression used to search for member operation log files. - */ - private static final String OPLOG_REGEX = ".*\\.[kdc]rf$"; - - @Rule - public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder(); - - private final Map<Integer, File> baseDirectoryByVm = new HashMap<>(); - - /** - * Creates test regions for a member. - */ - private void createRegions(File baseDirectory, int vmNumber) throws IOException { - Cache cache = getCache(new CacheFactory().set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel())); - cache.createDiskStoreFactory().setDiskDirs(getDiskDirectory(baseDirectory, vmNumber)) - .create("fooStore"); - cache.createDiskStoreFactory().setDiskDirs(getDiskDirectory(baseDirectory, vmNumber)) - .create("barStore"); - getRegionFactory(cache).setDiskStoreName("fooStore").create("fooRegion"); - getRegionFactory(cache).setDiskStoreName("barStore").create("barRegion"); - } - - private File[] getDiskDirectory(File parent, int vmNumber) throws IOException { - File dir = new File(parent, "disk" + String.valueOf(vmNumber)).getAbsoluteFile(); - dir.mkdirs(); - return new File[] {dir}; - } - - private RegionFactory<Integer, String> getRegionFactory(Cache cache) { - PartitionAttributes<Integer, String> attributes = - new PartitionAttributesFactory<Integer, String>().setTotalNumBuckets(5).create(); - RegionFactory<Integer, String> factory = - cache.<Integer, String>createRegionFactory(RegionShortcut.PARTITION_PERSISTENT) - .setPartitionAttributes(attributes); - return factory; - } - - /** - * A FileFilter that looks for a timestamped gemfire backup directory. - */ - private static final FileFilter backupDirFilter = file -> { - // This will break in about 90 years... - return file.isDirectory() && file.getName().startsWith("20"); - }; - - /** - * @return the baseline backup directory. - */ - private static File getBaselineDir() { - File tmpDir = new File(System.getProperty("java.io.tmpdir")); - File dir = new File(tmpDir, "baseline"); - if (!dir.exists()) { - dir.mkdirs(); - } - - return dir; - } - - /** - * @return the second incremental backup directory. - */ - private static File getIncremental2Dir() { - File tmpDir = new File(System.getProperty("java.io.tmpdir")); - File dir = new File(tmpDir, "incremental2"); - if (!dir.exists()) { - dir.mkdirs(); - } - - return dir; - } - - /** - * @return the incremental backup directory. - */ - private static File getIncrementalDir() { - File tmpDir = new File(System.getProperty("java.io.tmpdir")); - File dir = new File(tmpDir, "incremental"); - if (!dir.exists()) { - dir.mkdirs(); - } - - return dir; - } - - /** - * Invokes {@link AdminDistributedSystem#getMissingPersistentMembers()} on a member. - * - * @param vm a member of the distributed system. - * @return a set of missing members for the distributed system. - */ - @SuppressWarnings("unchecked") - private Set<PersistentID> getMissingMembers(VM vm) { - return (Set<PersistentID>) vm.invoke(new SerializableCallable("getMissingMembers") { - @Override - public Object call() { - return AdminDistributedSystemImpl - .getMissingPersistentMembers(getSystem().getDistributionManager()); - } - }); - } + private static final int DATA_INCREMENT = 10_000; + private static final RegexFileFilter OPLOG_FILTER = new RegexFileFilter(".*\\.[kdc]rf$"); - private BackupStatus baseline(VM vm) { - return vm.invoke(() -> { - try { - return new BackupOperation(getSystem().getDistributionManager(), getCache()) - .backupAllMembers( - getBaselineDir().toString(), null); - } catch (ManagementException e) { - throw new RuntimeException(e); - } - }); - } + private int dataStart; + private int dataEnd = dataStart + DATA_INCREMENT; - private BackupStatus incremental(VM vm) { - return vm.invoke(() -> { - try { - return new BackupOperation(getSystem().getDistributionManager(), getCache()) - .backupAllMembers( - getIncrementalDir().toString(), getBaselineBackupDir().toString()); - } catch (ManagementException e) { - throw new RuntimeException(e); - } - }); - } + private String uniqueName; + private String diskStoreName1; + private String diskStoreName2; + private String regionName1; + private String regionName2; - private BackupStatus incremental2(VM vm) { - return vm.invoke(() -> { - try { - return new BackupOperation(getSystem().getDistributionManager(), getCache()) - .backupAllMembers( - getIncremental2Dir().toString(), getIncrementalBackupDir().toString()); - } catch (ManagementException e) { - throw new RuntimeException(e); - } - }); - } + private VM vm0; + private VM vm1; - /** - * Invokes {@link DistributedSystem#getDistributedMember()} on a member. - * - * @param vm a distributed system member. - * @return the member's id. - */ - private String getMemberId(VM vm) { - return vm.invoke(() -> getCache().getDistributedSystem().getDistributedMember().toString() - .replaceAll("[^\\w]+", "_")); - } + private transient Process process; + private transient ProcessStreamReader processReader; - /** - * Invokes {@link Cache#close()} on a member. - */ - private void closeCache(final VM closeVM) { - closeVM.invoke(new SerializableRunnable() { - @Override - public void run() { - getCache().close(); - } - }); - } + @Rule + public DistributedTestRule distributedTestRule = new DistributedTestRule(); - /** - * Locates the PersistentID for the testStore disk store for a distributed member. - * - * @param vm a distributed member. - * @return a PersistentID for a member's disk store. - */ - private PersistentID getPersistentID(final VM vm, final String diskStoreName) { - return vm.invoke(() -> { - PersistentID id = null; - Collection<DiskStore> diskStores = ((InternalCache) getCache()).listDiskStores(); - for (DiskStore diskStore : diskStores) { - if (diskStore.getName().equals(diskStoreName)) { - id = ((DiskStoreImpl) diskStore).getPersistentID(); - break; - } - } - return id; - }); - } + @Rule + public CacheRule cacheRule = new CacheRule(); - /** - * Locates the PersistentID for the testStore disk store for a distributed member. - * - * @param vm a distributed member. - * @return a PersistentID for a member's disk store. - */ - private PersistentID getPersistentID(final VM vm) { - return getPersistentID(vm, "fooStore"); - } + @Rule + public DistributedDiskDirRule diskDirRule = new DistributedDiskDirRule(); - /** - * Invokes {@link DistributedSystem#disconnect()} on a member. - * - * @param disconnectVM a member of the distributed system to disconnect. - * @param testVM a member of the distributed system to test for the missing member (just - * disconnected). - */ - private PersistentID disconnect(final VM disconnectVM, final VM testVM) { - final PersistentID id = disconnectVM.invoke(() -> { - PersistentID persistentID = null; - Collection<DiskStore> diskStores = getCache().listDiskStores(); - for (DiskStore diskStore : diskStores) { - if (diskStore.getName().equals("fooStore")) { - persistentID = ((DiskStoreImpl) diskStore).getPersistentID(); - break; - } - } + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); - getSystem().disconnect(); + @Rule + public SerializableTestName testName = new SerializableTestName(); - return persistentID; - }); + @Before + public void setUp() throws Exception { + vm0 = getVM(0); + vm1 = getVM(1); - final Set<PersistentID> missingMembers = new HashSet<>(); - Wait.waitForCriterion(new WaitCriterion() { - @Override - public boolean done() { - missingMembers.clear(); - missingMembers.addAll(getMissingMembers(testVM)); + uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName(); - return missingMembers.contains(id); - } + diskStoreName1 = uniqueName + "_diskStore-1"; + diskStoreName2 = uniqueName + "_diskStore-2"; + regionName1 = uniqueName + "_region-1"; + regionName2 = uniqueName + "_region-2"; - @Override - public String description() { - return "[IncrementalBackupDistributedTest] Waiting for missing member " + id; - } - }, 10000, 500, false); + vm0.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm0))); + vm1.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm1))); - return id; - } + createCache(diskDirRule.getDiskDirFor(getController())); - /** - * Invokes {@link CacheFactory#create()} on a member. - * - * @param vm a member of the distributed system. - */ - private void openCache(VM vm) throws IOException { - int vmNumber = vm.getId(); - File vmDir = getBaseDir(vmNumber); - vm.invoke(() -> createRegions(vmDir, vmNumber)); + performPuts(); } - private File getBaseDir(int vmNumber) throws IOException { - File baseDir = baseDirectoryByVm.get(vmNumber); - if (baseDir == null) { - baseDir = tempDir.newFolder("vm" + vmNumber); - baseDirectoryByVm.put(vmNumber, baseDir); + @After + public void tearDown() throws Exception { + if (process != null && process.isAlive()) { + process.destroyForcibly(); + process.waitFor(2, MINUTES); } - return baseDir; - } - - /** - * Blocks and waits for a backup operation to finish on a distributed member. - * - * @param vm a member of the distributed system. - */ - private void waitForBackup(VM vm) { - vm.invoke(new SerializableRunnable() { - @Override - public void run() { - Collection<DiskStore> backupInProgress = ((InternalCache) getCache()).listDiskStores(); - List<DiskStoreImpl> backupCompleteList = new LinkedList<>(); - - while (backupCompleteList.size() < backupInProgress.size()) { - for (DiskStore diskStore : backupInProgress) { - if (((DiskStoreImpl) diskStore).getInProgressBackup() == null - && !backupCompleteList.contains(diskStore)) { - backupCompleteList.add((DiskStoreImpl) diskStore); - } - } - } - } - }); - } - - /** - * Performs a full backup. - * - * @return the backup status. - */ - private BackupStatus performBaseline() { - return baseline(Host.getHost(0).getVM(1)); - } - - /** - * Performs an incremental backup. - * - * @return the backup status. - */ - private BackupStatus performIncremental() { - return incremental(Host.getHost(0).getVM(1)); - } - - /** - * Performs a second incremental backup. - */ - private BackupStatus performIncremental2() { - return incremental2(Host.getHost(0).getVM(1)); - } - - /** - * @return the directory for the completed baseline backup. - */ - private static File getBaselineBackupDir() { - File[] dirs = getBaselineDir().listFiles(backupDirFilter); - assertEquals(1, dirs.length); - return dirs[0]; - } - - /** - * @return the directory for the completed baseline backup. - */ - private static File getIncrementalBackupDir() { - File[] dirs = getIncrementalDir().listFiles(backupDirFilter); - assertEquals(1, dirs.length); - return dirs[0]; - } - - /** - * Returns an individual member's backup directory. - * - * @param rootDir the directory to begin searching for the member's backup dir from. - * @param memberId the member's identifier. - * @return the member's backup directory. - */ - private File getBackupDirForMember(final File rootDir, final String memberId) { - File[] dateDirs = rootDir.listFiles(backupDirFilter); - assertEquals(1, dateDirs.length); - - File[] memberDirs = dateDirs[0].listFiles(new FileFilter() { - @Override - public boolean accept(File file) { - return file.isDirectory() && file.getName().contains(memberId); - } - }); - - assertEquals(1, memberDirs.length); - - return memberDirs[0]; - } - - /** - * Adds the data region to every participating VM. - */ - @SuppressWarnings("serial") - private void createDataRegions() throws IOException { - Host host = Host.getHost(0); - int numberOfVms = host.getVMCount(); - - for (int i = 0; i < numberOfVms; ++i) { - openCache(host.getVM(i)); + if (processReader != null && processReader.isRunning()) { + processReader.stop(); } } /** - * Executes a shell command in an external process. - * - * @param command a shell command. - * @return the exit value of processing the shell command. - */ - private int execute(String command) throws IOException, InterruptedException { - final ProcessBuilder builder = new ProcessBuilder(command); - builder.redirectErrorStream(true); - final Process process = builder.start(); - - /* - * Consume standard out. - */ - new Thread(new Runnable() { - @Override - public void run() { - - try { - BufferedReader reader = - new BufferedReader(new InputStreamReader(process.getInputStream())); - String line; - - do { - line = reader.readLine(); - } while (null != line); - - reader.close(); - } catch (IOException e) { - logger.info(e); - } - } - }).start(); - - /* - * Consume standard error. - */ - new Thread(new Runnable() { - @Override - public void run() { - try { - BufferedReader reader = - new BufferedReader(new InputStreamReader(process.getErrorStream())); - String line; - - do { - line = reader.readLine(); - } while (null != line); - - reader.close(); - } catch (IOException e) { - logger.info(e); - } - } - }).start(); - - return process.waitFor(); - } - - /** - * Peforms an operation log restore for a member. - * - * @param backupDir the member's backup directory containing the restore script. - */ - private void performRestore(File memberDir, File backupDir) - throws IOException, InterruptedException { - /* - * The restore script will not restore if there is an if file in the copy to directory. Remove - * these files first. - */ - Collection<File> ifFiles = FileUtils.listFiles(memberDir, new RegexFileFilter(".*\\.if$"), - DirectoryFileFilter.DIRECTORY); - for (File file : ifFiles) { - file.delete(); - } - - /* - * Remove all operation logs. - */ - Collection<File> oplogs = FileUtils.listFiles(memberDir, new RegexFileFilter(OPLOG_REGEX), - DirectoryFileFilter.DIRECTORY); - for (File file : oplogs) { - file.delete(); - } - - /* - * Get a hold of the restore script and make sure it is there. - */ - File restoreScript = new File(backupDir, "restore.sh"); - if (!restoreScript.exists()) { - restoreScript = new File(backupDir, "restore.bat"); - } - assertTrue(restoreScript.exists()); - - assertEquals(0, execute(restoreScript.getAbsolutePath())); - } - - /** - * Adds an incomplete marker to the baseline backup for a member. - * - * @param vm a distributed system member. - */ - private void markAsIncomplete(VM vm) throws IOException { - File backupDir = getBackupDirForMember(getBaselineDir(), getMemberId(vm)); - assertTrue(backupDir.exists()); - - File incomplete = new File(backupDir, BackupWriter.INCOMPLETE_BACKUP_FILE); - incomplete.createNewFile(); - } - - /** - * Loads additional data into the test regions. - */ - private void loadMoreData() { - Region<Integer, String> fooRegion = getCache().getRegion("fooRegion"); - - // Fill our region data - for (int i = this.dataStart; i < this.dataEnd; ++i) { - fooRegion.put(i, Integer.toString(i)); - } - - Region<Integer, String> barRegion = getCache().getRegion("barRegion"); - - // Fill our region data - for (int i = this.dataStart; i < this.dataEnd; ++i) { - barRegion.put(i, Integer.toString(i)); - } - - this.dataStart += DATA_INCREMENT; - this.dataEnd += DATA_INCREMENT; - } - - /** - * Used to confirm valid BackupStatus data. Confirms fix for defect #45657 - * - * @param backupStatus contains a list of members that were backed up. - */ - private void assertBackupStatus(final BackupStatus backupStatus) { - Map<DistributedMember, Set<PersistentID>> backupMap = backupStatus.getBackedUpDiskStores(); - assertFalse(backupMap.isEmpty()); - - for (DistributedMember member : backupMap.keySet()) { - for (PersistentID id : backupMap.get(member)) { - assertNotNull(id.getHost()); - assertNotNull(id.getUUID()); - assertNotNull(id.getDirectory()); - } - } - } - - /** - * 1. Add partitioned persistent region to all members. 2. Fills region with data. - */ - @Override - public final void postSetUp() throws Exception { - createDataRegions(); - File dir = getBaseDir(-1); - createRegions(dir, -1); - loadMoreData(); - } - - /** - * Removes backup directories (and all backup data). - */ - @Override - public final void preTearDownCacheTestCase() throws Exception { - FileUtils.deleteDirectory(getIncremental2Dir()); - FileUtils.deleteDirectory(getIncrementalDir()); - FileUtils.deleteDirectory(getBaselineDir()); - } - - /** - * This tests the basic features of incremental backup. This means that operation logs that are - * present in both the baseline and member's disk store should not be copied during the - * incremental backup. Additionally, the restore script should reference and copy operation logs - * from the baseline backup. + * This tests the basic features of performBackupIncremental backup. This means that operation + * logs that are present in both the performBackupBaseline and member's disk store should not be + * copied during the performBackupIncremental backup. Additionally, the restore script should + * reference and copy operation logs from the performBackupBaseline backup. */ @Test public void testIncrementalBackup() throws Exception { - String memberId = getMemberId(Host.getHost(0).getVM(1)); + String memberId = vm1.invoke(() -> getModifiedMemberId()); - File memberDir = baseDirectoryByVm.get(1); - // getVMDir(Host.getHost(0).getVM(1)); - assertNotNull(memberDir); + File memberDir = diskDirRule.getDiskDirFor(vm1); // Find all of the member's oplogs in the disk directory (*.crf,*.krf,*.drf) - Collection<File> memberOplogFiles = FileUtils.listFiles(memberDir, - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); - assertFalse(memberOplogFiles.isEmpty()); + Collection<File> memberOplogFiles = listFiles(memberDir, OPLOG_FILTER, DIRECTORY); + assertThat(memberOplogFiles).isNotEmpty(); // Perform a full backup and wait for it to finish - assertBackupStatus(performBaseline()); - waitForBackup(Host.getHost(0).getVM(1)); + validateBackupStatus(vm1.invoke(() -> performBackup(getBaselinePath()))); + vm1.invoke(() -> waitForBackup()); - // Find all of the member's oplogs in the baseline (*.crf,*.krf,*.drf) + // Find all of the member's oplogs in the performBackupBaseline (*.crf,*.krf,*.drf) Collection<File> memberBaselineOplogs = - FileUtils.listFiles(getBackupDirForMember(getBaselineDir(), memberId), - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); - assertFalse(memberBaselineOplogs.isEmpty()); + listFiles(getBackupDirForMember(getBaselineDir(), memberId), OPLOG_FILTER, DIRECTORY); + assertThat(memberBaselineOplogs).isNotEmpty(); List<String> memberBaselineOplogNames = new LinkedList<>(); TransformUtils.transform(memberBaselineOplogs, memberBaselineOplogNames, TransformUtils.fileNameTransformer); - // Peform and incremental backup and wait for it to finish - loadMoreData(); // Doing this preserves the new oplogs created by the baseline backup - assertBackupStatus(performIncremental()); - waitForBackup(Host.getHost(0).getVM(1)); + // Perform and performBackupIncremental backup and wait for it to finish + performPuts(); // This preserves the new oplogs created by the performBackupBaseline backup + validateBackupStatus( + vm1.invoke(() -> performBackup(getIncrementalPath(), getBaselineBackupPath()))); + vm1.invoke(() -> waitForBackup()); - // Find all of the member's oplogs in the incremental (*.crf,*.krf,*.drf) + // Find all of the member's oplogs in the performBackupIncremental (*.crf,*.krf,*.drf) Collection<File> memberIncrementalOplogs = - FileUtils.listFiles(getBackupDirForMember(getIncrementalDir(), memberId), - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); - assertFalse(memberIncrementalOplogs.isEmpty()); + listFiles(getBackupDirForMember(getIncrementalDir(), memberId), OPLOG_FILTER, DIRECTORY); + assertThat(memberIncrementalOplogs).isNotEmpty(); List<String> memberIncrementalOplogNames = new LinkedList<>(); TransformUtils.transform(memberIncrementalOplogs, memberIncrementalOplogNames, TransformUtils.fileNameTransformer); - /* - * Assert that the incremental backup does not contain baseline operation logs that the member - * still has copies of. - */ - for (String oplog : memberBaselineOplogNames) { - assertFalse(memberIncrementalOplogNames.contains(oplog)); - } + // Assert that the performBackupIncremental backup does not contain performBackupBaseline + // operation logs that the member still has copies of. + assertThat(memberIncrementalOplogNames).doesNotContainAnyElementsOf(memberBaselineOplogNames); + + // Perform a second performBackupIncremental and wait for it to finish. - // Perform a second incremental and wait for it to finish. - loadMoreData(); // Doing this preserves the new oplogs created by the incremental backup - assertBackupStatus(performIncremental2()); - waitForBackup(Host.getHost(0).getVM(1)); + // Doing this preserves the new oplogs created by the performBackupIncremental backup + performPuts(); + validateBackupStatus( + vm1.invoke(() -> performBackup(getIncremental2Path(), getIncrementalBackupPath()))); + vm1.invoke(() -> waitForBackup()); Collection<File> memberIncremental2Oplogs = - FileUtils.listFiles(getBackupDirForMember(getIncremental2Dir(), memberId), - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); - assertFalse(memberIncremental2Oplogs.isEmpty()); + listFiles(getBackupDirForMember(getIncremental2Dir(), memberId), OPLOG_FILTER, DIRECTORY); + assertThat(memberIncremental2Oplogs).isNotEmpty(); List<String> memberIncremental2OplogNames = new LinkedList<>(); TransformUtils.transform(memberIncremental2Oplogs, memberIncremental2OplogNames, TransformUtils.fileNameTransformer); - /* - * Assert that the second incremental backup does not contain operation logs copied into the - * baseline. - */ - for (String oplog : memberBaselineOplogNames) { - assertFalse(memberIncremental2OplogNames.contains(oplog)); - } + // Assert that the second performBackupIncremental backup does not contain operation logs copied + // into the performBackupBaseline. + assertThat(memberIncremental2OplogNames).doesNotContainAnyElementsOf(memberBaselineOplogNames); - /* - * Also assert that the second incremental backup does not contain operation logs copied into - * the member's first incremental backup. - */ - for (String oplog : memberIncrementalOplogNames) { - assertFalse(memberIncremental2OplogNames.contains(oplog)); - } + // Also assert that the second performBackupIncremental backup does not contain operation logs + // copied into the member's first performBackupIncremental backup. + assertThat(memberIncremental2OplogNames) + .doesNotContainAnyElementsOf(memberIncrementalOplogNames); // Shut down our member so we can perform a restore - PersistentID id = getPersistentID(Host.getHost(0).getVM(1)); - closeCache(Host.getHost(0).getVM(1)); + PersistentID id = vm1.invoke(() -> getPersistentID(diskStoreName1)); + vm1.invoke(() -> cacheRule.getCache().close()); // Execute the restore performRestore(new File(id.getDirectory()), getBackupDirForMember(getIncremental2Dir(), memberId)); - /* - * Collect all of the restored operation logs. - */ - Collection<File> restoredOplogs = FileUtils.listFiles(new File(id.getDirectory()), - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); - assertFalse(restoredOplogs.isEmpty()); + // Collect all of the restored operation logs. + Collection<File> restoredOplogs = + listFiles(new File(id.getDirectory()), OPLOG_FILTER, DIRECTORY); + assertThat(restoredOplogs).isNotEmpty(); + List<String> restoredOplogNames = new LinkedList<>(); TransformUtils.transform(restoredOplogs, restoredOplogNames, TransformUtils.fileNameTransformer); - /* - * Assert that baseline operation logs have been copied over to the member's disk directory. - */ - for (String oplog : memberBaselineOplogNames) { - assertTrue(restoredOplogNames.contains(oplog)); - } + // Assert that performBackupBaseline operation logs have been copied over to the member's disk + // directory. + assertThat(restoredOplogNames).containsAll(memberBaselineOplogNames); - /* - * Assert that the incremental operation logs have been copied over to the member's disk - * directory. - */ - for (String oplog : memberIncrementalOplogNames) { - assertTrue(restoredOplogNames.contains(oplog)); - } + // Assert that the performBackupIncremental operation logs have been copied over to the member's + // disk directory. + assertThat(restoredOplogNames).containsAll(memberIncrementalOplogNames); - /* - * Assert that the second incremental operation logs have been copied over to the member's disk - * directory. - */ - for (String oplog : memberIncremental2OplogNames) { - assertTrue(restoredOplogNames.contains(oplog)); - } + // Assert that the second performBackupIncremental operation logs have been copied over to the + // member's disk directory. + assertThat(restoredOplogNames).containsAll(memberIncremental2OplogNames); - /* - * Reconnect the member. - */ - openCache(Host.getHost(0).getVM(1)); + // Reconnect the member. + vm1.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm1))); } /** * Successful if a member performs a full backup when its backup data is not present in the - * baseline (for whatever reason). This also tests what happens when a member is offline during - * the baseline backup. + * performBackupBaseline (for whatever reason). This also tests what happens when a member is + * offline during the performBackupBaseline backup. * + * <p> * The test is regarded as successful when all of the missing members oplog files are backed up - * during an incremental backup. This means that the member peformed a full backup because its - * oplogs were missing in the baseline. + * during an performBackupIncremental backup. This means that the member performed a full backup + * because its oplogs were missing in the performBackupBaseline. */ @Test public void testMissingMemberInBaseline() throws Exception { - // Simulate the missing member by forcing a persistent member - // to go offline. - final PersistentID missingMember = - disconnect(Host.getHost(0).getVM(0), Host.getHost(0).getVM(1)); - - /* - * Perform baseline and make sure that the list of offline disk stores contains our missing - * member. - */ - BackupStatus baselineStatus = performBaseline(); - assertBackupStatus(baselineStatus); - assertNotNull(baselineStatus.getOfflineDiskStores()); - assertEquals(2, baselineStatus.getOfflineDiskStores().size()); + // Simulate the missing member by forcing a persistent member to go offline. + PersistentID missingMember = vm0.invoke(() -> getPersistentID(diskStoreName1)); + vm0.invoke(() -> cacheRule.getCache().close()); + + await().atMost(2, MINUTES) + .until(() -> vm1.invoke(() -> getMissingPersistentMembers().contains(missingMember))); + + // Perform performBackupBaseline and make sure that list of offline disk stores contains our + // missing member. + BackupStatus baselineStatus = vm1.invoke(() -> performBackup(getBaselinePath())); + validateBackupStatus(baselineStatus); + assertThat(baselineStatus.getOfflineDiskStores()).isNotNull().hasSize(2); // Find all of the member's oplogs in the missing member's diskstore directory structure // (*.crf,*.krf,*.drf) Collection<File> missingMemberOplogFiles = - FileUtils.listFiles(new File(missingMember.getDirectory()), - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); - assertFalse(missingMemberOplogFiles.isEmpty()); - - /* - * Restart our missing member and make sure it is back online and part of the distributed system - */ - openCache(Host.getHost(0).getVM(0)); - - /* - * After reconnecting make sure the other members agree that the missing member is back online. - */ - final Set<PersistentID> missingMembers = new HashSet<>(); - Wait.waitForCriterion(new WaitCriterion() { - @Override - public boolean done() { - missingMembers.clear(); - missingMembers.addAll(getMissingMembers(Host.getHost(0).getVM(1))); - return !missingMembers.contains(missingMember); - } + listFiles(new File(missingMember.getDirectory()), OPLOG_FILTER, DIRECTORY); + assertThat(missingMemberOplogFiles).isNotEmpty(); - @Override - public String description() { - return "[testMissingMemberInBasline] Wait for missing member."; - } - }, 10000, 500, false); + // Restart our missing member and make sure it is back online and part of the cluster + vm0.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm0))); - assertEquals(0, missingMembers.size()); + // After reconnecting make sure the other members agree that the missing member is back online. + await().atMost(2, MINUTES) + .until(() -> assertThat(getMissingPersistentMembers()).doesNotContain(missingMember)); - /* - * Peform incremental and make sure we have no offline disk stores. - */ - BackupStatus incrementalStatus = performIncremental(); - assertBackupStatus(incrementalStatus); - assertNotNull(incrementalStatus.getOfflineDiskStores()); - assertEquals(0, incrementalStatus.getOfflineDiskStores().size()); + // Perform performBackupIncremental and make sure we have no offline disk stores. + BackupStatus incrementalStatus = + vm1.invoke(() -> performBackup(getIncrementalPath(), getBaselineBackupPath())); + validateBackupStatus(incrementalStatus); + assertThat(incrementalStatus.getOfflineDiskStores()).isNotNull().isEmpty(); // Get the missing member's member id which is different from the PersistentID - String memberId = getMemberId(Host.getHost(0).getVM(0)); - assertNotNull(memberId); + String memberId = vm0.invoke(() -> getModifiedMemberId()); - // Get list of backed up oplog files in the incremental backup for the missing member + // Get list of backed up oplog files in the performBackupIncremental backup for the missing + // member File incrementalMemberDir = getBackupDirForMember(getIncrementalDir(), memberId); - Collection<File> backupOplogFiles = FileUtils.listFiles(incrementalMemberDir, - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); - assertFalse(backupOplogFiles.isEmpty()); + Collection<File> backupOplogFiles = listFiles(incrementalMemberDir, OPLOG_FILTER, DIRECTORY); + assertThat(backupOplogFiles).isNotEmpty(); // Transform missing member oplogs to just their file names. List<String> missingMemberOplogNames = new LinkedList<>(); TransformUtils.transform(missingMemberOplogFiles, missingMemberOplogNames, TransformUtils.fileNameTransformer); - // Transform missing member's incremental backup oplogs to just their file names. + // Transform missing member's performBackupIncremental backup oplogs to just their file names. List<String> backupOplogNames = new LinkedList<>(); TransformUtils.transform(backupOplogFiles, backupOplogNames, TransformUtils.fileNameTransformer); - /* - * Make sure that the incremental backup for the missing member contains all of the operation - * logs for that member. This proves that a full backup was performed for that member. - */ - assertTrue(backupOplogNames.containsAll(missingMemberOplogNames)); + // Make sure that the performBackupIncremental backup for the missing member contains all of the + // operation logs for that member. This proves that a full backup was performed for that member. + assertThat(backupOplogNames).containsAll(missingMemberOplogNames); } /** * Successful if a member performs a full backup if their backup is marked as incomplete in the - * baseline. + * performBackupBaseline. */ @Test public void testIncompleteInBaseline() throws Exception { - /* - * Get the member ID for VM 1 and perform a baseline. - */ - String memberId = getMemberId(Host.getHost(0).getVM(1)); - assertBackupStatus(performBaseline()); - - /* - * Find all of the member's oplogs in the baseline (*.crf,*.krf,*.drf) - */ + // Get the member ID for VM 1 and perform a performBackupBaseline. + String memberId = vm1.invoke(() -> getModifiedMemberId()); + validateBackupStatus(vm1.invoke(() -> performBackup(getBaselinePath()))); + + // Find all of the member's oplogs in the performBackupBaseline (*.crf,*.krf,*.drf) Collection<File> memberBaselineOplogs = - FileUtils.listFiles(getBackupDirForMember(getBaselineDir(), memberId), - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); - assertFalse(memberBaselineOplogs.isEmpty()); + listFiles(getBackupDirForMember(getBaselineDir(), memberId), OPLOG_FILTER, DIRECTORY); + assertThat(memberBaselineOplogs).isNotEmpty(); List<String> memberBaselineOplogNames = new LinkedList<>(); TransformUtils.transform(memberBaselineOplogs, memberBaselineOplogNames, TransformUtils.fileNameTransformer); - // Mark the baseline as incomplete (even though it really isn't) - markAsIncomplete(Host.getHost(0).getVM(1)); + vm1.invoke(() -> { + File backupDir = getBackupDirForMember(getBaselineDir(), getModifiedMemberId()); + assertThat(backupDir).exists(); + + // Mark the performBackupBaseline as incomplete (even though it really isn't) + File incomplete = new File(backupDir, BackupWriter.INCOMPLETE_BACKUP_FILE); + assertThat(incomplete.createNewFile()).isTrue(); + }); - /* - * Do an incremental. It should discover that the baseline is incomplete and backup all of the - * operation logs that are in the baseline. - */ - assertBackupStatus(performIncremental()); + // Do an performBackupIncremental. It should discover that the performBackupBaseline is + // incomplete and backup all of the operation logs that are in the performBackupBaseline. + validateBackupStatus( + vm1.invoke(() -> performBackup(getIncrementalPath(), getBaselineBackupPath()))); - /* - * Find all of the member's oplogs in the incremental (*.crf,*.krf,*.drf) - */ + // Find all of the member's oplogs in the performBackupIncremental (*.crf,*.krf,*.drf) Collection<File> memberIncrementalOplogs = - FileUtils.listFiles(getBackupDirForMember(getIncrementalDir(), memberId), - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); - assertFalse(memberIncrementalOplogs.isEmpty()); + listFiles(getBackupDirForMember(getIncrementalDir(), memberId), OPLOG_FILTER, DIRECTORY); + assertThat(memberIncrementalOplogs).isNotEmpty(); List<String> memberIncrementalOplogNames = new LinkedList<>(); TransformUtils.transform(memberIncrementalOplogs, memberIncrementalOplogNames, TransformUtils.fileNameTransformer); - /* - * Assert that all of the baseline operation logs are in the incremental backup. If so, then the - * incomplete marker was discovered in the baseline by the incremental backup process. - */ - for (String oplog : memberBaselineOplogNames) { - assertTrue(memberIncrementalOplogNames.contains(oplog)); - } + // Assert that all of the performBackupBaseline operation logs are in the + // performBackupIncremental backup. If so, then the incomplete marker was discovered in the + // performBackupBaseline by the performBackupIncremental backup process. + assertThat(memberIncrementalOplogNames).containsAll(memberBaselineOplogNames); } /** - * Successful if all members perform a full backup when they share the baseline directory and it - * is missing. + * Successful if all members perform a full backup when they share the performBackupBaseline + * directory and it is missing. */ @Test public void testMissingBaseline() throws Exception { - /* - * Get the member ID for VM 1 and perform a baseline. - */ - String memberId = getMemberId(Host.getHost(0).getVM(1)); - assertBackupStatus(performBaseline()); - - /* - * Find all of the member's oplogs in the baseline (*.crf,*.krf,*.drf) - */ + // Get the member ID for VM 1 and perform a performBackupBaseline. + String memberId = vm1.invoke(() -> getModifiedMemberId()); + validateBackupStatus(vm1.invoke(() -> performBackup(getBaselinePath()))); + + // Find all of the member's oplogs in the performBackupBaseline (*.crf,*.krf,*.drf) Collection<File> memberBaselineOplogs = - FileUtils.listFiles(getBackupDirForMember(getBaselineDir(), memberId), - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); - assertFalse(memberBaselineOplogs.isEmpty()); + listFiles(getBackupDirForMember(getBaselineDir(), memberId), OPLOG_FILTER, DIRECTORY); + assertThat(memberBaselineOplogs).isNotEmpty(); List<String> memberBaselineOplogNames = new LinkedList<>(); TransformUtils.transform(memberBaselineOplogs, memberBaselineOplogNames, TransformUtils.fileNameTransformer); - /* - * Custom incremental backup callable that retrieves the current baseline before deletion. - */ - SerializableCallable callable = new SerializableCallable("Backup all members.") { - private final File baselineDir = getBaselineBackupDir(); - - @Override - public Object call() { - try { - return new BackupOperation(getSystem().getDistributionManager(), getCache()) - .backupAllMembers( - getIncrementalDir().toString(), this.baselineDir.toString()); - - } catch (ManagementException e) { - throw new RuntimeException(e); - } - } - }; - - /* - * Do an incremental after deleting the baseline. It should discover that the baseline is gone - * and backup all of the operation logs that are in the baseline. - */ + // Do an performBackupIncremental after deleting the performBackupBaseline. It should discover + // that the performBackupBaseline is gone and backup all of the operation logs that are in the + // performBackupBaseline. FileUtils.deleteDirectory(getBaselineDir()); - Host.getHost(0).getVM(1).invoke(callable); - /* - * Find all of the member's oplogs in the incremental (*.crf,*.krf,*.drf) - */ + // Custom performBackupIncremental backup callable that retrieves the current + // performBackupBaseline before deletion. + vm1.invoke(() -> { + new BackupOperation(cacheRule.getSystem().getDistributionManager(), cacheRule.getCache()) + .backupAllMembers(getIncrementalPath(), getBaselinePath()); + }); + + // Find all of the member's oplogs in the performBackupIncremental (*.crf,*.krf,*.drf) Collection<File> memberIncrementalOplogs = - FileUtils.listFiles(getBackupDirForMember(getIncrementalDir(), memberId), - new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY); + listFiles(getBackupDirForMember(getIncrementalDir(), memberId), OPLOG_FILTER, DIRECTORY); assertThat(memberIncrementalOplogs).isNotEmpty(); List<String> memberIncrementalOplogNames = new LinkedList<>(); TransformUtils.transform(memberIncrementalOplogs, memberIncrementalOplogNames, TransformUtils.fileNameTransformer); - /* - * Assert that all of the baseline operation logs are in the incremental backup. If so, then the - * missing baseline was discovered by the incremental backup process. - */ - for (String oplog : memberBaselineOplogNames) { - assertTrue(memberIncrementalOplogNames.contains(oplog)); - } + // Assert that all of the performBackupBaseline operation logs are in the + // performBackupIncremental backup. If so, then the missing performBackupBaseline was discovered + // by the performBackupIncremental backup process. + assertThat(memberIncrementalOplogNames).containsAll(memberBaselineOplogNames); } /** - * Successful if a user deployed jar file is included as part of the backup. + * Verifies that a user deployed jar file is included as part of the backup. */ @Test public void testBackupUserDeployedJarFiles() throws Exception { - final String jarName = "BackupJarDeploymentDUnit"; - final String jarNameRegex = ".*" + jarName + ".*"; - final ClassBuilder classBuilder = new ClassBuilder(); - final byte[] classBytes = classBuilder.createJarFromName(jarName); + String jarName = "BackupJarDeploymentDUnit"; + byte[] classBytes = new ClassBuilder().createJarFromName(jarName); - File jarFile = tempDir.newFile(); + File jarFile = temporaryFolder.newFile(); IOUtils.copyLarge(new ByteArrayInputStream(classBytes), new FileOutputStream(jarFile)); - VM vm0 = Host.getHost(0).getVM(0); - - /* - * Deploy a "dummy" jar to the VM. - */ + // Deploy a "dummy" jar to the VM. File deployedJarFile = vm0.invoke(() -> { DeployedJar deployedJar = ClassPathLoader.getLatest().getJarDeployer().deploy(jarName, jarFile); return deployedJar.getFile(); }); - assertTrue(deployedJarFile.exists()); - /* - * Perform backup. Make sure it is successful. - */ - assertBackupStatus(baseline(vm0)); + assertThat(deployedJarFile).exists(); - /* - * Make sure the user deployed jar is part of the backup. - */ + // Perform backup. Make sure it is successful. + validateBackupStatus(vm0.invoke(() -> performBackup(getBaselinePath()))); + + // Make sure the user deployed jar is part of the backup. Collection<File> memberDeployedJarFiles = - FileUtils.listFiles(getBackupDirForMember(getBaselineDir(), getMemberId(vm0)), - new RegexFileFilter(jarNameRegex), DirectoryFileFilter.DIRECTORY); - assertFalse(memberDeployedJarFiles.isEmpty()); + listFiles(getBackupDirForMember(getBaselineDir(), vm0.invoke(() -> getModifiedMemberId())), + new RegexFileFilter(".*" + jarName + ".*"), DIRECTORY); + assertThat(memberDeployedJarFiles).isNotEmpty(); // Shut down our member so we can perform a restore - PersistentID id = getPersistentID(vm0); - closeCache(vm0); - - /* - * Get the VM's user directory. - */ - final String vmDir = vm0.invoke(() -> System.getProperty("user.dir")); + PersistentID id = vm0.invoke(() -> getPersistentID(diskStoreName1)); + vm0.invoke(() -> cacheRule.getCache().close()); - File backupDir = getBackupDirForMember(getBaselineDir(), getMemberId(vm0)); + // Get the VM's user directory. + String vmDir = vm0.invoke(() -> System.getProperty("user.dir")); - vm0.bounce(); + File backupDir = + getBackupDirForMember(getBaselineDir(), vm0.invoke(() -> getModifiedMemberId())); - /* - * Cleanup "dummy" jar from file system. - */ - Pattern pattern = Pattern.compile('^' + jarName + ".*#\\d++$"); - deleteMatching(new File("."), pattern); + // Cleanup "dummy" jar from file system. + deleteMatching(new File("."), Pattern.compile('^' + jarName + ".*#\\d++$")); // Execute the restore performRestore(new File(id.getDirectory()), backupDir); - /* - * Make sure the user deployed jar is part of the restore. - */ - Collection<File> restoredJars = FileUtils.listFiles(new File(vmDir), - new RegexFileFilter(jarNameRegex), DirectoryFileFilter.DIRECTORY); - assertFalse(restoredJars.isEmpty()); + // Make sure the user deployed jar is part of the restore. + Collection<File> restoredJars = + listFiles(new File(vmDir), new RegexFileFilter(".*" + jarName + ".*"), DIRECTORY); + assertThat(restoredJars).isNotEmpty(); + List<String> restoredJarNames = new LinkedList<>(); TransformUtils.transform(memberDeployedJarFiles, restoredJarNames, TransformUtils.fileNameTransformer); for (String name : restoredJarNames) { - assertTrue(name.contains(jarName)); + assertThat(name).contains(jarName); } // Restart the member - openCache(vm0); + vm0.invoke(() -> createCache(diskDirRule.getDiskDirFor(vm0))); - /* - * Remove the "dummy" jar from the VM. - */ + // Remove the "dummy" jar from the VM. vm0.invoke(() -> { for (DeployedJar jarClassLoader : ClassPathLoader.getLatest().getJarDeployer() .findDeployedJars()) { @@ -1069,16 +483,217 @@ public class IncrementalBackupDistributedTest extends JUnit4CacheTestCase { } }); - /* - * Cleanup "dummy" jar from file system. - */ - pattern = Pattern.compile('^' + jarName + ".*#\\d++$"); - deleteMatching(new File(vmDir), pattern); + // Cleanup "dummy" jar from file system. + deleteMatching(new File(vmDir), Pattern.compile('^' + jarName + ".*#\\d++$")); + } + + private void createCache(final File diskDir) { + cacheRule.getOrCreateCache(); + + createDiskStore(diskStoreName1, diskDir); + createDiskStore(diskStoreName2, diskDir); + + createRegion(regionName1, diskStoreName1); + createRegion(regionName2, diskStoreName2); + } + + private void createDiskStore(final String diskStoreName, final File diskDir) { + DiskStoreFactory diskStoreFactory = cacheRule.getCache().createDiskStoreFactory(); + diskStoreFactory.setDiskDirs(new File[] {diskDir}); + diskStoreFactory.create(diskStoreName); + } + + private void createRegion(final String regionName, final String diskStoreName) { + PartitionAttributesFactory<Integer, String> partitionAttributesFactory = + new PartitionAttributesFactory<>(); + partitionAttributesFactory.setTotalNumBuckets(5); + + RegionFactory<Integer, String> regionFactory = + cacheRule.getCache().createRegionFactory(PARTITION_PERSISTENT); + regionFactory.setDiskStoreName(diskStoreName); + regionFactory.setPartitionAttributes(partitionAttributesFactory.create()); + regionFactory.create(regionName); + } + + private File getBaselineDir() { + File dir = new File(temporaryFolder.getRoot(), "baseline"); + if (!dir.exists()) { + dir.mkdirs(); + } + + return dir; + } + + private String getBaselinePath() { + return getBaselineDir().getAbsolutePath(); + } + + private File getIncrementalDir() { + File dir = new File(temporaryFolder.getRoot(), "incremental"); + if (!dir.exists()) { + dir.mkdirs(); + } + + return dir; + } + + private String getIncrementalPath() { + return getIncrementalDir().getAbsolutePath(); + } + + private File getIncremental2Dir() { + File dir = new File(temporaryFolder.getRoot(), "incremental2"); + if (!dir.exists()) { + dir.mkdirs(); + } + + return dir; + } + + private String getIncremental2Path() { + return getIncremental2Dir().getAbsolutePath(); + } + + private Set<PersistentID> getMissingPersistentMembers() { + return AdminDistributedSystemImpl + .getMissingPersistentMembers(cacheRule.getCache().getDistributionManager()); + } + + private BackupStatus performBackup(final String targetDirPath) { + return performBackup(targetDirPath, null); + } + + private BackupStatus performBackup(final String targetDirPath, final String baselineDirPath) { + return new BackupOperation(cacheRule.getCache().getDistributionManager(), cacheRule.getCache()) + .backupAllMembers(targetDirPath, baselineDirPath); + } + + private String getModifiedMemberId() { + return cacheRule.getCache().getDistributedSystem().getDistributedMember().toString() + .replaceAll("[^\\w]+", "_"); + } + + private PersistentID getPersistentID(final String diskStoreName) { + for (DiskStore diskStore : cacheRule.getCache().listDiskStores()) { + if (diskStore.getName().equals(diskStoreName)) { + return ((DiskStoreImpl) diskStore).getPersistentID(); + } + } + throw new Error("Failed to find disk store " + diskStoreName); + } + + private void waitForBackup() { + Collection<DiskStore> backupInProgress = cacheRule.getCache().listDiskStores(); + List<DiskStoreImpl> backupCompleteList = new LinkedList<>(); + + while (backupCompleteList.size() < backupInProgress.size()) { + for (DiskStore diskStore : backupInProgress) { + if (((DiskStoreImpl) diskStore).getInProgressBackup() == null + && !backupCompleteList.contains(diskStore)) { + backupCompleteList.add((DiskStoreImpl) diskStore); + } + } + } + } + + private String getBaselineBackupPath() { + File[] dirs = getBaselineDir().listFiles((FileFilter) DIRECTORY); + assertThat(dirs).hasSize(1); + return dirs[0].getAbsolutePath(); + } + + private String getIncrementalBackupPath() { + File[] dirs = getIncrementalDir().listFiles((FileFilter) DIRECTORY); + assertThat(dirs).hasSize(1); + return dirs[0].getAbsolutePath(); + } + + private File getBackupDirForMember(final File rootDir, final String memberId) { + File[] dateDirs = rootDir.listFiles((FileFilter) DIRECTORY); + assertThat(dateDirs).hasSize(1); + + File[] memberDirs = + dateDirs[0].listFiles(file -> file.isDirectory() && file.getName().contains(memberId)); + assertThat(memberDirs).hasSize(1); + + return memberDirs[0]; + } + + private ReadingMode getReadingMode() { + return SystemUtils.isWindows() ? ReadingMode.NON_BLOCKING : ReadingMode.BLOCKING; + } + + private void execute(final String command) throws IOException, InterruptedException { + process = new ProcessBuilder(command).redirectErrorStream(true).start(); + + processReader = new ProcessStreamReader.Builder(process).inputStream(process.getInputStream()) + .inputListener(line -> logger.info("OUTPUT: {}", line)) + .readingMode(getReadingMode()).continueReadingMillis(2 * 1000).build().start(); + + assertThat(process.waitFor(5, MINUTES)).isTrue(); + assertThat(process.exitValue()).isEqualTo(0); + } + + private void performRestore(final File memberDir, final File backupDir) + throws IOException, InterruptedException { + // The restore script will not restore if there is an if file in the copy to directory. Remove + // these files first. + Collection<File> ifFiles = listFiles(memberDir, new RegexFileFilter(".*\\.if$"), DIRECTORY); + for (File file : ifFiles) { + assertThat(file.delete()).isTrue(); + } + + // Remove all operation logs. + Collection<File> oplogs = listFiles(memberDir, OPLOG_FILTER, DIRECTORY); + for (File file : oplogs) { + assertThat(file.delete()).isTrue(); + } + + // Get a hold of the restore script and make sure it is there. + File restoreScript = new File(backupDir, "restore.sh"); + if (!restoreScript.exists()) { + restoreScript = new File(backupDir, "restore.bat"); + } + assertThat(restoreScript).exists(); + + execute(restoreScript.getAbsolutePath()); + } + + private void performPuts() { + Region<Integer, String> region = cacheRule.getCache().getRegion(regionName1); + + // Fill our region data + for (int i = dataStart; i < dataEnd; ++i) { + region.put(i, Integer.toString(i)); + } + + Region<Integer, String> barRegion = cacheRule.getCache().getRegion(regionName2); + + // Fill our region data + for (int i = dataStart; i < dataEnd; ++i) { + barRegion.put(i, Integer.toString(i)); + } + + dataStart += DATA_INCREMENT; + dataEnd += DATA_INCREMENT; + } + + private void validateBackupStatus(final BackupStatus backupStatus) { + Map<DistributedMember, Set<PersistentID>> backupMap = backupStatus.getBackedUpDiskStores(); + assertThat(backupMap).isNotEmpty(); + + for (DistributedMember member : backupMap.keySet()) { + assertThat(backupMap.get(member)).isNotEmpty(); + for (PersistentID id : backupMap.get(member)) { + assertThat(id.getHost()).isNotNull(); + assertThat(id.getUUID()).isNotNull(); + assertThat(id.getDirectory()).isNotNull(); + } + } } - private void deleteMatching(File dir, final Pattern pattern) throws IOException { - Collection<File> files = - FileUtils.listFiles(dir, new RegexFileFilter(pattern), DirectoryFileFilter.DIRECTORY); + private void deleteMatching(final File dir, final Pattern pattern) throws IOException { + Collection<File> files = listFiles(dir, new RegexFileFilter(pattern), DIRECTORY); for (File file : files) { Files.delete(file.toPath()); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PartitionedBackupPrepareAndFinishMsgDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PartitionedBackupPrepareAndFinishMsgDUnitTest.java deleted file mode 100644 index 7ebbea8..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PartitionedBackupPrepareAndFinishMsgDUnitTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache.backup; - -import java.io.IOException; - -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionShortcut; - -public class PartitionedBackupPrepareAndFinishMsgDUnitTest - extends BackupPrepareAndFinishMsgDUnitTest { - - @Override - public Region<Integer, Integer> createRegion() throws IOException { - return createRegion(RegionShortcut.PARTITION_PERSISTENT); - } -} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupPrepareAndFinishMsgDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareAndFinishBackupDistributedTest.java similarity index 60% rename from geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupPrepareAndFinishMsgDUnitTest.java rename to geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareAndFinishBackupDistributedTest.java index 1ab9933..0c648af 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupPrepareAndFinishMsgDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/PrepareAndFinishBackupDistributedTest.java @@ -14,14 +14,15 @@ */ package org.apache.geode.internal.cache.backup; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT; +import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT; +import static org.apache.geode.test.dunit.VM.getController; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -41,7 +42,11 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Parameterized.UseParametersRunnerFactory; import org.apache.geode.cache.Cache; import org.apache.geode.cache.DiskStore; @@ -54,67 +59,92 @@ import org.apache.geode.cache.query.FunctionDomainException; import org.apache.geode.cache.query.NameResolutionException; import org.apache.geode.cache.query.QueryInvocationTargetException; import org.apache.geode.cache.query.TypeMismatchException; -import org.apache.geode.cache30.CacheTestCase; import org.apache.geode.distributed.internal.DistributionManager; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.DistributedDiskDirRule; +import org.apache.geode.test.dunit.rules.DistributedTestRule; import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; +import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; @Category({DistributedTest.class}) -public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase { +@RunWith(Parameterized.class) +@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class) +@SuppressWarnings({"serial", "unused"}) +public class PrepareAndFinishBackupDistributedTest { - // Although this test does not make use of other members, the current member needs to be - // a distributed member (rather than local) because it sends prepare and finish backup messages - private static final String TEST_REGION_NAME = "TestRegion"; + private String uniqueName; + private String regionName; + private Region<Integer, Integer> region; + + @Parameter + public RegionShortcut regionShortcut; + + @Parameters + public static Collection<RegionShortcut> data() { + return Arrays.asList(PARTITION_PERSISTENT, REPLICATE_PERSISTENT); + } @Rule - public TemporaryFolder tempDir = new TemporaryFolder(); + public DistributedTestRule distributedTestRule = new DistributedTestRule(); - private File[] diskDirs = null; - private Region<Integer, Integer> region; + @Rule + public CacheRule cacheRule = new CacheRule(); + + @Rule + public DistributedDiskDirRule diskDirRule = new DistributedDiskDirRule(); + + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); - protected abstract Region<Integer, Integer> createRegion() throws IOException; + @Rule + public SerializableTestName testName = new SerializableTestName(); @Before - public void setup() throws IOException { - region = createRegion(); + public void setUp() { + uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName(); + regionName = uniqueName + "_region"; + + region = createRegion(regionShortcut); } @Test - public void createWaitsForBackupTest() throws Throwable { + public void createWaitsForBackupTest() throws Exception { doActionAndVerifyWaitForBackup(() -> region.create(1, 1)); verifyKeyValuePair(1, 1); } @Test - public void putThatCreatesWaitsForBackupTest() throws Throwable { + public void putThatCreatesWaitsForBackupTest() throws Exception { doActionAndVerifyWaitForBackup(() -> region.put(1, 1)); verifyKeyValuePair(1, 1); } @Test - public void putWaitsForBackupTest() throws Throwable { + public void putWaitsForBackupTest() throws Exception { region.put(1, 1); doActionAndVerifyWaitForBackup(() -> region.put(1, 2)); verifyKeyValuePair(1, 2); } @Test - public void invalidateWaitsForBackupTest() throws Throwable { + public void invalidateWaitsForBackupTest() throws Exception { region.put(1, 1); doActionAndVerifyWaitForBackup(() -> region.invalidate(1)); verifyKeyValuePair(1, null); } @Test - public void destroyWaitsForBackupTest() throws Throwable { + public void destroyWaitsForBackupTest() throws Exception { region.put(1, 1); doActionAndVerifyWaitForBackup(() -> region.destroy(1)); - assertFalse(region.containsKey(1)); + assertThat(region).doesNotContainKey(1); } @Test - public void putAllWaitsForBackupTest() throws Throwable { + public void putAllWaitsForBackupTest() throws Exception { Map<Integer, Integer> entries = new HashMap<>(); entries.put(1, 1); entries.put(2, 2); @@ -125,13 +155,13 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase { } @Test - public void removeAllWaitsForBackupTest() throws Throwable { + public void removeAllWaitsForBackupTest() throws Exception { region.put(1, 1); region.put(2, 2); List<Integer> keys = Arrays.asList(1, 2); doActionAndVerifyWaitForBackup(() -> region.removeAll(keys)); - assertTrue(region.isEmpty()); + assertThat(region).isEmpty(); } @Test @@ -140,42 +170,77 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase { doReadActionsAndVerifyCompletion(); } + /** + * Create a region, installing the test hook in the backup lock + * + * @param shortcut The region shortcut to use to create the region + * @return The newly created region. + */ + private Region<Integer, Integer> createRegion(RegionShortcut shortcut) { + Cache cache = cacheRule.getOrCreateCache(); + + DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory(); + diskStoreFactory.setDiskDirs(new File[] {getDiskDir()}); + + DiskStore diskStore = diskStoreFactory.create(getUniqueName()); + + RegionFactory<Integer, Integer> regionFactory = cache.createRegionFactory(shortcut); + regionFactory.setDiskStoreName(diskStore.getName()); + regionFactory.setDiskSynchronous(true); + + if (shortcut.equals(PARTITION_PERSISTENT)) { + PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory(); + partitionAttributesFactory.setTotalNumBuckets(1); + regionFactory.setPartitionAttributes(partitionAttributesFactory.create()); + } + + return regionFactory.create(regionName); + } + private void doActionAndVerifyWaitForBackup(Runnable function) throws InterruptedException, TimeoutException, ExecutionException { - DistributionManager dm = GemFireCacheImpl.getInstance().getDistributionManager(); + DistributionManager dm = cacheRule.getCache().getDistributionManager(); Set recipients = dm.getOtherDistributionManagerIds(); + Properties backupProperties = new BackupConfigFactory() - .withTargetDirPath(diskDirs[0].toString()).createBackupProperties(); - Future<Void> future = null; + .withTargetDirPath(getDiskDir().toString()).createBackupProperties(); + new PrepareBackupStep(dm, dm.getId(), dm.getCache(), recipients, new PrepareBackupFactory(), backupProperties).send(); + ReentrantLock backupLock = ((LocalRegion) region).getDiskStore().getBackupLock(); - future = CompletableFuture.runAsync(function); + Future<Void> future = CompletableFuture.runAsync(function); Awaitility.await().atMost(5, TimeUnit.SECONDS) - .until(() -> assertTrue(backupLock.getQueueLength() > 0)); + .until(() -> assertThat(backupLock.getQueueLength()).isGreaterThanOrEqualTo(0)); + new FinishBackupStep(dm, dm.getId(), dm.getCache(), recipients, new FinishBackupFactory()) .send(); + future.get(5, TimeUnit.SECONDS); } private void doReadActionsAndVerifyCompletion() { - DistributionManager dm = GemFireCacheImpl.getInstance().getDistributionManager(); + DistributionManager dm = cacheRule.getCache().getDistributionManager(); Set recipients = dm.getOtherDistributionManagerIds(); + Properties backupProperties = new BackupConfigFactory() - .withTargetDirPath(diskDirs[0].toString()).createBackupProperties(); + .withTargetDirPath(getDiskDir().toString()).createBackupProperties(); + new PrepareBackupStep(dm, dm.getId(), dm.getCache(), recipients, new PrepareBackupFactory(), backupProperties).send(); + ReentrantLock backupLock = ((LocalRegion) region).getDiskStore().getBackupLock(); List<CompletableFuture<?>> futureList = doReadActions(); CompletableFuture.allOf(futureList.toArray(new CompletableFuture<?>[futureList.size()])); - assertTrue(backupLock.getQueueLength() == 0); + assertThat(backupLock.getQueueLength()).isEqualTo(0); + new FinishBackupStep(dm, dm.getId(), dm.getCache(), recipients, new FinishBackupFactory()) .send(); } private void verifyKeyValuePair(Integer key, Integer expectedValue) { - assertTrue(region.containsKey(key)); - assertEquals(expectedValue, region.get(key)); + assertThat(region).containsKey(key); + assertThat(region.get(key)).isEqualTo(expectedValue); } private List<CompletableFuture<?>> doReadActions() { @@ -183,15 +248,15 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase { actions.add(() -> region.get(1)); actions.add(() -> region.containsKey(1)); actions.add(() -> region.containsValue(1)); - actions.add(region::entrySet); - actions.add(this::valueExistsCheck); + actions.add(() -> region.entrySet()); + actions.add(() -> valueExistsCheck()); actions.add(() -> region.getAll(Collections.emptyList())); actions.add(() -> region.getEntry(1)); - actions.add(region::isEmpty); - actions.add(region::keySet); - actions.add(region::size); - actions.add(region::values); - actions.add(this::queryCheck); + actions.add(() -> region.isEmpty()); + actions.add(() -> region.keySet()); + actions.add(() -> region.size()); + actions.add(() -> region.values()); + actions.add(() -> queryCheck()); return actions.stream().map(runnable -> CompletableFuture.runAsync(runnable)) .collect(Collectors.toList()); } @@ -207,34 +272,18 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase { private void queryCheck() { try { - region.query("select * from /" + TEST_REGION_NAME); + region.query("select * from /" + regionName); } catch (FunctionDomainException | TypeMismatchException | NameResolutionException | QueryInvocationTargetException e) { throw new RuntimeException(e); } } - /** - * Create a region, installing the test hook in the backup lock - * - * @param shortcut The region shortcut to use to create the region - * @return The newly created region. - */ - protected Region<Integer, Integer> createRegion(RegionShortcut shortcut) throws IOException { - Cache cache = getCache(); - DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory(); - diskDirs = new File[] {tempDir.newFolder()}; - diskStoreFactory.setDiskDirs(diskDirs); - DiskStore diskStore = diskStoreFactory.create(getUniqueName()); + private String getUniqueName() { + return uniqueName; + } - RegionFactory<Integer, Integer> regionFactory = cache.createRegionFactory(shortcut); - regionFactory.setDiskStoreName(diskStore.getName()); - regionFactory.setDiskSynchronous(true); - if (shortcut.equals(RegionShortcut.PARTITION_PERSISTENT)) { - PartitionAttributesFactory prFactory = new PartitionAttributesFactory(); - prFactory.setTotalNumBuckets(1); - regionFactory.setPartitionAttributes(prFactory.create()); - } - return regionFactory.create(TEST_REGION_NAME); + private File getDiskDir() { + return diskDirRule.getDiskDirFor(getController()); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/ReplicateBackupPrepareAndFinishMsgDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/ReplicateBackupPrepareAndFinishMsgDUnitTest.java deleted file mode 100644 index 45941cc..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/ReplicateBackupPrepareAndFinishMsgDUnitTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache.backup; - -import java.io.IOException; - -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionShortcut; - -public class ReplicateBackupPrepareAndFinishMsgDUnitTest - extends BackupPrepareAndFinishMsgDUnitTest { - - @Override - public Region<Integer, Integer> createRegion() throws IOException { - return createRegion(RegionShortcut.REPLICATE_PERSISTENT); - } -}
