Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 615defb10 -> 5d4dd08e2


SQOOP-2581: Sqoop2: Drop the jobId parameter in JobManager

(Dian Fu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5d4dd08e
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5d4dd08e
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5d4dd08e

Branch: refs/heads/sqoop2
Commit: 5d4dd08e2c62a89148d144aad212c3990d4d1264
Parents: 615defb
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Tue Sep 22 07:21:10 2015 -0700
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Tue Sep 22 07:21:10 2015 -0700

----------------------------------------------------------------------
 .../apache/sqoop/error/code/DriverError.java    |  2 +-
 .../org/apache/sqoop/utils/UrlSafeUtils.java    |  2 --
 .../org/apache/sqoop/driver/JobManager.java     | 38 ++++++++++----------
 .../org/apache/sqoop/driver/TestJobManager.java |  8 ++---
 .../apache/sqoop/handler/JobRequestHandler.java | 11 ++++--
 5 files changed, 33 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/5d4dd08e/common/src/main/java/org/apache/sqoop/error/code/DriverError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/DriverError.java 
b/common/src/main/java/org/apache/sqoop/error/code/DriverError.java
index 0ec9310..8be408c 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/DriverError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/DriverError.java
@@ -32,7 +32,7 @@ public enum DriverError implements ErrorCode {
 
   DRIVER_0003("Given job is not running"),
 
-  DRIVER_0004("Unknown job id"),
+  DRIVER_0004("Unknown job"),
 
   DRIVER_0005("Unsupported job type"),
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5d4dd08e/common/src/main/java/org/apache/sqoop/utils/UrlSafeUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/utils/UrlSafeUtils.java 
b/common/src/main/java/org/apache/sqoop/utils/UrlSafeUtils.java
index 75defd7..1460124 100644
--- a/common/src/main/java/org/apache/sqoop/utils/UrlSafeUtils.java
+++ b/common/src/main/java/org/apache/sqoop/utils/UrlSafeUtils.java
@@ -51,7 +51,6 @@ public final class UrlSafeUtils {
 
   public static String urlPathEncode(String path) {
     try {
-      //return URLEncoder.encode(path, ENCODING_UTF8).replaceAll("\\+", "%20");
       return URLEncoder.encode(URLEncoder.encode(path, ENCODING_UTF8), 
ENCODING_UTF8);
     } catch (UnsupportedEncodingException uee) {
       throw new RuntimeException(uee);
@@ -60,7 +59,6 @@ public final class UrlSafeUtils {
 
   public static String urlPathDecode(String path) {
     try {
-      //return URLDecoder.decode(path.replaceAll("%20", "\\+"), ENCODING_UTF8);
       return URLDecoder.decode(URLDecoder.decode(path, ENCODING_UTF8), 
ENCODING_UTF8);
     } catch (UnsupportedEncodingException uee) {
       throw new RuntimeException(uee);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5d4dd08e/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java 
b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 923df0d..de1b976 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -50,6 +50,7 @@ import org.apache.sqoop.request.HttpEventContext;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.submission.SubmissionStatus;
 import org.apache.sqoop.utils.ClassUtils;
+import org.apache.sqoop.utils.UrlSafeUtils;
 
 @edu.umd.cs.findbugs.annotations.SuppressWarnings("IS2_INCONSISTENT_SYNC")
 public class JobManager implements Reconfigurable {
@@ -277,10 +278,14 @@ public class JobManager implements Reconfigurable {
     LOG.info("Submission manager initialized: OK");
   }
 
-  public MSubmission start(long jobId, String jobName, HttpEventContext ctx) {
-
-    MSubmission mSubmission = createJobSubmission(ctx, jobId);
-    JobRequest jobRequest = createJobRequest(jobId, mSubmission);
+  public MSubmission start(String jobName, HttpEventContext ctx) {
+    MJob job = RepositoryManager.getInstance().getRepository()
+        .findJob(jobName);
+    if (!job.getEnabled()) {
+      throw new SqoopException(DriverError.DRIVER_0009, "Job: " + jobName);
+    }
+    MSubmission mSubmission = createJobSubmission(ctx, job.getPersistenceId());
+    JobRequest jobRequest = createJobRequest(mSubmission, job);
     // Bootstrap job to execute in the configured execution engine
     prepareJob(jobRequest);
     // Make sure that this job id is not currently running and submit the job
@@ -289,7 +294,7 @@ public class JobManager implements Reconfigurable {
       MSubmission lastSubmission = 
RepositoryManager.getInstance().getRepository()
           .findLastSubmissionForJob(jobName);
       if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
-        throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + 
jobId);
+        throw new SqoopException(DriverError.DRIVER_0002, "Job with name " + 
jobName);
       }
       // NOTE: the following is a blocking call
       boolean success = submissionEngine.submit(jobRequest);
@@ -305,10 +310,7 @@ public class JobManager implements Reconfigurable {
     return mSubmission;
   }
 
-  private JobRequest createJobRequest(long jobId, MSubmission submission) {
-    // get job
-    MJob job = getJob(jobId);
-
+  private JobRequest createJobRequest(MSubmission submission, MJob job) {
     // get from/to connections for the job
     MLink fromConnection = getLink(job.getFromLinkId());
     MLink toConnection = getLink(job.getToLinkId());
@@ -361,7 +363,7 @@ public class JobManager implements Reconfigurable {
     jobRequest.setDriverConfig(driverConfig);
     jobRequest.setJobName(job.getName());
     jobRequest.setJobId(job.getPersistenceId());
-    jobRequest.setNotificationUrl(notificationBaseUrl + jobId + "/status");
+    jobRequest.setNotificationUrl(notificationBaseUrl + 
UrlSafeUtils.urlPathEncode(job.getName()) + "/status");
     
jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat(), 
Direction.FROM);
     
jobRequest.setIntermediateDataFormat(toConnector.getIntermediateDataFormat(), 
Direction.TO);
 
@@ -440,8 +442,8 @@ public class JobManager implements Reconfigurable {
     MLink link = RepositoryManager.getInstance().getRepository()
         .findLink(linkId);
     if (!link.getEnabled()) {
-      throw new SqoopException(DriverError.DRIVER_0010, "Connection id: "
-          + link.getPersistenceId());
+      throw new SqoopException(DriverError.DRIVER_0010, "Connection: "
+          + link.getName());
     }
     return link;
   }
@@ -453,7 +455,7 @@ public class JobManager implements Reconfigurable {
     }
 
     if (!job.getEnabled()) {
-      throw new SqoopException(DriverError.DRIVER_0009, "Job id: " + 
job.getPersistenceId());
+      throw new SqoopException(DriverError.DRIVER_0009, "Job: " + 
job.getName());
     }
     return job;
   }
@@ -606,13 +608,13 @@ public class JobManager implements Reconfigurable {
         request.getJobConfig(Direction.TO));
   }
 
-  public MSubmission stop(long jobId, String jobName, HttpEventContext ctx) {
+  public MSubmission stop(String jobName, HttpEventContext ctx) {
 
     Repository repository = RepositoryManager.getInstance().getRepository();
     MSubmission mSubmission = repository.findLastSubmissionForJob(jobName);
 
     if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
-      throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
+      throw new SqoopException(DriverError.DRIVER_0003, "Job with name " + 
jobName
           + " is not running hence cannot stop");
     }
     submissionEngine.stop(mSubmission.getExternalJobId());
@@ -626,12 +628,12 @@ public class JobManager implements Reconfigurable {
     return mSubmission;
   }
 
-  public MSubmission status(long jobId, String jobName) {
+  public MSubmission status(String jobName) {
     Repository repository = RepositoryManager.getInstance().getRepository();
     MSubmission mSubmission = repository.findLastSubmissionForJob(jobName);
 
     if (mSubmission == null) {
-      return new MSubmission(jobId, new Date(), 
SubmissionStatus.NEVER_EXECUTED);
+      return null;
     }
     // If the submission is in running state, let's update it
     if (mSubmission.getStatus().isRunning()) {
@@ -647,7 +649,7 @@ public class JobManager implements Reconfigurable {
    *
    * @param submission Submission to update
    */
-  public void updateSubmission(MSubmission submission) {
+  private void updateSubmission(MSubmission submission) {
     // We're expecting that this method will be called only if we think that 
the submission is still running
     assert submission.getStatus().isRunning();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5d4dd08e/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java 
b/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
index 414d3e9..2d7dfef 100644
--- a/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
+++ b/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
@@ -115,8 +115,8 @@ public class TestJobManager {
     MLink testConnection = new MLink(123l, null);
     testConnection.setPersistenceId(1234);
     testConnection.setEnabled(false);
-    SqoopException exception = new SqoopException(DriverError.DRIVER_0010, 
"Connection id: "
-        + testConnection.getPersistenceId());
+    SqoopException exception = new SqoopException(DriverError.DRIVER_0010, 
"Connection: "
+        + testConnection.getName());
 
     MLink mConnectionSpy = org.mockito.Mockito.spy(testConnection);
     when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
@@ -147,8 +147,8 @@ public class TestJobManager {
     MJob testJob = job(123l, 456l);
     testJob.setEnabled(false);
     testJob.setPersistenceId(1111);
-    SqoopException exception = new SqoopException(DriverError.DRIVER_0009, 
"Job id: "
-        + testJob.getPersistenceId());
+    SqoopException exception = new SqoopException(DriverError.DRIVER_0009, 
"Job: "
+        + testJob.getName());
 
     MJob mJobSpy = org.mockito.Mockito.spy(testJob);
     when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5d4dd08e/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java 
b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
index f65cb81..6face94 100644
--- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.handler;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.List;
 import java.util.Locale;
 
@@ -52,6 +53,7 @@ import org.apache.sqoop.security.AuthorizationManager;
 import org.apache.sqoop.server.RequestContext;
 import org.apache.sqoop.server.RequestHandler;
 import org.apache.sqoop.server.common.ServerError;
+import org.apache.sqoop.submission.SubmissionStatus;
 import org.apache.sqoop.validation.ConfigValidationResult;
 import org.apache.sqoop.validation.Status;
 import org.json.simple.JSONObject;
@@ -383,7 +385,7 @@ public class JobRequestHandler implements RequestHandler {
     }
 
     MSubmission submission = JobManager.getInstance()
-        .start(jobId, jobName, prepareRequestEventContext(ctx));
+        .start(jobName, prepareRequestEventContext(ctx));
     return new SubmissionBean(submission);
   }
 
@@ -398,7 +400,7 @@ public class JobRequestHandler implements RequestHandler {
 
     AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
         ctx.getRequest().getRemoteAddr(), "stop", "job", 
String.valueOf(jobId));
-    MSubmission submission = JobManager.getInstance().stop(jobId, jobName, 
prepareRequestEventContext(ctx));
+    MSubmission submission = JobManager.getInstance().stop(jobName, 
prepareRequestEventContext(ctx));
     return new SubmissionBean(submission);
   }
 
@@ -413,7 +415,10 @@ public class JobRequestHandler implements RequestHandler {
 
     AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
         ctx.getRequest().getRemoteAddr(), "status", "job", 
String.valueOf(jobId));
-    MSubmission submission = JobManager.getInstance().status(jobId, jobName);
+    MSubmission submission = JobManager.getInstance().status(jobName);
+    if (submission == null) {
+      submission = new MSubmission(jobId, new Date(), 
SubmissionStatus.NEVER_EXECUTED);
+    }
 
     return new SubmissionBean(submission);
   }

Reply via email to