This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 87b2296  SUBMARINE-442. Support get job's log in submarine-server REST 
API
87b2296 is described below

commit 87b2296da9eeadd80765cfa49e3d262c7d32b4e1
Author: JohnTing <[email protected]>
AuthorDate: Thu Apr 23 23:51:22 2020 +0800

    SUBMARINE-442. Support get job's log in submarine-server REST API
    
    ### What is this PR for?
    
    Now we have the "jobs" resource in REST which can do CRUD. We also need a 
"logs" API to get the job's log output. The URI could be "api/v1/logs"
    It should accept parameters like "jobid". Initially, the logs could be 
aggregated logs of all containers.
    Streaming is preferred so that the python client can enable a fancy way for 
the end-user to check logs
    
    ### What type of PR is it?
    Feature
    
    ### Todos
    * [x] - get logs so far
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/projects/SUBMARINE/issues/SUBMARINE-442
    
    ### How should this be tested?
    Create a job
    Visit /api/v1/logs or /api/v1/logs/{jobid} with a browser
    
    ### Screenshots (if appropriate)
    http://127.0.0.1:8080/api/v1/jobs/logs
    
![image](https://user-images.githubusercontent.com/19265751/79961593-8a294c80-84b9-11ea-85ef-9367e17fecc9.png)
    
    http://127.0.0.1:8080/api/v1/jobs/logs/job_1587481945001_0001
    
![image](https://user-images.githubusercontent.com/19265751/79961674-a0370d00-84b9-11ea-908f-b6bdeceeb6eb.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: JohnTing <[email protected]>
    
    Closes #263 from JohnTing/SUBMARINE-442 and squashes the following commits:
    
    7195a77 [JohnTing] test12
    1bb806f [JohnTing] test12
    319222c [JohnTing] SUBMARINE-442
---
 .gitignore                                         |  6 ++
 .../apache/submarine/server/api/job/JobLog.java}   | 46 ++++++++++----
 .../submarine/server/api/job/JobSubmitter.java     | 16 +++++
 .../apache/submarine/server/job/JobManager.java    | 38 +++++++++++
 .../submarine/server/rest/JobManagerRestApi.java   | 27 ++++++++
 .../submarine/server/rest/RestConstants.java       |  2 +
 .../server/submitter/k8s/K8sJobSubmitter.java      | 74 +++++++++++++++++++++-
 .../apache/submarine/rest/JobManagerRestApiIT.java | 15 +++++
 8 files changed, 210 insertions(+), 14 deletions(-)

diff --git a/.gitignore b/.gitignore
index ebf34fc..af83984 100644
--- a/.gitignore
+++ b/.gitignore
@@ -88,3 +88,9 @@ submarine-cloud/bin/*
 
 submarine-security/spark-security/dependency-reduced-pom.xml
 submarine-security/spark-security/derby.log
+
+# vscode file
+.project
+.classpath
+.settings
+.factorypath
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobLog.java
similarity index 54%
copy from 
submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
copy to 
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobLog.java
index e0b2ede..66784b0 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobLog.java
@@ -17,21 +17,41 @@
  * under the License.
  */
 
-package org.apache.submarine.server.rest;
+package org.apache.submarine.server.api.job;
 
-public class RestConstants {
-  public static final String V1 = "v1";
-  public static final String JOBS = "jobs";
-  public static final String JOB_ID = "id";
-  public static final String PING = "ping";
-  public static final String MEDIA_TYPE_YAML = "application/yaml";
-  public static final String CHARSET_UTF8 = "charset=utf-8";
+import java.util.ArrayList;
+import java.util.List;
 
-  public static final String METASTORE = "metastore";
+public class JobLog {
+  private String jobId;
+  private List<podLog> logContent;
 
-  public static final String CLUSTER = "cluster";
-  public static final String ADDRESS = "address";
+  class podLog {
+    String podName;
+    String podLog;
+    podLog(String podName, String podLog) {
+      this.podName = podName;
+      this.podLog = podLog;
+    }
+  }
 
-  public static final String NODES = "nodes";
-  public static final String NODE = "node";
+  public JobLog() {
+    logContent = new ArrayList<podLog>();
+  }
+  
+  public void setJobId(String jobId) {
+    this.jobId = jobId;
+  }
+  
+  public String getJobId() {
+    return jobId;
+  }
+
+  public void addPodLog(String name, String log) {
+    logContent.add(new podLog(name, log));
+  }
+
+  public void clearPodLog() {
+    logContent.clear();
+  }
 }
diff --git 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
index 46bef31..a9d455f 100644
--- 
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
+++ 
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
@@ -70,4 +70,20 @@ public interface JobSubmitter {
    * @throws SubmarineRuntimeException running error
    */
   Job deleteJob(JobSpec jobSpec) throws SubmarineRuntimeException;
+
+  /**
+   * Get the pod log list in the job
+   * @param Job job
+   * @return object
+   * @throws SubmarineRuntimeException running error
+   */
+  JobLog getJobLog(JobSpec jobSpec, String jobId) throws 
SubmarineRuntimeException;
+
+  /**
+   * Get the pod name list in the job
+   * @param Job job
+   * @return object
+   * @throws SubmarineRuntimeException running error
+   */
+  JobLog getJobLogName(JobSpec jobSpec, String jobId) throws 
SubmarineRuntimeException;
 }
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
index df95c68..ad4261c 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
@@ -34,6 +34,7 @@ import org.apache.submarine.server.SubmitterManager;
 import org.apache.submarine.server.api.job.JobSubmitter;
 import org.apache.submarine.server.api.job.Job;
 import org.apache.submarine.server.api.job.JobId;
+import org.apache.submarine.server.api.job.JobLog;
 import org.apache.submarine.server.api.spec.JobSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -164,6 +165,43 @@ public class JobManager {
     return job;
   }
 
+  /**
+   * List job logs
+   * @param status job status, if null will return all job logs
+   * @return job log list
+   * @throws SubmarineRuntimeException the service error
+   */
+  public List<JobLog> listJobLogsByStatus(String status) throws 
SubmarineRuntimeException {
+    List<JobLog> jobLogList = new ArrayList<JobLog>();
+    for (Map.Entry<String, Job> entry : cachedJobMap.entrySet()) {
+      String jobId = entry.getKey();
+      Job job = entry.getValue();
+      JobSpec spec = job.getSpec();
+      Job patchJob = submitter.findJob(spec);
+      LOG.info("Found job: {}", patchJob.getStatus());
+      if (status == null || 
status.toLowerCase().equals(patchJob.getStatus().toLowerCase())) {
+        job.rebuild(patchJob);
+        jobLogList.add(submitter.getJobLogName(spec, jobId));
+      }
+    }
+    return jobLogList;
+  }
+
+  /**
+   * Get job log
+   * @param id job id
+   * @return object
+   * @throws SubmarineRuntimeException the service error
+   */
+  public JobLog getJobLog(String id) throws SubmarineRuntimeException {
+    checkJobId(id);
+    Job job = cachedJobMap.get(id);
+    JobSpec spec = job.getSpec();
+    Job patchJob = submitter.findJob(spec);
+    job.rebuild(patchJob);
+    return submitter.getJobLog(spec, id);
+  }
+
   private void checkSpec(JobSpec spec) throws SubmarineRuntimeException {
     if (spec == null) {
       throw new SubmarineRuntimeException(Status.OK.getStatusCode(), "Invalid 
job spec.");
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
index e3511ec..f468717 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
@@ -35,6 +35,7 @@ import java.util.List;
 import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
 import org.apache.submarine.server.job.JobManager;
 import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.job.JobLog;
 import org.apache.submarine.server.api.spec.JobSpec;
 import org.apache.submarine.server.response.JsonResponse;
 
@@ -132,6 +133,32 @@ public class JobManagerRestApi {
       return parseJobServiceException(e);
     }
   }
+  
+  @GET
+  @Path("/logs")
+  public Response listLog(@QueryParam("status") String status) {
+    try {
+      List<JobLog> jobLogList = jobManager.listJobLogsByStatus(status);
+      return new JsonResponse.Builder<List<JobLog>>(Response.Status.OK).
+          result(jobLogList).build();
+
+    } catch (SubmarineRuntimeException e) {
+      return parseJobServiceException(e);
+    }
+  }
+
+  @GET
+  @Path("/logs/{id}")
+  public Response getLog(@PathParam(RestConstants.JOB_ID) String id) {
+    try {
+      JobLog jobLog = jobManager.getJobLog(id);
+      return new JsonResponse.Builder<JobLog>(Response.Status.OK).
+          result(jobLog).build();
+
+    } catch (SubmarineRuntimeException e) {
+      return parseJobServiceException(e);
+    }
+  }
 
   private Response parseJobServiceException(SubmarineRuntimeException e) {
     return new 
JsonResponse.Builder<String>(e.getCode()).message(e.getMessage()).build();
diff --git 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
index e0b2ede..5979a2f 100644
--- 
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
+++ 
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
@@ -34,4 +34,6 @@ public class RestConstants {
 
   public static final String NODES = "nodes";
   public static final String NODE = "node";
+
+  public static final String LOGS = "logs";
 }
diff --git 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
index 557ebaa..d2efed5 100644
--- 
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
+++ 
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
@@ -28,7 +28,10 @@ import io.kubernetes.client.ApiClient;
 import io.kubernetes.client.ApiException;
 import io.kubernetes.client.Configuration;
 import io.kubernetes.client.JSON;
+import io.kubernetes.client.apis.CoreV1Api;
 import io.kubernetes.client.apis.CustomObjectsApi;
+import io.kubernetes.client.models.V1Pod;
+import io.kubernetes.client.models.V1PodList;
 import io.kubernetes.client.models.V1Status;
 import io.kubernetes.client.util.ClientBuilder;
 import io.kubernetes.client.util.KubeConfig;
@@ -37,6 +40,8 @@ import 
org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
 import org.apache.submarine.server.api.exception.InvalidSpecException;
 import org.apache.submarine.server.api.job.JobSubmitter;
 import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.job.JobLog;
+import org.apache.submarine.server.api.spec.JobLibrarySpec;
 import org.apache.submarine.server.api.spec.JobSpec;
 import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
 import org.apache.submarine.server.submitter.k8s.model.MLJob;
@@ -52,9 +57,14 @@ public class K8sJobSubmitter implements JobSubmitter {
 
   private static final String KUBECONFIG_ENV = "KUBECONFIG";
 
+  private static final String TF_JOB_SELECTOR_KEY = "tf-job-name=";
+  private static final String PYTORCH_JOB_SELECTOR_KEY = "pytorch-job-name=";
+
   // K8s API client for CRD
   private CustomObjectsApi api;
 
+  private CoreV1Api coreApi;
+
   public K8sJobSubmitter() {}
 
   @Override
@@ -79,6 +89,9 @@ public class K8sJobSubmitter implements JobSubmitter {
     if (api == null) {
       api = new CustomObjectsApi();
     }
+    if (coreApi == null) {
+      coreApi = new CoreV1Api(client);
+    }
   }
 
   @Override
@@ -95,8 +108,10 @@ public class K8sJobSubmitter implements JobSubmitter {
           mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob, 
"true");
       job = parseResponseObject(object, ParseOp.PARSE_OP_RESULT);
     } catch (InvalidSpecException e) {
+      LOG.error("K8s submitter: parse Job object failed by " + e.getMessage(), 
e);
       throw new SubmarineRuntimeException(200, e.getMessage());
     } catch (ApiException e) {
+      LOG.error("K8s submitter: parse Job object failed by " + e.getMessage(), 
e);
       throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
     }
     return job;
@@ -165,11 +180,68 @@ public class K8sJobSubmitter implements JobSubmitter {
         return MLJobConverter.toJobFromStatus(status);
       }
     } catch (JsonSyntaxException e) {
-      LOG.warn("K8s submitter: parse response object failed by " + 
e.getMessage(), e);
+      LOG.error("K8s submitter: parse response object failed by " + 
e.getMessage(), e);
     }
     throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream 
response failed.");
   }
 
+  @Override
+  public JobLog getJobLogName(JobSpec jobSpec, String jobId) {
+    JobLog jobLog = new JobLog();
+    jobLog.setJobId(jobId);
+    try {
+      final V1PodList podList = coreApi.listNamespacedPod(
+          jobSpec.getNamespace(),
+          "false", null, null,
+          getJobLabelSelector(jobSpec), null, null,
+          null, null);
+      for (V1Pod pod: podList.getItems()) {
+        String podName = pod.getMetadata().getName();
+        jobLog.addPodLog(podName, null);
+      }
+    } catch (final ApiException e) {
+      LOG.error("Error when listing pod for job:" + jobSpec.getName(), 
e.getMessage());
+    }
+    return jobLog;
+  }
+
+  @Override
+  public JobLog getJobLog(JobSpec jobSpec, String jobId) {
+    JobLog jobLog = new JobLog();
+    jobLog.setJobId(jobId);
+    try {
+      final V1PodList podList = coreApi.listNamespacedPod(
+          jobSpec.getNamespace(),
+          "false", null, null,
+          getJobLabelSelector(jobSpec), null, null,
+          null, null);
+      
+      for (V1Pod pod : podList.getItems()) {
+        String podName = pod.getMetadata().getName();
+        String namespace = pod.getMetadata().getNamespace();
+        String podLog = coreApi.readNamespacedPodLog(
+            podName, namespace, null, Boolean.FALSE,
+            Integer.MAX_VALUE, null, Boolean.FALSE, 
+            Integer.MAX_VALUE, null, Boolean.FALSE);
+
+        jobLog.addPodLog(podName, podLog);
+      }
+    } catch (final ApiException e) {
+      LOG.error("Error when listing pod for job:" + jobSpec.getName(), 
e.getMessage());
+    }
+    return jobLog;
+  }
+  
+  private String getJobLabelSelector(JobSpec jobSpec) {
+    // TODO(JohnTing): SELECTOR_KEY should be obtained from individual models 
in MLJOB
+    if (jobSpec.getLibrarySpec()
+        
.getName().equalsIgnoreCase(JobLibrarySpec.SupportedMLFramework.TENSORFLOW.getName()))
 {
+      return TF_JOB_SELECTOR_KEY + jobSpec.getName();
+    } else {
+      return PYTORCH_JOB_SELECTOR_KEY + jobSpec.getName();
+    }
+  }
+
   private enum ParseOp {
     PARSE_OP_RESULT,
     PARSE_OP_DELETE
diff --git 
a/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
 
b/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
index 35aff5b..7501df5 100644
--- 
a/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
+++ 
b/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonObject;
+
 import io.kubernetes.client.ApiClient;
 import io.kubernetes.client.ApiException;
 import io.kubernetes.client.Configuration;
@@ -66,6 +67,7 @@ public class JobManagerRestApiIT extends 
AbstractSubmarineServerTest {
   /** Key is the ml framework name, the value is the operator */
   private static Map<String, KfOperator> kfOperatorMap;
   private static String JOB_PATH = "/api/" + RestConstants.V1 + "/" + 
RestConstants.JOBS;
+  private static String JOB_LOG_PATH = JOB_PATH + "/" + RestConstants.LOGS;
 
   private Gson gson = new GsonBuilder()
       .registerTypeAdapter(JobId.class, new JobIdSerializer())
@@ -151,6 +153,9 @@ public class JobManagerRestApiIT extends 
AbstractSubmarineServerTest {
     Job foundJob = gson.fromJson(gson.toJson(jsonResponse.getResult()), 
Job.class);
     verifyGetJobApiResult(createdJob, foundJob);
 
+    // get log list
+    // TODO(JohnTing): Test the job log after creating the job
+
     // patch
     // TODO(jiwq): the commons-httpclient not support patch method
     // https://tools.ietf.org/html/rfc5789
@@ -252,6 +257,16 @@ public class JobManagerRestApiIT extends 
AbstractSubmarineServerTest {
     Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
jsonResponse.getCode());
   }
 
+  @Test
+  public void testListJobLog() throws Exception {
+    GetMethod getMethod = httpGet(JOB_LOG_PATH);
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
getMethod.getStatusCode());
+
+    String json = getMethod.getResponseBodyAsString();
+    JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
+    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
jsonResponse.getCode());
+  }
+
   String loadContent(String resourceName) throws Exception {
     URL fileUrl = this.getClass().getResource("/" + resourceName);
     LOG.info("Resource file: " + fileUrl);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to