This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new ded1db7 HIVE-24114:Fix Repl Load with both staging and data copy on target (Pravin Kumar Sinha, reviewed by Aasha Medhi) ded1db7 is described below commit ded1db7b220f1ba51d4c84baac4d866f9d293af0 Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Tue Sep 8 09:49:41 2020 +0530 HIVE-24114:Fix Repl Load with both staging and data copy on target (Pravin Kumar Sinha, reviewed by Aasha Medhi) --- .../java/org/apache/hadoop/hive/ql/ErrorMsg.java | 3 +- .../apache/hadoop/hive/ql/parse/TestCopyUtils.java | 2 +- .../parse/TestReplicationScenariosAcidTables.java | 12 +- .../TestReplicationScenariosExclusiveReplica.java | 231 ++++++++++++++++++++- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 4 +- .../hadoop/hive/ql/parse/repl/CopyUtils.java | 18 +- 6 files changed, 251 insertions(+), 19 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 4ce2c52..b14f906 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -622,7 +622,8 @@ public enum ErrorMsg { REPL_INVALID_CONFIG_FOR_SERVICE(40008, "Invalid config error : {0} for {1} service.", true), REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE(40009, "Invalid internal config error : {0} for {1} service.", true), REPL_RETRY_EXHAUSTED(40010, "Retry exhausted for retryable error code {0}.", true), - REPL_FAILED_WITH_NON_RECOVERABLE_ERROR(40011, "Replication failed with non recoverable error. Needs manual intervention") + REPL_FAILED_WITH_NON_RECOVERABLE_ERROR(40011, "Replication failed with non recoverable error. Needs manual intervention"), + REPL_INVALID_ARGUMENTS(40012, "Invalid arguments error : {0}.", true) ; private int errorCode; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java index 9648c72..28b4abe 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java @@ -90,7 +90,7 @@ public class TestCopyUtils { MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{ - put(ConfVars.HIVE_IN_TEST.varname, "true"); + put(ConfVars.HIVE_IN_TEST_REPL.varname, "true"); put(ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, "1"); put(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false"); put(ConfVars.HIVE_DISTCP_DOAS_USER.varname, currentUser); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 4be9144..c03b252 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -1292,7 +1292,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios public void testCheckPointingDataDumpFailureBootstrapDuringIncremental() throws Throwable { List<String> dumpClause = Arrays.asList( "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", - "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + UserGroupInformation.getCurrentUser().getUserName() + "'"); @@ -1309,7 +1309,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios dumpClause = Arrays.asList( "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", - "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + UserGroupInformation.getCurrentUser().getUserName() + "'", @@ -1491,7 +1491,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios //To force distcp copy List<String> dumpClause = Arrays.asList( "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", - "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + UserGroupInformation.getCurrentUser().getUserName() + "'"); @@ -1642,7 +1642,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios //Distcp copy List<String> dumpClause = Arrays.asList( "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", - "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + UserGroupInformation.getCurrentUser().getUserName() + "'"); @@ -1695,7 +1695,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios //To force distcp copy List<String> dumpClause = Arrays.asList( "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", - "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + UserGroupInformation.getCurrentUser().getUserName() + "'"); @@ -1748,7 +1748,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios //To force distcp copy List<String> dumpClause = Arrays.asList( "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", - "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + UserGroupInformation.getCurrentUser().getUserName() + "'"); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java index 549447e..452ba64 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -68,8 +68,9 @@ public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcr } @Test - public void externalTableReplicationWithRemoteStaging() throws Throwable { + public void testRemoteStagingAndCopyTaskOnTarget() throws Throwable { List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir); + withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -123,7 +124,233 @@ public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcr } @Test - public void externalTableReplicationWithLocalStaging() throws Throwable { + public void testLocalStagingAndCopyTaskOnTarget() throws Throwable { + List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir); + withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (500)") + .run("create table t2 (id int)") + .run("insert into table t2 values (600)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, primary); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("500") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("600"); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (700)") + .run("create table t4 (id int)") + .run("insert into table t4 values (800)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, primary); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select id from t1") + .verifyResult("500") + .run("select id from t2") + .verifyResult("600") + .run("select id from t3") + .verifyResult("700") + .run("select id from t4") + .verifyResult("800"); + } + + @Test + public void testRemoteStagingAndCopyTaskOnSource() throws Throwable { + List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir); + withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); + withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (100)") + .run("create table t2 (id int)") + .run("insert into table t2 values (200)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, replica); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("100") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("200"); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (300)") + .run("create table t4 (id int)") + .run("insert into table t4 values (400)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, replica); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select id from t1") + .verifyResult("100") + .run("select id from t2") + .verifyResult("200") + .run("select id from t3") + .verifyResult("300") + .run("select id from t4") + .verifyResult("400"); + } + + @Test + public void testLocalStagingAndCopyTaskOnSource() throws Throwable { + List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir); + withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='" + false + "'"); + withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (500)") + .run("create table t2 (id int)") + .run("insert into table t2 values (600)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, primary); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("500") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("600"); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (700)") + .run("create table t4 (id int)") + .run("insert into table t4 values (800)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, primary); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select id from t1") + .verifyResult("500") + .run("select id from t2") + .verifyResult("600") + .run("select id from t3") + .verifyResult("700") + .run("select id from t4") + .verifyResult("800"); + } + + @Test + public void testRegularCopyRemoteStagingAndCopyTaskOnSource() throws Throwable { + List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir); + withClauseOptions.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='" + false + "'"); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (500)") + .run("create table t2 (id int)") + .run("insert into table t2 values (600)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, replica); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("500") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("600"); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (700)") + .run("create table t4 (id int)") + .run("insert into table t4 values (800)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, replica); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select id from t1") + .verifyResult("500") + .run("select id from t2") + .verifyResult("600") + .run("select id from t3") + .verifyResult("700") + .run("select id from t4") + .verifyResult("800"); + } + + @Test + public void testRegularCopyWithLocalStagingAndCopyTaskOnTarget() throws Throwable { List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 656c298..f261889 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -315,7 +315,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } private Path getDumpRoot(Path currentDumpPath) { - if (ReplDumpWork.testDeletePreviousDumpMetaPath && conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) { + if (ReplDumpWork.testDeletePreviousDumpMetaPath + && (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) + || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL))) { //testDeleteDumpMetaDumpPath to be used only for test. return null; } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index c386aee..c313c383 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl; import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -29,7 +30,6 @@ import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -45,7 +45,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.stream.Collectors; public class CopyUtils { @@ -58,7 +57,7 @@ public class CopyUtils { private final HiveConf hiveConf; private final long maxCopyFileSize; private final long maxNumberOfFiles; - private final boolean hiveInTest; + private final boolean hiveInReplTest; private final String copyAsUser; private FileSystem destinationFs; @@ -66,7 +65,7 @@ public class CopyUtils { this.hiveConf = hiveConf; maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES); maxCopyFileSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE); - hiveInTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST); + hiveInReplTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL); this.copyAsUser = distCpDoAsUser; this.destinationFs = destinationFs; } @@ -74,16 +73,19 @@ public class CopyUtils { // Used by replication, copy files from source to destination. It is possible source file is // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm - public void copyAndVerify(Path destRoot, List<ReplChangeManager.FileInfo> srcFiles, Path origSrcPtah, + public void copyAndVerify(Path destRoot, List<ReplChangeManager.FileInfo> srcFiles, Path origSrcPath, boolean overwrite) throws IOException, LoginException, HiveFatalException { UserGroupInformation proxyUser = getProxyUser(); - FileSystem sourceFs = origSrcPtah.getFileSystem(hiveConf); + if (CollectionUtils.isEmpty(srcFiles)) { + throw new IOException(ErrorMsg.REPL_INVALID_ARGUMENTS.format("SrcFiles can not be empty during copy operation.")); + } + FileSystem sourceFs = srcFiles.get(0).getSrcFs(); boolean useRegularCopy = regularCopy(sourceFs, srcFiles); try { if (!useRegularCopy) { srcFiles.clear(); - srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPtah, null)); + srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, null)); doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, overwrite); } else { Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot); @@ -429,7 +431,7 @@ public class CopyUtils { */ boolean regularCopy(FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList) throws IOException { - if (hiveInTest) { + if (hiveInReplTest) { return true; } if (isLocal(sourceFs) || isLocal(destinationFs)) {