liuxunorg commented on a change in pull request #249: SUBMARINE-455. Support 
find/patch/delete job in submarine-server REST
URL: https://github.com/apache/submarine/pull/249#discussion_r398272920
 
 

 ##########
 File path: 
submarine-server/server-core/src/main/java/org/apache/submarine/server/JobManager.java
 ##########
 @@ -69,36 +70,93 @@ public static JobManager getInstance() {
 
   private JobManager(SubmitterManager submitterManager) {
     this.submitterManager = submitterManager;
-    this.executorService = Executors.newFixedThreadPool(50);
   }
 
   @Override
-  public Job submitJob(JobSpec spec) throws UnsupportedJobTypeException {
-    if (!spec.validate()) {
-      return null;
+  public Job createJob(JobSpec spec) throws UnsupportedSubmitterException, 
InvalidSpecException {
+    Job job = getSubmitter(spec.getSubmitterSpec().getType()).createJob(spec);
+    if (job != null) {
+      JobId jobId = generateJobId();
+      job.setJobId(jobId);
+      job.setSpec(spec);
+      cachedJobSpecs.putIfAbsent(jobId.toString(), spec);
     }
+    return job;
+  }
 
-    JobSubmitter submitter = submitterManager.getSubmitterByType(
-        spec.getSubmitterSpec().getType());
-    if (submitter == null) {
-      throw new UnsupportedJobTypeException();
+  private JobId generateJobId() {
+    return JobId.newInstance(SubmarineServer.getServerTimeStamp(), 
jobCounter.incrementAndGet());
+  }
+
+  @Override
+  public Job getJob(String jobIdString) throws UnsupportedSubmitterException,
+      NotFoundJobException, InvalidSpecException {
+    JobId jobId = JobId.fromString(jobIdString);
+    if (jobId == null) {
+      throw new NotFoundJobException();
+    }
+    JobSpec spec = cachedJobSpecs.get(jobId.toString());
+    if (spec == null) {
+      throw new NotFoundJobException();
     }
+    Job job = getSubmitter(spec.getSubmitterSpec().getType()).findJob(spec);
+    return fillJob(job, jobId, spec);
+  }
 
-    Job job = new Job();
-    job.setJobId(generateJobId());
-    executorService.submit(() -> {
-      try {
-        jobs.putIfAbsent(job.getJobId(), submitter.submitJob(spec));
-      } catch (UnsupportedJobTypeException e) {
-        LOG.error(e.getMessage(), e);
-      } catch (InvalidSpecException e) {
-        LOG.error("Invalid job spec: " + spec + ", " + e.getMessage());
+  @Override
+  public List<Job> listJobs(String status) throws 
UnsupportedSubmitterException,
+      InvalidSpecException {
+    List<Job> jobList = new ArrayList<>();
+    for (Map.Entry<String, JobSpec> entry : cachedJobSpecs.entrySet()) {
+      Job job = getSubmitter(entry.getValue().getSubmitterSpec().getType())
+          .findJob(entry.getValue());
+      if (status == null || 
status.toLowerCase().equals(job.getStatus().toLowerCase())) {
+        jobList.add(fillJob(job, JobId.fromString(entry.getKey()), 
entry.getValue()));
       }
-    });
-    return job;
+    }
+    return jobList;
   }
 
-  private JobId generateJobId() {
-    return JobId.newInstance(SubmarineServer.getServerTimeStamp(), 
jobCounter.incrementAndGet());
+  @Override
+  public Job patchJob(String jobIdString, JobSpec spec) throws 
UnsupportedSubmitterException,
 
 Review comment:
   Please change var name `jobIdString` to `jobId`
   The same below

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to