This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a760dd  HIVE-24131:Use original src location always when data copy 
runs on target (Pravin Kumar Sinha, reviewed by Aasha Medhi)
2a760dd is described below

commit 2a760dd607e206d7f1061c01075767ecfff40d0c
Author: Anishek Agarwal <[email protected]>
AuthorDate: Mon Sep 14 16:15:34 2020 +0530

    HIVE-24131:Use original src location always when data copy runs on target 
(Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
 .../parse/TestReplicationScenariosAcidTables.java  | 259 +++++++++++++++++++++
 .../TestReplicationScenariosExclusiveReplica.java  |  30 ++-
 .../apache/hadoop/hive/ql/exec/ReplCopyTask.java   |   2 +-
 .../hadoop/hive/ql/parse/repl/CopyUtils.java       |  18 +-
 .../repl/dump/events/AbstractEventHandler.java     |   2 +-
 .../repl/dump/events/CreateFunctionHandler.java    |   2 +-
 6 files changed, 291 insertions(+), 22 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 95ad047..70151ee 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -53,6 +53,7 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -2075,4 +2076,262 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
             .run("show tables")
             .verifyResults(new String[]{"t1"});
   }
+
+  @Test
+  public void testORCTableRegularCopyWithCopyOnTarget() throws Throwable {
+    ArrayList<String> withClause = new ArrayList<>();
+    withClause.add("'" + 
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'");
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES 
('transactional'='true')")
+            .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE tpart1(a int) partitioned by (name string)" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE tpart2(a int) partitioned by (name string) 
clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE text1(a string) STORED AS TEXTFILE")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (11)")
+            .run("insert into t2 values (2)")
+            .run("insert into t2 values (22)")
+            .run("insert into t3 values (33)")
+            .run("insert into tpart1 partition(name='Tom') values(100)")
+            .run("insert into tpart1 partition(name='Jerry') values(101)")
+            .run("insert into tpart2 partition(name='Bob') values(200)")
+            .run("insert into tpart2 partition(name='Carl') values(201)")
+            .run("insert into text1 values ('ricky')")
+            .dump(primaryDbName, withClause);
+
+    replica.run("DROP TABLE t3");
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", 
"text1"})
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "11"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[]{"2", "22"})
+            .run("select a from " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"100", "101"})
+            .run("show partitions " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+            .run("select a from " + replicatedDbName + ".tpart2")
+            .verifyResults(new String[]{"200", "201"})
+            .run("show partitions " + replicatedDbName + ".tpart2")
+            .verifyResults(new String[]{"name=Bob", "name=Carl"})
+            .run("select a from " + replicatedDbName + ".text1")
+            .verifyResults(new String[]{"ricky"});
+
+    WarehouseInstance.Tuple incrementalDump = primary.run("use " + 
primaryDbName)
+            .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE tpart3(a int) partitioned by (name string)" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE tpart4(a int) partitioned by (name string) 
clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t1 values (111)")
+            .run("insert into t2 values (222)")
+            .run("insert into t4 values (4)")
+            .run("insert into tpart1 partition(name='Tom') values(102)")
+            .run("insert into tpart1 partition(name='Jerry') values(103)")
+            .run("insert into tpart2 partition(name='Bob') values(202)")
+            .run("insert into tpart2 partition(name='Carl') values(203)")
+            .run("insert into tpart3 partition(name='Tom3') values(300)")
+            .run("insert into tpart3 partition(name='Jerry3') values(301)")
+            .run("insert into tpart4 partition(name='Bob4') values(400)")
+            .run("insert into tpart4 partition(name='Carl4') values(401)")
+            .run("insert into text1 values ('martin')")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables ")
+            .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", 
"tpart3", "tpart4", "text1"})
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "11", "111"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[]{"2", "22", "222"})
+            .run("select * from " + replicatedDbName + ".t4")
+            .verifyResults(new String[]{"4"})
+            .run("select a from " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"100", "101", "102", "103"})
+            .run("show partitions " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+            .run("select a from " + replicatedDbName + ".tpart2")
+            .verifyResults(new String[]{"200", "201", "202", "203"})
+            .run("show partitions " + replicatedDbName + ".tpart2")
+            .verifyResults(new String[]{"name=Bob", "name=Carl"})
+            .run("select a from " + replicatedDbName + ".tpart3")
+            .verifyResults(new String[]{"300", "301"})
+            .run("show partitions " + replicatedDbName + ".tpart3")
+            .verifyResults(new String[]{"name=Tom3", "name=Jerry3"})
+            .run("select a from " + replicatedDbName + ".tpart4")
+            .verifyResults(new String[]{"400", "401"})
+            .run("show partitions " + replicatedDbName + ".tpart4")
+            .verifyResults(new String[]{"name=Bob4", "name=Carl4"})
+            .run("select a from " + replicatedDbName + ".text1")
+            .verifyResults(new String[]{"ricky", "martin"});
+
+    incrementalDump = primary.run("use " + primaryDbName)
+            .run("insert into t4 values (44)")
+            .run("insert into t1 values (1111)")
+            .run("DROP TABLE t1")
+            .run("insert into t2 values (2222)")
+            .run("insert into tpart1 partition(name='Tom') values(104)")
+            .run("insert into tpart1 partition(name='Tom_del') values(1000)")
+            .run("insert into tpart1 partition(name='Harry') values(10001)")
+            .run("insert into tpart1 partition(name='Jerry') values(105)")
+            .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')")
+            .run("DROP TABLE tpart2")
+            .dump(primaryDbName, withClause);
+
+    replica.run("DROP TABLE t4")
+            .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')");
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables ")
+            .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", 
"tpart4", "text1"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[]{"2", "22", "222", "2222"})
+            .run("select a from " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"101", "103", "105", "1000", "10001"})
+            .run("show partitions " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"name=Harry", "name=Jerry", 
"name=Tom_del"});
+  }
+
+  @Test
+  public void testORCTableDistcpCopyWithCopyOnTarget() throws Throwable {
+    //Distcp copy
+    List<String> withClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname 
+ "'='true'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() + 
"'");
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES 
('transactional'='true')")
+            .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE tpart1(a int) partitioned by (name string)" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE tpart2(a int) partitioned by (name string) 
clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE text1(a string) STORED AS TEXTFILE")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (11)")
+            .run("insert into t2 values (2)")
+            .run("insert into t2 values (22)")
+            .run("insert into t3 values (33)")
+            .run("insert into tpart1 partition(name='Tom') values(100)")
+            .run("insert into tpart1 partition(name='Jerry') values(101)")
+            .run("insert into tpart2 partition(name='Bob') values(200)")
+            .run("insert into tpart2 partition(name='Carl') values(201)")
+            .run("insert into text1 values ('ricky')")
+            .dump(primaryDbName, withClause);
+
+    replica.run("DROP TABLE t3");
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", 
"text1"})
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "11"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[]{"2", "22"})
+            .run("select a from " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"100", "101"})
+            .run("show partitions " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+            .run("select a from " + replicatedDbName + ".tpart2")
+            .verifyResults(new String[]{"200", "201"})
+            .run("show partitions " + replicatedDbName + ".tpart2")
+            .verifyResults(new String[]{"name=Bob", "name=Carl"})
+            .run("select a from " + replicatedDbName + ".text1")
+            .verifyResults(new String[]{"ricky"});
+
+    WarehouseInstance.Tuple incrementalDump = primary.run("use " + 
primaryDbName)
+            .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE tpart3(a int) partitioned by (name string)" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("CREATE TABLE tpart4(a int) partitioned by (name string) 
clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t1 values (111)")
+            .run("insert into t2 values (222)")
+            .run("insert into t4 values (4)")
+            .run("insert into tpart1 partition(name='Tom') values(102)")
+            .run("insert into tpart1 partition(name='Jerry') values(103)")
+            .run("insert into tpart2 partition(name='Bob') values(202)")
+            .run("insert into tpart2 partition(name='Carl') values(203)")
+            .run("insert into tpart3 partition(name='Tom3') values(300)")
+            .run("insert into tpart3 partition(name='Jerry3') values(301)")
+            .run("insert into tpart4 partition(name='Bob4') values(400)")
+            .run("insert into tpart4 partition(name='Carl4') values(401)")
+            .run("insert into text1 values ('martin')")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables ")
+            .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", 
"tpart3", "tpart4", "text1"})
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "11", "111"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[]{"2", "22", "222"})
+            .run("select * from " + replicatedDbName + ".t4")
+            .verifyResults(new String[]{"4"})
+            .run("select a from " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"100", "101", "102", "103"})
+            .run("show partitions " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+            .run("select a from " + replicatedDbName + ".tpart2")
+            .verifyResults(new String[]{"200", "201", "202", "203"})
+            .run("show partitions " + replicatedDbName + ".tpart2")
+            .verifyResults(new String[]{"name=Bob", "name=Carl"})
+            .run("select a from " + replicatedDbName + ".tpart3")
+            .verifyResults(new String[]{"300", "301"})
+            .run("show partitions " + replicatedDbName + ".tpart3")
+            .verifyResults(new String[]{"name=Tom3", "name=Jerry3"})
+            .run("select a from " + replicatedDbName + ".tpart4")
+            .verifyResults(new String[]{"400", "401"})
+            .run("show partitions " + replicatedDbName + ".tpart4")
+            .verifyResults(new String[]{"name=Bob4", "name=Carl4"})
+            .run("select a from " + replicatedDbName + ".text1")
+            .verifyResults(new String[]{"ricky", "martin"});
+
+    incrementalDump = primary.run("use " + primaryDbName)
+            .run("insert into t4 values (44)")
+            .run("insert into t1 values (1111)")
+            .run("DROP TABLE t1")
+            .run("insert into t2 values (2222)")
+            .run("insert into tpart1 partition(name='Tom') values(104)")
+            .run("insert into tpart1 partition(name='Tom_del') values(1000)")
+            .run("insert into tpart1 partition(name='Harry') values(10001)")
+            .run("insert into tpart1 partition(name='Jerry') values(105)")
+            .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')")
+            .run("DROP TABLE tpart2")
+            .dump(primaryDbName, withClause);
+
+    replica.run("DROP TABLE t4")
+            .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')");
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables ")
+            .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", 
"tpart4", "text1"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[]{"2", "22", "222", "2222"})
+            .run("select a from " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"101", "103", "105", "1000", "10001"})
+            .run("show partitions " + replicatedDbName + ".tpart1")
+            .verifyResults(new String[]{"name=Harry", "name=Jerry", 
"name=Tom_del"});
+  }
 }
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
index 452ba64..07e8787 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
@@ -68,8 +68,8 @@ public class TestReplicationScenariosExclusiveReplica extends 
BaseReplicationAcr
   }
 
   @Test
-  public void testRemoteStagingAndCopyTaskOnTarget() throws Throwable {
-    List<String> withClauseOptions = 
getStagingLocationConfig(replica.repldDir);
+  public void testDistCpCopyWithRemoteStagingAndCopyTaskOnTarget() throws 
Throwable {
+    List<String> withClauseOptions = 
getStagingLocationConfig(replica.repldDir, true);
     withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + 
"'='" + false + "'");
     WarehouseInstance.Tuple tuple = primary
         .run("use " + primaryDbName)
@@ -124,8 +124,8 @@ public class TestReplicationScenariosExclusiveReplica 
extends BaseReplicationAcr
   }
 
   @Test
-  public void testLocalStagingAndCopyTaskOnTarget() throws Throwable {
-    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir);
+  public void testDistCpCopyWithLocalStagingAndCopyTaskOnTarget() throws 
Throwable {
+    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir, true);
     withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + 
"'='" + false + "'");
     WarehouseInstance.Tuple tuple = primary
             .run("use " + primaryDbName)
@@ -180,8 +180,8 @@ public class TestReplicationScenariosExclusiveReplica 
extends BaseReplicationAcr
   }
 
   @Test
-  public void testRemoteStagingAndCopyTaskOnSource() throws Throwable {
-    List<String> withClauseOptions = 
getStagingLocationConfig(replica.repldDir);
+  public void testDistCpCopyWithRemoteStagingAndCopyTaskOnSource() throws 
Throwable {
+    List<String> withClauseOptions = 
getStagingLocationConfig(replica.repldDir, true);
     withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + 
"'='" + false + "'");
     withClauseOptions.add("'" + 
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + 
"'");
     WarehouseInstance.Tuple tuple = primary
@@ -237,8 +237,8 @@ public class TestReplicationScenariosExclusiveReplica 
extends BaseReplicationAcr
   }
 
   @Test
-  public void testLocalStagingAndCopyTaskOnSource() throws Throwable {
-    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir);
+  public void testDistCpCopyWithLocalStagingAndCopyTaskOnSource() throws 
Throwable {
+    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir, true);
     withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + 
"'='" + false + "'");
     withClauseOptions.add("'" + 
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + 
"'");
     WarehouseInstance.Tuple tuple = primary
@@ -295,7 +295,7 @@ public class TestReplicationScenariosExclusiveReplica 
extends BaseReplicationAcr
 
   @Test
   public void testRegularCopyRemoteStagingAndCopyTaskOnSource() throws 
Throwable {
-    List<String> withClauseOptions = 
getStagingLocationConfig(replica.repldDir);
+    List<String> withClauseOptions = 
getStagingLocationConfig(replica.repldDir, false);
     withClauseOptions.add("'" + 
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + 
"'");
     WarehouseInstance.Tuple tuple = primary
             .run("use " + primaryDbName)
@@ -351,7 +351,7 @@ public class TestReplicationScenariosExclusiveReplica 
extends BaseReplicationAcr
 
   @Test
   public void testRegularCopyWithLocalStagingAndCopyTaskOnTarget() throws 
Throwable {
-    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir);
+    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir, false);
     WarehouseInstance.Tuple tuple = primary
             .run("use " + primaryDbName)
             .run("create external table t1 (id int)")
@@ -409,7 +409,7 @@ public class TestReplicationScenariosExclusiveReplica 
extends BaseReplicationAcr
     String primaryDb = "primarydb1";
     String replicaDb = "repldb1";
     String tableName = "t1";
-    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir);
+    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir, false);
     WarehouseInstance.Tuple tuple = primary
             .run("create database " + primaryDb)
             .run("alter database "+ primaryDb + " set 
dbproperties('repl.source.for'='1,2,3')")
@@ -456,9 +456,15 @@ public class TestReplicationScenariosExclusiveReplica 
extends BaseReplicationAcr
     Assert.assertEquals(shouldExists, fileSystem.exists(dataFilePath));
   }
 
-  private List<String> getStagingLocationConfig(String stagingLoc) {
+  private List<String> getStagingLocationConfig(String stagingLoc, boolean 
addDistCpConfigs) throws IOException {
     List<String> confList = new ArrayList<>();
     confList.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + stagingLoc 
+ "'");
+    if (addDistCpConfigs) {
+      confList.add("'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname 
+ "'='1'");
+      confList.add("'" + 
HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'");
+      confList.add("'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + 
"'='"
+              + UserGroupInformation.getCurrentUser().getUserName() + "'");
+    }
     return confList;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 58d8e8c..1f40dd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -156,7 +156,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
       }
       // Copy the files from different source file systems to one destination 
directory
       CopyUtils copyUtils = new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs);
-      copyUtils.copyAndVerify(toPath, srcFiles, fromPath, work.isOverWrite());
+      copyUtils.copyAndVerify(toPath, srcFiles, fromPath, 
work.readSrcAsFilesList(), work.isOverWrite());
 
       // If a file is copied from CM path, then need to rename them using 
original source file name
       // This is needed to avoid having duplicate files in target if same 
event is applied twice
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index c313c383..5a662ff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -74,7 +74,7 @@ public class CopyUtils {
   // changed/removed during copy, so double check the checksum after copy,
   // if not match, copy again from cm
   public void copyAndVerify(Path destRoot, List<ReplChangeManager.FileInfo> 
srcFiles, Path origSrcPath,
-                            boolean overwrite)
+                            boolean readSrcAsFilesList, boolean overwrite)
           throws IOException, LoginException, HiveFatalException {
     UserGroupInformation proxyUser = getProxyUser();
     if (CollectionUtils.isEmpty(srcFiles)) {
@@ -83,11 +83,8 @@ public class CopyUtils {
     FileSystem sourceFs = srcFiles.get(0).getSrcFs();
     boolean useRegularCopy = regularCopy(sourceFs, srcFiles);
     try {
-      if (!useRegularCopy) {
-        srcFiles.clear();
-        srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, 
null));
-        doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, 
overwrite);
-      } else {
+      if (useRegularCopy || readSrcAsFilesList) {
+        // Layout of data files may differ based on the type of tables.
         Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = 
fsToFileMap(srcFiles, destRoot);
         for (Map.Entry<FileSystem, Map<Path, 
List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) {
           Map<Path, List<ReplChangeManager.FileInfo>> destMap = 
entry.getValue();
@@ -104,9 +101,16 @@ public class CopyUtils {
             }
 
             // Copy files with retry logic on failure or source file is 
dropped or changed.
-            doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, true, 
overwrite);
+            doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, 
useRegularCopy, overwrite);
           }
         }
+      } else {
+        // When distCp is to be used and the srcFiles doesn't contain subDirs 
(readSrcAsFilesList=false),
+        // original from path should be used during distCp, as distCp copies 
dirItems of srcPath,
+        // not the srcPath folder itself.
+        srcFiles.clear();
+        srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, 
null));
+        doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, 
overwrite);
       }
     } finally {
       if (proxyUser != null) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index e758c8d..1aff738 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -127,7 +127,7 @@ abstract class AbstractEventHandler<T extends EventMessage> 
implements EventHand
       filePaths.add(fileInfo);
       FileSystem dstFs = dataPath.getFileSystem(hiveConf);
       CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs);
-      copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath, false);
+      copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath, true, false);
       copyUtils.renameFileCopiedFromCmPath(dataPath, dstFs, filePaths);
     }
   }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
index 0d66128..b5a910f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
@@ -80,7 +80,7 @@ class CreateFunctionHandler extends 
AbstractEventHandler<CreateFunctionMessage>
         Path destRoot = funcBinCopyPath.getTargetPath().getParent();
         FileSystem dstFs = destRoot.getFileSystem(hiveConf);
         CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs);
-        copyUtils.copyAndVerify(destRoot, filePaths, 
funcBinCopyPath.getSrcPath(), false);
+        copyUtils.copyAndVerify(destRoot, filePaths, 
funcBinCopyPath.getSrcPath(), true, false);
         copyUtils.renameFileCopiedFromCmPath(destRoot, dstFs, filePaths);
       }
     }

Reply via email to