Repository: sqoop Updated Branches: refs/heads/sqoop2 615defb10 -> 5d4dd08e2
SQOOP-2581: Sqoop2: Drop the jobId parameter in JobManager (Dian Fu via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5d4dd08e Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5d4dd08e Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5d4dd08e Branch: refs/heads/sqoop2 Commit: 5d4dd08e2c62a89148d144aad212c3990d4d1264 Parents: 615defb Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Sep 22 07:21:10 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Sep 22 07:21:10 2015 -0700 ---------------------------------------------------------------------- .../apache/sqoop/error/code/DriverError.java | 2 +- .../org/apache/sqoop/utils/UrlSafeUtils.java | 2 -- .../org/apache/sqoop/driver/JobManager.java | 38 ++++++++++---------- .../org/apache/sqoop/driver/TestJobManager.java | 8 ++--- .../apache/sqoop/handler/JobRequestHandler.java | 11 ++++-- 5 files changed, 33 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/5d4dd08e/common/src/main/java/org/apache/sqoop/error/code/DriverError.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/error/code/DriverError.java b/common/src/main/java/org/apache/sqoop/error/code/DriverError.java index 0ec9310..8be408c 100644 --- a/common/src/main/java/org/apache/sqoop/error/code/DriverError.java +++ b/common/src/main/java/org/apache/sqoop/error/code/DriverError.java @@ -32,7 +32,7 @@ public enum DriverError implements ErrorCode { DRIVER_0003("Given job is not running"), - DRIVER_0004("Unknown job id"), + DRIVER_0004("Unknown job"), DRIVER_0005("Unsupported job type"), http://git-wip-us.apache.org/repos/asf/sqoop/blob/5d4dd08e/common/src/main/java/org/apache/sqoop/utils/UrlSafeUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/utils/UrlSafeUtils.java b/common/src/main/java/org/apache/sqoop/utils/UrlSafeUtils.java index 75defd7..1460124 100644 --- a/common/src/main/java/org/apache/sqoop/utils/UrlSafeUtils.java +++ b/common/src/main/java/org/apache/sqoop/utils/UrlSafeUtils.java @@ -51,7 +51,6 @@ public final class UrlSafeUtils { public static String urlPathEncode(String path) { try { - //return URLEncoder.encode(path, ENCODING_UTF8).replaceAll("\\+", "%20"); return URLEncoder.encode(URLEncoder.encode(path, ENCODING_UTF8), ENCODING_UTF8); } catch (UnsupportedEncodingException uee) { throw new RuntimeException(uee); @@ -60,7 +59,6 @@ public final class UrlSafeUtils { public static String urlPathDecode(String path) { try { - //return URLDecoder.decode(path.replaceAll("%20", "\\+"), ENCODING_UTF8); return URLDecoder.decode(URLDecoder.decode(path, ENCODING_UTF8), ENCODING_UTF8); } catch (UnsupportedEncodingException uee) { throw new RuntimeException(uee); http://git-wip-us.apache.org/repos/asf/sqoop/blob/5d4dd08e/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 923df0d..de1b976 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -50,6 +50,7 @@ import org.apache.sqoop.request.HttpEventContext; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.utils.ClassUtils; +import org.apache.sqoop.utils.UrlSafeUtils; @edu.umd.cs.findbugs.annotations.SuppressWarnings("IS2_INCONSISTENT_SYNC") public class JobManager implements Reconfigurable { @@ -277,10 +278,14 @@ public class JobManager implements Reconfigurable { LOG.info("Submission manager initialized: OK"); } - public MSubmission start(long jobId, String jobName, HttpEventContext ctx) { - - MSubmission mSubmission = createJobSubmission(ctx, jobId); - JobRequest jobRequest = createJobRequest(jobId, mSubmission); + public MSubmission start(String jobName, HttpEventContext ctx) { + MJob job = RepositoryManager.getInstance().getRepository() + .findJob(jobName); + if (!job.getEnabled()) { + throw new SqoopException(DriverError.DRIVER_0009, "Job: " + jobName); + } + MSubmission mSubmission = createJobSubmission(ctx, job.getPersistenceId()); + JobRequest jobRequest = createJobRequest(mSubmission, job); // Bootstrap job to execute in the configured execution engine prepareJob(jobRequest); // Make sure that this job id is not currently running and submit the job @@ -289,7 +294,7 @@ public class JobManager implements Reconfigurable { MSubmission lastSubmission = RepositoryManager.getInstance().getRepository() .findLastSubmissionForJob(jobName); if (lastSubmission != null && lastSubmission.getStatus().isRunning()) { - throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId); + throw new SqoopException(DriverError.DRIVER_0002, "Job with name " + jobName); } // NOTE: the following is a blocking call boolean success = submissionEngine.submit(jobRequest); @@ -305,10 +310,7 @@ public class JobManager implements Reconfigurable { return mSubmission; } - private JobRequest createJobRequest(long jobId, MSubmission submission) { - // get job - MJob job = getJob(jobId); - + private JobRequest createJobRequest(MSubmission submission, MJob job) { // get from/to connections for the job MLink fromConnection = getLink(job.getFromLinkId()); MLink toConnection = getLink(job.getToLinkId()); @@ -361,7 +363,7 @@ public class JobManager implements Reconfigurable { jobRequest.setDriverConfig(driverConfig); jobRequest.setJobName(job.getName()); jobRequest.setJobId(job.getPersistenceId()); - jobRequest.setNotificationUrl(notificationBaseUrl + jobId + "/status"); + jobRequest.setNotificationUrl(notificationBaseUrl + UrlSafeUtils.urlPathEncode(job.getName()) + "/status"); jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat(), Direction.FROM); jobRequest.setIntermediateDataFormat(toConnector.getIntermediateDataFormat(), Direction.TO); @@ -440,8 +442,8 @@ public class JobManager implements Reconfigurable { MLink link = RepositoryManager.getInstance().getRepository() .findLink(linkId); if (!link.getEnabled()) { - throw new SqoopException(DriverError.DRIVER_0010, "Connection id: " - + link.getPersistenceId()); + throw new SqoopException(DriverError.DRIVER_0010, "Connection: " + + link.getName()); } return link; } @@ -453,7 +455,7 @@ public class JobManager implements Reconfigurable { } if (!job.getEnabled()) { - throw new SqoopException(DriverError.DRIVER_0009, "Job id: " + job.getPersistenceId()); + throw new SqoopException(DriverError.DRIVER_0009, "Job: " + job.getName()); } return job; } @@ -606,13 +608,13 @@ public class JobManager implements Reconfigurable { request.getJobConfig(Direction.TO)); } - public MSubmission stop(long jobId, String jobName, HttpEventContext ctx) { + public MSubmission stop(String jobName, HttpEventContext ctx) { Repository repository = RepositoryManager.getInstance().getRepository(); MSubmission mSubmission = repository.findLastSubmissionForJob(jobName); if (mSubmission == null || !mSubmission.getStatus().isRunning()) { - throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId + throw new SqoopException(DriverError.DRIVER_0003, "Job with name " + jobName + " is not running hence cannot stop"); } submissionEngine.stop(mSubmission.getExternalJobId()); @@ -626,12 +628,12 @@ public class JobManager implements Reconfigurable { return mSubmission; } - public MSubmission status(long jobId, String jobName) { + public MSubmission status(String jobName) { Repository repository = RepositoryManager.getInstance().getRepository(); MSubmission mSubmission = repository.findLastSubmissionForJob(jobName); if (mSubmission == null) { - return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); + return null; } // If the submission is in running state, let's update it if (mSubmission.getStatus().isRunning()) { @@ -647,7 +649,7 @@ public class JobManager implements Reconfigurable { * * @param submission Submission to update */ - public void updateSubmission(MSubmission submission) { + private void updateSubmission(MSubmission submission) { // We're expecting that this method will be called only if we think that the submission is still running assert submission.getStatus().isRunning(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/5d4dd08e/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java b/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java index 414d3e9..2d7dfef 100644 --- a/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java +++ b/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java @@ -115,8 +115,8 @@ public class TestJobManager { MLink testConnection = new MLink(123l, null); testConnection.setPersistenceId(1234); testConnection.setEnabled(false); - SqoopException exception = new SqoopException(DriverError.DRIVER_0010, "Connection id: " - + testConnection.getPersistenceId()); + SqoopException exception = new SqoopException(DriverError.DRIVER_0010, "Connection: " + + testConnection.getName()); MLink mConnectionSpy = org.mockito.Mockito.spy(testConnection); when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock); @@ -147,8 +147,8 @@ public class TestJobManager { MJob testJob = job(123l, 456l); testJob.setEnabled(false); testJob.setPersistenceId(1111); - SqoopException exception = new SqoopException(DriverError.DRIVER_0009, "Job id: " - + testJob.getPersistenceId()); + SqoopException exception = new SqoopException(DriverError.DRIVER_0009, "Job: " + + testJob.getName()); MJob mJobSpy = org.mockito.Mockito.spy(testJob); when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock); http://git-wip-us.apache.org/repos/asf/sqoop/blob/5d4dd08e/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java index f65cb81..6face94 100644 --- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java @@ -19,6 +19,7 @@ package org.apache.sqoop.handler; import java.io.IOException; import java.util.Arrays; +import java.util.Date; import java.util.List; import java.util.Locale; @@ -52,6 +53,7 @@ import org.apache.sqoop.security.AuthorizationManager; import org.apache.sqoop.server.RequestContext; import org.apache.sqoop.server.RequestHandler; import org.apache.sqoop.server.common.ServerError; +import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.validation.ConfigValidationResult; import org.apache.sqoop.validation.Status; import org.json.simple.JSONObject; @@ -383,7 +385,7 @@ public class JobRequestHandler implements RequestHandler { } MSubmission submission = JobManager.getInstance() - .start(jobId, jobName, prepareRequestEventContext(ctx)); + .start(jobName, prepareRequestEventContext(ctx)); return new SubmissionBean(submission); } @@ -398,7 +400,7 @@ public class JobRequestHandler implements RequestHandler { AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), "stop", "job", String.valueOf(jobId)); - MSubmission submission = JobManager.getInstance().stop(jobId, jobName, prepareRequestEventContext(ctx)); + MSubmission submission = JobManager.getInstance().stop(jobName, prepareRequestEventContext(ctx)); return new SubmissionBean(submission); } @@ -413,7 +415,10 @@ public class JobRequestHandler implements RequestHandler { AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(), ctx.getRequest().getRemoteAddr(), "status", "job", String.valueOf(jobId)); - MSubmission submission = JobManager.getInstance().status(jobId, jobName); + MSubmission submission = JobManager.getInstance().status(jobName); + if (submission == null) { + submission = new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); + } return new SubmissionBean(submission); }
