pkumarsinha commented on a change in pull request #1648:
URL: https://github.com/apache/hive/pull/1648#discussion_r520063304
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
##########
@@ -82,27 +87,31 @@ public void copyAndVerify(Path destRoot,
List<ReplChangeManager.FileInfo> srcFil
}
FileSystem sourceFs = srcFiles.get(0).getSrcFs();
boolean useRegularCopy = regularCopy(sourceFs, srcFiles);
+ ExecutorService executorService = null;
try {
if (useRegularCopy || readSrcAsFilesList) {
+ executorService = Executors.newFixedThreadPool(maxParallelCopyTask);
Review comment:
When readSrcAsFilesList is true, should we not always use regular copy
as anyway currently going discp way has adverse affect and no benefit.
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -647,6 +647,9 @@ private static void populateLlapDaemonVarsSet(Set<String>
llapDaemonVarsSetLocal
"Provide the maximum number of partitions of a table that will be
batched together during \n"
+ "repl load. All the partitions in a batch will make a single
metastore call to update the metadata. \n"
+ "The data for these partitions will be copied before copying the
metadata batch. "),
+ REPL_PARALLEL_COPY_TASKS("hive.repl.parallel.copy.tasks",1000,
Review comment:
Do we also support a case where multiple parallel ReplCopyTask will be
launched in bootstrap case (in existing code I mean)?
Another thing is, how does this 'hive.repl.parallel.copy.tasks' fares with
multiple concurrent policies? Like, say some customer sets this config to 5k
and we have say 40 concurrent policies. It would result in 200K threads
concurrently (excluding other threads). Do you see any issue with that?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
##########
@@ -82,27 +87,31 @@ public void copyAndVerify(Path destRoot,
List<ReplChangeManager.FileInfo> srcFil
}
FileSystem sourceFs = srcFiles.get(0).getSrcFs();
boolean useRegularCopy = regularCopy(sourceFs, srcFiles);
+ ExecutorService executorService = null;
try {
if (useRegularCopy || readSrcAsFilesList) {
+ executorService = Executors.newFixedThreadPool(maxParallelCopyTask);
Review comment:
Add tests.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
##########
@@ -112,11 +121,34 @@ public void copyAndVerify(Path destRoot,
List<ReplChangeManager.FileInfo> srcFil
srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath,
null));
doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy,
overwrite);
}
+ } catch (InterruptedException e) {
+ LOG.error("Failed to copy ", e);
+ throw new
IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
} finally {
if (proxyUser != null) {
FileSystem.closeAllForUGI(proxyUser);
}
+ if (executorService != null) {
Review comment:
This is prone to Thread leak. Make sure this is always executed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]