This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new e34acf5 HIVE-23835:Repl Dump should dump function binaries to staging directory( Pravin Kumar Sinha, reviewed by Aasha Medhi) e34acf5 is described below commit e34acf5c677a23af0053ac98532a9caa9e190b6c Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Mon Jul 27 13:48:47 2020 +0530 HIVE-23835:Repl Dump should dump function binaries to staging directory( Pravin Kumar Sinha, reviewed by Aasha Medhi) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 +- .../hive/ql/parse/TestReplicationScenarios.java | 4 +- .../TestReplicationScenariosAcrossInstances.java | 78 ++++++++++++++++++++++ .../hadoop/hive/ql/parse/WarehouseInstance.java | 1 + .../org/apache/hadoop/hive/ql/exec/CopyTask.java | 2 +- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 27 +++++--- .../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 32 ++++++++- .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 1 + .../org/apache/hadoop/hive/ql/parse/EximUtil.java | 10 +-- .../hive/ql/parse/repl/dump/PartitionExport.java | 8 +-- .../hive/ql/parse/repl/dump/TableExport.java | 4 +- .../repl/dump/events/CreateFunctionHandler.java | 42 +++++++++++- .../ql/parse/repl/dump/io/FunctionSerializer.java | 23 ++++++- .../repl/load/message/CreateFunctionHandler.java | 16 +++-- .../org/apache/hadoop/hive/ql/plan/CopyWork.java | 15 +++++ .../hadoop/hive/ql/exec/repl/TestReplDumpTask.java | 5 +- .../queries/clientpositive/repl_2_exim_basic.q | 1 + 17 files changed, 232 insertions(+), 39 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2a32d89..d623311 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -542,7 +542,7 @@ public class HiveConf extends Configuration { "Indicates whether replication dump can skip copyTask and refer to \n" + " original path instead. This would retain all table and partition meta"), REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE("hive.repl.dump.metadata.only.for.external.table", - false, + true, "Indicates whether external table replication dump only metadata information or data + metadata"), REPL_BOOTSTRAP_ACID_TABLES("hive.repl.bootstrap.acid.tables", false, "Indicates if repl dump should bootstrap the information about ACID tables along with \n" diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 66b0d07..b8e91dd 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -395,10 +395,10 @@ public class TestReplicationScenarios { String replicatedDbName = dbName + "_dupe"; - EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, true); + EximUtil.DataCopyPath.setNullSrcPath(hconf, true); verifyFail("REPL DUMP " + dbName, driver); advanceDumpDir(); - EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, false); + EximUtil.DataCopyPath.setNullSrcPath(hconf, false); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replicatedDbName); advanceDumpDir(); FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 60074ae..2953c22 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -116,6 +116,79 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro } @Test + public void testCreateFunctionOnHDFSIncrementalReplication() throws Throwable { + Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar"); + Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar"); + Path identityUdf2HdfsPath = new Path(primary.functionsRoot, "idFunc2" + File.separator + "identity_udf2.jar"); + setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath); + setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf2HdfsPath); + + primary.run("CREATE FUNCTION " + primaryDbName + + ".idFunc1 as 'IdentityStringUDF' " + + "using jar '" + identityUdf1HdfsPath.toString() + "'"); + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName); + replica.load(replicatedDbName, primaryDbName) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'") + .verifyResults(new String[] { replicatedDbName + ".idFunc1"}) + .run("SELECT " + replicatedDbName + ".idFunc1('MyName')") + .verifyResults(new String[] { "MyName"}); + + primary.run("CREATE FUNCTION " + primaryDbName + + ".idFunc2 as 'IdentityStringUDF' " + + "using jar '" + identityUdf2HdfsPath.toString() + "'"); + + WarehouseInstance.Tuple incrementalDump = + primary.dump(primaryDbName); + replica.load(replicatedDbName, primaryDbName) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'") + .verifyResults(new String[] { replicatedDbName + ".idFunc1", + replicatedDbName + ".idFunc2" }) + .run("SELECT " + replicatedDbName + ".idFunc2('YourName')") + .verifyResults(new String[] { "YourName"}); + } + + @Test + public void testCreateFunctionOnHDFSIncrementalReplicationLazyCopy() throws Throwable { + Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar"); + Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar"); + Path identityUdf2HdfsPath = new Path(primary.functionsRoot, "idFunc2" + File.separator + "identity_udf2.jar"); + setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath); + setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf2HdfsPath); + List<String> withClasuse = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'"); + + primary.run("CREATE FUNCTION " + primaryDbName + + ".idFunc1 as 'IdentityStringUDF' " + + "using jar '" + identityUdf1HdfsPath.toString() + "'"); + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, withClasuse); + replica.load(replicatedDbName, primaryDbName, withClasuse) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'") + .verifyResults(new String[] { replicatedDbName + ".idFunc1"}) + .run("SELECT " + replicatedDbName + ".idFunc1('MyName')") + .verifyResults(new String[] { "MyName"}); + + primary.run("CREATE FUNCTION " + primaryDbName + + ".idFunc2 as 'IdentityStringUDF' " + + "using jar '" + identityUdf2HdfsPath.toString() + "'"); + + WarehouseInstance.Tuple incrementalDump = + primary.dump(primaryDbName, withClasuse); + replica.load(replicatedDbName, primaryDbName, withClasuse) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'") + .verifyResults(new String[] { replicatedDbName + ".idFunc1", + replicatedDbName + ".idFunc2" }) + .run("SELECT " + replicatedDbName + ".idFunc2('YourName')") + .verifyResults(new String[] { "YourName"}); + } + + @Test public void testBootstrapReplLoadRetryAfterFailureForFunctions() throws Throwable { String funcName1 = "f1"; String funcName2 = "f2"; @@ -1685,4 +1758,9 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro private String quote(String str) { return "'" + str + "'"; } + + private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPath) throws IOException { + FileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.copyFromLocalFile(identityUdfLocalPath, identityUdfHdfsPath); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 0a7d5a0..1bef351 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -141,6 +141,7 @@ public class WarehouseInstance implements Closeable { hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); hiveConf.setVar(HiveConf.ConfVars.REPLCMDIR, cmRoot); hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot); + hiveConf.setBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE, false); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:memory:${test.tmp.dir}/APP;create=true"); hiveConf.setVar(HiveConf.ConfVars.REPLDIR, this.repldDir); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index c11f582..5ffc110 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -91,7 +91,7 @@ public class CopyTask extends Task<CopyWork> implements Serializable { Utilities.FILE_OP_LOGGER.debug("Copying file {} to {}", oneSrcPathStr, toPath); if (!FileUtils.copy(srcFs, oneSrc.getPath(), dstFs, toPath, false, // delete source - true, // overwrite destination + work.isOverwrite(), // overwrite destination conf)) { console.printError("Failed to copy: '" + oneSrcPathStr + "to: '" + toPath.toString() + "'"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index bb0ae1f..402c87e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -154,7 +154,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { public int execute() { try { SecurityUtils.reloginExpiringKeytabUser(); - if (work.tableDataCopyIteratorsInitialized()) { + if (work.dataCopyIteratorsInitialized()) { initiateDataCopyTasks(); } else { Path dumpRoot = getEncodedDumpRootPath(); @@ -178,6 +178,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; + LOG.info("Data copy at load enabled : {}", conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)); if (isBootstrap) { lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, getHive()); } else { @@ -253,6 +254,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf)); childTasks.addAll(work.managedTableCopyTasks(taskTracker, conf)); + childTasks.addAll(work.functionsBinariesCopyTasks(taskTracker, conf)); if (childTasks.isEmpty()) { //All table data copy work finished. finishRemainingTasks(); @@ -792,6 +794,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { //We can't reuse the previous write id as it might be invalid due to compaction metadataPath.getFileSystem(conf).delete(metadataPath, true); } + List<EximUtil.DataCopyPath> functionsBinaryCopyPaths = Collections.emptyList(); int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) { @@ -818,12 +821,12 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { work.getMetricCollector().reportStageStart(getName(), metricMap); Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb); Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName); - dumpFunctionMetadata(dbName, dbRoot, hiveDb); + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + functionsBinaryCopyPaths = dumpFunctionMetadata(dbName, dbRoot, dbDataRoot, hiveDb, dataCopyAtLoad); String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; try (Writer writer = new Writer(dbRoot, conf)) { - List<Path> extTableLocations = new LinkedList<>(); for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); Table table = null; @@ -836,7 +839,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { LOG.debug("Adding table {} to external tables list", tblName); writer.dataLocationDump(tableTuple.object, extTableFileList, conf); } - boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot, bootDumpBeginReplId, hiveDb, tableTuple, managedTblList, dataCopyAtLoad); @@ -878,6 +880,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId); dmd.write(true); + work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator()); setDataCopyIterators(extTableFileList, managedTblList); return bootDumpBeginReplId; } @@ -1108,24 +1111,30 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return null; } - void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) throws Exception { - Path functionsRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); + List<EximUtil.DataCopyPath> dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Path dbDataRoot, + Hive hiveDb, boolean copyAtLoad) throws Exception { + List<EximUtil.DataCopyPath> functionsBinaryCopyPaths = new ArrayList<>(); + Path functionsMetaRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); + Path functionsDataRoot = new Path(dbDataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); List<String> functionNames = hiveDb.getFunctions(dbName, "*"); for (String functionName : functionNames) { HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName, hiveDb); if (tuple == null) { continue; } - Path functionRoot = new Path(functionsRoot, functionName); - Path functionMetadataFile = new Path(functionRoot, FUNCTION_METADATA_FILE_NAME); + Path functionMetaRoot = new Path(functionsMetaRoot, functionName); + Path functionMetadataFile = new Path(functionMetaRoot, FUNCTION_METADATA_FILE_NAME); + Path functionDataRoot = new Path(functionsDataRoot, functionName); try (JsonWriter jsonWriter = new JsonWriter(functionMetadataFile.getFileSystem(conf), functionMetadataFile)) { - FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf); + FunctionSerializer serializer = new FunctionSerializer(tuple.object, functionDataRoot, copyAtLoad, conf); serializer.writeTo(jsonWriter, tuple.replicationSpec); + functionsBinaryCopyPaths.addAll(serializer.getFunctionBinaryCopyPaths()); } work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.FUNCTIONS.name(), 1); replLogger.functionLog(functionName); } + return functionsBinaryCopyPaths; } void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 21caf44..64b9dd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -57,6 +57,7 @@ public class ReplDumpWork implements Serializable { private Integer maxEventLimit; private transient Iterator<String> externalTblCopyPathIterator; private transient Iterator<String> managedTblCopyPathIterator; + private transient Iterator<EximUtil.DataCopyPath> functionCopyPathIterator; private Path currentDumpPath; private List<String> resultValues; private boolean shouldOverwrite; @@ -147,8 +148,17 @@ public class ReplDumpWork implements Serializable { this.managedTblCopyPathIterator = managedTblCopyPathIterator; } - public boolean tableDataCopyIteratorsInitialized() { - return externalTblCopyPathIterator != null || managedTblCopyPathIterator != null; + public void setFunctionCopyPathIterator(Iterator<EximUtil.DataCopyPath> functionCopyPathIterator) { + if (this.functionCopyPathIterator != null) { + throw new IllegalStateException("Function copy path iterator has already been initialized"); + } + this.functionCopyPathIterator = functionCopyPathIterator; + } + + public boolean dataCopyIteratorsInitialized() { + return externalTblCopyPathIterator != null + || managedTblCopyPathIterator != null + || functionCopyPathIterator != null; } public Path getCurrentDumpPath() { @@ -192,7 +202,7 @@ public class ReplDumpWork implements Serializable { ReplicationSpec replSpec = new ReplicationSpec(); replSpec.setIsReplace(true); replSpec.setInReplicationScope(true); - EximUtil.ManagedTableCopyPath managedTableCopyPath = new EximUtil.ManagedTableCopyPath(replSpec); + EximUtil.DataCopyPath managedTableCopyPath = new EximUtil.DataCopyPath(replSpec); managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next()); Task<?> copyTask = ReplCopyTask.getLoadCopyTask( managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(), @@ -204,6 +214,22 @@ public class ReplDumpWork implements Serializable { return tasks; } + public List<Task<?>> functionsBinariesCopyTasks(TaskTracker tracker, HiveConf conf) { + List<Task<?>> tasks = new ArrayList<>(); + if (functionCopyPathIterator != null) { + while (functionCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { + EximUtil.DataCopyPath binaryCopyPath = functionCopyPathIterator.next(); + Task<?> copyTask = ReplCopyTask.getLoadCopyTask( + binaryCopyPath.getReplicationSpec(), binaryCopyPath.getSrcPath(), binaryCopyPath.getTargetPath(), conf + ); + tasks.add(copyTask); + tracker.addTask(copyTask); + LOG.debug("added task for {}", binaryCopyPath); + } + } + return tasks; + } + public void setShouldOverwrite(boolean shouldOverwrite) { this.shouldOverwrite = shouldOverwrite; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 5ac9a05..3c3dc44 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -119,6 +119,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { if (shouldLoadAuthorizationMetadata()) { initiateAuthorizationLoadTask(); } + LOG.info("Data copy at load enabled : {}", conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)); if (work.isIncrementalLoad()) { return executeIncrementalLoad(); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index b35f7ab..7d39f8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -162,20 +162,20 @@ public class EximUtil { } /** - * Wrapper class for mapping source and target path for copying managed table data. + * Wrapper class for mapping source and target path for copying managed table data and function's binary. */ - public static class ManagedTableCopyPath implements StringConvertibleObject { + public static class DataCopyPath implements StringConvertibleObject { private static final String URI_SEPARATOR = "#"; private ReplicationSpec replicationSpec; private static boolean nullSrcPathForTest = false; private Path srcPath; private Path tgtPath; - public ManagedTableCopyPath(ReplicationSpec replicationSpec) { + public DataCopyPath(ReplicationSpec replicationSpec) { this.replicationSpec = replicationSpec; } - public ManagedTableCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) { + public DataCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) { this.replicationSpec = replicationSpec; if (srcPath == null) { throw new IllegalArgumentException("Source path can not be null."); @@ -200,7 +200,7 @@ public class EximUtil { @Override public String toString() { - return "ManagedTableCopyPath{" + return "DataCopyPath{" + "fullyQualifiedSourcePath=" + srcPath + ", fullyQualifiedTargetPath=" + tgtPath + '}'; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 65d4fbf..aad34d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; -import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; +import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; @@ -75,11 +75,11 @@ class PartitionExport { this.callersSession = SessionState.get(); } - List<ManagedTableCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask, + List<DataCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask, FileList fileList, boolean dataCopyAtLoad) throws InterruptedException, HiveException { List<Future<?>> futures = new LinkedList<>(); - List<ManagedTableCopyPath> managedTableCopyPaths = new LinkedList<>(); + List<DataCopyPath> managedTableCopyPaths = new LinkedList<>(); ExecutorService producer = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build()); futures.add(producer.submit(() -> { @@ -126,7 +126,7 @@ class PartitionExport { Path dataDumpDir = new Path(paths.dataExportRootDir(), partitionName); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); if (!(isExportTask || dataCopyAtLoad)) { - fileList.add(new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(), + fileList.add(new DataCopyPath(forReplicationSpec, partition.getDataLocation(), dataDumpDir).convertToString()); } } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 66cf494..1465b8e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; +import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; @@ -174,7 +174,7 @@ public class TableExport { List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); if (!(isExportTask || dataCopyAtLoad)) { - fileList.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(), + fileList.add(new DataCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(), paths.dataExportDir()).convertToString()); } new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java index c9e1041..69671b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java @@ -20,13 +20,25 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; +import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath; + +import javax.security.auth.login.LoginException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + class CreateFunctionHandler extends AbstractEventHandler<CreateFunctionMessage> { CreateFunctionHandler(NotificationEvent event) { super(event); @@ -41,13 +53,37 @@ class CreateFunctionHandler extends AbstractEventHandler<CreateFunctionMessage> public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} CREATE_FUNCTION message : {}", fromEventId(), eventMessageAsJSON); Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf); - + boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + List<DataCopyPath> functionBinaryCopyPaths = new ArrayList<>(); try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) { - new FunctionSerializer(eventMessage.getFunctionObj(), withinContext.hiveConf) - .writeTo(jsonWriter, withinContext.replicationSpec); + FunctionSerializer serializer = new FunctionSerializer(eventMessage.getFunctionObj(), + dataPath, copyAtLoad, withinContext.hiveConf); + serializer.writeTo(jsonWriter, withinContext.replicationSpec); + functionBinaryCopyPaths.addAll(serializer.getFunctionBinaryCopyPaths()); } withinContext.createDmd(this).write(); + copyFunctionBinaries(functionBinaryCopyPaths, withinContext.hiveConf); + } + + private void copyFunctionBinaries(List<DataCopyPath> functionBinaryCopyPaths, HiveConf hiveConf) + throws MetaException, IOException, LoginException, HiveFatalException { + if (!functionBinaryCopyPaths.isEmpty()) { + String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>(); + for (DataCopyPath funcBinCopyPath : functionBinaryCopyPaths) { + String [] decodedURISplits = ReplChangeManager.decodeFileUri(funcBinCopyPath.getSrcPath().toString()); + ReplChangeManager.FileInfo fileInfo = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]), + decodedURISplits[1], decodedURISplits[2], decodedURISplits[3], hiveConf); + filePaths.add(fileInfo); + Path destRoot = funcBinCopyPath.getTargetPath().getParent(); + FileSystem dstFs = destRoot.getFileSystem(hiveConf); + CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs); + copyUtils.copyAndVerify(destRoot, filePaths, funcBinCopyPath.getSrcPath(), false); + copyUtils.renameFileCopiedFromCmPath(destRoot, dstFs, filePaths); + } + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java index 733bab5..2e87267 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; @@ -40,10 +41,15 @@ public class FunctionSerializer implements JsonWriter.Serializer { public static final String FIELD_NAME = "function"; private Function function; private HiveConf hiveConf; + private Path functionDataRoot; + private boolean copyAtLoad; + private List<EximUtil.DataCopyPath> functionBinaryCopyPaths = new ArrayList<>(); - public FunctionSerializer(Function function, HiveConf hiveConf) { + public FunctionSerializer(Function function, Path functionDataRoot, boolean copyAtLoad, HiveConf hiveConf) { this.hiveConf = hiveConf; this.function = function; + this.functionDataRoot = functionDataRoot; + this.copyAtLoad = copyAtLoad; } @Override @@ -58,9 +64,16 @@ public class FunctionSerializer implements JsonWriter.Serializer { FileSystem fileSystem = inputPath.getFileSystem(hiveConf); Path qualifiedUri = PathBuilder.fullyQualifiedHDFSUri(inputPath, fileSystem); String checkSum = ReplChangeManager.checksumFor(qualifiedUri, fileSystem); - String newFileUri = ReplChangeManager.getInstance(hiveConf) + String encodedSrcUri = ReplChangeManager.getInstance(hiveConf) .encodeFileUri(qualifiedUri.toString(), checkSum, null); - resourceUris.add(new ResourceUri(uri.getResourceType(), newFileUri)); + if (copyAtLoad) { + resourceUris.add(new ResourceUri(uri.getResourceType(), encodedSrcUri)); + } else { + Path newBinaryPath = new Path(functionDataRoot, qualifiedUri.getName()); + resourceUris.add(new ResourceUri(uri.getResourceType(),newBinaryPath.toString())); + functionBinaryCopyPaths.add(new EximUtil.DataCopyPath(additionalPropertiesProvider, + new Path(encodedSrcUri), newBinaryPath)); + } } else { resourceUris.add(uri); } @@ -84,4 +97,8 @@ public class FunctionSerializer implements JsonWriter.Serializer { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); } } + + public List<EximUtil.DataCopyPath> getFunctionBinaryCopyPaths() { + return functionBinaryCopyPaths; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index 948d201..f42290e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import java.io.IOException; @@ -193,15 +194,20 @@ public class CreateFunctionHandler extends AbstractMessageHandler { new Path(functionsRootDir).getFileSystem(context.hiveConf) ); - Task<?> copyTask = ReplCopyTask.getLoadCopyTask( - metadata.getReplicationSpec(), new Path(sourceUri), qualifiedDestinationPath, - context.hiveConf - ); - replCopyTasks.add(copyTask); + replCopyTasks.add(getCopyTask(sourceUri, qualifiedDestinationPath)); ResourceUri destinationUri = new ResourceUri(resourceUri.getResourceType(), qualifiedDestinationPath.toString()); context.log.debug("copy source uri : {} to destination uri: {}", sourceUri, destinationUri); return destinationUri; } + + private Task<?> getCopyTask(String sourceUri, Path dest) { + boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + if (copyAtLoad ) { + return ReplCopyTask.getLoadCopyTask(metadata.getReplicationSpec(), new Path(sourceUri), dest, context.hiveConf); + } else { + return TaskFactory.get(new CopyWork(new Path(sourceUri), dest, true, false), context.hiveConf); + } + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index 018983f..f69776a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -33,6 +33,7 @@ public class CopyWork implements Serializable { private Path[] fromPath; private Path[] toPath; private boolean errorOnSrcEmpty; + private boolean overwrite = true; public CopyWork() { } @@ -42,6 +43,12 @@ public class CopyWork implements Serializable { this.setErrorOnSrcEmpty(errorOnSrcEmpty); } + public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty, boolean overwrite) { + this(new Path[] { fromPath }, new Path[] { toPath }); + this.setErrorOnSrcEmpty(errorOnSrcEmpty); + this.setOverwrite(overwrite); + } + public CopyWork(final Path[] fromPath, final Path[] toPath) { if (fromPath.length != toPath.length) { throw new RuntimeException( @@ -87,4 +94,12 @@ public class CopyWork implements Serializable { public boolean isErrorOnSrcEmpty() { return errorOnSrcEmpty; } + + public boolean isOverwrite() { + return overwrite; + } + + public void setOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index a6def15..3f47678 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; @@ -87,7 +88,9 @@ public class TestReplDumpTask { } @Override - void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) { + List<EximUtil.DataCopyPath> dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Path dbDataRoot, + Hive hiveDb, boolean copyAtLoad) { + return Collections.emptyList(); } @Override diff --git a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q index b9119a9..4c18223 100644 --- a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q +++ b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q @@ -3,6 +3,7 @@ set hive.test.mode=true; set hive.test.mode.prefix=; set hive.test.mode.nosamplelist=managed_t,ext_t,managed_t_imported,managed_t_r_imported,ext_t_imported,ext_t_r_imported; set hive.repl.include.external.tables=true; +set hive.repl.dump.metadata.only.for.external.table=false; drop table if exists managed_t; drop table if exists ext_t;