This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new c891b0b GEODE-5405: Refactor backup to remove duplication and clean up
c891b0b is described below
commit c891b0b4f4c2c37e4983b8efd53fb023a798f178
Author: Kirk Lund <[email protected]>
AuthorDate: Mon Jul 9 14:45:15 2018 -0700
GEODE-5405: Refactor backup to remove duplication and clean up
---
.../apache/geode/internal/cache/DiskStoreImpl.java | 2 +-
.../internal/cache/backup/AbortBackupRequest.java | 4 +-
.../internal/cache/backup/AbortBackupStep.java | 20 +++--
.../cache/backup/BackupDataStoreResult.java | 22 +++---
.../internal/cache/backup/BackupDefinition.java | 2 +
.../internal/cache/backup/BackupFileCopier.java | 11 +--
.../geode/internal/cache/backup/BackupFilter.java | 1 +
.../internal/cache/backup/BackupInspector.java | 41 ++++-------
.../internal/cache/backup/BackupLockService.java | 3 +-
.../internal/cache/backup/BackupOperation.java | 3 +
.../internal/cache/backup/BackupResponse.java | 4 +-
.../cache/backup/BackupResultCollector.java | 1 +
.../geode/internal/cache/backup/BackupService.java | 85 ++++++++++++----------
.../geode/internal/cache/backup/BackupStep.java | 24 +++---
.../geode/internal/cache/backup/BackupTask.java | 17 ++---
.../geode/internal/cache/backup/BackupWriter.java | 2 +-
.../internal/cache/backup/BackupWriterFactory.java | 6 +-
.../internal/cache/backup/DiskStoreBackup.java | 14 ++--
.../cache/backup/FileSystemBackupWriter.java | 8 +-
.../cache/backup/FileSystemBackupWriterConfig.java | 5 +-
.../FileSystemIncrementalBackupLocation.java | 14 ++--
.../internal/cache/backup/FinishBackupRequest.java | 4 +-
.../internal/cache/backup/FinishBackupStep.java | 5 +-
.../internal/cache/backup/FlushToDiskFactory.java | 1 -
.../internal/cache/backup/FlushToDiskRequest.java | 7 +-
.../internal/cache/backup/FlushToDiskResponse.java | 7 +-
.../internal/cache/backup/FlushToDiskStep.java | 48 +++---------
.../cache/backup/IncrementalBackupLocation.java | 1 +
.../geode/internal/cache/backup/PrepareBackup.java | 1 -
.../cache/backup/PrepareBackupRequest.java | 2 +-
.../geode/internal/cache/backup/RestoreScript.java | 20 ++---
.../internal/cache/backup/ScriptGenerator.java | 1 -
.../cache/backup/TemporaryBackupFiles.java | 2 +-
.../internal/cache/backup/UnixBackupInspector.java | 2 +-
.../internal/cache/backup/UnixScriptGenerator.java | 10 ++-
.../cache/backup/WindowsBackupInspector.java | 10 +--
.../cache/DiskStoreImplIntegrationTest.java | 2 +-
.../internal/cache/backup/AbortBackupStepTest.java | 17 ++++-
.../internal/cache/backup/BackupServiceTest.java | 4 +-
39 files changed, 206 insertions(+), 227 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 33d58f1..d64591c 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -2035,7 +2035,7 @@ public class DiskStoreImpl implements DiskStore {
try (Stream<Path> stream =
Files.list(directoryHolder.getDir().toPath())) {
backupDirectories = stream
.filter((path) -> path.getFileName().toString()
- .startsWith(BackupService.DATA_STORES_TEMPORARY_DIRECTORY))
+ .startsWith(BackupService.TEMPORARY_DIRECTORY_FOR_BACKUPS))
.filter(p -> Files.isDirectory(p)).collect(Collectors.toList());
}
for (Path backupDirectory : backupDirectories) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/AbortBackupRequest.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/AbortBackupRequest.java
index 374948a..f83909f 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/AbortBackupRequest.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/AbortBackupRequest.java
@@ -30,14 +30,14 @@ public class AbortBackupRequest extends CliLegacyMessage {
private final transient AbortBackupFactory abortBackupFactory;
public AbortBackupRequest() {
- this.abortBackupFactory = new AbortBackupFactory();
+ abortBackupFactory = new AbortBackupFactory();
}
AbortBackupRequest(InternalDistributedMember sender,
Set<InternalDistributedMember> recipients,
int processorId, AbortBackupFactory abortBackupFactory) {
setSender(sender);
setRecipients(recipients);
- this.msgId = processorId;
+ msgId = processorId;
this.abortBackupFactory = abortBackupFactory;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/AbortBackupStep.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/AbortBackupStep.java
index 659a449..cc0eccc 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/AbortBackupStep.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/AbortBackupStep.java
@@ -25,6 +25,7 @@ import
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.cache.InternalCache;
class AbortBackupStep extends BackupStep {
+
private final InternalDistributedMember member;
private final InternalCache cache;
private final Set<InternalDistributedMember> recipients;
@@ -40,6 +41,17 @@ class AbortBackupStep extends BackupStep {
this.abortBackupFactory = abortBackupFactory;
}
+ /**
+ * AbortBackupStep overrides addToResults in order to include members with
empty persistentIds
+ * (such as the sender).
+ */
+ @Override
+ public void addToResults(InternalDistributedMember member, Set<PersistentID>
persistentIds) {
+ if (persistentIds != null) {
+ getResults().put(member, persistentIds);
+ }
+ }
+
@Override
ReplyProcessor21 createReplyProcessor() {
return abortBackupFactory.createReplyProcessor(this,
getDistributionManager(), recipients);
@@ -55,12 +67,4 @@ class AbortBackupStep extends BackupStep {
abortBackupFactory.createAbortBackup(cache).run();
addToResults(member, Collections.emptySet());
}
-
- @Override
- public void addToResults(InternalDistributedMember member, Set<PersistentID>
persistentIds) {
- if (persistentIds != null) {
- getResults().put(member, persistentIds);
- }
- }
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreResult.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreResult.java
index 294c71a..2ab665b 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreResult.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDataStoreResult.java
@@ -20,29 +20,29 @@ import java.util.Set;
import org.apache.geode.cache.persistence.PersistentID;
import org.apache.geode.distributed.DistributedMember;
-public class BackupDataStoreResult {
+class BackupDataStoreResult {
- private Map<DistributedMember, Set<PersistentID>> existingDataStores;
+ private final Map<DistributedMember, Set<PersistentID>> existingDataStores;
+ private final Map<DistributedMember, Set<PersistentID>> successfulMembers;
- private Map<DistributedMember, Set<PersistentID>> successfulMembers;
-
- public BackupDataStoreResult(Map<DistributedMember, Set<PersistentID>>
existingDataStores,
+ BackupDataStoreResult(Map<DistributedMember, Set<PersistentID>>
existingDataStores,
Map<DistributedMember, Set<PersistentID>> successfulMembers) {
this.existingDataStores = existingDataStores;
this.successfulMembers = successfulMembers;
}
- public Map<DistributedMember, Set<PersistentID>> getExistingDataStores() {
- return this.existingDataStores;
+ Map<DistributedMember, Set<PersistentID>> getExistingDataStores() {
+ return existingDataStores;
}
- public Map<DistributedMember, Set<PersistentID>> getSuccessfulMembers() {
- return this.successfulMembers;
+ Map<DistributedMember, Set<PersistentID>> getSuccessfulMembers() {
+ return successfulMembers;
}
+ @Override
public String toString() {
return new StringBuilder().append(getClass().getSimpleName()).append("[")
- .append("existingDataStores=").append(this.existingDataStores)
- .append(";
successfulMembers=").append(this.successfulMembers).append("]").toString();
+ .append("existingDataStores=").append(existingDataStores)
+ .append(";
successfulMembers=").append(successfulMembers).append("]").toString();
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
index b7cd22c..c88edd7 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
@@ -25,11 +25,13 @@ import java.util.Set;
import org.apache.geode.cache.DiskStore;
class BackupDefinition {
+
private final Map<DiskStore, Set<Path>> oplogFilesByDiskStore = new
HashMap<>();
private final Set<Path> configFiles = new HashSet<>();
private final Map<Path, Path> userFiles = new HashMap<>();
private final Map<Path, Path> deployedJars = new HashMap<>();
private final Map<DiskStore, Path> diskInitFiles = new HashMap<>();
+
private RestoreScript restoreScript;
void addConfigFileToBackup(Path configFile) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFileCopier.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFileCopier.java
index e054cc3..d536bc8 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFileCopier.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFileCopier.java
@@ -40,8 +40,8 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.Oplog;
import org.apache.geode.internal.logging.LogService;
-public class BackupFileCopier {
- Logger logger = LogService.getLogger();
+class BackupFileCopier {
+ private static final Logger logger = LogService.getLogger();
private static final String CONFIG_DIRECTORY = "config";
private static final String USER_FILES = "user";
@@ -96,9 +96,9 @@ public class BackupFileCopier {
}
Set<File> copyUserFiles() throws IOException {
- Set<File> userFilesBackedUp = new HashSet<>();
ensureExistence(userDirectory);
List<File> backupFiles = cache.getBackupFiles();
+ Set<File> userFilesBackedUp = new HashSet<>();
for (File original : backupFiles) {
if (original.exists()) {
original = original.getAbsoluteFile();
@@ -116,10 +116,9 @@ public class BackupFileCopier {
}
Set<File> copyDeployedJars() throws IOException {
+ ensureExistence(userDirectory);
Set<File> userJars = new HashSet<>();
JarDeployer deployer = null;
-
- ensureExistence(userDirectory);
try {
// Suspend any user deployed jar file updates during this backup.
deployer = getJarDeployer();
@@ -177,9 +176,7 @@ public class BackupFileCopier {
backupDefinition.addOplogFileToBackup(diskStore,
tempDiskDir.resolve(file.getName()));
}
- // package access for testing purposes only
JarDeployer getJarDeployer() {
return ClassPathLoader.getLatest().getJarDeployer();
}
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFilter.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFilter.java
index 11c6241..3435663 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFilter.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFilter.java
@@ -20,5 +20,6 @@ import java.nio.file.Path;
import org.apache.geode.cache.DiskStore;
interface BackupFilter {
+
boolean accept(DiskStore diskStore, Path path) throws IOException;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupInspector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupInspector.java
index 80d9673..8f66bcb 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupInspector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupInspector.java
@@ -30,7 +30,8 @@ import org.apache.geode.internal.lang.SystemUtils;
* Inspects a completed backup and parses the operation log file data from the
restore script
* produced by a previous backup.
*/
-public abstract class BackupInspector {
+abstract class BackupInspector {
+
/**
* Maps operation log file names to script lines that copy previously backed
up operation log
* files. These lines will be added to future restore scripts if the
operation logs are still
@@ -44,18 +45,13 @@ public abstract class BackupInspector {
private final Set<String> oplogFileNames = new HashSet<>();
/**
- * Root directory for a member's backup.
- */
- private final File backupDir;
-
- /**
* Returns a BackupInspector for a member's backup directory.
*
* @param backupDir a member's backup directory.
* @return a new BackupInspector.
* @throws IOException the backup directory was malformed.
*/
- public static BackupInspector createInspector(final File backupDir) throws
IOException {
+ static BackupInspector createInspector(final File backupDir) throws
IOException {
if (SystemUtils.isWindows()) {
return new WindowsBackupInspector(backupDir);
}
@@ -74,8 +70,6 @@ public abstract class BackupInspector {
throw new IOException("Backup directory " + backupDir.getAbsolutePath()
+ " does not exist.");
}
- this.backupDir = backupDir;
-
File restoreFile = getRestoreFile(backupDir);
if (!restoreFile.exists()) {
throw new IOException("Restore file " + restoreFile.getName() + " does
not exist.");
@@ -103,8 +97,8 @@ public abstract class BackupInspector {
private void parseRestoreFile(final BufferedReader reader) throws
IOException {
boolean markerFound = false;
- String line = null;
- while (!markerFound && (null != (line = reader.readLine()))) {
+ String line;
+ while (!markerFound && null != (line = reader.readLine())) {
markerFound = line.contains(RestoreScript.INCREMENTAL_MARKER_COMMENT);
}
@@ -117,14 +111,7 @@ public abstract class BackupInspector {
* Returns true if the restore script is incremental.
*/
public boolean isIncremental() {
- return !this.oplogFileNames.isEmpty();
- }
-
- /**
- * @return the backup directory being inspected.
- */
- public File getBackupDir() {
- return this.backupDir;
+ return !oplogFileNames.isEmpty();
}
/**
@@ -132,16 +119,16 @@ public abstract class BackupInspector {
*
* @param oplogFileName an operation log file.
*/
- public String getScriptLineForOplogFile(final String oplogFileName) {
- return this.oplogLineMap.get(oplogFileName);
+ String getScriptLineForOplogFile(final String oplogFileName) {
+ return oplogLineMap.get(oplogFileName);
}
/**
* Returns the set of operation log files copied in the incremental backup
section of the restore
* script.
*/
- public Set<String> getIncrementalOplogFileNames() {
- return Collections.unmodifiableSet(this.oplogFileNames);
+ Set<String> getIncrementalOplogFileNames() {
+ return Collections.unmodifiableSet(oplogFileNames);
}
/**
@@ -149,26 +136,26 @@ public abstract class BackupInspector {
*
* @param backupDir a member's backup directory.
*/
- protected abstract File getRestoreFile(final File backupDir);
+ abstract File getRestoreFile(final File backupDir);
/**
* Returns the copyTo operation log file path for an operation log file name.
*
* @param oplogFileName an operation log file.
*/
- public abstract String getCopyToForOplogFile(final String oplogFileName);
+ abstract String getCopyToForOplogFile(final String oplogFileName);
/**
* Returns the copy from operation log file path for an operation log file
name.
*
* @param oplogFileName an operation log file.
*/
- public abstract String getCopyFromForOplogFile(final String oplogFileName);
+ abstract String getCopyFromForOplogFile(final String oplogFileName);
/**
* Parses out operation log data from the incremental backup portion of the
restore script.
*
* @param reader restore file reader.
*/
- protected abstract void parseOplogLines(final BufferedReader reader) throws
IOException;
+ abstract void parseOplogLines(final BufferedReader reader) throws
IOException;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupLockService.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupLockService.java
index 56d3346..71754b5 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupLockService.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupLockService.java
@@ -38,8 +38,7 @@ public class BackupLockService {
}
private DistributedLockService getLockService(DistributionManager dm) {
- DistributedLockService dls = DistributedLockService.getServiceNamed(
- LOCK_SERVICE_NAME);
+ DistributedLockService dls =
DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME);
if (dls == null) {
synchronized (LOCK_SYNC) {
dls = DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME);
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupOperation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupOperation.java
index 643b23d..9835e04 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupOperation.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupOperation.java
@@ -136,11 +136,14 @@ public class BackupOperation {
}
interface MissingPersistentMembersProvider {
+
Set<PersistentID> getMissingPersistentMembers(DistributionManager dm);
}
private static class DefaultMissingPersistentMembersProvider
implements MissingPersistentMembersProvider {
+
+ @Override
public Set<PersistentID> getMissingPersistentMembers(DistributionManager
dm) {
return AdminDistributedSystemImpl.getMissingPersistentMembers(dm);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupResponse.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupResponse.java
index 7fec551..dc90c32 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupResponse.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupResponse.java
@@ -37,12 +37,12 @@ public class BackupResponse extends AdminResponse {
super();
}
- public BackupResponse(InternalDistributedMember sender,
HashSet<PersistentID> persistentIds) {
+ BackupResponse(InternalDistributedMember sender, HashSet<PersistentID>
persistentIds) {
setRecipient(sender);
this.persistentIds = persistentIds;
}
- public Set<PersistentID> getPersistentIds() {
+ Set<PersistentID> getPersistentIds() {
return persistentIds;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupResultCollector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupResultCollector.java
index d4ab322..12d9e39 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupResultCollector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupResultCollector.java
@@ -20,5 +20,6 @@ import org.apache.geode.cache.persistence.PersistentID;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
interface BackupResultCollector {
+
void addToResults(InternalDistributedMember member, Set<PersistentID>
persistentIds);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java
index 3857fd0..e0377b1 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java
@@ -41,41 +41,26 @@ import org.apache.geode.internal.logging.LoggingThreadGroup;
public class BackupService {
private static final Logger logger = LogService.getLogger();
- public static final String DATA_STORES_TEMPORARY_DIRECTORY = "backupTemp_";
+ public static final String TEMPORARY_DIRECTORY_FOR_BACKUPS = "backupTemp_";
+
private final ExecutorService executor;
private final MembershipListener membershipListener = new
BackupMembershipListener();
private final InternalCache cache;
+ private final AtomicReference<BackupTask> currentTask = new
AtomicReference<>();
private transient Future<HashSet<PersistentID>> taskFuture;
- final AtomicReference<BackupTask> currentTask = new AtomicReference<>();
-
public BackupService(InternalCache cache) {
this.cache = cache;
executor = createExecutor();
}
- private ExecutorService createExecutor() {
- LoggingThreadGroup group =
LoggingThreadGroup.createThreadGroup("BackupService Thread", logger);
- ThreadFactory threadFactory = new ThreadFactory() {
- private final AtomicInteger threadId = new AtomicInteger();
-
- public Thread newThread(final Runnable command) {
- Thread thread =
- new Thread(group, command, "BackupServiceThread" +
this.threadId.incrementAndGet());
- thread.setDaemon(true);
- return thread;
- }
- };
- return Executors.newSingleThreadExecutor(threadFactory);
- }
-
public HashSet<PersistentID> prepareBackup(InternalDistributedMember sender,
BackupWriter writer)
throws IOException, InterruptedException {
- validateRequestingAdmin(sender);
+ validateRequestingSender(sender);
BackupTask backupTask = new BackupTask(cache, writer);
if (!currentTask.compareAndSet(null, backupTask)) {
- throw new IOException("Another backup already in progress");
+ throw new IOException("Another backup is already in progress");
}
taskFuture = executor.submit(() -> backupTask.backup());
return backupTask.getPreparedDiskStores();
@@ -111,17 +96,6 @@ public class BackupService {
return task == null ? null : task.getBackupForDiskStore(diskStore);
}
- void validateRequestingAdmin(InternalDistributedMember sender) {
- // We need to watch for pure admin guys that depart. this
allMembershipListener set
- // looks like it should receive those events.
- Set allIds =
-
cache.getDistributionManager().addAllMembershipListenerAndGetAllIds(membershipListener);
- if (!allIds.contains(sender)) {
- cleanup();
- throw new IllegalStateException("The admin member requesting a backup
has already departed");
- }
- }
-
public boolean deferDrfDelete(DiskStoreImpl diskStore, Oplog oplog) {
DiskStoreBackup diskStoreBackup = getBackupForDiskStore(diskStore);
if (diskStoreBackup != null) {
@@ -138,11 +112,6 @@ public class BackupService {
return false;
}
- void cleanup() {
-
cache.getDistributionManager().removeAllMembershipListener(membershipListener);
- currentTask.set(null);
- }
-
public void abortBackup() {
BackupTask task = currentTask.get();
cleanup();
@@ -151,7 +120,45 @@ public class BackupService {
}
}
+ void validateRequestingSender(InternalDistributedMember sender) {
+ // We need to watch for pure admin guys that depart. this
allMembershipListener set
+ // looks like it should receive those events.
+ Set allIds =
+
cache.getDistributionManager().addAllMembershipListenerAndGetAllIds(membershipListener);
+ if (!allIds.contains(sender)) {
+ cleanup();
+ throw new IllegalStateException("The member requesting a backup has
already departed");
+ }
+ }
+
+ void setCurrentTask(BackupTask backupTask) {
+ currentTask.set(backupTask);
+ }
+
+ private ExecutorService createExecutor() {
+ LoggingThreadGroup group =
LoggingThreadGroup.createThreadGroup("BackupService Thread", logger);
+ ThreadFactory threadFactory = new ThreadFactory() {
+
+ private final AtomicInteger threadId = new AtomicInteger();
+
+ @Override
+ public Thread newThread(final Runnable command) {
+ Thread thread =
+ new Thread(group, command, "BackupServiceThread" +
threadId.incrementAndGet());
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ return Executors.newSingleThreadExecutor(threadFactory);
+ }
+
+ private void cleanup() {
+
cache.getDistributionManager().removeAllMembershipListener(membershipListener);
+ currentTask.set(null);
+ }
+
private class BackupMembershipListener implements MembershipListener {
+
@Override
public void memberDeparted(DistributionManager distributionManager,
InternalDistributedMember id, boolean crashed) {
@@ -161,19 +168,19 @@ public class BackupService {
@Override
public void memberJoined(DistributionManager distributionManager,
InternalDistributedMember id) {
- // unused
+ // nothing
}
@Override
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> failures,
List<InternalDistributedMember> remaining) {
- // unused
+ // nothing
}
@Override
public void memberSuspect(DistributionManager distributionManager,
InternalDistributedMember id,
InternalDistributedMember whoSuspected, String reason) {
- // unused
+ // nothing
}
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupStep.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupStep.java
index 45ac9f1..e456827 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupStep.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupStep.java
@@ -36,9 +36,9 @@ abstract class BackupStep implements BackupResultCollector {
private final DistributionManager dm;
private final Map<DistributedMember, Set<PersistentID>> results =
- Collections.synchronizedMap(new HashMap<DistributedMember,
Set<PersistentID>>());
+ Collections.synchronizedMap(new HashMap<>());
- protected BackupStep(DistributionManager dm) {
+ BackupStep(DistributionManager dm) {
this.dm = dm;
}
@@ -48,6 +48,13 @@ abstract class BackupStep implements BackupResultCollector {
abstract void processLocally();
+ @Override
+ public void addToResults(InternalDistributedMember member, Set<PersistentID>
persistentIds) {
+ if (persistentIds != null && !persistentIds.isEmpty()) {
+ results.put(member, persistentIds);
+ }
+ }
+
Map<DistributedMember, Set<PersistentID>> send() {
ReplyProcessor21 replyProcessor = createReplyProcessor();
@@ -68,18 +75,11 @@ abstract class BackupStep implements BackupResultCollector {
return getResults();
}
- @Override
- public void addToResults(InternalDistributedMember member, Set<PersistentID>
persistentIds) {
- if (persistentIds != null && !persistentIds.isEmpty()) {
- results.put(member, persistentIds);
- }
- }
-
Map<DistributedMember, Set<PersistentID>> getResults() {
- return this.results;
+ return results;
}
- protected DistributionManager getDistributionManager() {
- return this.dm;
+ DistributionManager getDistributionManager() {
+ return dm;
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
index 651dc02..b7a4341 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
@@ -34,7 +34,7 @@ import org.apache.geode.internal.logging.LogService;
/**
* This class manages the state an logic to backup a single cache.
*/
-public class BackupTask {
+class BackupTask {
private static final Logger logger = LogService.getLogger();
private final Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStore = new
HashMap<>();
@@ -46,13 +46,13 @@ public class BackupTask {
private final HashSet<PersistentID> diskStoresWithData = new HashSet<>();
private final BackupWriter backupWriter;
- private volatile boolean isCancelled = false;
+ private volatile boolean isCancelled;
private TemporaryBackupFiles temporaryFiles;
private BackupFileCopier fileCopier;
- BackupTask(InternalCache gemFireCache, BackupWriter backupWriter) {
- this.cache = gemFireCache;
+ BackupTask(InternalCache cache, BackupWriter backupWriter) {
+ this.cache = cache;
this.backupWriter = backupWriter;
}
@@ -245,12 +245,10 @@ public class BackupTask {
// Get an appropriate lock object for each set of oplogs.
Object childLock = childOplog.getLock();
- // TODO - We really should move this lock into the disk store, but
- // until then we need to do this magic to make sure we're actually
- // locking the latest child for both types of oplogs
+ // TODO: We really should move this lock into the disk store, but
until then we need to do
+ // this magic to make sure we're actually locking the latest child for
both types of oplogs
- // This ensures that all writing to disk is blocked while we are
- // creating the snapshot
+ // This ensures that all writing to disk is blocked while we are
creating the snapshot
synchronized (childLock) {
if (logger.isDebugEnabled()) {
logger.debug("Synchronized on lock for oplog {} on disk store {}",
@@ -294,5 +292,4 @@ public class BackupTask {
DiskStoreBackup getBackupForDiskStore(DiskStoreImpl diskStore) {
return backupByDiskStore.get(diskStore);
}
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriter.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriter.java
index b465b0e..c124207 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriter.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriter.java
@@ -18,6 +18,7 @@ import java.io.IOException;
import java.nio.file.Path;
public interface BackupWriter {
+
String INCOMPLETE_BACKUP_FILE = "INCOMPLETE_BACKUP_FILE";
String USER_FILES_DIRECTORY = "user";
String DEPLOYED_JARS_DIRECTORY = "user";
@@ -29,5 +30,4 @@ public interface BackupWriter {
void backupFiles(BackupDefinition backupDefinition) throws IOException;
Path getBaselineDirectory();
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriterFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriterFactory.java
index 9f29e08..085c1fb 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriterFactory.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriterFactory.java
@@ -21,8 +21,10 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;
-public enum BackupWriterFactory {
+enum BackupWriterFactory {
+
FILE_SYSTEM("FileSystem") {
+ @Override
BackupWriter createWriter(Properties properties, String memberId) {
FileSystemBackupWriterConfig config = new
FileSystemBackupWriterConfig(properties);
Path targetDir = Paths.get(config.getTargetDirectory())
@@ -37,7 +39,7 @@ public enum BackupWriterFactory {
}
};
- private String type;
+ private final String type;
BackupWriterFactory(String type) {
this.type = type;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/DiskStoreBackup.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/DiskStoreBackup.java
index 592f43b..488efa4 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/DiskStoreBackup.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/DiskStoreBackup.java
@@ -31,8 +31,8 @@ public class DiskStoreBackup {
private final Set<Oplog> deferredCrfDeletes = new HashSet<>();
private final Set<Oplog> deferredDrfDeletes = new HashSet<>();
- public DiskStoreBackup(Oplog[] allOplogs) {
- this.pendingBackup = new HashSet<>(Arrays.asList(allOplogs));
+ DiskStoreBackup(Oplog[] allOplogs) {
+ pendingBackup = new HashSet<>(Arrays.asList(allOplogs));
}
/**
@@ -41,7 +41,7 @@ public class DiskStoreBackup {
* @return true if the delete has been deferred. False if this oplog should
be deleted
* immediately.
*/
- public synchronized boolean deferCrfDelete(Oplog oplog) {
+ synchronized boolean deferCrfDelete(Oplog oplog) {
if (pendingBackup.contains(oplog)) {
deferredCrfDeletes.add(oplog);
return true;
@@ -56,7 +56,7 @@ public class DiskStoreBackup {
* @return true if the delete has been deferred. False if this oplog should
be deleted
* immediately.
*/
- public synchronized boolean deferDrfDelete(Oplog oplog) {
+ synchronized boolean deferDrfDelete(Oplog oplog) {
if (pendingBackup.contains(oplog)) {
deferredDrfDeletes.add(oplog);
return true;
@@ -65,11 +65,11 @@ public class DiskStoreBackup {
return false;
}
- public synchronized Set<Oplog> getPendingBackup() {
+ synchronized Set<Oplog> getPendingBackup() {
return new HashSet<>(pendingBackup);
}
- public synchronized void backupFinished(Oplog oplog) {
+ synchronized void backupFinished(Oplog oplog) {
pendingBackup.remove(oplog);
if (deferredCrfDeletes.remove(oplog)) {
oplog.deleteCRFFileOnly();
@@ -79,7 +79,7 @@ public class DiskStoreBackup {
}
}
- public synchronized void cleanup() {
+ synchronized void cleanup() {
for (Oplog oplog : getPendingBackup()) {
backupFinished(oplog);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriter.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriter.java
index aadc143..ca54034 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriter.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriter.java
@@ -30,7 +30,7 @@ import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.i18n.LocalizedStrings;
-public class FileSystemBackupWriter implements BackupWriter {
+class FileSystemBackupWriter implements BackupWriter {
private final Path backupDirectory;
private final FileSystemIncrementalBackupLocation
incrementalBaselineLocation;
@@ -143,7 +143,7 @@ public class FileSystemBackupWriter implements BackupWriter
{
RestoreScript restoreScript) throws IOException {
File storesDir = new File(backupDirectory.toFile(), DATA_STORES_DIRECTORY);
for (Map.Entry<DiskStore, Collection<Path>> entry : oplogFiles.entrySet())
{
- DiskStoreImpl diskStore = ((DiskStoreImpl) entry.getKey());
+ DiskStoreImpl diskStore = (DiskStoreImpl) entry.getKey();
boolean diskstoreHasFilesInBackup = false;
for (Path path : entry.getValue()) {
if (filter.accept(diskStore, path)) {
@@ -175,7 +175,7 @@ public class FileSystemBackupWriter implements BackupWriter
{
name = GemFireCacheImpl.getDefaultDiskStoreName();
}
name = name + "_" + ((DiskStoreImpl)
diskStore).getDiskStoreID().toString();
- return this.backupDirectory.resolve(DATA_STORES_DIRECTORY).resolve(name)
+ return backupDirectory.resolve(DATA_STORES_DIRECTORY).resolve(name)
.resolve(BACKUP_DIR_PREFIX + index);
}
@@ -196,7 +196,7 @@ public class FileSystemBackupWriter implements BackupWriter
{
name = GemFireCacheImpl.getDefaultDiskStoreName();
}
- return (name + "_" + diskStore.getDiskStoreID().toString());
+ return name + "_" + diskStore.getDiskStoreID().toString();
}
private void backupOplog(Path targetDir, Path path) throws IOException {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriterConfig.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriterConfig.java
index e54e21a..40195fd 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriterConfig.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriterConfig.java
@@ -19,6 +19,7 @@ import java.util.Properties;
import org.apache.commons.lang.StringUtils;
class FileSystemBackupWriterConfig extends AbstractBackupWriterConfig {
+
static final String TARGET_DIR = "TARGET_DIRECTORY";
static final String BASELINE_DIR = "BASELINE_DIRECTORY";
@@ -26,7 +27,7 @@ class FileSystemBackupWriterConfig extends
AbstractBackupWriterConfig {
super(properties);
}
- public String getTargetDirectory() {
+ String getTargetDirectory() {
String value = getProperties().getProperty(TARGET_DIR);
if (StringUtils.isBlank(value)) {
throw new IllegalStateException("Target directory is missing");
@@ -34,7 +35,7 @@ class FileSystemBackupWriterConfig extends
AbstractBackupWriterConfig {
return value;
}
- public String getBaselineDirectory() {
+ String getBaselineDirectory() {
return getProperties().getProperty(BASELINE_DIR);
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocation.java
index 55f7009..bc2f5bc 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocation.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocation.java
@@ -29,14 +29,14 @@ import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.util.TransformUtils;
-public class FileSystemIncrementalBackupLocation implements
IncrementalBackupLocation {
+class FileSystemIncrementalBackupLocation implements IncrementalBackupLocation
{
private static final String INCOMPLETE_BACKUP_FILE =
"INCOMPLETE_BACKUP_FILE";
private final Path memberBackupLocationDir;
FileSystemIncrementalBackupLocation(File backupLocationDir, String memberId)
{
- this.memberBackupLocationDir = new File(backupLocationDir,
memberId).toPath();
+ memberBackupLocationDir = new File(backupLocationDir, memberId).toPath();
}
Path getMemberBackupLocationDir() {
@@ -66,7 +66,7 @@ public class FileSystemIncrementalBackupLocation implements
IncrementalBackupLoc
BackupInspector inspector = createBackupInspector(checkedBaselineDir);
HashSet<File> oplogs = new HashSet<>();
if (inspector.isIncremental() && inspector.getIncrementalOplogFileNames()
!= null) {
- inspector.getIncrementalOplogFileNames().forEach((oplog) -> {
+ inspector.getIncrementalOplogFileNames().forEach(oplog -> {
oplog = inspector.getCopyFromForOplogFile(oplog);
oplogs.add(new File(oplog));
});
@@ -87,8 +87,7 @@ public class FileSystemIncrementalBackupLocation implements
IncrementalBackupLoc
File baselineDir = memberBackupLocationDir.toFile();
if (!baselineDir.exists()) {
- // hmmm, did this member have a restart?
- // Determine which member dir might be a match for us
+ // hmmm, did this member have a restart? Determine which member dir
might be a match for us
baselineDir =
findBaselineForThisMember(memberBackupLocationDir.getParent(), diskStore);
}
@@ -102,7 +101,7 @@ public class FileSystemIncrementalBackupLocation implements
IncrementalBackupLoc
return baselineDir;
}
- File findBaselineForThisMember(Path baselineParentDir, DiskStore diskStore) {
+ private File findBaselineForThisMember(Path baselineParentDir, DiskStore
diskStore) {
File baselineDir = null;
// Find the first matching DiskStoreId directory for this member.
@@ -126,7 +125,6 @@ public class FileSystemIncrementalBackupLocation implements
IncrementalBackupLoc
if (name == null) {
name = GemFireCacheImpl.getDefaultDiskStoreName();
}
- return (name + "_" + diskStore.getDiskStoreID().toString());
+ return name + "_" + diskStore.getDiskStoreID().toString();
}
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FinishBackupRequest.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FinishBackupRequest.java
index 717e34b..91d6cc2 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FinishBackupRequest.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FinishBackupRequest.java
@@ -40,14 +40,14 @@ public class FinishBackupRequest extends CliLegacyMessage {
private final transient FinishBackupFactory finishBackupFactory;
public FinishBackupRequest() {
- this.finishBackupFactory = new FinishBackupFactory();
+ finishBackupFactory = new FinishBackupFactory();
}
FinishBackupRequest(InternalDistributedMember sender,
Set<InternalDistributedMember> recipients,
int processorId, FinishBackupFactory finishBackupFactory) {
setSender(sender);
setRecipients(recipients);
- this.msgId = processorId;
+ msgId = processorId;
this.finishBackupFactory = finishBackupFactory;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FinishBackupStep.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FinishBackupStep.java
index cc260c8..303cb98 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FinishBackupStep.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FinishBackupStep.java
@@ -36,12 +36,12 @@ class FinishBackupStep extends BackupStep {
FinishBackupStep(DistributionManager dm, InternalDistributedMember member,
InternalCache cache, Set<InternalDistributedMember> recipients,
- FinishBackupFactory FinishBackupFactory) {
+ FinishBackupFactory finishBackupFactory) {
super(dm);
this.member = member;
this.cache = cache;
this.recipients = recipients;
- this.finishBackupFactory = FinishBackupFactory;
+ this.finishBackupFactory = finishBackupFactory;
}
@Override
@@ -62,5 +62,4 @@ class FinishBackupStep extends BackupStep {
logger.fatal("Failed to FinishBackup in " + member, e);
}
}
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskFactory.java
index 453a37b..79a2387 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskFactory.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskFactory.java
@@ -45,5 +45,4 @@ class FlushToDiskFactory {
FlushToDiskFactory flushToDiskFactory) {
return new FlushToDiskStep(dm, member, cache, recipients,
flushToDiskFactory);
}
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
index 89d1f12..aa31a01 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskRequest.java
@@ -32,14 +32,14 @@ public class FlushToDiskRequest extends CliLegacyMessage {
public FlushToDiskRequest() {
super();
- this.flushToDiskFactory = new FlushToDiskFactory();
+ flushToDiskFactory = new FlushToDiskFactory();
}
FlushToDiskRequest(InternalDistributedMember sender,
Set<InternalDistributedMember> recipients,
int processorId, FlushToDiskFactory flushToDiskFactory) {
- this.setSender(sender);
+ setSender(sender);
setRecipients(recipients);
- this.msgId = processorId;
+ msgId = processorId;
this.flushToDiskFactory = flushToDiskFactory;
}
@@ -53,5 +53,4 @@ public class FlushToDiskRequest extends CliLegacyMessage {
flushToDiskFactory.createFlushToDisk(dm.getCache()).run();
return flushToDiskFactory.createResponse(getSender());
}
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskResponse.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskResponse.java
index 89b1b71..3489407 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskResponse.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskResponse.java
@@ -19,8 +19,6 @@ import org.apache.geode.internal.admin.remote.AdminResponse;
/**
* The response to the {@link FlushToDiskRequest}
- *
- *
*/
public class FlushToDiskResponse extends AdminResponse {
@@ -28,10 +26,11 @@ public class FlushToDiskResponse extends AdminResponse {
super();
}
- public FlushToDiskResponse(InternalDistributedMember sender) {
- this.setRecipient(sender);
+ FlushToDiskResponse(InternalDistributedMember sender) {
+ setRecipient(sender);
}
+ @Override
public int getDSFID() {
return FLUSH_TO_DISK_RESPONSE;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskStep.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskStep.java
index 6db8f44..57cd1d5 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskStep.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDiskStep.java
@@ -16,26 +16,14 @@ package org.apache.geode.internal.cache.backup;
import java.util.Set;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelException;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
-import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.logging.LogService;
-/**
- * A Operation to from an admin VM to all non admin members to start a backup.
In the prepare phase
- * of the backup, the members will suspend bucket destroys to make sure
buckets aren't missed during
- * the backup.
- */
-public class FlushToDiskStep {
- private static final Logger logger = LogService.getLogger();
+class FlushToDiskStep extends BackupStep {
- private final DistributionManager dm;
private final InternalDistributedMember member;
private final InternalCache cache;
private final Set<InternalDistributedMember> recipients;
@@ -44,42 +32,26 @@ public class FlushToDiskStep {
FlushToDiskStep(DistributionManager dm, InternalDistributedMember member,
InternalCache cache, Set<InternalDistributedMember> recipients,
FlushToDiskFactory flushToDiskFactory) {
+ super(dm);
this.flushToDiskFactory = flushToDiskFactory;
- this.dm = dm;
this.member = member;
this.recipients = recipients;
this.cache = cache;
}
- void send() {
- ReplyProcessor21 replyProcessor = createReplyProcessor();
-
- dm.putOutgoing(createDistributionMessage(replyProcessor));
-
- processLocally();
-
- try {
- replyProcessor.waitForReplies();
- } catch (ReplyException e) {
- if (!(e.getCause() instanceof CancelException)) {
- throw e;
- }
- } catch (InterruptedException e) {
- logger.warn(e.getMessage(), e);
- }
- }
-
- private ReplyProcessor21 createReplyProcessor() {
- return this.flushToDiskFactory.createReplyProcessor(dm, recipients);
+ @Override
+ ReplyProcessor21 createReplyProcessor() {
+ return flushToDiskFactory.createReplyProcessor(getDistributionManager(),
recipients);
}
- private DistributionMessage createDistributionMessage(ReplyProcessor21
replyProcessor) {
- return this.flushToDiskFactory.createRequest(member, recipients,
+ @Override
+ DistributionMessage createDistributionMessage(ReplyProcessor21
replyProcessor) {
+ return flushToDiskFactory.createRequest(member, recipients,
replyProcessor.getProcessorId());
}
- private void processLocally() {
+ @Override
+ void processLocally() {
flushToDiskFactory.createFlushToDisk(cache).run();
}
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/IncrementalBackupLocation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/IncrementalBackupLocation.java
index dbfc3fb..5e5a1f3 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/IncrementalBackupLocation.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/IncrementalBackupLocation.java
@@ -21,5 +21,6 @@ import java.util.Map;
import org.apache.geode.cache.DiskStore;
public interface IncrementalBackupLocation {
+
Map<String, File> getBackedUpOplogs(DiskStore diskStore) throws IOException;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackup.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackup.java
index 8460543..24e1b72 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackup.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackup.java
@@ -42,5 +42,4 @@ class PrepareBackup {
}
return persistentIds;
}
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupRequest.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupRequest.java
index fc42c24..51ec733 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupRequest.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupRequest.java
@@ -46,7 +46,7 @@ public class PrepareBackupRequest extends CliLegacyMessage {
private Properties properties;
public PrepareBackupRequest() {
- this.prepareBackupFactory = new PrepareBackupFactory();
+ prepareBackupFactory = new PrepareBackupFactory();
}
PrepareBackupRequest(InternalDistributedMember sender,
Set<InternalDistributedMember> recipients,
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/RestoreScript.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/RestoreScript.java
index 309dca9..b7069fe 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/RestoreScript.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/RestoreScript.java
@@ -35,7 +35,7 @@ import org.apache.geode.internal.lang.SystemUtils;
* <p>
* It generates either a restore.sh for unix or a restore.bat for windows.
*/
-public class RestoreScript {
+class RestoreScript {
static final String INCREMENTAL_MARKER_COMMENT =
"Incremental backup. Restore baseline originals from previous backups.";
@@ -57,7 +57,7 @@ public class RestoreScript {
private final Map<File, File> backedUpFiles = new LinkedHashMap<>();
private final List<File> existenceTests = new ArrayList<>();
- public RestoreScript() {
+ RestoreScript() {
this(SystemUtils.isWindows() ? new WindowsScriptGenerator() : new
UnixScriptGenerator());
}
@@ -65,23 +65,23 @@ public class RestoreScript {
this.generator = generator;
}
- public void addBaselineFiles(final Map<File, File> baselineFiles) {
+ void addBaselineFiles(final Map<File, File> baselineFiles) {
this.baselineFiles.putAll(baselineFiles);
}
- public void addBaselineFile(File baseline, File absoluteFile) {
- this.baselineFiles.put(baseline, absoluteFile);
+ void addBaselineFile(File baseline, File absoluteFile) {
+ baselineFiles.put(baseline, absoluteFile);
}
public void addFile(final File originalFile, final File backupFile) {
backedUpFiles.put(backupFile, originalFile.getAbsoluteFile());
}
- public void addExistenceTest(final File originalFile) {
+ void addExistenceTest(final File originalFile) {
existenceTests.add(originalFile.getAbsoluteFile());
}
- public File generate(final File outputDir) throws IOException {
+ File generate(final File outputDir) throws IOException {
File outputFile = new File(outputDir, generator.getScriptName());
try (BufferedWriter writer = Files.newBufferedWriter(outputFile.toPath()))
{
@@ -137,18 +137,18 @@ public class RestoreScript {
private void writeIncrementalData(BufferedWriter writer) throws IOException {
// Write out baseline file copies in restore script (if there are any) if
this is a restore
// for an incremental backup
- if (this.baselineFiles.isEmpty()) {
+ if (baselineFiles.isEmpty()) {
return;
}
writer.newLine();
generator.writeComment(writer, INCREMENTAL_MARKER_COMMENT);
- for (Map.Entry<File, File> entry : this.baselineFiles.entrySet()) {
+ for (Map.Entry<File, File> entry : baselineFiles.entrySet()) {
generator.writeCopyFile(writer, entry.getKey(), entry.getValue());
}
}
- public void addUserFile(File original, File dest) {
+ void addUserFile(File original, File dest) {
addExistenceTest(original);
addFile(original, dest);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/ScriptGenerator.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/ScriptGenerator.java
index 7804d13..7e7acde 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/ScriptGenerator.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/ScriptGenerator.java
@@ -34,5 +34,4 @@ interface ScriptGenerator {
void writeComment(BufferedWriter writer, String string) throws IOException;
String getScriptName();
-
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/TemporaryBackupFiles.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/TemporaryBackupFiles.java
index 79100c9..3273a47 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/TemporaryBackupFiles.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/TemporaryBackupFiles.java
@@ -53,7 +53,7 @@ class TemporaryBackupFiles {
*/
static TemporaryBackupFiles create() throws IOException {
long currentTime = System.currentTimeMillis();
- String diskStoreDirectoryName =
BackupService.DATA_STORES_TEMPORARY_DIRECTORY + currentTime;
+ String diskStoreDirectoryName =
BackupService.TEMPORARY_DIRECTORY_FOR_BACKUPS + currentTime;
Path temporaryDirectory = Files.createTempDirectory("backup_" +
currentTime);
return new TemporaryBackupFiles(temporaryDirectory,
diskStoreDirectoryName);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/UnixBackupInspector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/UnixBackupInspector.java
index cc14cb4..174ae4c 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/UnixBackupInspector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/UnixBackupInspector.java
@@ -56,7 +56,7 @@ class UnixBackupInspector extends BackupInspector {
@Override
protected void parseOplogLines(final BufferedReader reader) throws
IOException {
- String line = null;
+ String line;
while (null != (line = reader.readLine())) {
int beginIndex = line.lastIndexOf(File.separator) + 1;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/UnixScriptGenerator.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/UnixScriptGenerator.java
index 4cc22e2..5c36a01 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/UnixScriptGenerator.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/UnixScriptGenerator.java
@@ -22,8 +22,7 @@ class UnixScriptGenerator implements ScriptGenerator {
private static final String SCRIPT_FILE_NAME = "restore.sh";
- private final String separator = System.lineSeparator();
-
+ @Override
public void writePreamble(final BufferedWriter writer) throws IOException {
writer.write("#!/bin/bash -e");
writer.newLine();
@@ -31,11 +30,13 @@ class UnixScriptGenerator implements ScriptGenerator {
writer.newLine();
}
+ @Override
public void writeComment(final BufferedWriter writer, final String string)
throws IOException {
writer.write("# " + string);
writer.newLine();
}
+ @Override
public void writeCopyDirectoryContents(final BufferedWriter writer, final
File backup,
final File original, final boolean backupHasFiles) throws IOException {
writer.write("mkdir -p '" + original + "'");
@@ -46,19 +47,22 @@ class UnixScriptGenerator implements ScriptGenerator {
}
}
+ @Override
public void writeCopyFile(final BufferedWriter writer, final File backup,
final File original)
throws IOException {
writer.write("cp -p '" + backup + "' '" + original + "'");
writer.newLine();
}
+ @Override
public void writeExistenceTest(final BufferedWriter writer, final File file)
throws IOException {
writer.write("test -e '" + file + "' && echo '" +
RestoreScript.REFUSE_TO_OVERWRITE_MESSAGE
+ file + "' && exit 1 ");
writer.newLine();
}
- public void writeExit(final BufferedWriter writer) throws IOException {
+ @Override
+ public void writeExit(final BufferedWriter writer) {
// do nothing
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/WindowsBackupInspector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/WindowsBackupInspector.java
index 2d5adf7..e9238c9 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/WindowsBackupInspector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/WindowsBackupInspector.java
@@ -55,11 +55,11 @@ class WindowsBackupInspector extends BackupInspector {
}
@Override
- protected void parseOplogLines(final BufferedReader reader) throws
IOException {
- String line = null;
+ void parseOplogLines(final BufferedReader reader) throws IOException {
+ String line;
int beginIndex, endIndex;
- String oplogName = "";
+ String oplogName;
while (null != (line = reader.readLine())) {
if (line.startsWith("IF")) {
continue;
@@ -68,14 +68,14 @@ class WindowsBackupInspector extends BackupInspector {
} else {
beginIndex = line.lastIndexOf("\"") + 1;
endIndex = line.indexOf(WindowsScriptGenerator.ROBOCOPY_NO_JOB_HEADER,
beginIndex) - 1;
- oplogName = (line.substring(beginIndex, endIndex)).trim();
+ oplogName = line.substring(beginIndex, endIndex).trim();
addOplogLine(oplogName, line);
}
}
}
@Override
- protected File getRestoreFile(final File backupDir) {
+ File getRestoreFile(final File backupDir) {
return new File(backupDir, RESTORE_FILE);
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java
index 1908eec..0830485 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java
@@ -77,7 +77,7 @@ public class DiskStoreImplIntegrationTest {
List<Path> tempDirs = new ArrayList<>();
for (File diskDir : diskStore.getDiskDirs()) {
Path tempDir =
-
diskDir.toPath().resolve(BackupService.DATA_STORES_TEMPORARY_DIRECTORY +
"testing");
+
diskDir.toPath().resolve(BackupService.TEMPORARY_DIRECTORY_FOR_BACKUPS +
"testing");
Files.createDirectories(tempDir);
tempDirs.add(tempDir);
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/AbortBackupStepTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/AbortBackupStepTest.java
index edc715d..aca41b3 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/AbortBackupStepTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/AbortBackupStepTest.java
@@ -98,9 +98,8 @@ public class AbortBackupStepTest {
@Test
public void sendReturnsResultsForAllMembers() throws Exception {
- MemberWithPersistentIds[] ids =
- new MemberWithPersistentIds[] {new MemberWithPersistentIds(member1,
new HashSet<>()),
- new MemberWithPersistentIds(member2, new HashSet<>())};
+ MemberWithPersistentIds[] ids = new MemberWithPersistentIds[] {
+ createMemberWithPersistentIds(member1),
createMemberWithPersistentIds(member2)};
doAnswer(invokeAddToResults(ids)).when(backupReplyProcessor).waitForReplies();
@@ -120,7 +119,7 @@ public class AbortBackupStepTest {
@Test
public void addToResultsShouldShowUpInGetResults() {
- abortBackupStep.addToResults(member1, new HashSet<>());
+ abortBackupStep.addToResults(member1, createPersistentIds());
assertThat(abortBackupStep.getResults()).containsOnlyKeys(member1);
}
@@ -171,6 +170,16 @@ public class AbortBackupStepTest {
};
}
+ private MemberWithPersistentIds
createMemberWithPersistentIds(InternalDistributedMember member) {
+ return new MemberWithPersistentIds(member, createPersistentIds());
+ }
+
+ private HashSet<PersistentID> createPersistentIds() {
+ HashSet<PersistentID> persistentIds = new HashSet<>();
+ persistentIds.add(mock(PersistentID.class));
+ return persistentIds;
+ }
+
private static class MemberWithPersistentIds {
InternalDistributedMember member;
Set<PersistentID> persistentIds;
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java
index daa2037..2e34697 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java
@@ -65,14 +65,14 @@ public class BackupServiceTest {
@Test
public void throwsExceptionWhenBackupRequesterHasLeftDistributedSystem() {
InternalDistributedMember oldSender = new
InternalDistributedMember("localhost", 5556);
- assertThatThrownBy(() -> backupService.validateRequestingAdmin(oldSender))
+ assertThatThrownBy(() -> backupService.validateRequestingSender(oldSender))
.isInstanceOf(IllegalStateException.class);
}
@Test
public void startBackupThrowsExceptionWhenAnotherBackupInProgress() throws
Exception {
BackupTask backupTask = mock(BackupTask.class);
- backupService.currentTask.set(backupTask);
+ backupService.setCurrentTask(backupTask);
assertThatThrownBy(() -> backupService.prepareBackup(sender, null))
.isInstanceOf(IOException.class);
}