Updated Branches: refs/heads/sqoop2 e5ab9a4f3 -> 73b5b38c5
SQOOP-702: Refactor OutputLoadExecutor (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/73b5b38c Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/73b5b38c Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/73b5b38c Branch: refs/heads/sqoop2 Commit: 73b5b38c53d7dbc52d35fcf71fd9e9c1e1fb876f Parents: e5ab9a4 Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri Nov 16 15:59:37 2012 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Fri Nov 16 15:59:37 2012 -0800 ---------------------------------------------------------------------- .../job/mr/SqoopOutputFormatLoadExecutor.java | 74 +++++++++------ 1 files changed, 46 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/73b5b38c/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 5a3a04e..3bd1e1b 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -18,6 +18,10 @@ package org.apache.sqoop.job.mr; +import com.google.common.base.Throwables; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,7 +49,7 @@ public class SqoopOutputFormatLoadExecutor { private volatile Data data; private JobContext context; private SqoopRecordWriter producer; - private ConsumerThread consumer; + private Future<?> consumerFuture; private Semaphore filled = new Semaphore(0, true); private Semaphore free = new Semaphore(1, true); @@ -53,12 +57,11 @@ public class SqoopOutputFormatLoadExecutor { data = new Data(); context = jobctx; producer = new SqoopRecordWriter(); - consumer = new ConsumerThread(); } public RecordWriter<Data, NullWritable> getRecordWriter() { - consumer.setDaemon(true); - consumer.start(); + consumerFuture = Executors.newSingleThreadExecutor().submit( + new ConsumerThread()); return producer; } @@ -66,14 +69,11 @@ public class SqoopOutputFormatLoadExecutor { * This is a producer-consumer problem and can be solved * with two semaphores. */ - public class SqoopRecordWriter extends RecordWriter<Data, NullWritable> { + private class SqoopRecordWriter extends RecordWriter<Data, NullWritable> { @Override public void write(Data key, NullWritable value) throws InterruptedException { - - if(readerFinished) { - consumer.checkException(); - } + checkConsumerCompletion(); free.acquire(); int type = key.getType(); data.setContent(key.getContent(type), type); @@ -82,21 +82,45 @@ public class SqoopOutputFormatLoadExecutor { @Override public void close(TaskAttemptContext context) throws InterruptedException { - if(readerFinished) { - // Reader finished before writer - something went wrong? - consumer.checkException(); - } + checkConsumerCompletion(); free.acquire(); writerFinished = true; // This will interrupt only the acquire call in the consumer class, // since we have acquired the free semaphore, and close is called from // the same thread that writes - so filled has not been released since then // so the consumer is definitely blocked on the filled semaphore. - consumer.interrupt(); + consumerFuture.cancel(true); + } + } + + /** + * This method checks if the reader thread has finished, and re-throw + * any exceptions thrown by the reader thread. + * + * @throws SqoopException if the consumer thread threw it. + * @throws RuntimeException if some other exception was thrown. + */ + private void checkConsumerCompletion() { + if (readerFinished) { + try { + consumerFuture.get(); + } catch (ExecutionException ex) { + // In almost all cases, the exception will be SqoopException, + // because all exceptions are caught and propagated as + // SqoopExceptions + Throwable t = ex.getCause(); + if(t instanceof SqoopException) { + throw (SqoopException)t; + } + //In the rare case, it was not a SqoopException + Throwables.propagate(t); + } catch (Exception ex) { + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019, ex); + } } } - public class OutputFormatDataReader extends DataReader { + private class OutputFormatDataReader extends DataReader { @Override public void setFieldDelimiter(char fieldDelimiter) { data.setFieldDelimiter(fieldDelimiter); @@ -133,14 +157,7 @@ public class SqoopOutputFormatLoadExecutor { } } - public class ConsumerThread extends Thread { - private volatile SqoopException exception = null; - - public void checkException() { - if (exception != null) { - throw exception; - } - } + private class ConsumerThread implements Runnable { @Override public void run() { @@ -158,16 +175,17 @@ public class SqoopOutputFormatLoadExecutor { try { loader.run(frameworkContext, reader); } catch (Throwable t) { - exception = new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); LOG.error("Error while loading data out of MR job.", t); + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); } // if no exception happens yet and reader finished before writer, // something went wrong - if (exception == null && !writerFinished) { - // create exception if data are not all consumed - exception = new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019); - LOG.error("Reader terminated, but writer is still running!", exception); + if (!writerFinished) { + // throw exception if data are not all consumed + LOG.error("Reader terminated, but writer is still running!"); + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019); + } // inform writer that reader is finished readerFinished = true;
