Repository: sqoop Updated Branches: refs/heads/trunk fa4e90365 -> 89366b49b
SQOOP-2343: AsyncSqlRecordWriter stucks if any exception is thrown out in its close method (Yibing Shi via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/89366b49 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/89366b49 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/89366b49 Branch: refs/heads/trunk Commit: 89366b49b3d7227180b35726c5c7919a7c94e736 Parents: fa4e903 Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu Apr 30 18:45:52 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu Apr 30 18:45:52 2015 -0700 ---------------------------------------------------------------------- .../apache/sqoop/mapreduce/AsyncSqlRecordWriter.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/89366b49/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java b/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java index d0e1711..15a62a6 100644 --- a/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java +++ b/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java @@ -61,6 +61,8 @@ public abstract class AsyncSqlRecordWriter<K extends SqoopRecord, V> private AsyncSqlOutputFormat.AsyncSqlExecThread execThread; private boolean startedExecThread; + private boolean closed; + public AsyncSqlRecordWriter(TaskAttemptContext context) throws ClassNotFoundException, SQLException { this.conf = context.getConfiguration(); @@ -82,6 +84,8 @@ public abstract class AsyncSqlRecordWriter<K extends SqoopRecord, V> connection, stmtsPerTx); this.execThread.setDaemon(true); this.startedExecThread = false; + + this.closed = false; } /** @@ -176,6 +180,15 @@ public abstract class AsyncSqlRecordWriter<K extends SqoopRecord, V> /** {@inheritDoc} */ public void close(TaskAttemptContext context) throws IOException, InterruptedException { + // If any exception is thrown out in this method, mapreduce framework catches the exception and + // calls this method again in case the recorder hasn't bee closed properly. Without the + // protection below, it can make the main thread stuck in execThread.put since there is no + // receiver for the synchronous queue any more. + if (closed) { + return; + } + closed = true; + try { try { execUpdate(true, true);
