This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ded1db7  HIVE-24114:Fix Repl Load with both staging and data copy on 
target (Pravin Kumar Sinha, reviewed by Aasha Medhi)
ded1db7 is described below

commit ded1db7b220f1ba51d4c84baac4d866f9d293af0
Author: Anishek Agarwal <anis...@gmail.com>
AuthorDate: Tue Sep 8 09:49:41 2020 +0530

    HIVE-24114:Fix Repl Load with both staging and data copy on target (Pravin 
Kumar Sinha, reviewed by Aasha Medhi)
---
 .../java/org/apache/hadoop/hive/ql/ErrorMsg.java   |   3 +-
 .../apache/hadoop/hive/ql/parse/TestCopyUtils.java |   2 +-
 .../parse/TestReplicationScenariosAcidTables.java  |  12 +-
 .../TestReplicationScenariosExclusiveReplica.java  | 231 ++++++++++++++++++++-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |   4 +-
 .../hadoop/hive/ql/parse/repl/CopyUtils.java       |  18 +-
 6 files changed, 251 insertions(+), 19 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java 
b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 4ce2c52..b14f906 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -622,7 +622,8 @@ public enum ErrorMsg {
   REPL_INVALID_CONFIG_FOR_SERVICE(40008, "Invalid config error : {0} for {1} 
service.", true),
   REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE(40009, "Invalid internal config 
error : {0} for {1} service.", true),
   REPL_RETRY_EXHAUSTED(40010, "Retry exhausted for retryable error code {0}.", 
true),
-  REPL_FAILED_WITH_NON_RECOVERABLE_ERROR(40011, "Replication failed with non 
recoverable error. Needs manual intervention")
+  REPL_FAILED_WITH_NON_RECOVERABLE_ERROR(40011, "Replication failed with non 
recoverable error. Needs manual intervention"),
+  REPL_INVALID_ARGUMENTS(40012, "Invalid arguments error : {0}.", true)
   ;
 
   private int errorCode;
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
index 9648c72..28b4abe 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
@@ -90,7 +90,7 @@ public class TestCopyUtils {
     MiniDFSCluster miniDFSCluster =
         new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
     HashMap<String, String> overridesForHiveConf = new HashMap<String, 
String>() {{
-      put(ConfVars.HIVE_IN_TEST.varname, "true");
+      put(ConfVars.HIVE_IN_TEST_REPL.varname, "true");
       put(ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, "1");
       put(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
       put(ConfVars.HIVE_DISTCP_DOAS_USER.varname, currentUser);
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 4be9144..c03b252 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -1292,7 +1292,7 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
   public void testCheckPointingDataDumpFailureBootstrapDuringIncremental() 
throws Throwable {
     List<String> dumpClause = Arrays.asList(
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
-            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
             "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
                     + UserGroupInformation.getCurrentUser().getUserName() + 
"'");
@@ -1309,7 +1309,7 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
 
     dumpClause = Arrays.asList(
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
-            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
             "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
                     + UserGroupInformation.getCurrentUser().getUserName() + 
"'",
@@ -1491,7 +1491,7 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
     //To force distcp copy
     List<String> dumpClause = Arrays.asList(
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
-            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
             "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
                     + UserGroupInformation.getCurrentUser().getUserName() + 
"'");
@@ -1642,7 +1642,7 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
     //Distcp copy
     List<String> dumpClause = Arrays.asList(
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
-            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
             "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
                     + UserGroupInformation.getCurrentUser().getUserName() + 
"'");
@@ -1695,7 +1695,7 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
     //To force distcp copy
     List<String> dumpClause = Arrays.asList(
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
-            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
             "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
                     + UserGroupInformation.getCurrentUser().getUserName() + 
"'");
@@ -1748,7 +1748,7 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
     //To force distcp copy
     List<String> dumpClause = Arrays.asList(
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
-            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
             "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
             "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
                     + UserGroupInformation.getCurrentUser().getUserName() + 
"'");
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
index 549447e..452ba64 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
@@ -68,8 +68,9 @@ public class TestReplicationScenariosExclusiveReplica extends 
BaseReplicationAcr
   }
 
   @Test
-  public void externalTableReplicationWithRemoteStaging() throws Throwable {
+  public void testRemoteStagingAndCopyTaskOnTarget() throws Throwable {
     List<String> withClauseOptions = 
getStagingLocationConfig(replica.repldDir);
+    withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + 
"'='" + false + "'");
     WarehouseInstance.Tuple tuple = primary
         .run("use " + primaryDbName)
         .run("create external table t1 (id int)")
@@ -123,7 +124,233 @@ public class TestReplicationScenariosExclusiveReplica 
extends BaseReplicationAcr
   }
 
   @Test
-  public void externalTableReplicationWithLocalStaging() throws Throwable {
+  public void testLocalStagingAndCopyTaskOnTarget() throws Throwable {
+    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir);
+    withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + 
"'='" + false + "'");
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (500)")
+            .run("create table t2 (id int)")
+            .run("insert into table t2 values (600)")
+            .dump(primaryDbName, withClauseOptions);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, 
primary);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("select id from t1")
+            .verifyResult("500")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("select id from t2")
+            .verifyResult("600");
+
+    tuple = primary.run("use " + primaryDbName)
+            .run("create external table t3 (id int)")
+            .run("insert into table t3 values (700)")
+            .run("create table t4 (id int)")
+            .run("insert into table t4 values (800)")
+            .dump(primaryDbName, withClauseOptions);
+
+    // verify that the external table info is written correctly for incremental
+    assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, 
true, primary);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("show tables like 't3'")
+            .verifyResult("t3")
+            .run("show tables like 't4'")
+            .verifyResult("t4")
+            .run("select id from t1")
+            .verifyResult("500")
+            .run("select id from t2")
+            .verifyResult("600")
+            .run("select id from t3")
+            .verifyResult("700")
+            .run("select id from t4")
+            .verifyResult("800");
+  }
+
+  @Test
+  public void testRemoteStagingAndCopyTaskOnSource() throws Throwable {
+    List<String> withClauseOptions = 
getStagingLocationConfig(replica.repldDir);
+    withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + 
"'='" + false + "'");
+    withClauseOptions.add("'" + 
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + 
"'");
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (100)")
+            .run("create table t2 (id int)")
+            .run("insert into table t2 values (200)")
+            .dump(primaryDbName, withClauseOptions);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, 
replica);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("select id from t1")
+            .verifyResult("100")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("select id from t2")
+            .verifyResult("200");
+
+    tuple = primary.run("use " + primaryDbName)
+            .run("create external table t3 (id int)")
+            .run("insert into table t3 values (300)")
+            .run("create table t4 (id int)")
+            .run("insert into table t4 values (400)")
+            .dump(primaryDbName, withClauseOptions);
+
+    // verify that the external table info is written correctly for incremental
+    assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, 
true, replica);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("show tables like 't3'")
+            .verifyResult("t3")
+            .run("show tables like 't4'")
+            .verifyResult("t4")
+            .run("select id from t1")
+            .verifyResult("100")
+            .run("select id from t2")
+            .verifyResult("200")
+            .run("select id from t3")
+            .verifyResult("300")
+            .run("select id from t4")
+            .verifyResult("400");
+  }
+
+  @Test
+  public void testLocalStagingAndCopyTaskOnSource() throws Throwable {
+    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir);
+    withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + 
"'='" + false + "'");
+    withClauseOptions.add("'" + 
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + 
"'");
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (500)")
+            .run("create table t2 (id int)")
+            .run("insert into table t2 values (600)")
+            .dump(primaryDbName, withClauseOptions);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, 
primary);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("select id from t1")
+            .verifyResult("500")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("select id from t2")
+            .verifyResult("600");
+
+    tuple = primary.run("use " + primaryDbName)
+            .run("create external table t3 (id int)")
+            .run("insert into table t3 values (700)")
+            .run("create table t4 (id int)")
+            .run("insert into table t4 values (800)")
+            .dump(primaryDbName, withClauseOptions);
+
+    // verify that the external table info is written correctly for incremental
+    assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, 
true, primary);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("show tables like 't3'")
+            .verifyResult("t3")
+            .run("show tables like 't4'")
+            .verifyResult("t4")
+            .run("select id from t1")
+            .verifyResult("500")
+            .run("select id from t2")
+            .verifyResult("600")
+            .run("select id from t3")
+            .verifyResult("700")
+            .run("select id from t4")
+            .verifyResult("800");
+  }
+
+  @Test
+  public void testRegularCopyRemoteStagingAndCopyTaskOnSource() throws 
Throwable {
+    List<String> withClauseOptions = 
getStagingLocationConfig(replica.repldDir);
+    withClauseOptions.add("'" + 
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + 
"'");
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (500)")
+            .run("create table t2 (id int)")
+            .run("insert into table t2 values (600)")
+            .dump(primaryDbName, withClauseOptions);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, 
replica);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("select id from t1")
+            .verifyResult("500")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("select id from t2")
+            .verifyResult("600");
+
+    tuple = primary.run("use " + primaryDbName)
+            .run("create external table t3 (id int)")
+            .run("insert into table t3 values (700)")
+            .run("create table t4 (id int)")
+            .run("insert into table t4 values (800)")
+            .dump(primaryDbName, withClauseOptions);
+
+    // verify that the external table info is written correctly for incremental
+    assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, 
true, replica);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("show tables like 't3'")
+            .verifyResult("t3")
+            .run("show tables like 't4'")
+            .verifyResult("t4")
+            .run("select id from t1")
+            .verifyResult("500")
+            .run("select id from t2")
+            .verifyResult("600")
+            .run("select id from t3")
+            .verifyResult("700")
+            .run("select id from t4")
+            .verifyResult("800");
+  }
+
+  @Test
+  public void testRegularCopyWithLocalStagingAndCopyTaskOnTarget() throws 
Throwable {
     List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir);
     WarehouseInstance.Tuple tuple = primary
             .run("use " + primaryDbName)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 656c298..f261889 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -315,7 +315,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
   }
 
   private Path getDumpRoot(Path currentDumpPath) {
-    if (ReplDumpWork.testDeletePreviousDumpMetaPath && 
conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) {
+    if (ReplDumpWork.testDeletePreviousDumpMetaPath
+            && (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)
+                || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL))) {
       //testDeleteDumpMetaDumpPath to be used only for test.
       return null;
     } else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index c386aee..c313c383 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.parse.repl;
 
 import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -29,7 +30,6 @@ import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -45,7 +45,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
 public class CopyUtils {
@@ -58,7 +57,7 @@ public class CopyUtils {
   private final HiveConf hiveConf;
   private final long maxCopyFileSize;
   private final long maxNumberOfFiles;
-  private final boolean hiveInTest;
+  private final boolean hiveInReplTest;
   private final String copyAsUser;
   private FileSystem destinationFs;
 
@@ -66,7 +65,7 @@ public class CopyUtils {
     this.hiveConf = hiveConf;
     maxNumberOfFiles = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES);
     maxCopyFileSize = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE);
-    hiveInTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST);
+    hiveInReplTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL);
     this.copyAsUser = distCpDoAsUser;
     this.destinationFs = destinationFs;
   }
@@ -74,16 +73,19 @@ public class CopyUtils {
   // Used by replication, copy files from source to destination. It is 
possible source file is
   // changed/removed during copy, so double check the checksum after copy,
   // if not match, copy again from cm
-  public void copyAndVerify(Path destRoot, List<ReplChangeManager.FileInfo> 
srcFiles, Path origSrcPtah,
+  public void copyAndVerify(Path destRoot, List<ReplChangeManager.FileInfo> 
srcFiles, Path origSrcPath,
                             boolean overwrite)
           throws IOException, LoginException, HiveFatalException {
     UserGroupInformation proxyUser = getProxyUser();
-    FileSystem sourceFs = origSrcPtah.getFileSystem(hiveConf);
+    if (CollectionUtils.isEmpty(srcFiles)) {
+      throw new IOException(ErrorMsg.REPL_INVALID_ARGUMENTS.format("SrcFiles 
can not be empty during copy operation."));
+    }
+    FileSystem sourceFs = srcFiles.get(0).getSrcFs();
     boolean useRegularCopy = regularCopy(sourceFs, srcFiles);
     try {
       if (!useRegularCopy) {
         srcFiles.clear();
-        srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPtah, 
null));
+        srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, 
null));
         doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, 
overwrite);
       } else {
         Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = 
fsToFileMap(srcFiles, destRoot);
@@ -429,7 +431,7 @@ public class CopyUtils {
   */
   boolean regularCopy(FileSystem sourceFs, List<ReplChangeManager.FileInfo> 
fileList)
       throws IOException {
-    if (hiveInTest) {
+    if (hiveInReplTest) {
       return true;
     }
     if (isLocal(sourceFs) || isLocal(destinationFs)) {

Reply via email to