This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2a760dd HIVE-24131:Use original src location always when data copy
runs on target (Pravin Kumar Sinha, reviewed by Aasha Medhi)
2a760dd is described below
commit 2a760dd607e206d7f1061c01075767ecfff40d0c
Author: Anishek Agarwal <[email protected]>
AuthorDate: Mon Sep 14 16:15:34 2020 +0530
HIVE-24131:Use original src location always when data copy runs on target
(Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
.../parse/TestReplicationScenariosAcidTables.java | 259 +++++++++++++++++++++
.../TestReplicationScenariosExclusiveReplica.java | 30 ++-
.../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 2 +-
.../hadoop/hive/ql/parse/repl/CopyUtils.java | 18 +-
.../repl/dump/events/AbstractEventHandler.java | 2 +-
.../repl/dump/events/CreateFunctionHandler.java | 2 +-
6 files changed, 291 insertions(+), 22 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 95ad047..70151ee 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -53,6 +53,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
@@ -2075,4 +2076,262 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
.run("show tables")
.verifyResults(new String[]{"t1"});
}
+
+ @Test
+ public void testORCTableRegularCopyWithCopyOnTarget() throws Throwable {
+ ArrayList<String> withClause = new ArrayList<>();
+ withClause.add("'" +
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'");
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES
('transactional'='true')")
+ .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart1(a int) partitioned by (name string)" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart2(a int) partitioned by (name string)
clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE text1(a string) STORED AS TEXTFILE")
+ .run("insert into t1 values (1)")
+ .run("insert into t1 values (11)")
+ .run("insert into t2 values (2)")
+ .run("insert into t2 values (22)")
+ .run("insert into t3 values (33)")
+ .run("insert into tpart1 partition(name='Tom') values(100)")
+ .run("insert into tpart1 partition(name='Jerry') values(101)")
+ .run("insert into tpart2 partition(name='Bob') values(200)")
+ .run("insert into tpart2 partition(name='Carl') values(201)")
+ .run("insert into text1 values ('ricky')")
+ .dump(primaryDbName, withClause);
+
+ replica.run("DROP TABLE t3");
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2",
"text1"})
+ .run("select * from " + replicatedDbName + ".t1")
+ .verifyResults(new String[] {"1", "11"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"2", "22"})
+ .run("select a from " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"100", "101"})
+ .run("show partitions " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+ .run("select a from " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"200", "201"})
+ .run("show partitions " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"name=Bob", "name=Carl"})
+ .run("select a from " + replicatedDbName + ".text1")
+ .verifyResults(new String[]{"ricky"});
+
+ WarehouseInstance.Tuple incrementalDump = primary.run("use " +
primaryDbName)
+ .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart3(a int) partitioned by (name string)" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart4(a int) partitioned by (name string)
clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("insert into t1 values (111)")
+ .run("insert into t2 values (222)")
+ .run("insert into t4 values (4)")
+ .run("insert into tpart1 partition(name='Tom') values(102)")
+ .run("insert into tpart1 partition(name='Jerry') values(103)")
+ .run("insert into tpart2 partition(name='Bob') values(202)")
+ .run("insert into tpart2 partition(name='Carl') values(203)")
+ .run("insert into tpart3 partition(name='Tom3') values(300)")
+ .run("insert into tpart3 partition(name='Jerry3') values(301)")
+ .run("insert into tpart4 partition(name='Bob4') values(400)")
+ .run("insert into tpart4 partition(name='Carl4') values(401)")
+ .run("insert into text1 values ('martin')")
+ .dump(primaryDbName, withClause);
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables ")
+ .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2",
"tpart3", "tpart4", "text1"})
+ .run("select * from " + replicatedDbName + ".t1")
+ .verifyResults(new String[] {"1", "11", "111"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"2", "22", "222"})
+ .run("select * from " + replicatedDbName + ".t4")
+ .verifyResults(new String[]{"4"})
+ .run("select a from " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"100", "101", "102", "103"})
+ .run("show partitions " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+ .run("select a from " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"200", "201", "202", "203"})
+ .run("show partitions " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"name=Bob", "name=Carl"})
+ .run("select a from " + replicatedDbName + ".tpart3")
+ .verifyResults(new String[]{"300", "301"})
+ .run("show partitions " + replicatedDbName + ".tpart3")
+ .verifyResults(new String[]{"name=Tom3", "name=Jerry3"})
+ .run("select a from " + replicatedDbName + ".tpart4")
+ .verifyResults(new String[]{"400", "401"})
+ .run("show partitions " + replicatedDbName + ".tpart4")
+ .verifyResults(new String[]{"name=Bob4", "name=Carl4"})
+ .run("select a from " + replicatedDbName + ".text1")
+ .verifyResults(new String[]{"ricky", "martin"});
+
+ incrementalDump = primary.run("use " + primaryDbName)
+ .run("insert into t4 values (44)")
+ .run("insert into t1 values (1111)")
+ .run("DROP TABLE t1")
+ .run("insert into t2 values (2222)")
+ .run("insert into tpart1 partition(name='Tom') values(104)")
+ .run("insert into tpart1 partition(name='Tom_del') values(1000)")
+ .run("insert into tpart1 partition(name='Harry') values(10001)")
+ .run("insert into tpart1 partition(name='Jerry') values(105)")
+ .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')")
+ .run("DROP TABLE tpart2")
+ .dump(primaryDbName, withClause);
+
+ replica.run("DROP TABLE t4")
+ .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')");
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables ")
+ .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3",
"tpart4", "text1"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"2", "22", "222", "2222"})
+ .run("select a from " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"101", "103", "105", "1000", "10001"})
+ .run("show partitions " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"name=Harry", "name=Jerry",
"name=Tom_del"});
+ }
+
+ @Test
+ public void testORCTableDistcpCopyWithCopyOnTarget() throws Throwable {
+ //Distcp copy
+ List<String> withClause = Arrays.asList(
+ "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname
+ "'='true'",
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname +
"'='1'",
+ "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
+ "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname +
"'='0'",
+ "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+ + UserGroupInformation.getCurrentUser().getUserName() +
"'");
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES
('transactional'='true')")
+ .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart1(a int) partitioned by (name string)" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart2(a int) partitioned by (name string)
clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE text1(a string) STORED AS TEXTFILE")
+ .run("insert into t1 values (1)")
+ .run("insert into t1 values (11)")
+ .run("insert into t2 values (2)")
+ .run("insert into t2 values (22)")
+ .run("insert into t3 values (33)")
+ .run("insert into tpart1 partition(name='Tom') values(100)")
+ .run("insert into tpart1 partition(name='Jerry') values(101)")
+ .run("insert into tpart2 partition(name='Bob') values(200)")
+ .run("insert into tpart2 partition(name='Carl') values(201)")
+ .run("insert into text1 values ('ricky')")
+ .dump(primaryDbName, withClause);
+
+ replica.run("DROP TABLE t3");
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2",
"text1"})
+ .run("select * from " + replicatedDbName + ".t1")
+ .verifyResults(new String[] {"1", "11"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"2", "22"})
+ .run("select a from " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"100", "101"})
+ .run("show partitions " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+ .run("select a from " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"200", "201"})
+ .run("show partitions " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"name=Bob", "name=Carl"})
+ .run("select a from " + replicatedDbName + ".text1")
+ .verifyResults(new String[]{"ricky"});
+
+ WarehouseInstance.Tuple incrementalDump = primary.run("use " +
primaryDbName)
+ .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart3(a int) partitioned by (name string)" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("CREATE TABLE tpart4(a int) partitioned by (name string)
clustered by (a) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='true')")
+ .run("insert into t1 values (111)")
+ .run("insert into t2 values (222)")
+ .run("insert into t4 values (4)")
+ .run("insert into tpart1 partition(name='Tom') values(102)")
+ .run("insert into tpart1 partition(name='Jerry') values(103)")
+ .run("insert into tpart2 partition(name='Bob') values(202)")
+ .run("insert into tpart2 partition(name='Carl') values(203)")
+ .run("insert into tpart3 partition(name='Tom3') values(300)")
+ .run("insert into tpart3 partition(name='Jerry3') values(301)")
+ .run("insert into tpart4 partition(name='Bob4') values(400)")
+ .run("insert into tpart4 partition(name='Carl4') values(401)")
+ .run("insert into text1 values ('martin')")
+ .dump(primaryDbName, withClause);
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables ")
+ .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2",
"tpart3", "tpart4", "text1"})
+ .run("select * from " + replicatedDbName + ".t1")
+ .verifyResults(new String[] {"1", "11", "111"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"2", "22", "222"})
+ .run("select * from " + replicatedDbName + ".t4")
+ .verifyResults(new String[]{"4"})
+ .run("select a from " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"100", "101", "102", "103"})
+ .run("show partitions " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+ .run("select a from " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"200", "201", "202", "203"})
+ .run("show partitions " + replicatedDbName + ".tpart2")
+ .verifyResults(new String[]{"name=Bob", "name=Carl"})
+ .run("select a from " + replicatedDbName + ".tpart3")
+ .verifyResults(new String[]{"300", "301"})
+ .run("show partitions " + replicatedDbName + ".tpart3")
+ .verifyResults(new String[]{"name=Tom3", "name=Jerry3"})
+ .run("select a from " + replicatedDbName + ".tpart4")
+ .verifyResults(new String[]{"400", "401"})
+ .run("show partitions " + replicatedDbName + ".tpart4")
+ .verifyResults(new String[]{"name=Bob4", "name=Carl4"})
+ .run("select a from " + replicatedDbName + ".text1")
+ .verifyResults(new String[]{"ricky", "martin"});
+
+ incrementalDump = primary.run("use " + primaryDbName)
+ .run("insert into t4 values (44)")
+ .run("insert into t1 values (1111)")
+ .run("DROP TABLE t1")
+ .run("insert into t2 values (2222)")
+ .run("insert into tpart1 partition(name='Tom') values(104)")
+ .run("insert into tpart1 partition(name='Tom_del') values(1000)")
+ .run("insert into tpart1 partition(name='Harry') values(10001)")
+ .run("insert into tpart1 partition(name='Jerry') values(105)")
+ .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')")
+ .run("DROP TABLE tpart2")
+ .dump(primaryDbName, withClause);
+
+ replica.run("DROP TABLE t4")
+ .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')");
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables ")
+ .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3",
"tpart4", "text1"})
+ .run("select * from " + replicatedDbName + ".t2")
+ .verifyResults(new String[]{"2", "22", "222", "2222"})
+ .run("select a from " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"101", "103", "105", "1000", "10001"})
+ .run("show partitions " + replicatedDbName + ".tpart1")
+ .verifyResults(new String[]{"name=Harry", "name=Jerry",
"name=Tom_del"});
+ }
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
index 452ba64..07e8787 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
@@ -68,8 +68,8 @@ public class TestReplicationScenariosExclusiveReplica extends
BaseReplicationAcr
}
@Test
- public void testRemoteStagingAndCopyTaskOnTarget() throws Throwable {
- List<String> withClauseOptions =
getStagingLocationConfig(replica.repldDir);
+ public void testDistCpCopyWithRemoteStagingAndCopyTaskOnTarget() throws
Throwable {
+ List<String> withClauseOptions =
getStagingLocationConfig(replica.repldDir, true);
withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname +
"'='" + false + "'");
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
@@ -124,8 +124,8 @@ public class TestReplicationScenariosExclusiveReplica
extends BaseReplicationAcr
}
@Test
- public void testLocalStagingAndCopyTaskOnTarget() throws Throwable {
- List<String> withClauseOptions =
getStagingLocationConfig(primary.repldDir);
+ public void testDistCpCopyWithLocalStagingAndCopyTaskOnTarget() throws
Throwable {
+ List<String> withClauseOptions =
getStagingLocationConfig(primary.repldDir, true);
withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname +
"'='" + false + "'");
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
@@ -180,8 +180,8 @@ public class TestReplicationScenariosExclusiveReplica
extends BaseReplicationAcr
}
@Test
- public void testRemoteStagingAndCopyTaskOnSource() throws Throwable {
- List<String> withClauseOptions =
getStagingLocationConfig(replica.repldDir);
+ public void testDistCpCopyWithRemoteStagingAndCopyTaskOnSource() throws
Throwable {
+ List<String> withClauseOptions =
getStagingLocationConfig(replica.repldDir, true);
withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname +
"'='" + false + "'");
withClauseOptions.add("'" +
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false +
"'");
WarehouseInstance.Tuple tuple = primary
@@ -237,8 +237,8 @@ public class TestReplicationScenariosExclusiveReplica
extends BaseReplicationAcr
}
@Test
- public void testLocalStagingAndCopyTaskOnSource() throws Throwable {
- List<String> withClauseOptions =
getStagingLocationConfig(primary.repldDir);
+ public void testDistCpCopyWithLocalStagingAndCopyTaskOnSource() throws
Throwable {
+ List<String> withClauseOptions =
getStagingLocationConfig(primary.repldDir, true);
withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname +
"'='" + false + "'");
withClauseOptions.add("'" +
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false +
"'");
WarehouseInstance.Tuple tuple = primary
@@ -295,7 +295,7 @@ public class TestReplicationScenariosExclusiveReplica
extends BaseReplicationAcr
@Test
public void testRegularCopyRemoteStagingAndCopyTaskOnSource() throws
Throwable {
- List<String> withClauseOptions =
getStagingLocationConfig(replica.repldDir);
+ List<String> withClauseOptions =
getStagingLocationConfig(replica.repldDir, false);
withClauseOptions.add("'" +
HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false +
"'");
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
@@ -351,7 +351,7 @@ public class TestReplicationScenariosExclusiveReplica
extends BaseReplicationAcr
@Test
public void testRegularCopyWithLocalStagingAndCopyTaskOnTarget() throws
Throwable {
- List<String> withClauseOptions =
getStagingLocationConfig(primary.repldDir);
+ List<String> withClauseOptions =
getStagingLocationConfig(primary.repldDir, false);
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
.run("create external table t1 (id int)")
@@ -409,7 +409,7 @@ public class TestReplicationScenariosExclusiveReplica
extends BaseReplicationAcr
String primaryDb = "primarydb1";
String replicaDb = "repldb1";
String tableName = "t1";
- List<String> withClauseOptions =
getStagingLocationConfig(primary.repldDir);
+ List<String> withClauseOptions =
getStagingLocationConfig(primary.repldDir, false);
WarehouseInstance.Tuple tuple = primary
.run("create database " + primaryDb)
.run("alter database "+ primaryDb + " set
dbproperties('repl.source.for'='1,2,3')")
@@ -456,9 +456,15 @@ public class TestReplicationScenariosExclusiveReplica
extends BaseReplicationAcr
Assert.assertEquals(shouldExists, fileSystem.exists(dataFilePath));
}
- private List<String> getStagingLocationConfig(String stagingLoc) {
+ private List<String> getStagingLocationConfig(String stagingLoc, boolean
addDistCpConfigs) throws IOException {
List<String> confList = new ArrayList<>();
confList.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + stagingLoc
+ "'");
+ if (addDistCpConfigs) {
+ confList.add("'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname
+ "'='1'");
+ confList.add("'" +
HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'");
+ confList.add("'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname +
"'='"
+ + UserGroupInformation.getCurrentUser().getUserName() + "'");
+ }
return confList;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 58d8e8c..1f40dd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -156,7 +156,7 @@ public class ReplCopyTask extends Task<ReplCopyWork>
implements Serializable {
}
// Copy the files from different source file systems to one destination
directory
CopyUtils copyUtils = new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs);
- copyUtils.copyAndVerify(toPath, srcFiles, fromPath, work.isOverWrite());
+ copyUtils.copyAndVerify(toPath, srcFiles, fromPath,
work.readSrcAsFilesList(), work.isOverWrite());
// If a file is copied from CM path, then need to rename them using
original source file name
// This is needed to avoid having duplicate files in target if same
event is applied twice
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index c313c383..5a662ff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -74,7 +74,7 @@ public class CopyUtils {
// changed/removed during copy, so double check the checksum after copy,
// if not match, copy again from cm
public void copyAndVerify(Path destRoot, List<ReplChangeManager.FileInfo>
srcFiles, Path origSrcPath,
- boolean overwrite)
+ boolean readSrcAsFilesList, boolean overwrite)
throws IOException, LoginException, HiveFatalException {
UserGroupInformation proxyUser = getProxyUser();
if (CollectionUtils.isEmpty(srcFiles)) {
@@ -83,11 +83,8 @@ public class CopyUtils {
FileSystem sourceFs = srcFiles.get(0).getSrcFs();
boolean useRegularCopy = regularCopy(sourceFs, srcFiles);
try {
- if (!useRegularCopy) {
- srcFiles.clear();
- srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath,
null));
- doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy,
overwrite);
- } else {
+ if (useRegularCopy || readSrcAsFilesList) {
+ // Layout of data files may differ based on the type of tables.
Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map =
fsToFileMap(srcFiles, destRoot);
for (Map.Entry<FileSystem, Map<Path,
List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) {
Map<Path, List<ReplChangeManager.FileInfo>> destMap =
entry.getValue();
@@ -104,9 +101,16 @@ public class CopyUtils {
}
// Copy files with retry logic on failure or source file is
dropped or changed.
- doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, true,
overwrite);
+ doCopyRetry(sourceFs, fileInfoList, destination, proxyUser,
useRegularCopy, overwrite);
}
}
+ } else {
+ // When distCp is to be used and the srcFiles doesn't contain subDirs
(readSrcAsFilesList=false),
+ // original from path should be used during distCp, as distCp copies
dirItems of srcPath,
+ // not the srcPath folder itself.
+ srcFiles.clear();
+ srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath,
null));
+ doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy,
overwrite);
}
} finally {
if (proxyUser != null) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index e758c8d..1aff738 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -127,7 +127,7 @@ abstract class AbstractEventHandler<T extends EventMessage>
implements EventHand
filePaths.add(fileInfo);
FileSystem dstFs = dataPath.getFileSystem(hiveConf);
CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs);
- copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath, false);
+ copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath, true, false);
copyUtils.renameFileCopiedFromCmPath(dataPath, dstFs, filePaths);
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
index 0d66128..b5a910f 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
@@ -80,7 +80,7 @@ class CreateFunctionHandler extends
AbstractEventHandler<CreateFunctionMessage>
Path destRoot = funcBinCopyPath.getTargetPath().getParent();
FileSystem dstFs = destRoot.getFileSystem(hiveConf);
CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs);
- copyUtils.copyAndVerify(destRoot, filePaths,
funcBinCopyPath.getSrcPath(), false);
+ copyUtils.copyAndVerify(destRoot, filePaths,
funcBinCopyPath.getSrcPath(), true, false);
copyUtils.renameFileCopiedFromCmPath(destRoot, dstFs, filePaths);
}
}