Updated Branches: refs/heads/sqoop2 612060139 -> 5be8eb680
SQOOP-813: LoaderExecutor might get into deadlock when exception is raised outside Loader itself (Jarek Jarcec Cecho via Cheolsoo Park) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5be8eb68 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5be8eb68 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5be8eb68 Branch: refs/heads/sqoop2 Commit: 5be8eb680867d26c5ec23a4501eca3d8bb031e54 Parents: 6120601 Author: Cheolsoo Park <[email protected]> Authored: Tue Jan 22 16:06:29 2013 -0800 Committer: Cheolsoo Park <[email protected]> Committed: Tue Jan 22 16:06:29 2013 -0800 ---------------------------------------------------------------------- .../job/mr/SqoopOutputFormatLoadExecutor.java | 65 +++++++-------- 1 files changed, 30 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/5be8eb68/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index d158327..c5f3abd 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -175,44 +175,39 @@ public class SqoopOutputFormatLoadExecutor { @Override public void run() { - DataReader reader = new OutputFormatDataReader(); - - Configuration conf = null; - if (!isTest) { - conf = context.getConfiguration(); - + LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting"); + try { + DataReader reader = new OutputFormatDataReader(); - loaderName = conf.get(JobConstants.JOB_ETL_LOADER); - } - Loader loader = (Loader) ClassUtils.instantiate(loaderName); - - // Objects that should be pass to the Executor execution - PrefixContext subContext = null; - Object configConnection = null; - Object configJob = null; - - if (!isTest) { - switch (ConfigurationUtils.getJobType(conf)) { - case EXPORT: - subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); - configConnection = ConfigurationUtils.getConnectorConnection(conf); - configJob = ConfigurationUtils.getConnectorJob(conf); - break; - case IMPORT: - subContext = new PrefixContext(conf, ""); - configConnection = ConfigurationUtils.getFrameworkConnection(conf); - configJob = ConfigurationUtils.getFrameworkJob(conf); - break; - default: - readerFinished = true; - // Release so that the writer can tell the framework something went - // wrong. - free.release(); - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023); + Configuration conf = null; + if (!isTest) { + conf = context.getConfiguration(); + loaderName = conf.get(JobConstants.JOB_ETL_LOADER); + } + Loader loader = (Loader) ClassUtils.instantiate(loaderName); + + // Objects that should be pass to the Executor execution + PrefixContext subContext = null; + Object configConnection = null; + Object configJob = null; + + if (!isTest) { + switch (ConfigurationUtils.getJobType(conf)) { + case EXPORT: + subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); + configConnection = ConfigurationUtils.getConnectorConnection(conf); + configJob = ConfigurationUtils.getConnectorJob(conf); + break; + case IMPORT: + subContext = new PrefixContext(conf, ""); + configConnection = ConfigurationUtils.getFrameworkConnection(conf); + configJob = ConfigurationUtils.getFrameworkJob(conf); + break; + default: + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023); + } } - } - try { LOG.info("Running loader class " + loaderName); loader.load(subContext, configConnection, configJob, reader); LOG.info("Loader has finished");
