Updated Branches: refs/heads/sqoop2 adef39bbb -> e5ab9a4f3
SQOOP-690: Fix threading issues in SqoopOutputFormatLoadExecutor (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e5ab9a4f Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e5ab9a4f Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e5ab9a4f Branch: refs/heads/sqoop2 Commit: e5ab9a4f3456f74092c1ac7335a236cbd8103f69 Parents: adef39b Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri Nov 16 12:09:28 2012 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Fri Nov 16 12:09:28 2012 -0800 ---------------------------------------------------------------------- .../connector/jdbc/GenericJdbcExportLoader.java | 2 +- .../sqoop/job/etl/HdfsSequenceImportLoader.java | 2 +- .../apache/sqoop/job/etl/HdfsTextImportLoader.java | 2 +- .../main/java/org/apache/sqoop/job/io/Data.java | 2 +- .../job/mr/SqoopOutputFormatLoadExecutor.java | 164 +++++---------- .../java/org/apache/sqoop/job/TestMapReduce.java | 2 +- .../main/java/org/apache/sqoop/job/etl/Loader.java | 2 +- .../java/org/apache/sqoop/job/io/DataReader.java | 6 +- 8 files changed, 65 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java index ff7384c..13574b2 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java @@ -29,7 +29,7 @@ public class GenericJdbcExportLoader extends Loader { private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION; @Override - public void run(ImmutableContext context, DataReader reader) { + public void run(ImmutableContext context, DataReader reader) throws Exception{ String driver = context.getString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER); String url = context.getString( http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java index 29a73b0..7c0ef08 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java @@ -46,7 +46,7 @@ public class HdfsSequenceImportLoader extends Loader { } @Override - public void run(ImmutableContext context, DataReader reader) { + public void run(ImmutableContext context, DataReader reader) throws Exception{ reader.setFieldDelimiter(fieldDelimiter); Configuration conf = new Configuration(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java index 711df0f..55eb389 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java @@ -46,7 +46,7 @@ public class HdfsTextImportLoader extends Loader { } @Override - public void run(ImmutableContext context, DataReader reader) { + public void run(ImmutableContext context, DataReader reader) throws Exception{ reader.setFieldDelimiter(fieldDelimiter); Configuration conf = new Configuration(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java index 41fceb8..83c670c 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java @@ -37,7 +37,7 @@ public class Data implements WritableComparable<Data> { // For example, it can be: // - Object[] for an array of object record // - String for a text of CSV record - private Object content = null; + private volatile Object content = null; public static final int EMPTY_DATA = 0; public static final int CSV_RECORD = 1; http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 0d636ae..5a3a04e 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -18,6 +18,7 @@ package org.apache.sqoop.job.mr; +import java.util.concurrent.Semaphore; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,12 +40,14 @@ public class SqoopOutputFormatLoadExecutor { public static final Log LOG = LogFactory.getLog(SqoopOutputFormatLoadExecutor.class.getName()); - private boolean readerFinished; - private boolean writerFinished; - private Data data; + private volatile boolean readerFinished = false; + private volatile boolean writerFinished = false; + private volatile Data data; private JobContext context; private SqoopRecordWriter producer; private ConsumerThread consumer; + private Semaphore filled = new Semaphore(0, true); + private Semaphore free = new Semaphore(1, true); public SqoopOutputFormatLoadExecutor(JobContext jobctx) { data = new Data(); @@ -59,69 +62,37 @@ public class SqoopOutputFormatLoadExecutor { return producer; } + /* + * This is a producer-consumer problem and can be solved + * with two semaphores. + */ public class SqoopRecordWriter extends RecordWriter<Data, NullWritable> { - @Override - public void write(Data key, NullWritable value) { - synchronized (data) { - if (readerFinished) { - consumer.checkException(); - return; - } - - try { - if (!data.isEmpty()) { - // wait for reader to consume data - data.wait(); - } - - int type = key.getType(); - data.setContent(key.getContent(type), type); - // notify reader that the data is ready - data.notify(); - - } catch (InterruptedException e) { - // inform reader that writer is finished - writerFinished = true; - - // unlock reader so it can continue - data.notify(); + @Override + public void write(Data key, NullWritable value) throws InterruptedException { - // throw exception - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0015, e); - } + if(readerFinished) { + consumer.checkException(); } + free.acquire(); + int type = key.getType(); + data.setContent(key.getContent(type), type); + filled.release(); } @Override - public void close(TaskAttemptContext context) { - synchronized (data) { - if (readerFinished) { - consumer.checkException(); - return; - } - - try { - if (!data.isEmpty()) { - // wait for reader to consume data - data.wait(); - } - - writerFinished = true; - - data.notify(); - - } catch (InterruptedException e) { - // inform reader that writer is finished - writerFinished = true; - - // unlock reader so it can continue - data.notify(); - - // throw exception - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0015, e); - } + public void close(TaskAttemptContext context) throws InterruptedException { + if(readerFinished) { + // Reader finished before writer - something went wrong? + consumer.checkException(); } + free.acquire(); + writerFinished = true; + // This will interrupt only the acquire call in the consumer class, + // since we have acquired the free semaphore, and close is called from + // the same thread that writes - so filled has not been released since then + // so the consumer is definitely blocked on the filled semaphore. + consumer.interrupt(); } } @@ -132,52 +103,38 @@ public class SqoopOutputFormatLoadExecutor { } @Override - public Object[] readArrayRecord() { + public Object[] readArrayRecord() throws InterruptedException { return (Object[])readContent(Data.ARRAY_RECORD); } @Override - public String readCsvRecord() { + public String readCsvRecord() throws InterruptedException { return (String)readContent(Data.CSV_RECORD); } @Override - public Object readContent(int type) { - synchronized (data) { - if (writerFinished) { + public Object readContent(int type) throws InterruptedException { + // Has any more data been produced after I last consumed. + // If no, wait for the producer to produce. + if (writerFinished && (filled.availablePermits() == 0)) { + return null; + } + try { + filled.acquire(); + } catch (InterruptedException ex) { + if(writerFinished) { return null; } - - try { - if (data.isEmpty()) { - // wait for writer to produce data - data.wait(); - } - - Object content = data.getContent(type); - data.setContent(null, Data.EMPTY_DATA); - - // notify writer that data is consumed - data.notify(); - - return content; - - } catch (InterruptedException e) { - // inform writer that reader is finished - readerFinished = true; - - // unlock writer so it can continue - data.notify(); - - // throw exception - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0016, e); - } + throw ex; } + Object content = data.getContent(type); + free.release(); + return content; } } public class ConsumerThread extends Thread { - private SqoopException exception = null; + private volatile SqoopException exception = null; public void checkException() { if (exception != null) { @@ -201,28 +158,19 @@ public class SqoopOutputFormatLoadExecutor { try { loader.run(frameworkContext, reader); } catch (Throwable t) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); + exception = new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); + LOG.error("Error while loading data out of MR job.", t); } - synchronized (data) { - // inform writer that reader is finished - readerFinished = true; - - // unlock writer so it can continue - data.notify(); - - // if no exception happens yet - if (exception == null && !writerFinished) { - // create exception if data are not all consumed - exception = new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019); - } - - // throw deferred exception if exist - if (exception != null) { - throw exception; - } + // if no exception happens yet and reader finished before writer, + // something went wrong + if (exception == null && !writerFinished) { + // create exception if data are not all consumed + exception = new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019); + LOG.error("Reader terminated, but writer is still running!", exception); } + // inform writer that reader is finished + readerFinished = true; } } - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 6dcf784..3e498ec 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -216,7 +216,7 @@ public class TestMapReduce extends TestCase { private Data actual = new Data(); @Override - public void run(ImmutableContext context, DataReader reader) { + public void run(ImmutableContext context, DataReader reader) throws Exception{ Object[] array; while ((array = reader.readArrayRecord()) != null) { actual.setContent(array, Data.ARRAY_RECORD); http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java index 3a708df..046b939 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java @@ -25,6 +25,6 @@ import org.apache.sqoop.job.io.DataReader; */ public abstract class Loader { - public abstract void run(ImmutableContext context, DataReader reader); + public abstract void run(ImmutableContext context, DataReader reader) throws Exception; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java index 18e2fb7..a50f591 100644 --- a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java +++ b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java @@ -23,11 +23,11 @@ package org.apache.sqoop.job.io; */ public abstract class DataReader { - public abstract Object[] readArrayRecord(); + public abstract Object[] readArrayRecord() throws Exception; - public abstract String readCsvRecord(); + public abstract String readCsvRecord() throws Exception; - public abstract Object readContent(int type); + public abstract Object readContent(int type) throws Exception; public abstract void setFieldDelimiter(char fieldDelimiter);
