ayushtkn commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r734993099



##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosUsingSnapshots.java
##########
@@ -593,6 +642,128 @@ public void testFailureScenarios() throws Throwable {
         .verifyResults(new String[] {"delhi", "noida"});
   }
 
+  /*
+   * test to check reuse of diff snapshots when incremental fails with 
irrecoverable error during data-copy (target modified)
+   * and re-bootstrap is required but overwrite is off.
+   */
+  @Test
+  public void testRebootstrapDiffCopy() throws Throwable {
+
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    DistributedFileSystem fsTarget = replica.miniDFSCluster.getFileSystem();
+    Path externalTableLocation1 = new Path("/" + testName.getMethodName() + 
"/table1/");
+    fs.mkdirs(externalTableLocation1, new FsPermission("777"));
+
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    withClause.add("'hive.repl.external.warehouse.single.copy.task.paths'='" + 
externalTableLocation1
+            .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString() + 
"'");
+
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .run("create external table table1 (place string) partitioned by 
(country string) row format "
+                    + "delimited fields terminated by ',' location '" + 
externalTableLocation1.toString() + "'")
+            .run("create external table table2 (id int)")
+            .run("create external table table3 (id int)")
+            .run("insert into table1 partition(country='nepal') values 
('kathmandu')")
+            .run("insert into table1 partition(country='china') values 
('beejing')")
+            .run("insert into table2 values(1)")
+            .run("insert into table3 values(5)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 'table1'")
+            .verifyResults(new String[] {"table1"})
+            .run("select place from table1 where country='nepal'")
+            .verifyResults(new String[] {"kathmandu"})
+            .run("select place from table1 where country='china'")
+            .verifyResults(new String[] {"beejing"})
+            .run("select id from table3")
+            .verifyResults(new String[]{"5"})
+            .run("select id from table2")
+            .verifyResults(new String[] {"1"})
+            .verifyReplTargetProperty(replicatedDbName);
+
+    // Check if the table1 directory is snapshotoble and the snapshot is there.
+    validateInitialSnapshotsCreated(externalTableLocation1.toString());
+
+    // Add some more data and do a dump & load
+    primary.run("use " + primaryDbName)
+            .run("insert into table1 partition(country='china') values 
('wuhan')")
+            .run("insert into table2 values(2)")
+            .run("insert into table3 values(6)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("select place from table1 where country='china'")
+            .verifyResults(new String[] {"beejing", "wuhan"})
+            .run("select id from table3")
+            .verifyResults(new String[]{"5", "6"})
+            .run("select id from table2")
+            .verifyResults(new String[] {"1", "2"})
+            .verifyReplTargetProperty(replicatedDbName);
+
+    // Verify if diff snapshots is there.
+    validateDiffSnapshotsCreated(externalTableLocation1.toString());
+
+    Path targetWhPath = externalTableDataPath(replicaConf, 
REPLICA_EXTERNAL_BASE,
+            new Path(primary.getDatabase(primaryDbName).getLocationUri()));
+    DistributedFileSystem replicaDfs = (DistributedFileSystem) 
targetWhPath.getFileSystem(replicaConf);
+
+    // Emulate the situation of a rebootstrap with incomplete data copied in 
the previous incremental cycle for some paths
+    // a. add some data to some paths
+    // b. do a dump load with snapshot disabled
+    // c. Now, some paths should have outdated snapshots in both source and 
target.
+    //    Re-enable snapshot and check whether diff-copy takes place for a 
fresh bootstrap
+    withClause.add("'hive.repl.externaltable.snapshotdiff.copy'='false'");
+    tuple = primary.run("use " + primaryDbName)
+            .run("insert into table table3 values (7)")
+            .run("insert into table table2 values (3)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("select id from table3")
+            .verifyResults(new String[]{"5", "6", "7"})
+            .run("select id from table2")
+            .verifyResults(new String[] {"1", "2", "3"});
+
+    replica.run("use " + replicatedDbName)
+            .run("drop table table1")
+            .run("drop table table2")
+            .run("drop table table3")
+            .run("drop database "+ replicatedDbName + " cascade");

Review comment:
       drop database cascade would have dropped tables as well right?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, 
FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), 
sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), 
remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path 
sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, 
SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path 
targetPath, String snapshotPrefix,
+                                                                              
boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount 
replSnapshotCount, FileList snapPathFileList,
+                                                                              
ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp 
for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, 
replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && 
conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, 
".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), 
firstSnapshot(snapshotPrefix));

Review comment:
       Should use the retryable renameSnaspshot from SnapshotUtils

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -254,23 +256,55 @@ public boolean canExecuteInParallel() {
     return true;
   }
 
-  boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, 
UserGroupInformation proxyUser,
+  boolean copyUsingDistCpSnapshots(Path sourcePath, Path targetPath, 
UserGroupInformation proxyUser, boolean isBootstrap,
       HiveConf clonedConf) throws IOException {
 
     DistributedFileSystem targetFs = SnapshotUtils.getDFS(targetPath, 
clonedConf);
     boolean result = false;
+    String snapPrefix = work.getSnapshotPrefix();
+    if(isBootstrap && conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) 
{
+      // in case of bootstrap replication from B to A (reverse replication), 
rename snapshots in A
+      // as they might have been renamed during dump in B
+      FileStatus[] listing = targetFs.listStatus(new Path(targetPath, 
".snapshot"));
+      for (FileStatus elem : listing) {
+        String snapShotName = elem.getPath().getName();
+        String prefix;
+        if (snapShotName.contains(OLD_SNAPSHOT)) {
+          prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(OLD_SNAPSHOT));
+          if (!prefix.equals(snapPrefix)) {
+            targetFs.renameSnapshot(targetPath, firstSnapshot(prefix), 
firstSnapshot(snapPrefix));
+          }
+        }
+        if (snapShotName.contains(NEW_SNAPSHOT)) {
+          prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(NEW_SNAPSHOT));
+          if (!prefix.equals(snapPrefix)) {
+            targetFs.renameSnapshot(targetPath, secondSnapshot(prefix), 
secondSnapshot(snapPrefix));
+          }
+        }
+      }

Review comment:
       This seems dupe to one in ReplExternalTable, Can be refactored as a 
method in SnapshotUtils

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, 
FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), 
sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), 
remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path 
sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, 
SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path 
targetPath, String snapshotPrefix,

Review comment:
       ``targetPath`` is unused

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, 
FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), 
sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), 
remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path 
sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, 
SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path 
targetPath, String snapshotPrefix,
+                                                                              
boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount 
replSnapshotCount, FileList snapPathFileList,
+                                                                              
ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp 
for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, 
replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && 
conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, 
".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), 
firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), 
secondSnapshot(snapshotPrefix));

Review comment:
       Should use the retryable renameSnaspshot from SnapshotUtils

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -1105,17 +1105,15 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, 
Path cmRoot, Hive hiveDb)
           boolean isExternalTablePresent = false;
 
           String snapshotPrefix = dbName.toLowerCase();
-          ArrayList<String> prevSnaps = new ArrayList<>(); // Will stay empty 
in case of bootstrap
+          ArrayList<String> prevSnaps = new ArrayList<>();
           if (isSnapshotEnabled) {
-            // Delete any old existing snapshot file, We always start fresh in 
case of bootstrap.
             
FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot),
 conf),
-                new Path(SnapshotUtils.getSnapshotFileListPath(dumpRoot),
-                    EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT));
-            
FileUtils.deleteIfExists(getDFS(SnapshotUtils.getSnapshotFileListPath(dumpRoot),
 conf),

Review comment:
       In case reuse snapshot is not turned on, In that we case we should clean 
up right? Then we don't need the current file as well. since we will go always 
with Initial copy?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, 
FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), 
sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), 
remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path 
sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, 
SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path 
targetPath, String snapshotPrefix,
+                                                                              
boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount 
replSnapshotCount, FileList snapPathFileList,
+                                                                              
ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp 
for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, 
replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && 
conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, 
".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), 
firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), 
secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest 
snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, 
second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
       }
-      // check if second snapshot exists.
-      boolean isSecondSnapAvlb = SnapshotUtils.isSnapshotAvailable(sourceDfs, 
sourcePath, snapshotPrefix,
-          OLD_SNAPSHOT, conf);
-      if (isSecondSnapAvlb) {
-        sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
-        sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), 
firstSnapshot(snapshotPrefix));
-        SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();
-        snapPathFileList.add(sourcePath.toString());
-        return DIFF_COPY;
-      } else {
-        // Check if first snapshot is available
-        boolean isFirstSnapshotAvailable =
-            SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, NEW_SNAPSHOT, conf);
-        if (isFirstSnapshotAvailable) {
+
+      //for bootstrap and forward replication
+      if(isBootstrap && !(!secondSnapAvailable && firstSnapAvailable)) {
+        if (conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+          //this can be used in re-bootstrap cases after irrecoverable error
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
+          snapPathFileList.add(sourcePath.toString());
+          replSnapshotCount.incrementNumCreated();
+          return SnapshotUtils
+                  .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+        } else {
+          // for normal bootstrap, use initial_copy
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
firstSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumCreated();
+          }
+          allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, 
replSnapshotCount, snapPathFileList, sourceDfs);
+          return INITIAL_COPY;
+        }
+      }
+
+      /*
+      * We have 4 cases :
+      * i.   both old and new snapshots exist in src -
+      *        a. In case of bootstrap -   it must be a re-bootstrap case 
where incremental failed earlier. Handled above.
+      *        b. In case of incremental - denotes normal incremental flow, we 
delete old snapshot,
+      *                                    rename the new as old and create a 
new 'new-snapshot'. No changes required in target.
+      * ii.  only new snapshot exists in src -
+      *        a. In case of incremental - denotes a path where initial copy 
would have been done in the previous
+      *                                    iteration, we rename it as 'old' 
and create a new 'new-snapshot'.
+      *                                    No changes required in target.
+      *        b. in case of bootstrap, it must be a re-bootstrap case. 
Handled above.
+      * iii. only old snapshot exists in src - this can occur during 
B(src)->A(tgt) replication after failover.
+      *      If reuse snapshots conf is true, attempt to reuse the snapshots :
+      *           Assume both snapshots exist in tgt (A). Deleting the old one 
and renaming the new as old will
+      *           be done during load.
+      *      Else proceed with initial copy.
+      * iv.  none exist - new path added in conf, need to do initial copy.
+      * */
+
+      if (secondSnapAvailable) {
+        if(firstSnapAvailable) {
+          sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
+          replSnapshotCount.incrementNumDeleted();
+          SnapshotUtils.renameSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix), conf);
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
+          replSnapshotCount.incrementNumCreated();
+          snapPathFileList.add(sourcePath.toString());
+          return DIFF_COPY;
+        } else {
+          //only new snapshot exists
           sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), 
firstSnapshot(snapshotPrefix));

Review comment:
       Can you replace with RenameSnapshot from SnapshotUtils

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, 
FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), 
sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), 
remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path 
sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, 
SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path 
targetPath, String snapshotPrefix,
+                                                                              
boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount 
replSnapshotCount, FileList snapPathFileList,
+                                                                              
ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp 
for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, 
replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && 
conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, 
".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), 
firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), 
secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest 
snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, 
second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
       }
-      // check if second snapshot exists.
-      boolean isSecondSnapAvlb = SnapshotUtils.isSnapshotAvailable(sourceDfs, 
sourcePath, snapshotPrefix,
-          OLD_SNAPSHOT, conf);
-      if (isSecondSnapAvlb) {
-        sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
-        sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), 
firstSnapshot(snapshotPrefix));
-        SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();
-        snapPathFileList.add(sourcePath.toString());
-        return DIFF_COPY;
-      } else {
-        // Check if first snapshot is available
-        boolean isFirstSnapshotAvailable =
-            SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, NEW_SNAPSHOT, conf);
-        if (isFirstSnapshotAvailable) {
+
+      //for bootstrap and forward replication
+      if(isBootstrap && !(!secondSnapAvailable && firstSnapAvailable)) {
+        if (conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+          //this can be used in re-bootstrap cases after irrecoverable error
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
+          snapPathFileList.add(sourcePath.toString());
+          replSnapshotCount.incrementNumCreated();
+          return SnapshotUtils
+                  .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+        } else {

Review comment:
       Shouldn't this else block be always executed in case of bootstarp if 
reuese is false, irrespective of the ``!(!secondSnapAvailable && 
firstSnapAvailable)`` guard?  

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, 
FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), 
sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), 
remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path 
sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, 
SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path 
targetPath, String snapshotPrefix,
+                                                                              
boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount 
replSnapshotCount, FileList snapPathFileList,
+                                                                              
ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp 
for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, 
replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && 
conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, 
".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), 
firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), 
secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest 
snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, 
second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;

Review comment:
       Can use `firstSnapAvailable`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to