ayushtkn commented on a change in pull request #2539:
URL: https://github.com/apache/hive/pull/2539#discussion_r689459394



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,64 +196,135 @@ private void dirLocationToCopy(String tableName, 
FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path 
sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, 
SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  private Map<String, SnapshotUtils.SnapshotCopyMode> 
createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              
boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount 
replSnapshotCount, FileList snapPathFileList,
+                                                                              
ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    Map<String, SnapshotUtils.SnapshotCopyMode> ret = new HashMap<>();
+    ret.put(snapshotPrefix, FALLBACK_COPY);
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp 
for copying data.", sourcePath);
-      return FALLBACK_COPY;
+      return ret;
     }
+    String prefix = snapshotPrefix;
+    SnapshotUtils.SnapshotCopyMode copyMode = FALLBACK_COPY;
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
+      if(conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, 
".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              break;
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              break;
+            }
+          }
+          ret.clear();
+          ret.put(prefix, copyMode);
+          snapshotPrefix = prefix;
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
+      }
+      boolean isFirstSnapshotAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean isSecondSnapAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, NEW_SNAPSHOT, conf);
+      //for bootstrap and non - failback case, use initial_copy
+      if(isBootstrap && !(!isSecondSnapAvl && isFirstSnapshotAvl)) {

Review comment:
       Is this all to find out. the prefix, which is always the database name?  
This seems to be done for even incremental also now?
   Can we just not rename the snapshot in case of control failover as per the 
new standards and use the normal flow post that and get rid of maintaining 
prefix

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -192,64 +196,135 @@ private void dirLocationToCopy(String tableName, 
FileList fileList, Path sourceP
     fileList.add(new DirCopyWork(tableName, sourcePath, targetPath, copyMode, 
snapshotPrefix).convertToString());
   }
 
-  private SnapshotUtils.SnapshotCopyMode createSnapshotsAtSource(Path 
sourcePath, String snapshotPrefix,
-      boolean isSnapshotEnabled, HiveConf conf, 
SnapshotUtils.ReplSnapshotCount replSnapshotCount, FileList snapPathFileList,
-      ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+  private Map<String, SnapshotUtils.SnapshotCopyMode> 
createSnapshotsAtSource(Path sourcePath, Path targetPath, String snapshotPrefix,
+                                                                              
boolean isSnapshotEnabled, HiveConf conf, SnapshotUtils.ReplSnapshotCount 
replSnapshotCount, FileList snapPathFileList,
+                                                                              
ArrayList<String> prevSnaps, boolean isBootstrap) throws IOException {
+    Map<String, SnapshotUtils.SnapshotCopyMode> ret = new HashMap<>();
+    ret.put(snapshotPrefix, FALLBACK_COPY);
     if (!isSnapshotEnabled) {
       LOG.info("Snapshot copy not enabled for path {} Will use normal distCp 
for copying data.", sourcePath);
-      return FALLBACK_COPY;
+      return ret;
     }
+    String prefix = snapshotPrefix;
+    SnapshotUtils.SnapshotCopyMode copyMode = FALLBACK_COPY;
     DistributedFileSystem sourceDfs = SnapshotUtils.getDFS(sourcePath, conf);
     try {
-      if(isBootstrap) {
+      if(conf.getBoolVar(HiveConf.ConfVars.REPL_REUSE_SNAPSHOTS)) {
+        try {
+          FileStatus[] listing = sourceDfs.listStatus(new Path(sourcePath, 
".snapshot"));
+          for (FileStatus elem : listing) {
+            String snapShotName = elem.getPath().getName();
+            if (snapShotName.contains(OLD_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(OLD_SNAPSHOT));
+              break;
+            }
+            if (snapShotName.contains(NEW_SNAPSHOT)) {
+              prefix = snapShotName.substring(0, 
snapShotName.lastIndexOf(NEW_SNAPSHOT));
+              break;
+            }
+          }
+          ret.clear();
+          ret.put(prefix, copyMode);
+          snapshotPrefix = prefix;
+        } catch (SnapshotException e) {
+          //dir not snapshottable, continue
+        }
+      }
+      boolean isFirstSnapshotAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, OLD_SNAPSHOT, conf);
+      boolean isSecondSnapAvl =
+              SnapshotUtils.isSnapshotAvailable(sourceDfs, sourcePath, 
snapshotPrefix, NEW_SNAPSHOT, conf);
+      //for bootstrap and non - failback case, use initial_copy
+      if(isBootstrap && !(!isSecondSnapAvl && isFirstSnapshotAvl)) {
         // Delete any pre existing snapshots.
         SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
firstSnapshot(snapshotPrefix), conf);
         SnapshotUtils.deleteSnapshotIfExists(sourceDfs, sourcePath, 
secondSnapshot(snapshotPrefix), conf);
         allowAndCreateInitialSnapshot(sourcePath, snapshotPrefix, conf, 
replSnapshotCount, snapPathFileList, sourceDfs);
-        return INITIAL_COPY;
+        ret.put(prefix, INITIAL_COPY);
+        return ret;
       }
 
+      //While resuming a failed replication
       if (prevSnaps.contains(sourcePath.toString())) {

Review comment:
       is prevSnaps initialised for bootstrap? better to add a test as well for 
the failure and checkpointing flow

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosUsingSnapshots.java
##########
@@ -79,11 +80,11 @@ public static void classLevelSetup() throws Exception {
     overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, 
"true");
     overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
         UserGroupInformation.getCurrentUser().getUserName());
-    
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"false");
+    
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"true");
     
overrides.put(HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname,
 "true");
     overrides.put(REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY.varname, "true");
 
-    internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, 
TestReplicationScenarios.class);

Review comment:
       why do we need this? all the test should work as is without bothering 
existing tests. 

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -103,38 +109,49 @@ static void internalBeforeClassSetup(Map<String, String> 
overrides,
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("metastore.warehouse.tenant.colocation", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + 
".hosts", "*");
+    String primaryBaseDir = 
Files.createTempDirectory("primary").toFile().getAbsolutePath();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, primaryBaseDir);
+    replicaConf = new HiveConf(clazz);
+    replicaConf.set("dfs.client.use.datanode.hostname", "true");
+    replicaConf.set("metastore.warehouse.tenant.colocation", "true");
+    replicaConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + 
".hosts", "*");
+    String replicaBaseDir = 
Files.createTempDirectory("replica").toFile().getAbsolutePath();
+    replicaConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir);
     MiniDFSCluster miniDFSCluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
+            new 
MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
+    MiniDFSCluster miniDFSClusterReplica =
+            new 
MiniDFSCluster.Builder(replicaConf).numDataNodes(2).format(true).build();
     Map<String, String> acidEnableConf = new HashMap<String, String>() {{
-        put("fs.defaultFS", 
miniDFSCluster.getFileSystem().getUri().toString());
-        put("hive.support.concurrency", "true");
-        put("hive.txn.manager", 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
-        put("hive.metastore.client.capability.check", "false");
-        put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
-        put("hive.strict.checks.bucketing", "false");
-        put("hive.mapred.mode", "nonstrict");
-        put("mapred.input.dir.recursive", "true");
-        put("hive.metastore.disallow.incompatible.col.type.changes", "false");
-        put("metastore.warehouse.tenant.colocation", "true");
-        put("hive.in.repl.test", "true");
-        put("hive.txn.readonly.enabled", "true");
-        //HIVE-25267
-        put(MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT.getVarname(), "2000");
-        put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"false");
-        
put(HiveConf.ConfVars.REPL_RETAIN_CUSTOM_LOCATIONS_FOR_DB_ON_TARGET.varname, 
"false");
-      }};
+      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+      put("hive.support.concurrency", "true");
+      put("hive.txn.manager", 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("metastore.warehouse.tenant.colocation", "true");
+      put("hive.in.repl.test", "true");
+      put("hive.txn.readonly.enabled", "true");

Review comment:
       nit
   looks like only indentation change, avoid unrelated changes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to