Repository: sqoop Updated Branches: refs/heads/sqoop2 4f0e28625 -> 190b78fcb
SQOOP-2396: Sqoop2: Race condition in purge/update threads on Server shutdown (Dian Fu via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/190b78fc Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/190b78fc Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/190b78fc Branch: refs/heads/sqoop2 Commit: 190b78fcb83c4870d7dcb7f8af289047f44fefb8 Parents: 4f0e286 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Nov 18 08:06:49 2015 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Nov 18 08:06:49 2015 -0800 ---------------------------------------------------------------------- .../org/apache/sqoop/driver/JobManager.java | 65 +++++++++++++------- 1 file changed, 43 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/190b78fc/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 15ca796..90ee541 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -140,6 +140,16 @@ public class JobManager implements Reconfigurable { private UpdateThread updateThread = null; /** + * Lock for purge thread. + */ + private Object purgeThreadLock = new Object(); + + /** + * Lock for update thread. + */ + private Object updateThreadLock = new Object(); + + /** * Synchronization variable between threads. */ private boolean running; @@ -196,20 +206,24 @@ public class JobManager implements Reconfigurable { running = false; - try { - purgeThread.interrupt(); - purgeThread.join(); - } catch (InterruptedException e) { - // TODO(jarcec): Do I want to wait until it actually finish here? - LOG.error("Interrupted joining purgeThread"); + synchronized(purgeThreadLock) { + try { + purgeThread.interrupt(); + purgeThread.join(); + } catch (InterruptedException e) { + // TODO(jarcec): Do I want to wait until it actually finish here? + LOG.error("Interrupted joining purgeThread"); + } } - try { - updateThread.interrupt(); - updateThread.join(); - } catch (InterruptedException e) { - // TODO(jarcec): Do I want to wait until it actually finish here? - LOG.error("Interrupted joining updateThread"); + synchronized(updateThreadLock) { + try { + updateThread.interrupt(); + updateThread.join(); + } catch (InterruptedException e) { + // TODO(jarcec): Do I want to wait until it actually finish here? + LOG.error("Interrupted joining updateThread"); + } } if (submissionEngine != null) { @@ -763,11 +777,15 @@ public class JobManager implements Reconfigurable { try { LOG.info("Purging old submissions"); Date threshold = new Date((new Date()).getTime() - purgeThreshold); - RepositoryManager.getInstance().getRepository() - .purgeSubmissions(threshold); + synchronized(purgeThreadLock) { + RepositoryManager.getInstance().getRepository() + .purgeSubmissions(threshold); + } Thread.sleep(purgeSleep); } catch (InterruptedException e) { LOG.debug("Purge thread interrupted", e); + } catch (SqoopException ex) { + LOG.error("Purge thread encountered exception", ex); } } @@ -787,18 +805,21 @@ public class JobManager implements Reconfigurable { try { LOG.debug("Updating running submissions"); - // Let's get all running submissions from repository to check them out - List<MSubmission> unfinishedSubmissions = - RepositoryManager.getInstance().getRepository() - .findUnfinishedSubmissions(); + synchronized(updateThreadLock) { + // Let's get all running submissions from repository to check them out + List<MSubmission> unfinishedSubmissions = + RepositoryManager.getInstance().getRepository() + .findUnfinishedSubmissions(); - for (MSubmission submission : unfinishedSubmissions) { - updateSubmission(submission); + for (MSubmission submission : unfinishedSubmissions) { + updateSubmission(submission); + } } - Thread.sleep(updateSleep); } catch (InterruptedException e) { - LOG.debug("Purge thread interrupted", e); + LOG.debug("Update thread interrupted", e); + } catch (SqoopException ex) { + LOG.error("Update thread encountered exception", ex); } }
