This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2dfc8ae  GEODE-4407 (#1499): Refactoring incremental backup logic
2dfc8ae is described below

commit 2dfc8aee602d697702ef57bdaf7e12b4815572cf
Author: agingade <aging...@pivotal.io>
AuthorDate: Fri Feb 23 11:32:55 2018 -0800

    GEODE-4407 (#1499): Refactoring incremental backup logic
    
    Removed dependency on target location while fetching backup files from 
source.
---
 .../apache/geode/internal/cache/DiskStoreImpl.java |   1 +
 .../org/apache/geode/internal/cache/Oplog.java     |  49 +----
 .../internal/cache/backup/BackupDefinition.java    |  20 +-
 .../internal/cache/backup/BackupFileCopier.java    |   4 +-
 .../{BackupDestination.java => BackupFilter.java}  |  12 +-
 .../geode/internal/cache/backup/BackupService.java |  20 +-
 .../geode/internal/cache/backup/BackupTask.java    | 240 +++------------------
 .../{BackupDestination.java => BackupWriter.java}  |   4 +-
 .../cache/{ => backup}/DiskStoreBackup.java        |  13 +-
 .../cache/backup/FileSystemBackupDestination.java  | 145 -------------
 .../cache/backup/FileSystemBackupWriter.java       | 237 ++++++++++++++++++++
 .../FileSystemIncrementalBackupLocation.java       | 132 ++++++++++++
 ...stination.java => IncrementalBackupFilter.java} |  25 ++-
 ...ination.java => IncrementalBackupLocation.java} |  13 +-
 .../geode/internal/cache/backup/RestoreScript.java |   4 +
 .../cache/backup/BackupDefinitionTest.java         |  21 +-
 .../backup/BackupFileCopierIntegrationTest.java    |   5 +-
 .../internal/cache/backup/BackupServiceTest.java   |   5 +-
 ...onTest.java => FileSystemBackupWriterTest.java} |  69 +++---
 .../FileSystemIncrementalBackupLocationTest.java   | 189 ++++++++++++++++
 .../backup/IncrementalBackupDistributedTest.java   |   5 +-
 21 files changed, 730 insertions(+), 483 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 0ad8310..c4d8fa0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -89,6 +89,7 @@ import org.apache.geode.i18n.StringId;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.ExportDiskRegion.ExportWriter;
 import org.apache.geode.internal.cache.backup.BackupService;
+import org.apache.geode.internal.cache.backup.DiskStoreBackup;
 import org.apache.geode.internal.cache.entries.DiskEntry;
 import org.apache.geode.internal.cache.entries.DiskEntry.Helper.ValueWrapper;
 import org.apache.geode.internal.cache.entries.DiskEntry.RecoveredEntry;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index 1a9a5844..00fb08e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -1215,46 +1215,8 @@ public class Oplog implements CompactableOplog, 
Flushable {
     return matchingFiles;
   }
 
-  /**
-   * Returns a map of baseline oplog files to copy that match this oplog's 
files for a currently
-   * running backup.
-   *
-   * @param baselineOplogFiles a List of files to match this oplog's filenames 
against.
-   * @return a map of baslineline oplog files to copy. May be empty if total 
current set for this
-   *         oplog does not match the baseline.
-   */
-  public Map<File, File> mapBaseline(Collection<File> baselineOplogFiles) {
-    // Map of baseline oplog file name to oplog file
-    Map<String, File> baselineOplogMap =
-        TransformUtils.transformAndMap(baselineOplogFiles, 
TransformUtils.fileNameTransformer);
-
-    // Returned Map of baseline file to current oplog file
-    Map<File, File> baselineToOplogMap = new HashMap<>();
-
-    // Check for crf existence
-    if ((null != this.crf.f) && this.crf.f.exists()
-        && baselineOplogMap.containsKey(this.crf.f.getName())) {
-      baselineToOplogMap.put(baselineOplogMap.get(this.crf.f.getName()),
-          IOUtils.tryGetCanonicalFileElseGetAbsoluteFile(this.crf.f));
-    }
-
-    // Check for drf existence
-    if ((null != this.drf.f) && this.drf.f.exists()
-        && baselineOplogMap.containsKey(this.drf.f.getName())) {
-      baselineToOplogMap.put(baselineOplogMap.get(this.drf.f.getName()),
-          IOUtils.tryGetCanonicalFileElseGetAbsoluteFile(this.drf.f));
-    }
-
-    // Check for krf existence
-    if (getParent().getDiskInitFile().hasKrf(this.oplogId)) {
-      File krfFile = getKrfFile();
-      if (krfFile.exists() && baselineOplogMap.containsKey(krfFile.getName())) 
{
-        baselineToOplogMap.put(baselineOplogMap.get(krfFile.getName()),
-            IOUtils.tryGetCanonicalFileElseGetAbsoluteFile(krfFile));
-      }
-    }
-
-    return baselineToOplogMap;
+  public boolean hasKrf() {
+    return getParent().getDiskInitFile().hasKrf(this.oplogId);
   }
 
   /** the oplog identifier * */
@@ -5719,9 +5681,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
 
   public void deleteCRF() {
     oplogSet.crfDelete(this.oplogId);
-    BackupService backupService = getInternalCache().getBackupService();
-    DiskStoreBackup inProgressBackup = getParent().getInProgressBackup();
-    if (inProgressBackup == null || !inProgressBackup.deferCrfDelete(this)) {
+    if (!getInternalCache().getBackupService().deferCrfDelete(getParent(), 
this)) {
       deleteCRFFileOnly();
     }
   }
@@ -5754,8 +5714,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
 
   public void deleteDRF() {
     getOplogSet().drfDelete(this.oplogId);
-    DiskStoreBackup inProgressBackup = getParent().getInProgressBackup();
-    if (inProgressBackup == null || !inProgressBackup.deferDrfDelete(this)) {
+    if (!getInternalCache().getBackupService().deferDrfDelete(getParent(), 
this)) {
       deleteDRFFileOnly();
     }
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
index 685d03b..b7cd22c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDefinition.java
@@ -27,8 +27,8 @@ import org.apache.geode.cache.DiskStore;
 class BackupDefinition {
   private final Map<DiskStore, Set<Path>> oplogFilesByDiskStore = new 
HashMap<>();
   private final Set<Path> configFiles = new HashSet<>();
-  private final Set<Path> userFiles = new HashSet<>();
-  private final Set<Path> deployedJars = new HashSet<>();
+  private final Map<Path, Path> userFiles = new HashMap<>();
+  private final Map<Path, Path> deployedJars = new HashMap<>();
   private final Map<DiskStore, Path> diskInitFiles = new HashMap<>();
   private RestoreScript restoreScript;
 
@@ -36,12 +36,12 @@ class BackupDefinition {
     configFiles.add(configFile);
   }
 
-  void addUserFilesToBackup(Path userFile) {
-    userFiles.add(userFile);
+  void addUserFilesToBackup(Path userFile, Path source) {
+    userFiles.put(userFile, source);
   }
 
-  void addDeployedJarToBackup(Path deployedJar) {
-    deployedJars.add(deployedJar);
+  void addDeployedJarToBackup(Path deployedJar, Path source) {
+    deployedJars.put(deployedJar, source);
   }
 
   void addDiskInitFile(DiskStore diskStore, Path diskInitFile) {
@@ -60,12 +60,12 @@ class BackupDefinition {
     return Collections.unmodifiableSet(configFiles);
   }
 
-  Set<Path> getUserFiles() {
-    return Collections.unmodifiableSet(userFiles);
+  Map<Path, Path> getUserFiles() {
+    return Collections.unmodifiableMap(userFiles);
   }
 
-  Set<Path> getDeployedJars() {
-    return Collections.unmodifiableSet(deployedJars);
+  Map<Path, Path> getDeployedJars() {
+    return Collections.unmodifiableMap(deployedJars);
   }
 
   Map<DiskStore, Path> getDiskInitFiles() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFileCopier.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFileCopier.java
index ce91ebf..e013e7e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFileCopier.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFileCopier.java
@@ -108,7 +108,7 @@ public class BackupFileCopier {
         } else {
           Files.copy(original.toPath(), destination, 
StandardCopyOption.COPY_ATTRIBUTES);
         }
-        backupDefinition.addUserFilesToBackup(destination);
+        backupDefinition.addUserFilesToBackup(destination, original.toPath());
         userFilesBackedUp.add(original);
       }
     }
@@ -131,7 +131,7 @@ public class BackupFileCopier {
         String sourceFileName = source.getName();
         Path destination = userDirectory.resolve(sourceFileName);
         Files.copy(source.toPath(), destination, 
StandardCopyOption.COPY_ATTRIBUTES);
-        backupDefinition.addDeployedJarToBackup(destination);
+        backupDefinition.addDeployedJarToBackup(destination, source.toPath());
         userJars.add(source);
       }
     } finally {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFilter.java
similarity index 70%
copy from 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
copy to 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFilter.java
index a84deea..11c6241 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupFilter.java
@@ -15,14 +15,10 @@
 package org.apache.geode.internal.cache.backup;
 
 import java.io.IOException;
+import java.nio.file.Path;
 
-public interface BackupDestination {
-  String USER_FILES_DIRECTORY = "user";
-  String DEPLOYED_JARS_DIRECTORY = "user";
-  String CONFIG_DIRECTORY = "config";
-  String BACKUP_DIR_PREFIX = "dir";
-  String README_FILE = "README_FILE.txt";
-  String DATA_STORES_DIRECTORY = "diskstores";
+import org.apache.geode.cache.DiskStore;
 
-  void backupFiles(BackupDefinition backupDefinition) throws IOException;
+interface BackupFilter {
+  boolean accept(DiskStore diskStore, Path path) throws IOException;
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java
index ef841af..6098f33 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java
@@ -33,9 +33,9 @@ import org.apache.geode.cache.persistence.PersistentID;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.MembershipListener;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.DiskStoreBackup;
 import org.apache.geode.internal.cache.DiskStoreImpl;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.Oplog;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThreadGroup;
 
@@ -79,7 +79,7 @@ public class BackupService {
       throw new IOException("Another backup already in progress");
     }
     taskFuture = executor.submit(() -> backupTask.backup());
-    return backupTask.awaitLockAcquisition();
+    return backupTask.getPreparedDiskStores();
   }
 
   public HashSet<PersistentID> doBackup() throws IOException {
@@ -123,6 +123,22 @@ public class BackupService {
     }
   }
 
+  public boolean deferDrfDelete(DiskStoreImpl diskStore, Oplog oplog) {
+    DiskStoreBackup diskStoreBackup = getBackupForDiskStore(diskStore);
+    if (diskStoreBackup != null) {
+      return diskStoreBackup.deferDrfDelete(oplog);
+    }
+    return false;
+  }
+
+  public boolean deferCrfDelete(DiskStoreImpl diskStore, Oplog oplog) {
+    DiskStoreBackup diskStoreBackup = getBackupForDiskStore(diskStore);
+    if (diskStoreBackup != null) {
+      return diskStoreBackup.deferCrfDelete(oplog);
+    }
+    return false;
+  }
+
   void cleanup() {
     
cache.getDistributionManager().removeAllMembershipListener(membershipListener);
     currentTask.set(null);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
index 41e3e90..27ae581 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
@@ -16,26 +16,20 @@ package org.apache.geode.internal.cache.backup;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.DiskStore;
 import org.apache.geode.cache.persistence.PersistentID;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.DirectoryHolder;
-import org.apache.geode.internal.cache.DiskStoreBackup;
 import org.apache.geode.internal.cache.DiskStoreImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.Oplog;
 import org.apache.geode.internal.logging.LogService;
@@ -46,12 +40,6 @@ import org.apache.geode.internal.logging.LogService;
 public class BackupTask {
   private static final Logger logger = LogService.getLogger();
 
-  static final String INCOMPLETE_BACKUP_FILE = "INCOMPLETE_BACKUP_FILE";
-
-  private static final String BACKUP_DIR_PREFIX = "dir";
-  private static final String DATA_STORES_DIRECTORY = "diskstores";
-  private static final String USER_FILES = "user";
-
   private final Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStore = new 
HashMap<>();
   private final RestoreScript restoreScript = new RestoreScript();
   private final InternalCache cache;
@@ -62,6 +50,7 @@ public class BackupTask {
   private final HashSet<PersistentID> diskStoresWithData = new HashSet<>();
   private final File targetDir;
   private final File baselineDir;
+  private final BackupWriter backupWriter;
 
   private volatile boolean isCancelled = false;
 
@@ -73,9 +62,23 @@ public class BackupTask {
     this.targetDir = targetDir;
     this.baselineDir = baselineDir;
     memberId = getCleanedMemberId();
+    backupWriter = createBackupWriter();
+  }
+
+  private BackupWriter createBackupWriter() {
+    BackupWriter writer;
+    Path backupDirectory = targetDir.toPath().resolve(memberId);
+    if (baselineDir == null) {
+      writer = new FileSystemBackupWriter(backupDirectory);
+    } else {
+      FileSystemIncrementalBackupLocation incrementalBaselineLocation =
+          new FileSystemIncrementalBackupLocation(baselineDir, memberId);
+      writer = new FileSystemBackupWriter(backupDirectory, 
incrementalBaselineLocation);
+    }
+    return writer;
   }
 
-  HashSet<PersistentID> awaitLockAcquisition() throws InterruptedException {
+  HashSet<PersistentID> getPreparedDiskStores() throws InterruptedException {
     locksAcquired.await();
     return diskStoresWithData;
   }
@@ -120,32 +123,22 @@ public class BackupTask {
     }
 
     try {
+      Collection<DiskStore> diskStores = 
cache.listDiskStoresIncludingRegionOwned();
       temporaryFiles = TemporaryBackupFiles.create();
       fileCopier = new BackupFileCopier(cache, temporaryFiles);
-      File memberBackupDir = new File(targetDir, memberId);
 
-      // Make sure our baseline is okay for this member, then create inspector 
for baseline backup
-      File checkedBaselineDir = checkBaseline(baselineDir);
-      BackupInspector inspector =
-          (checkedBaselineDir == null ? null : 
BackupInspector.createInspector(checkedBaselineDir));
-      File storesDir = new File(memberBackupDir, DATA_STORES_DIRECTORY);
-      Collection<DiskStore> diskStores = 
cache.listDiskStoresIncludingRegionOwned();
+      Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStores = 
startDiskStoreBackups(diskStores);
 
-      Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStores =
-          startDiskStoreBackups(inspector, storesDir, diskStores);
       allowDestroys.countDown();
+
       HashSet<PersistentID> persistentIds = 
finishDiskStoreBackups(backupByDiskStores);
 
       if (!backupByDiskStores.isEmpty()) {
-        // TODO: allow different strategies...
+        backupAdditionalFiles();
         BackupDefinition backupDefinition = fileCopier.getBackupDefinition();
-        backupAdditionalFiles(memberBackupDir);
         backupDefinition.setRestoreScript(restoreScript);
-        BackupDestination backupDestination =
-            new FileSystemBackupDestination(memberBackupDir.toPath());
-        backupDestination.backupFiles(backupDefinition);
+        backupWriter.backupFiles(backupDefinition);
       }
-
       return persistentIds;
     } finally {
       cleanup();
@@ -164,16 +157,15 @@ public class BackupTask {
     return persistentIds;
   }
 
-  private Map<DiskStoreImpl, DiskStoreBackup> 
startDiskStoreBackups(BackupInspector inspector,
-      File storesDir, Collection<DiskStore> diskStores) throws IOException {
+  private Map<DiskStoreImpl, DiskStoreBackup> startDiskStoreBackups(
+      Collection<DiskStore> diskStores) throws IOException {
     Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStore = new HashMap<>();
 
     for (DiskStore store : diskStores) {
       DiskStoreImpl diskStore = (DiskStoreImpl) store;
       try {
         if (diskStore.hasPersistedData()) {
-          File diskStoreDir = new File(storesDir, getBackupDirName(diskStore));
-          DiskStoreBackup backup = startDiskStoreBackup(diskStore, 
diskStoreDir, inspector);
+          DiskStoreBackup backup = startDiskStoreBackup(diskStore);
           backupByDiskStore.put(diskStore, backup);
         }
       } finally {
@@ -215,78 +207,10 @@ public class BackupTask {
     }
   }
 
-  /**
-   * Returns the memberId directory for this member in the baseline. The 
memberId may have changed
-   * if this member has been restarted since the last backup.
-   *
-   * @param baselineParentDir parent directory of last backup.
-   * @return null if the baseline for this member could not be located.
-   */
-  private File findBaselineForThisMember(File baselineParentDir) {
-    File baselineDir = null;
-
-    // Find the first matching DiskStoreId directory for this member.
-    for (DiskStore diskStore : cache.listDiskStoresIncludingRegionOwned()) {
-      File[] matchingFiles = baselineParentDir
-          .listFiles((file, name) -> 
name.endsWith(getBackupDirName((DiskStoreImpl) diskStore)));
-      // We found it? Good. Set this member's baseline to the backed up disk 
store's member dir (two
-      // levels up).
-      if (null != matchingFiles && matchingFiles.length > 0)
-        baselineDir = matchingFiles[0].getParentFile().getParentFile();
-    }
-    return baselineDir;
-  }
-
-  /**
-   * Performs a sanity check on the baseline directory for incremental 
backups. If a baseline
-   * directory exists for the member and there is no INCOMPLETE_BACKUP_FILE 
file then return the
-   * data stores directory for this member.
-   *
-   * @param baselineParentDir a previous backup directory. This is used with 
the incremental backup
-   *        option. May be null if the user specified a full backup.
-   * @return null if the backup is to be a full backup otherwise return the 
data store directory in
-   *         the previous backup for this member (if incremental).
-   */
-  private File checkBaseline(File baselineParentDir) {
-    File baselineDir = null;
-
-    if (null != baselineParentDir) {
-      // Start by looking for this memberId
-      baselineDir = new File(baselineParentDir, memberId);
-
-      if (!baselineDir.exists()) {
-        // hmmm, did this member have a restart?
-        // Determine which member dir might be a match for us
-        baselineDir = findBaselineForThisMember(baselineParentDir);
-      }
-
-      if (null != baselineDir) {
-        // check for existence of INCOMPLETE_BACKUP_FILE file
-        File incompleteBackup = new File(baselineDir, INCOMPLETE_BACKUP_FILE);
-        if (incompleteBackup.exists()) {
-          baselineDir = null;
-        }
-      }
-    }
-
-    return baselineDir;
-  }
-
-  private void backupAdditionalFiles(File backupDir) throws IOException {
+  private void backupAdditionalFiles() throws IOException {
     fileCopier.copyConfigFiles();
-
-    Set<File> userFiles = fileCopier.copyUserFiles();
-    File userBackupDir = new File(backupDir, USER_FILES);
-    for (File file : userFiles) {
-      File restoreScriptDestination = new File(userBackupDir, file.getName());
-      restoreScript.addUserFile(file, restoreScriptDestination);
-    }
-
-    Set<File> jars = fileCopier.copyDeployedJars();
-    for (File file : jars) {
-      File restoreScriptDestination = new File(userBackupDir, file.getName());
-      restoreScript.addFile(file, restoreScriptDestination);
-    }
+    fileCopier.copyUserFiles();
+    fileCopier.copyDeployedJars();
   }
 
   /**
@@ -318,33 +242,18 @@ public class BackupTask {
   }
 
   /**
-   * Returns the dir name used to back up this DiskStore's directories under. 
The name is a
-   * concatenation of the disk store name and id.
-   */
-  private String getBackupDirName(DiskStoreImpl diskStore) {
-    String name = diskStore.getName();
-
-    if (name == null) {
-      name = GemFireCacheImpl.getDefaultDiskStoreName();
-    }
-
-    return (name + "_" + diskStore.getDiskStoreID().toString());
-  }
-
-  /**
    * Start the backup process. This is the second step of the backup process. 
In this method, we
    * define the data we're backing up by copying the init file and rolling to 
the next file. After
    * this method returns operations can proceed as normal, except that we 
don't remove oplogs.
    */
-  private DiskStoreBackup startDiskStoreBackup(DiskStoreImpl diskStore, File 
targetDir,
-      BackupInspector baselineInspector) throws IOException {
+  private DiskStoreBackup startDiskStoreBackup(DiskStoreImpl diskStore) throws 
IOException {
     DiskStoreBackup backup = null;
     boolean done = false;
     try {
       for (;;) {
         Oplog childOplog = diskStore.getPersistentOplogSet().getChild();
         if (childOplog == null) {
-          backup = new DiskStoreBackup(new Oplog[0], targetDir);
+          backup = new DiskStoreBackup(new Oplog[0]);
           backupByDiskStore.put(diskStore, backup);
           break;
         }
@@ -367,24 +276,11 @@ public class BackupTask {
             logger.debug("snapshotting oplogs for disk store {}", 
diskStore.getName());
           }
 
-          addDiskStoreDirectoriesToRestoreScript(diskStore, targetDir);
-
           
restoreScript.addExistenceTest(diskStore.getDiskInitFile().getIFFile());
 
           // Contains all oplogs that will backed up
-
-          // Incremental backup so filter out oplogs that have already been
-          // backed up
-          Oplog[] allOplogs;
-          if (null != baselineInspector) {
-            allOplogs = filterBaselineOplogs(diskStore, baselineInspector);
-          } else {
-            allOplogs = diskStore.getAllOplogsForBackup();
-          }
-
-          // mark all oplogs as being backed up. This will
-          // prevent the oplogs from being deleted
-          backup = new DiskStoreBackup(allOplogs, targetDir);
+          Oplog[] allOplogs = diskStore.getAllOplogsForBackup();
+          backup = new DiskStoreBackup(allOplogs);
           backupByDiskStore.put(diskStore, backup);
 
           fileCopier.copyDiskInitFile(diskStore);
@@ -406,77 +302,6 @@ public class BackupTask {
     return backup;
   }
 
-  private void addDiskStoreDirectoriesToRestoreScript(DiskStoreImpl diskStore, 
File targetDir) {
-    DirectoryHolder[] directories = diskStore.getDirectoryHolders();
-    for (int i = 0; i < directories.length; i++) {
-      File backupDir = getBackupDirForCurrentMember(targetDir, i);
-      restoreScript.addFile(directories[i].getDir(), backupDir);
-    }
-  }
-
-  /**
-   * Filters and returns the current set of oplogs that aren't already in the 
baseline for
-   * incremental backup
-   *
-   * @param baselineInspector the inspector for the previous backup.
-   * @return an array of Oplogs to be copied for an incremental backup.
-   */
-  private Oplog[] filterBaselineOplogs(DiskStoreImpl diskStore, 
BackupInspector baselineInspector) {
-    File baselineDir = new File(baselineInspector.getBackupDir(), 
DATA_STORES_DIRECTORY);
-    baselineDir = new File(baselineDir, getBackupDirName(diskStore));
-
-    // Find all of the member's diskstore oplogs in the member's baseline
-    // diskstore directory structure (*.crf,*.krf,*.drf)
-    Collection<File> baselineOplogFiles =
-        FileUtils.listFiles(baselineDir, new String[] {"krf", "drf", "crf"}, 
true);
-    // Our list of oplogs to copy (those not already in the baseline)
-    List<Oplog> oplogList = new LinkedList<>();
-
-    // Total list of member oplogs
-    Oplog[] allOplogs = diskStore.getAllOplogsForBackup();
-
-    // Loop through operation logs and see if they are already part of the 
baseline backup.
-    for (Oplog log : allOplogs) {
-      // See if they are backed up in the current baseline
-      Map<File, File> oplogMap = log.mapBaseline(baselineOplogFiles);
-
-      // No? Then see if they were backed up in previous baselines
-      if (oplogMap.isEmpty() && baselineInspector.isIncremental()) {
-        oplogMap = addBaselineOplogToRestoreScript(baselineInspector, log);
-      }
-
-      if (oplogMap.isEmpty()) {
-        // These are fresh operation log files so lets back them up.
-        oplogList.add(log);
-      } else {
-        /*
-         * These have been backed up before so lets just add their entries 
from the previous backup
-         * or restore script into the current one.
-         */
-        restoreScript.addBaselineFiles(oplogMap);
-      }
-    }
-
-    // Convert the filtered oplog list to an array
-    return oplogList.toArray(new Oplog[oplogList.size()]);
-  }
-
-  private Map<File, File> addBaselineOplogToRestoreScript(BackupInspector 
baselineInspector,
-      Oplog log) {
-    Map<File, File> oplogMap = new HashMap<>();
-    Set<String> matchingOplogs =
-        
log.gatherMatchingOplogFiles(baselineInspector.getIncrementalOplogFileNames());
-    for (String matchingOplog : matchingOplogs) {
-      oplogMap.put(new 
File(baselineInspector.getCopyFromForOplogFile(matchingOplog)),
-          new File(baselineInspector.getCopyToForOplogFile(matchingOplog)));
-    }
-    return oplogMap;
-  }
-
-  private File getBackupDirForCurrentMember(File targetDir, int index) {
-    return new File(targetDir, BACKUP_DIR_PREFIX + index);
-  }
-
   private String getCleanedMemberId() {
     InternalDistributedMember memberId =
         cache.getInternalDistributedSystem().getDistributedMember();
@@ -491,4 +316,5 @@ public class BackupTask {
   DiskStoreBackup getBackupForDiskStore(DiskStoreImpl diskStore) {
     return backupByDiskStore.get(diskStore);
   }
+
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriter.java
similarity index 92%
copy from 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
copy to 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriter.java
index a84deea..328e83c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriter.java
@@ -16,7 +16,8 @@ package org.apache.geode.internal.cache.backup;
 
 import java.io.IOException;
 
-public interface BackupDestination {
+public interface BackupWriter {
+  String INCOMPLETE_BACKUP_FILE = "INCOMPLETE_BACKUP_FILE";
   String USER_FILES_DIRECTORY = "user";
   String DEPLOYED_JARS_DIRECTORY = "user";
   String CONFIG_DIRECTORY = "config";
@@ -25,4 +26,5 @@ public interface BackupDestination {
   String DATA_STORES_DIRECTORY = "diskstores";
 
   void backupFiles(BackupDefinition backupDefinition) throws IOException;
+
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/DiskStoreBackup.java
similarity index 91%
rename from 
geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java
rename to 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/DiskStoreBackup.java
index b4428e8..592f43b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/DiskStoreBackup.java
@@ -12,13 +12,14 @@
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
  */
-package org.apache.geode.internal.cache;
+package org.apache.geode.internal.cache.backup;
 
-import java.io.File;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.geode.internal.cache.Oplog;
+
 /**
  * This class manages the state of the backup of an individual disk store. It 
holds the list of
  * oplogs that still need to be backed up, along with the lists of oplog files 
that should be
@@ -29,11 +30,9 @@ public class DiskStoreBackup {
   private final Set<Oplog> pendingBackup;
   private final Set<Oplog> deferredCrfDeletes = new HashSet<>();
   private final Set<Oplog> deferredDrfDeletes = new HashSet<>();
-  private final File targetDir;
 
-  public DiskStoreBackup(Oplog[] allOplogs, File targetDir) {
+  public DiskStoreBackup(Oplog[] allOplogs) {
     this.pendingBackup = new HashSet<>(Arrays.asList(allOplogs));
-    this.targetDir = targetDir;
   }
 
   /**
@@ -80,10 +79,6 @@ public class DiskStoreBackup {
     }
   }
 
-  public File getTargetDir() {
-    return targetDir;
-  }
-
   public synchronized void cleanup() {
     for (Oplog oplog : getPendingBackup()) {
       backupFinished(oplog);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupDestination.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupDestination.java
deleted file mode 100644
index a845766..0000000
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupDestination.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.backup;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.commons.io.FileUtils;
-
-import org.apache.geode.cache.DiskStore;
-import org.apache.geode.internal.cache.DiskStoreImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-
-public class FileSystemBackupDestination implements BackupDestination {
-  static final String INCOMPLETE_BACKUP_FILE = "INCOMPLETE_BACKUP_FILE";
-
-  private final Path backupDir;
-
-  FileSystemBackupDestination(Path backupDir) {
-    this.backupDir = backupDir;
-  }
-
-  @Override
-  public void backupFiles(BackupDefinition backupDefinition) throws 
IOException {
-    Files.createDirectories(backupDir);
-    Files.createFile(backupDir.resolve(INCOMPLETE_BACKUP_FILE));
-    backupAllFilesets(backupDefinition);
-    Files.delete(backupDir.resolve(INCOMPLETE_BACKUP_FILE));
-  }
-
-  private void backupAllFilesets(BackupDefinition backupDefinition) throws 
IOException {
-    backupUserFiles(backupDefinition.getUserFiles());
-    backupDeployedJars(backupDefinition.getDeployedJars());
-    backupConfigFiles(backupDefinition.getConfigFiles());
-    backupOplogs(backupDefinition.getOplogFilesByDiskStore());
-    backupDiskInitFiles(backupDefinition.getDiskInitFiles());
-    RestoreScript script = backupDefinition.getRestoreScript();
-    if (script != null) {
-      File scriptFile = script.generate(backupDir.toFile());
-      backupRestoreScript(scriptFile.toPath());
-    }
-    writeReadMe();
-  }
-
-  private void writeReadMe() throws IOException {
-    String text = LocalizedStrings.BackupService_README.toLocalizedString();
-    Files.write(backupDir.resolve(README_FILE), text.getBytes());
-  }
-
-  private void backupRestoreScript(Path restoreScriptFile) throws IOException {
-    Files.copy(restoreScriptFile, 
backupDir.resolve(restoreScriptFile.getFileName()));
-  }
-
-  private void backupDiskInitFiles(Map<DiskStore, Path> diskInitFiles) throws 
IOException {
-    for (Map.Entry<DiskStore, Path> entry : diskInitFiles.entrySet()) {
-      Path destinationDirectory = getOplogBackupDir(entry.getKey(),
-          ((DiskStoreImpl) entry.getKey()).getInforFileDirIndex());
-      Files.createDirectories(destinationDirectory);
-      Files.copy(entry.getValue(), 
destinationDirectory.resolve(entry.getValue().getFileName()),
-          StandardCopyOption.COPY_ATTRIBUTES);
-    }
-  }
-
-  private void backupUserFiles(Collection<Path> userFiles) throws IOException {
-    Path userDirectory = backupDir.resolve(USER_FILES_DIRECTORY);
-    Files.createDirectories(userDirectory);
-    moveFilesOrDirectories(userFiles, userDirectory);
-  }
-
-  private void backupDeployedJars(Collection<Path> jarFiles) throws 
IOException {
-    Path jarsDirectory = backupDir.resolve(DEPLOYED_JARS_DIRECTORY);
-    Files.createDirectories(jarsDirectory);
-    moveFilesOrDirectories(jarFiles, jarsDirectory);
-  }
-
-  private void backupConfigFiles(Collection<Path> configFiles) throws 
IOException {
-    Path configDirectory = backupDir.resolve(CONFIG_DIRECTORY);
-    Files.createDirectories(configDirectory);
-    moveFilesOrDirectories(configFiles, configDirectory);
-  }
-
-  private void backupOplogs(Map<DiskStore, Collection<Path>> oplogFiles) 
throws IOException {
-    for (Map.Entry<DiskStore, Collection<Path>> entry : oplogFiles.entrySet()) 
{
-      for (Path path : entry.getValue()) {
-        int index = ((DiskStoreImpl) entry.getKey()).getInforFileDirIndex();
-        Path backupDir = createOplogBackupDir(entry.getKey(), index);
-        backupOplog(backupDir, path);
-      }
-    }
-  }
-
-  private Path getOplogBackupDir(DiskStore diskStore, int index) {
-    String name = diskStore.getName();
-    if (name == null) {
-      name = GemFireCacheImpl.getDefaultDiskStoreName();
-    }
-    name = name + "_" + ((DiskStoreImpl) 
diskStore).getDiskStoreID().toString();
-    return backupDir.resolve(DATA_STORES_DIRECTORY).resolve(name)
-        .resolve(BACKUP_DIR_PREFIX + index);
-  }
-
-  private Path createOplogBackupDir(DiskStore diskStore, int index) throws 
IOException {
-    Path oplogBackupDir = getOplogBackupDir(diskStore, index);
-    Files.createDirectories(oplogBackupDir);
-    return oplogBackupDir;
-  }
-
-  private void backupOplog(Path targetDir, Path path) throws IOException {
-    backupFile(targetDir, path.toFile());
-  }
-
-  private void backupFile(Path targetDir, File file) throws IOException {
-    Files.move(file.toPath(), targetDir.resolve(file.getName()));
-  }
-
-  private void moveFilesOrDirectories(Collection<Path> paths, Path 
targetDirectory)
-      throws IOException {
-    for (Path userFile : paths) {
-      Path destination = targetDirectory.resolve(userFile.getFileName());
-      if (Files.isDirectory(userFile)) {
-        FileUtils.moveDirectory(userFile.toFile(), destination.toFile());
-      } else {
-        Files.move(userFile, destination);
-      }
-    }
-  }
-}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriter.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriter.java
new file mode 100644
index 0000000..1698fc3
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriter.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.internal.cache.DirectoryHolder;
+import org.apache.geode.internal.cache.DiskStoreImpl;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.i18n.LocalizedStrings;
+
+public class FileSystemBackupWriter implements BackupWriter {
+
+  private final Path backupDirectory;
+  private final FileSystemIncrementalBackupLocation 
incrementalBaselineLocation;
+  private final BackupFilter filter;
+
+  FileSystemBackupWriter(Path backupDirectory) {
+    this(backupDirectory, null);
+  }
+
+  FileSystemBackupWriter(Path backupDirectory,
+      FileSystemIncrementalBackupLocation incrementalBaselineLocation) {
+    this.backupDirectory = backupDirectory;
+    this.incrementalBaselineLocation = incrementalBaselineLocation;
+    filter = createBackupFilter(incrementalBaselineLocation);
+  }
+
+  private BackupFilter createBackupFilter(
+      FileSystemIncrementalBackupLocation incrementalBaselineLocation) {
+    BackupFilter filter;
+    if (incrementalBaselineLocation != null
+        && 
Files.exists(incrementalBaselineLocation.getMemberBackupLocationDir())) {
+      filter = new IncrementalBackupFilter(incrementalBaselineLocation);
+    } else {
+      filter = (store, path) -> true;
+    }
+    return filter;
+  }
+
+  @Override
+  public void backupFiles(BackupDefinition backupDefinition) throws 
IOException {
+    Files.createDirectories(backupDirectory);
+    Files.createFile(backupDirectory.resolve(INCOMPLETE_BACKUP_FILE));
+    backupAllFilesets(backupDefinition);
+    Files.delete(backupDirectory.resolve(INCOMPLETE_BACKUP_FILE));
+  }
+
+  private void backupAllFilesets(BackupDefinition backupDefinition) throws 
IOException {
+    RestoreScript restoreScript = backupDefinition.getRestoreScript();
+    backupDiskInitFiles(backupDefinition.getDiskInitFiles());
+    backupOplogs(backupDefinition.getOplogFilesByDiskStore(), restoreScript);
+    backupConfigFiles(backupDefinition.getConfigFiles());
+    backupUserFiles(backupDefinition.getUserFiles(), restoreScript);
+    backupDeployedJars(backupDefinition.getDeployedJars(), restoreScript);
+    File scriptFile = restoreScript.generate(backupDirectory.toFile());
+    backupRestoreScript(scriptFile.toPath());
+    writeReadMe();
+  }
+
+  private void writeReadMe() throws IOException {
+    String text = LocalizedStrings.BackupService_README.toLocalizedString();
+    Files.write(backupDirectory.resolve(README_FILE), text.getBytes());
+  }
+
+  private void backupRestoreScript(Path restoreScriptFile) throws IOException {
+    Files.copy(restoreScriptFile, 
backupDirectory.resolve(restoreScriptFile.getFileName()));
+  }
+
+  private void backupDiskInitFiles(Map<DiskStore, Path> diskInitFiles) throws 
IOException {
+    for (Map.Entry<DiskStore, Path> entry : diskInitFiles.entrySet()) {
+      Path destinationDirectory = getOplogBackupDir(entry.getKey(),
+          ((DiskStoreImpl) entry.getKey()).getInforFileDirIndex());
+      Files.createDirectories(destinationDirectory);
+      Files.copy(entry.getValue(), 
destinationDirectory.resolve(entry.getValue().getFileName()),
+          StandardCopyOption.COPY_ATTRIBUTES);
+    }
+  }
+
+  private void backupUserFiles(Map<Path, Path> userFiles, RestoreScript 
restoreScript)
+      throws IOException {
+    Path userDirectory = backupDirectory.resolve(USER_FILES_DIRECTORY);
+    Files.createDirectories(userDirectory);
+
+    for (Map.Entry<Path, Path> userFileEntry : userFiles.entrySet()) {
+      Path userFile = userFileEntry.getKey();
+      Path originalFile = userFileEntry.getValue();
+
+      Path destination = userDirectory.resolve(userFile.getFileName());
+      moveFileOrDirectory(userFile, destination);
+      restoreScript.addUserFile(originalFile.toFile(), destination.toFile());
+    }
+  }
+
+  private void backupDeployedJars(Map<Path, Path> jarFiles, RestoreScript 
restoreScript)
+      throws IOException {
+    Path jarsDirectory = backupDirectory.resolve(DEPLOYED_JARS_DIRECTORY);
+    Files.createDirectories(jarsDirectory);
+
+    for (Map.Entry<Path, Path> jarFileEntry : jarFiles.entrySet()) {
+      Path jarFile = jarFileEntry.getKey();
+      Path originalFile = jarFileEntry.getValue();
+
+      Path destination = jarsDirectory.resolve(jarFile.getFileName());
+      moveFileOrDirectory(jarFile, destination);
+      restoreScript.addFile(originalFile.toFile(), destination.toFile());
+    }
+  }
+
+  private void backupConfigFiles(Collection<Path> configFiles) throws 
IOException {
+    Path configDirectory = backupDirectory.resolve(CONFIG_DIRECTORY);
+    Files.createDirectories(configDirectory);
+    moveFilesOrDirectories(configFiles, configDirectory);
+  }
+
+  private void backupOplogs(Map<DiskStore, Collection<Path>> oplogFiles,
+      RestoreScript restoreScript) throws IOException {
+    File storesDir = new File(backupDirectory.toFile(), DATA_STORES_DIRECTORY);
+    for (Map.Entry<DiskStore, Collection<Path>> entry : oplogFiles.entrySet()) 
{
+      DiskStoreImpl diskStore = ((DiskStoreImpl) entry.getKey());
+      boolean diskstoreHasFilesInBackup = false;
+      for (Path path : entry.getValue()) {
+        if (filter.accept(diskStore, path)) {
+          diskstoreHasFilesInBackup = true;
+          int index = diskStore.getInforFileDirIndex();
+          Path backupDir = createOplogBackupDir(diskStore, index);
+          backupOplog(backupDir, path);
+        } else {
+          Map<String, File> baselineOplogMap =
+              incrementalBaselineLocation.getBackedUpOplogs(diskStore);
+          
restoreScript.addBaselineFile(baselineOplogMap.get(path.getFileName().toString()),
+              new File(path.toAbsolutePath().getParent().getParent().toFile(),
+                  path.getFileName().toString()));
+        }
+      }
+      if (diskstoreHasFilesInBackup) {
+        addDiskStoreDirectoriesToRestoreScript((DiskStoreImpl) entry.getKey(),
+            getBaseBackupDirectory().toFile(), restoreScript);
+      }
+      File targetStoresDir = new File(storesDir, getBackupDirName(diskStore));
+      addDiskStoreDirectoriesToRestoreScript(diskStore, targetStoresDir, 
restoreScript);
+
+    }
+  }
+
+  private Path getOplogBackupDir(DiskStore diskStore, int index) {
+    String name = diskStore.getName();
+    if (name == null) {
+      name = GemFireCacheImpl.getDefaultDiskStoreName();
+    }
+    name = name + "_" + ((DiskStoreImpl) 
diskStore).getDiskStoreID().toString();
+    return this.backupDirectory.resolve(DATA_STORES_DIRECTORY).resolve(name)
+        .resolve(BACKUP_DIR_PREFIX + index);
+  }
+
+  private Path createOplogBackupDir(DiskStore diskStore, int index) throws 
IOException {
+    Path oplogBackupDir = getOplogBackupDir(diskStore, index);
+    Files.createDirectories(oplogBackupDir);
+    return oplogBackupDir;
+  }
+
+  /**
+   * Returns the dir name used to back up this DiskStore's directories under. 
The name is a
+   * concatenation of the disk store name and id.
+   */
+  private String getBackupDirName(DiskStoreImpl diskStore) {
+    String name = diskStore.getName();
+
+    if (name == null) {
+      name = GemFireCacheImpl.getDefaultDiskStoreName();
+    }
+
+    return (name + "_" + diskStore.getDiskStoreID().toString());
+  }
+
+  private void backupOplog(Path targetDir, Path path) throws IOException {
+    backupFile(targetDir, path.toFile());
+  }
+
+  private void backupFile(Path targetDir, File file) throws IOException {
+    Files.move(file.toPath(), targetDir.resolve(file.getName()));
+  }
+
+  private void moveFilesOrDirectories(Collection<Path> paths, Path 
targetDirectory)
+      throws IOException {
+    for (Path userFile : paths) {
+      Path destination = targetDirectory.resolve(userFile.getFileName());
+      moveFileOrDirectory(userFile, destination);
+    }
+  }
+
+  private void moveFileOrDirectory(Path userFile, Path destination) throws 
IOException {
+    if (Files.isDirectory(userFile)) {
+      FileUtils.moveDirectory(userFile.toFile(), destination.toFile());
+    } else {
+      Files.move(userFile, destination);
+    }
+  }
+
+  private void addDiskStoreDirectoriesToRestoreScript(DiskStoreImpl diskStore, 
File targetDir,
+      RestoreScript restoreScript) {
+    DirectoryHolder[] directories = diskStore.getDirectoryHolders();
+    for (int i = 0; i < directories.length; i++) {
+      File backupDir = getBackupDirForCurrentMember(targetDir, i);
+      restoreScript.addFile(directories[i].getDir(), backupDir);
+    }
+  }
+
+  private File getBackupDirForCurrentMember(File targetDir, int index) {
+    return new File(targetDir, BACKUP_DIR_PREFIX + index);
+  }
+
+  private Path getBaseBackupDirectory() {
+    return backupDirectory.getParent();
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocation.java
new file mode 100644
index 0000000..55f7009
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocation.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.internal.cache.DiskStoreImpl;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.util.TransformUtils;
+
+public class FileSystemIncrementalBackupLocation implements 
IncrementalBackupLocation {
+
+  private static final String INCOMPLETE_BACKUP_FILE = 
"INCOMPLETE_BACKUP_FILE";
+
+  private final Path memberBackupLocationDir;
+
+  FileSystemIncrementalBackupLocation(File backupLocationDir, String memberId) 
{
+    this.memberBackupLocationDir = new File(backupLocationDir, 
memberId).toPath();
+  }
+
+  Path getMemberBackupLocationDir() {
+    return memberBackupLocationDir;
+  }
+
+  @Override
+  public Map<String, File> getBackedUpOplogs(DiskStore diskStore) throws 
IOException {
+    File checkedBaselineDir = checkBaseline(diskStore);
+    if (checkedBaselineDir == null) {
+      return Collections.emptyMap();
+    }
+    Collection<File> baselineOplogFiles = 
getBackedUpOplogs(checkedBaselineDir, diskStore);
+    baselineOplogFiles.addAll(getPreviouslyBackedUpOpLogs(checkedBaselineDir));
+
+    // Map of baseline oplog file name to oplog file
+    return TransformUtils.transformAndMap(baselineOplogFiles, 
TransformUtils.fileNameTransformer);
+  }
+
+  Collection<File> getBackedUpOplogs(File checkedBaselineDir, DiskStore 
diskStore) {
+    File baselineDir = new File(checkedBaselineDir, 
BackupWriter.DATA_STORES_DIRECTORY);
+    baselineDir = new File(baselineDir, getBackupDirName((DiskStoreImpl) 
diskStore));
+    return FileUtils.listFiles(baselineDir, new String[] {"krf", "drf", 
"crf"}, true);
+  }
+
+  Collection<File> getPreviouslyBackedUpOpLogs(File checkedBaselineDir) throws 
IOException {
+    BackupInspector inspector = createBackupInspector(checkedBaselineDir);
+    HashSet<File> oplogs = new HashSet<>();
+    if (inspector.isIncremental() && inspector.getIncrementalOplogFileNames() 
!= null) {
+      inspector.getIncrementalOplogFileNames().forEach((oplog) -> {
+        oplog = inspector.getCopyFromForOplogFile(oplog);
+        oplogs.add(new File(oplog));
+      });
+    }
+    return oplogs;
+  }
+
+  BackupInspector createBackupInspector(File checkedBaselineDir) throws 
IOException {
+    return BackupInspector.createInspector(checkedBaselineDir);
+  }
+
+  /**
+   * Performs a sanity check on the baseline directory for incremental 
backups. If a baseline
+   * directory exists for the member and there is no INCOMPLETE_BACKUP_FILE 
file then return the
+   * data stores directory for this member.
+   */
+  private File checkBaseline(DiskStore diskStore) {
+    File baselineDir = memberBackupLocationDir.toFile();
+
+    if (!baselineDir.exists()) {
+      // hmmm, did this member have a restart?
+      // Determine which member dir might be a match for us
+      baselineDir = 
findBaselineForThisMember(memberBackupLocationDir.getParent(), diskStore);
+    }
+
+    if (null != baselineDir) {
+      // check for existence of INCOMPLETE_BACKUP_FILE file
+      File incompleteBackup = new File(baselineDir, INCOMPLETE_BACKUP_FILE);
+      if (incompleteBackup.exists()) {
+        baselineDir = null;
+      }
+    }
+    return baselineDir;
+  }
+
+  File findBaselineForThisMember(Path baselineParentDir, DiskStore diskStore) {
+    File baselineDir = null;
+
+    // Find the first matching DiskStoreId directory for this member.
+    File[] matchingFiles = baselineParentDir.toFile()
+        .listFiles((file, name) -> 
name.endsWith(getBackupDirName((DiskStoreImpl) diskStore)));
+    // We found it? Good. Set this member's baseline to the backed up disk 
store's member dir (two
+    // levels up).
+    if (null != matchingFiles && matchingFiles.length > 0) {
+      baselineDir = matchingFiles[0].getParentFile().getParentFile();
+    }
+
+    return baselineDir;
+  }
+
+  /**
+   * Returns the dir name used to back up this DiskStore's directories under. 
The name is a
+   * concatenation of the disk store name and id.
+   */
+  String getBackupDirName(DiskStoreImpl diskStore) {
+    String name = diskStore.getName();
+    if (name == null) {
+      name = GemFireCacheImpl.getDefaultDiskStoreName();
+    }
+    return (name + "_" + diskStore.getDiskStoreID().toString());
+  }
+
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/IncrementalBackupFilter.java
similarity index 56%
copy from 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
copy to 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/IncrementalBackupFilter.java
index a84deea..6db2641 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/IncrementalBackupFilter.java
@@ -14,15 +14,24 @@
  */
 package org.apache.geode.internal.cache.backup;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Map;
 
-public interface BackupDestination {
-  String USER_FILES_DIRECTORY = "user";
-  String DEPLOYED_JARS_DIRECTORY = "user";
-  String CONFIG_DIRECTORY = "config";
-  String BACKUP_DIR_PREFIX = "dir";
-  String README_FILE = "README_FILE.txt";
-  String DATA_STORES_DIRECTORY = "diskstores";
+import org.apache.geode.cache.DiskStore;
 
-  void backupFiles(BackupDefinition backupDefinition) throws IOException;
+public class IncrementalBackupFilter implements BackupFilter {
+
+  private final IncrementalBackupLocation incrementalBackupLocation;
+
+  IncrementalBackupFilter(IncrementalBackupLocation incrementalBackupLocation) 
{
+    this.incrementalBackupLocation = incrementalBackupLocation;
+  }
+
+  @Override
+  public boolean accept(DiskStore diskStore, Path path) throws IOException {
+    Map<String, File> baselineOplogMap = 
incrementalBackupLocation.getBackedUpOplogs(diskStore);
+    return !baselineOplogMap.containsKey(path.getFileName().toString());
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/IncrementalBackupLocation.java
similarity index 70%
rename from 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
rename to 
geode-core/src/main/java/org/apache/geode/internal/cache/backup/IncrementalBackupLocation.java
index a84deea..dbfc3fb 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupDestination.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/IncrementalBackupLocation.java
@@ -14,15 +14,12 @@
  */
 package org.apache.geode.internal.cache.backup;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 
-public interface BackupDestination {
-  String USER_FILES_DIRECTORY = "user";
-  String DEPLOYED_JARS_DIRECTORY = "user";
-  String CONFIG_DIRECTORY = "config";
-  String BACKUP_DIR_PREFIX = "dir";
-  String README_FILE = "README_FILE.txt";
-  String DATA_STORES_DIRECTORY = "diskstores";
+import org.apache.geode.cache.DiskStore;
 
-  void backupFiles(BackupDefinition backupDefinition) throws IOException;
+public interface IncrementalBackupLocation {
+  Map<String, File> getBackedUpOplogs(DiskStore diskStore) throws IOException;
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/RestoreScript.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/RestoreScript.java
index a166e93..309dca9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/RestoreScript.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/RestoreScript.java
@@ -69,6 +69,10 @@ public class RestoreScript {
     this.baselineFiles.putAll(baselineFiles);
   }
 
+  public void addBaselineFile(File baseline, File absoluteFile) {
+    this.baselineFiles.put(baseline, absoluteFile);
+  }
+
   public void addFile(final File originalFile, final File backupFile) {
     backedUpFiles.put(backupFile, originalFile.getAbsoluteFile());
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDefinitionTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDefinitionTest.java
index 5ca9a2b..5c93588 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDefinitionTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupDefinitionTest.java
@@ -48,9 +48,9 @@ public class BackupDefinitionTest {
     Path cannotBeAdded = Paths.get("");
     assertThatThrownBy(() -> 
backupDefinition.getConfigFiles().add(cannotBeAdded))
         .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> 
backupDefinition.getDeployedJars().add(cannotBeAdded))
+    assertThatThrownBy(() -> 
backupDefinition.getDeployedJars().put(cannotBeAdded, cannotBeAdded))
         .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> 
backupDefinition.getUserFiles().add(cannotBeAdded))
+    assertThatThrownBy(() -> 
backupDefinition.getUserFiles().put(cannotBeAdded, cannotBeAdded))
         .isInstanceOf(UnsupportedOperationException.class);
     assertThatThrownBy(() -> 
backupDefinition.getOplogFilesByDiskStore().put(mock(DiskStore.class),
         
Collections.emptySet())).isInstanceOf(UnsupportedOperationException.class);
@@ -68,22 +68,27 @@ public class BackupDefinitionTest {
     assertThat(backupDefinition.getConfigFiles()).containsOnly(config1, 
config2);
   }
 
+
   @Test
   public void containsDeployedJarFilesAdded() {
     Path jar1 = Paths.get("jar1");
     Path jar2 = Paths.get("jar2");
-    backupDefinition.addDeployedJarToBackup(jar1);
-    backupDefinition.addDeployedJarToBackup(jar2);
-    assertThat(backupDefinition.getDeployedJars()).containsOnly(jar1, jar2);
+    Path source = Paths.get("source");
+    backupDefinition.addDeployedJarToBackup(jar1, source);
+    backupDefinition.addDeployedJarToBackup(jar2, source);
+    assertThat(backupDefinition.getDeployedJars().keySet()).containsOnly(jar1, 
jar2);
+    
assertThat(backupDefinition.getDeployedJars().values()).containsOnly(source, 
source);
   }
 
   @Test
   public void containsUserFilesAdded() {
     Path userFile1 = Paths.get("userFile1");
     Path userFile2 = Paths.get("userFile2");
-    backupDefinition.addUserFilesToBackup(userFile1);
-    backupDefinition.addUserFilesToBackup(userFile2);
-    assertThat(backupDefinition.getUserFiles()).containsOnly(userFile1, 
userFile2);
+    Path source = Paths.get("source");
+    backupDefinition.addUserFilesToBackup(userFile1, source);
+    backupDefinition.addUserFilesToBackup(userFile2, source);
+    
assertThat(backupDefinition.getUserFiles().keySet()).containsOnly(userFile1, 
userFile2);
+    assertThat(backupDefinition.getUserFiles().values()).containsOnly(source, 
source);
   }
 
   @Test
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupFileCopierIntegrationTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupFileCopierIntegrationTest.java
index eb87070..35fb6ac 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupFileCopierIntegrationTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupFileCopierIntegrationTest.java
@@ -152,7 +152,8 @@ public class BackupFileCopierIntegrationTest {
 
     Path expectedJar = 
tempFilesLocation.resolve(USER_FILES).resolve("myJar.jar");
     assertThat(expectedJar).exists();
-    
assertThat(fileCopier.getBackupDefinition().getDeployedJars()).containsExactly(expectedJar);
+    assertThat(fileCopier.getBackupDefinition().getDeployedJars().keySet())
+        .containsExactly(expectedJar);
   }
 
   @Test
@@ -179,7 +180,7 @@ public class BackupFileCopierIntegrationTest {
     Path expectedUserSubdir = 
tempFilesLocation.resolve(USER_FILES).resolve("userSubfolder");
     assertThat(expectedUserFile).exists();
     assertThat(expectedUserSubdir).exists();
-    assertThat(fileCopier.getBackupDefinition().getUserFiles())
+    assertThat(fileCopier.getBackupDefinition().getUserFiles().keySet())
         .containsExactlyInAnyOrder(expectedUserFile, expectedUserSubdir);
   }
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java
index 2ad0107..5fa10f9 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java
@@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -73,7 +74,7 @@ public class BackupServiceTest {
   public void startBackupThrowsExceptionWhenAnotherBackupInProgress() throws 
Exception {
     BackupTask backupTask = mock(BackupTask.class);
     backupService.currentTask.set(backupTask);
-    assertThatThrownBy(() -> backupService.prepareBackup(sender, null, null))
+    assertThatThrownBy(() -> backupService.prepareBackup(sender, new File(""), 
null))
         .isInstanceOf(IOException.class);
   }
 
@@ -84,7 +85,7 @@ public class BackupServiceTest {
 
   @Test
   public void prepareBackupReturnsEmptyPersistentIdsWhenBackupNotInProgress() 
throws Exception {
-    assertThat(backupService.prepareBackup(sender, null, 
null).size()).isEqualTo(0);
+    assertThat(backupService.prepareBackup(sender, new File(""), 
null).size()).isEqualTo(0);
   }
 
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemBackupDestinationTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriterTest.java
similarity index 73%
rename from 
geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemBackupDestinationTest.java
rename to 
geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriterTest.java
index a969a05..7523007 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemBackupDestinationTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriterTest.java
@@ -14,14 +14,15 @@
  */
 package org.apache.geode.internal.cache.backup;
 
-import static 
org.apache.geode.internal.cache.backup.BackupDestination.CONFIG_DIRECTORY;
-import static 
org.apache.geode.internal.cache.backup.BackupDestination.DATA_STORES_DIRECTORY;
-import static 
org.apache.geode.internal.cache.backup.BackupDestination.DEPLOYED_JARS_DIRECTORY;
-import static 
org.apache.geode.internal.cache.backup.BackupDestination.README_FILE;
-import static 
org.apache.geode.internal.cache.backup.BackupDestination.USER_FILES_DIRECTORY;
-import static 
org.apache.geode.internal.cache.backup.FileSystemBackupDestination.INCOMPLETE_BACKUP_FILE;
+import static 
org.apache.geode.internal.cache.backup.FileSystemBackupWriter.CONFIG_DIRECTORY;
+import static 
org.apache.geode.internal.cache.backup.FileSystemBackupWriter.DATA_STORES_DIRECTORY;
+import static 
org.apache.geode.internal.cache.backup.FileSystemBackupWriter.DEPLOYED_JARS_DIRECTORY;
+import static 
org.apache.geode.internal.cache.backup.FileSystemBackupWriter.INCOMPLETE_BACKUP_FILE;
+import static 
org.apache.geode.internal.cache.backup.FileSystemBackupWriter.README_FILE;
+import static 
org.apache.geode.internal.cache.backup.FileSystemBackupWriter.USER_FILES_DIRECTORY;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -41,6 +42,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
+import org.apache.geode.internal.cache.DirectoryHolder;
 import org.apache.geode.internal.cache.DiskStoreImpl;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.Oplog;
@@ -49,7 +51,7 @@ import org.apache.geode.test.junit.categories.IntegrationTest;
 
 @Category(IntegrationTest.class)
 @RunWith(JUnitParamsRunner.class)
-public class FileSystemBackupDestinationTest {
+public class FileSystemBackupWriterTest {
   private static final Path RELATIVE_TARGET_DIR = Paths.get("backupTest");
 
   @Rule
@@ -57,14 +59,19 @@ public class FileSystemBackupDestinationTest {
 
   private BackupDefinition backupDefinition;
   private Path targetDir;
+  private Path sourceDir;
   private RestoreScript restoreScript;
+  String memberId = "member1";
 
   @Before
   public void setup() throws IOException {
     backupDefinition = new BackupDefinition();
     Path backupDirectory = tempDir.newFolder("backups").toPath();
     targetDir = backupDirectory.resolve("backupTarget");
+    sourceDir = backupDirectory.resolve("backupSource");
     restoreScript = mock(RestoreScript.class);
+    doNothing().when(restoreScript).addUserFile(any(), any());
+    doNothing().when(restoreScript).addFile(any(), any());
     when(restoreScript.generate(any())).thenReturn(tempDir.newFile());
   }
 
@@ -82,12 +89,13 @@ public class FileSystemBackupDestinationTest {
     Path userFile = tempDir.newFile("userFile").toPath();
     Path userSubdir = tempDir.newFolder("userSubDir").toPath();
     Path userFileInDir = Files.write(userSubdir.resolve("fileInDir"), new 
byte[] {});
-    backupDefinition.addUserFilesToBackup(userFile);
-    backupDefinition.addUserFilesToBackup(userSubdir);
+    backupDefinition.addUserFilesToBackup(userFile, sourceDir);
+    backupDefinition.addUserFilesToBackup(userSubdir, sourceDir);
+    backupDefinition.setRestoreScript(restoreScript);
 
     executeBackup(useRelativePath);
 
-    Path userDir = getTargetDir(useRelativePath).resolve(USER_FILES_DIRECTORY);
+    Path userDir = 
getTargetMemberDir(useRelativePath).resolve(USER_FILES_DIRECTORY);
     assertThat(userDir.resolve(userFile.getFileName())).exists();
     assertThat(userDir.resolve(userSubdir.getFileName())).exists();
     
assertThat(userDir.resolve(userSubdir.getFileName()).resolve(userFileInDir.getFileName()))
@@ -100,12 +108,13 @@ public class FileSystemBackupDestinationTest {
     Path jarFile = tempDir.newFile("jarFile").toPath();
     Path jarSubdir = tempDir.newFolder("jarSubdir").toPath();
     Path jarInSubdir = Files.write(jarSubdir.resolve("jarInSubdir"), new 
byte[] {});
-    backupDefinition.addDeployedJarToBackup(jarFile);
-    backupDefinition.addDeployedJarToBackup(jarSubdir);
+    backupDefinition.addDeployedJarToBackup(jarFile, sourceDir);
+    backupDefinition.addDeployedJarToBackup(jarSubdir, sourceDir);
+    backupDefinition.setRestoreScript(restoreScript);
 
     executeBackup(useRelativePath);
 
-    Path userDir = 
getTargetDir(useRelativePath).resolve(DEPLOYED_JARS_DIRECTORY);
+    Path userDir = 
getTargetMemberDir(useRelativePath).resolve(DEPLOYED_JARS_DIRECTORY);
     assertThat(userDir.resolve(jarFile.getFileName())).exists();
     assertThat(userDir.resolve(jarSubdir.getFileName())).exists();
     
assertThat(userDir.resolve(jarSubdir.getFileName()).resolve(jarInSubdir.getFileName()))
@@ -119,10 +128,11 @@ public class FileSystemBackupDestinationTest {
     Path propertyFile = tempDir.newFile("properties").toPath();
     backupDefinition.addConfigFileToBackup(cacheXml);
     backupDefinition.addConfigFileToBackup(propertyFile);
+    backupDefinition.setRestoreScript(restoreScript);
 
     executeBackup(useRelativePath);
 
-    Path configDir = getTargetDir(useRelativePath).resolve(CONFIG_DIRECTORY);
+    Path configDir = 
getTargetMemberDir(useRelativePath).resolve(CONFIG_DIRECTORY);
     assertThat(configDir.resolve(cacheXml.getFileName())).exists();
     assertThat(configDir.resolve(propertyFile.getFileName())).exists();
   }
@@ -137,14 +147,17 @@ public class FileSystemBackupDestinationTest {
     when(oplog.getDrfFile()).thenReturn(tempDir.newFile("drf"));
     when(oplog.getKrfFile()).thenReturn(tempDir.newFile("krf"));
     when(diskStore.getInforFileDirIndex()).thenReturn(1);
+    DirectoryHolder[] directoryHolders = new DirectoryHolder[0];
+    when(diskStore.getDirectoryHolders()).thenReturn(directoryHolders);
 
     backupDefinition.addOplogFileToBackup(diskStore, 
oplog.getCrfFile().toPath());
     backupDefinition.addOplogFileToBackup(diskStore, 
oplog.getDrfFile().toPath());
     backupDefinition.addOplogFileToBackup(diskStore, 
oplog.getKrfFile().toPath());
+    backupDefinition.setRestoreScript(restoreScript);
 
     executeBackup(useRelativePath);
 
-    Path diskStoreDir = 
getTargetDir(useRelativePath).resolve(DATA_STORES_DIRECTORY)
+    Path diskStoreDir = 
getTargetMemberDir(useRelativePath).resolve(DATA_STORES_DIRECTORY)
         .resolve(GemFireCacheImpl.getDefaultDiskStoreName() + "_1-2");
     assertThat(diskStoreDir.resolve("dir1").resolve("crf")).exists();
     assertThat(diskStoreDir.resolve("dir1").resolve("drf")).exists();
@@ -166,10 +179,11 @@ public class FileSystemBackupDestinationTest {
     Files.createFile(initFile2);
     backupDefinition.addDiskInitFile(diskStore1, initFile1);
     backupDefinition.addDiskInitFile(diskStore2, initFile2);
+    backupDefinition.setRestoreScript(restoreScript);
 
     executeBackup(useRelativePath);
 
-    Path diskStoreDir = 
getTargetDir(useRelativePath).resolve(DATA_STORES_DIRECTORY)
+    Path diskStoreDir = 
getTargetMemberDir(useRelativePath).resolve(DATA_STORES_DIRECTORY)
         .resolve(GemFireCacheImpl.getDefaultDiskStoreName() + "_1-2");
     assertThat(diskStoreDir.resolve("dir1").resolve("initFile1")).exists();
     assertThat(diskStoreDir.resolve("dir2").resolve("initFile2")).exists();
@@ -184,7 +198,7 @@ public class FileSystemBackupDestinationTest {
 
     executeBackup(useRelativePath);
 
-    
assertThat(getTargetDir(useRelativePath).resolve("restoreScript")).exists();
+    
assertThat(getTargetMemberDir(useRelativePath).resolve("restoreScript")).exists();
   }
 
   @Test
@@ -192,14 +206,14 @@ public class FileSystemBackupDestinationTest {
   public void backupContainsReadMe(boolean useRelativePath) throws IOException 
{
     executeBackup(useRelativePath);
 
-    assertThat(getTargetDir(useRelativePath).resolve(README_FILE)).exists();
+    
assertThat(getTargetMemberDir(useRelativePath).resolve(README_FILE)).exists();
   }
 
   @Test
   @Parameters({"true", "false"})
   public void leavesBehindIncompleteFileOnFailure(boolean useRelativePath) 
throws Exception {
     Path notCreatedFile = 
tempDir.newFolder("dir1").toPath().resolve("notCreated");
-    backupDefinition.addDeployedJarToBackup(notCreatedFile);
+    backupDefinition.addDeployedJarToBackup(notCreatedFile, sourceDir);
 
     try {
       executeBackup(useRelativePath);
@@ -207,23 +221,30 @@ public class FileSystemBackupDestinationTest {
       // expected to occur on missing file
     }
 
-    
assertThat(getTargetDir(useRelativePath).resolve(INCOMPLETE_BACKUP_FILE)).exists();
+    
assertThat(getTargetMemberDir(useRelativePath).resolve(INCOMPLETE_BACKUP_FILE)).exists();
   }
 
   @Test
   @Parameters({"true", "false"})
   public void doesNotLeaveBehindIncompleteFileOnSuccess(boolean 
useRelativePath) throws Exception {
     executeBackup(useRelativePath);
-    
assertThat(getTargetDir(useRelativePath).resolve(INCOMPLETE_BACKUP_FILE)).doesNotExist();
+    
assertThat(getTargetMemberDir(useRelativePath).resolve(INCOMPLETE_BACKUP_FILE)).doesNotExist();
   }
 
   private void executeBackup(boolean useRelativePath) throws IOException {
-    BackupDestination backupDestination =
-        new FileSystemBackupDestination(getTargetDir(useRelativePath));
-    backupDestination.backupFiles(backupDefinition);
+    backupDefinition.setRestoreScript(restoreScript);
+
+    BackupWriter backupWriter =
+        new 
FileSystemBackupWriter(getTargetDir(useRelativePath).resolve(memberId));
+    backupWriter.backupFiles(backupDefinition);
   }
 
   private Path getTargetDir(boolean useRelativePath) {
     return useRelativePath ? RELATIVE_TARGET_DIR : targetDir;
   }
+
+  private Path getTargetMemberDir(boolean useRelativePath) {
+    Path target = useRelativePath ? RELATIVE_TARGET_DIR : targetDir;
+    return target.resolve(memberId);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocationTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocationTest.java
new file mode 100644
index 0000000..cbb8254
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.backup;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.internal.cache.DiskStoreImpl;
+import org.apache.geode.internal.cache.persistence.DiskStoreID;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class FileSystemIncrementalBackupLocationTest {
+
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+
+  @Test
+  public void testNonExistentBackupLocation() throws IOException {
+    DiskStore diskstore = mock(DiskStore.class);
+    File nonExistingDir = Paths.get("nonexistent").toFile();
+    FileSystemIncrementalBackupLocation backupLocation =
+        new FileSystemIncrementalBackupLocation(nonExistingDir, "member1");
+    assertThat(backupLocation.getBackedUpOplogs(diskstore)).isEmpty();
+  }
+
+  @Test
+  public void testNonExistentMemberBackupLocation() throws IOException {
+    File backupLocation = tempDir.newFolder("backup");
+    DiskStore diskstore = mock(DiskStore.class);
+    FileSystemIncrementalBackupLocation fileBackupLocation =
+        new FileSystemIncrementalBackupLocation(backupLocation, "member1");
+    assertThat(fileBackupLocation.getBackedUpOplogs(diskstore)).isEmpty();
+  }
+
+  @Test
+  public void testWhenDiskstoresAreEmpty() throws IOException {
+    String memberId = "member1";
+    File backupLocation = tempDir.newFolder("backup");
+    Path memberBackupLocation = 
Files.createDirectories(backupLocation.toPath().resolve(memberId));
+    Path diskStoreMemberBackupLocation =
+        
Files.createDirectories(memberBackupLocation.resolve(BackupWriter.DATA_STORES_DIRECTORY));
+
+    DiskStoreImpl diskStore = mock(DiskStoreImpl.class);
+    when(diskStore.getDiskStoreID()).thenReturn(new DiskStoreID(1, 2));
+    FileSystemIncrementalBackupLocation fileBackupLocation =
+        new FileSystemIncrementalBackupLocation(backupLocation, "member1");
+
+    Files.createDirectories(
+        
diskStoreMemberBackupLocation.resolve(fileBackupLocation.getBackupDirName(diskStore)));
+
+    assertThat(fileBackupLocation
+        
.getBackedUpOplogs(fileBackupLocation.getMemberBackupLocationDir().toFile(), 
diskStore))
+            .isEmpty();
+  }
+
+  @Test
+  public void returnsFilesFromDiskstoreDirectory() throws IOException {
+    String memberId = "member1";
+    File backupLocation = tempDir.newFolder("backup");
+    Path memberBackupLocation = 
Files.createDirectories(backupLocation.toPath().resolve(memberId));
+    Path diskStoreMemberBackupLocation =
+        
Files.createDirectories(memberBackupLocation.resolve(BackupWriter.DATA_STORES_DIRECTORY));
+
+    DiskStoreImpl diskStore = mock(DiskStoreImpl.class);
+    when(diskStore.getDiskStoreID()).thenReturn(new DiskStoreID(1, 2));
+    FileSystemIncrementalBackupLocation fileBackupLocation =
+        new FileSystemIncrementalBackupLocation(backupLocation, "member1");
+
+    Path diskStorePath = Files.createDirectories(
+        
diskStoreMemberBackupLocation.resolve(fileBackupLocation.getBackupDirName(diskStore)));
+
+    Path crf = Files.createFile(diskStorePath.resolve("oplog1.crf"));
+    Path krf = Files.createFile(diskStorePath.resolve("oplog1.krf"));
+    Path drf = Files.createFile(diskStorePath.resolve("oplog1.drf"));
+
+    Collection<File> logFiles = fileBackupLocation
+        
.getBackedUpOplogs(fileBackupLocation.getMemberBackupLocationDir().toFile(), 
diskStore);
+    assertThat(logFiles).isNotEmpty();
+    assertThat(logFiles).contains(crf.toFile());
+    assertThat(logFiles).contains(krf.toFile());
+    assertThat(logFiles).contains(drf.toFile());
+  }
+
+  @Test
+  public void returnsPreviouslyBackedFilesFromBackupLocation() throws 
IOException {
+    String memberId = "member1";
+    File backupLocation = tempDir.newFolder("backup");
+    Files.createDirectories(backupLocation.toPath().resolve(memberId));
+
+    TestableFileSystemIncrementalBackupLocation fileBackupLocation =
+        new TestableFileSystemIncrementalBackupLocation(backupLocation, 
"member1");
+
+    initializeBackupInspector(fileBackupLocation);
+
+    Collection<File> logFiles = fileBackupLocation
+        
.getPreviouslyBackedUpOpLogs(fileBackupLocation.getMemberBackupLocationDir().toFile());
+    assertThat(logFiles).isNotEmpty();
+  }
+
+  @Test
+  public void returnsCurrentAndPreviouslyBackedFiles() throws IOException {
+    String memberId = "member1";
+    File backupLocation = tempDir.newFolder("backup");
+    Path memberBackupLocation = 
Files.createDirectories(backupLocation.toPath().resolve(memberId));
+    Path diskStoreMemberBackupLocation =
+        
Files.createDirectories(memberBackupLocation.resolve(BackupWriter.DATA_STORES_DIRECTORY));
+
+    DiskStoreImpl diskStore = mock(DiskStoreImpl.class);
+    when(diskStore.getDiskStoreID()).thenReturn(new DiskStoreID(1, 2));
+    TestableFileSystemIncrementalBackupLocation fileBackupLocation =
+        new TestableFileSystemIncrementalBackupLocation(backupLocation, 
"member1");
+
+    Path diskStorePath = Files.createDirectories(
+        
diskStoreMemberBackupLocation.resolve(fileBackupLocation.getBackupDirName(diskStore)));
+
+    Files.createFile(diskStorePath.resolve("2.crf"));
+    Files.createFile(diskStorePath.resolve("2.krf"));
+    Files.createFile(diskStorePath.resolve("2.drf"));
+
+    initializeBackupInspector(fileBackupLocation);
+
+    Map<String, File> allBackedFiles = 
fileBackupLocation.getBackedUpOplogs(diskStore);
+    assertThat(allBackedFiles.size()).isEqualTo(6);
+    assertThat(allBackedFiles.keySet()).contains("1.crf", "1.drf", "1.krf", 
"2.crf", "2.drf",
+        "2.krf");
+  }
+
+  private void initializeBackupInspector(
+      TestableFileSystemIncrementalBackupLocation fileSystemBackupLocation) {
+    BackupInspector backupInspector = mock(BackupInspector.class);
+    when(backupInspector.isIncremental()).thenReturn(true);
+    Set<String> previousBackupFiles =
+        new HashSet<>(Arrays.asList(new String[] {"1.crf", "1.drf", "1.krf"}));
+    
when(backupInspector.getIncrementalOplogFileNames()).thenReturn(previousBackupFiles);
+    when(backupInspector.getCopyFromForOplogFile(anyString())).thenAnswer(i -> 
i.getArguments()[0]);
+    fileSystemBackupLocation.setBackupInspector(backupInspector);
+  }
+
+  public class TestableFileSystemIncrementalBackupLocation
+      extends FileSystemIncrementalBackupLocation {
+
+    BackupInspector backupInspector;
+
+    TestableFileSystemIncrementalBackupLocation(File backupLocationDir, String 
memberId) {
+      super(backupLocationDir, memberId);
+    }
+
+    public void setBackupInspector(BackupInspector backupInspector) {
+      this.backupInspector = backupInspector;
+    }
+
+    @Override
+    BackupInspector createBackupInspector(File checkedBaselineDir) throws 
IOException {
+      return backupInspector;
+    }
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
index 960b9d7..a5a5e9e 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
@@ -15,6 +15,7 @@
 package org.apache.geode.internal.cache.backup;
 
 import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -581,7 +582,7 @@ public class IncrementalBackupDistributedTest extends 
JUnit4CacheTestCase {
     File backupDir = getBackupDirForMember(getBaselineDir(), getMemberId(vm));
     assertTrue(backupDir.exists());
 
-    File incomplete = new File(backupDir, BackupTask.INCOMPLETE_BACKUP_FILE);
+    File incomplete = new File(backupDir, BackupWriter.INCOMPLETE_BACKUP_FILE);
     incomplete.createNewFile();
   }
 
@@ -980,7 +981,7 @@ public class IncrementalBackupDistributedTest extends 
JUnit4CacheTestCase {
     Collection<File> memberIncrementalOplogs =
         FileUtils.listFiles(getBackupDirForMember(getIncrementalDir(), 
memberId),
             new RegexFileFilter(OPLOG_REGEX), DirectoryFileFilter.DIRECTORY);
-    assertFalse(memberIncrementalOplogs.isEmpty());
+    assertThat(memberIncrementalOplogs).isNotEmpty();
 
     List<String> memberIncrementalOplogNames = new LinkedList<>();
     TransformUtils.transform(memberIncrementalOplogs, 
memberIncrementalOplogNames,

-- 
To stop receiving notification emails like this one, please contact
aging...@apache.org.

Reply via email to