ArkoSharma commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r759026376



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, 
FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), 
sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), 
remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path 
sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, 
SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path 
targetPath, String snapshotPrefix,
+                                                                              
boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount 
replSnapshotCount, FileList snapPathFileList,
+                                                                              
ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp 
for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, 
replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && 
conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, 
".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), 
firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), 
secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest 
snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, 
second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;

Review comment:
       done.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -189,57 +191,137 @@ private void dirLocationToCopy(String tableName, 
FileList fileList, Path sourceP
       targetPath = new Path(Utils.replaceHost(targetPath.toString(), 
sourcePath.toUri().getHost()));
       sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), 
remoteNS));
     }
-    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix).convertToString());
+    fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix, isBootstrap).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path 
sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, 
SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path sourcePath, Path 
targetPath, String snapshotPrefix,
+                                                                              
boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount 
replSnapshotCount, FileList snapPathFileList,
+                                                                              
ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp 
for copying data.", sourcePath);
       return FALLBACK_COPY;
     }
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
-        // Delete any pre existing snapshots.
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
firstSnapshot(snapshotPrefix), conf);
-        SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
-        allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, 
replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+      if(isBootstrap && 
conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, 
".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            String prefix;
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, firstSnapshot(prefix), 
firstSnapshot(snapshotPrefix));
+              }
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              if(!prefix.equals(snapshotPrefix)) {
+                sourceDfs.renameSnapshot(sourcePath, secondSnapshot(prefix), 
secondSnapshot(snapshotPrefix));
+              }
+            }
+          }
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
       }
+      boolean firstSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean secondSnapAvailable =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, NEW_SNAPSHOT, conf);
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {
         // We already created a snapshot for this, just refresh the latest 
snapshot and leave.
-        sourceDfs.deleteSnapshot(sourcePath, secondSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
+        // In case of reverse replication after fail-over, in some paths, 
second snapshot may not be present.
+        if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf)) {
+          replSnapshotCount.incrementNumDeleted();
+        }
         SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
         replSnapshotCount.incrementNumCreated();
         snapPathFileList.add(sourcePath.toString());
         return SnapshotUtils
-            .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+                .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
       }
-      // check if second snapshot exists.
-      boolean isSecondSnapAvlb = SnapshotUtils.isSnapshotAvailable(sourceDfs, 
sourcePath, snapshotPrefix,
-          OLD_SNAPSHOT, conf);
-      if (isSecondSnapAvlb) {
-        sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
-        replSnapshotCount.incrementNumDeleted();
-        sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), 
firstSnapshot(snapshotPrefix));
-        SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
-        replSnapshotCount.incrementNumCreated();
-        snapPathFileList.add(sourcePath.toString());
-        return DIFF_COPY;
-      } else {
-        // Check if first snapshot is available
-        boolean isFirstSnapshotAvailable =
-            SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, NEW_SNAPSHOT, conf);
-        if (isFirstSnapshotAvailable) {
+
+      //for bootstrap and forward replication
+      if(isBootstrap && !(!secondSnapAvailable && firstSnapAvailable)) {
+        if (conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+          //this can be used in re-bootstrap cases after irrecoverable error
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
+          snapPathFileList.add(sourcePath.toString());
+          replSnapshotCount.incrementNumCreated();
+          return SnapshotUtils
+                  .isSnapshotAvailable(sourceDfs, sourcePath, snapshotPrefix, 
OLD_SNAPSHOT, conf) ? DIFF_COPY : INITIAL_COPY;
+        } else {
+          // for normal bootstrap, use initial_copy
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
firstSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumDeleted();
+          }
+          if(SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf)) {
+            replSnapshotCount.incrementNumCreated();
+          }
+          allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, 
replSnapshotCount, snapPathFileList, sourceDfs);
+          return INITIAL_COPY;
+        }
+      }
+
+      /*
+      * We have 4 cases :
+      * i.   both old and new snapshots exist in src -
+      *        a. In case of bootstrap -   it must be a re-bootstrap case 
where incremental failed earlier. Handled above.
+      *        b. In case of incremental - denotes normal incremental flow, we 
delete old snapshot,
+      *                                    rename the new as old and create a 
new 'new-snapshot'. No changes required in target.
+      * ii.  only new snapshot exists in src -
+      *        a. In case of incremental - denotes a path where initial copy 
would have been done in the previous
+      *                                    iteration, we rename it as 'old' 
and create a new 'new-snapshot'.
+      *                                    No changes required in target.
+      *        b. in case of bootstrap, it must be a re-bootstrap case. 
Handled above.
+      * iii. only old snapshot exists in src - this can occur during 
B(src)->A(tgt) replication after failover.
+      *      If reuse snapshots conf is true, attempt to reuse the snapshots :
+      *           Assume both snapshots exist in tgt (A). Deleting the old one 
and renaming the new as old will
+      *           be done during load.
+      *      Else proceed with initial copy.
+      * iv.  none exist - new path added in conf, need to do initial copy.
+      * */
+
+      if (secondSnapAvailable) {
+        if(firstSnapAvailable) {
+          sourceDfs.deleteSnapshot(sourcePath, firstSnapshot(snapshotPrefix));
+          replSnapshotCount.incrementNumDeleted();
+          SnapshotUtils.renameSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), firstSnapshot(snapshotPrefix), conf);
+          SnapshotUtils.createSnapshot(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
+          replSnapshotCount.incrementNumCreated();
+          snapPathFileList.add(sourcePath.toString());
+          return DIFF_COPY;
+        } else {
+          //only new snapshot exists
           sourceDfs.renameSnapshot(sourcePath, secondSnapshot(snapshotPrefix), 
firstSnapshot(snapshotPrefix));

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to