This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 87b2296 SUBMARINE-442. Support get job's log in submarine-server REST
API
87b2296 is described below
commit 87b2296da9eeadd80765cfa49e3d262c7d32b4e1
Author: JohnTing <[email protected]>
AuthorDate: Thu Apr 23 23:51:22 2020 +0800
SUBMARINE-442. Support get job's log in submarine-server REST API
### What is this PR for?
Now we have the "jobs" resource in REST which can do CRUD. We also need a
"logs" API to get the job's log output. The URI could be "api/v1/logs"
It should accept parameters like "jobid". Initially, the logs could be
aggregated logs of all containers.
Streaming is preferred so that the python client can enable a fancy way for
the end-user to check logs
### What type of PR is it?
Feature
### Todos
* [x] - get logs so far
### What is the Jira issue?
https://issues.apache.org/jira/projects/SUBMARINE/issues/SUBMARINE-442
### How should this be tested?
Create a job
Visit /api/v1/logs or /api/v1/logs/{jobid} with a browser
### Screenshots (if appropriate)
http://127.0.0.1:8080/api/v1/jobs/logs

http://127.0.0.1:8080/api/v1/jobs/logs/job_1587481945001_0001

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: JohnTing <[email protected]>
Closes #263 from JohnTing/SUBMARINE-442 and squashes the following commits:
7195a77 [JohnTing] test12
1bb806f [JohnTing] test12
319222c [JohnTing] SUBMARINE-442
---
.gitignore | 6 ++
.../apache/submarine/server/api/job/JobLog.java} | 46 ++++++++++----
.../submarine/server/api/job/JobSubmitter.java | 16 +++++
.../apache/submarine/server/job/JobManager.java | 38 +++++++++++
.../submarine/server/rest/JobManagerRestApi.java | 27 ++++++++
.../submarine/server/rest/RestConstants.java | 2 +
.../server/submitter/k8s/K8sJobSubmitter.java | 74 +++++++++++++++++++++-
.../apache/submarine/rest/JobManagerRestApiIT.java | 15 +++++
8 files changed, 210 insertions(+), 14 deletions(-)
diff --git a/.gitignore b/.gitignore
index ebf34fc..af83984 100644
--- a/.gitignore
+++ b/.gitignore
@@ -88,3 +88,9 @@ submarine-cloud/bin/*
submarine-security/spark-security/dependency-reduced-pom.xml
submarine-security/spark-security/derby.log
+
+# vscode file
+.project
+.classpath
+.settings
+.factorypath
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobLog.java
similarity index 54%
copy from
submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
copy to
submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobLog.java
index e0b2ede..66784b0 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobLog.java
@@ -17,21 +17,41 @@
* under the License.
*/
-package org.apache.submarine.server.rest;
+package org.apache.submarine.server.api.job;
-public class RestConstants {
- public static final String V1 = "v1";
- public static final String JOBS = "jobs";
- public static final String JOB_ID = "id";
- public static final String PING = "ping";
- public static final String MEDIA_TYPE_YAML = "application/yaml";
- public static final String CHARSET_UTF8 = "charset=utf-8";
+import java.util.ArrayList;
+import java.util.List;
- public static final String METASTORE = "metastore";
+public class JobLog {
+ private String jobId;
+ private List<podLog> logContent;
- public static final String CLUSTER = "cluster";
- public static final String ADDRESS = "address";
+ class podLog {
+ String podName;
+ String podLog;
+ podLog(String podName, String podLog) {
+ this.podName = podName;
+ this.podLog = podLog;
+ }
+ }
- public static final String NODES = "nodes";
- public static final String NODE = "node";
+ public JobLog() {
+ logContent = new ArrayList<podLog>();
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void addPodLog(String name, String log) {
+ logContent.add(new podLog(name, log));
+ }
+
+ public void clearPodLog() {
+ logContent.clear();
+ }
}
diff --git
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
index 46bef31..a9d455f 100644
---
a/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
+++
b/submarine-server/server-api/src/main/java/org/apache/submarine/server/api/job/JobSubmitter.java
@@ -70,4 +70,20 @@ public interface JobSubmitter {
* @throws SubmarineRuntimeException running error
*/
Job deleteJob(JobSpec jobSpec) throws SubmarineRuntimeException;
+
+ /**
+ * Get the pod log list in the job
+ * @param Job job
+ * @return object
+ * @throws SubmarineRuntimeException running error
+ */
+ JobLog getJobLog(JobSpec jobSpec, String jobId) throws
SubmarineRuntimeException;
+
+ /**
+ * Get the pod name list in the job
+ * @param Job job
+ * @return object
+ * @throws SubmarineRuntimeException running error
+ */
+ JobLog getJobLogName(JobSpec jobSpec, String jobId) throws
SubmarineRuntimeException;
}
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
index df95c68..ad4261c 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/job/JobManager.java
@@ -34,6 +34,7 @@ import org.apache.submarine.server.SubmitterManager;
import org.apache.submarine.server.api.job.JobSubmitter;
import org.apache.submarine.server.api.job.Job;
import org.apache.submarine.server.api.job.JobId;
+import org.apache.submarine.server.api.job.JobLog;
import org.apache.submarine.server.api.spec.JobSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -164,6 +165,43 @@ public class JobManager {
return job;
}
+ /**
+ * List job logs
+ * @param status job status, if null will return all job logs
+ * @return job log list
+ * @throws SubmarineRuntimeException the service error
+ */
+ public List<JobLog> listJobLogsByStatus(String status) throws
SubmarineRuntimeException {
+ List<JobLog> jobLogList = new ArrayList<JobLog>();
+ for (Map.Entry<String, Job> entry : cachedJobMap.entrySet()) {
+ String jobId = entry.getKey();
+ Job job = entry.getValue();
+ JobSpec spec = job.getSpec();
+ Job patchJob = submitter.findJob(spec);
+ LOG.info("Found job: {}", patchJob.getStatus());
+ if (status == null ||
status.toLowerCase().equals(patchJob.getStatus().toLowerCase())) {
+ job.rebuild(patchJob);
+ jobLogList.add(submitter.getJobLogName(spec, jobId));
+ }
+ }
+ return jobLogList;
+ }
+
+ /**
+ * Get job log
+ * @param id job id
+ * @return object
+ * @throws SubmarineRuntimeException the service error
+ */
+ public JobLog getJobLog(String id) throws SubmarineRuntimeException {
+ checkJobId(id);
+ Job job = cachedJobMap.get(id);
+ JobSpec spec = job.getSpec();
+ Job patchJob = submitter.findJob(spec);
+ job.rebuild(patchJob);
+ return submitter.getJobLog(spec, id);
+ }
+
private void checkSpec(JobSpec spec) throws SubmarineRuntimeException {
if (spec == null) {
throw new SubmarineRuntimeException(Status.OK.getStatusCode(), "Invalid
job spec.");
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
index e3511ec..f468717 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/JobManagerRestApi.java
@@ -35,6 +35,7 @@ import java.util.List;
import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
import org.apache.submarine.server.job.JobManager;
import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.job.JobLog;
import org.apache.submarine.server.api.spec.JobSpec;
import org.apache.submarine.server.response.JsonResponse;
@@ -132,6 +133,32 @@ public class JobManagerRestApi {
return parseJobServiceException(e);
}
}
+
+ @GET
+ @Path("/logs")
+ public Response listLog(@QueryParam("status") String status) {
+ try {
+ List<JobLog> jobLogList = jobManager.listJobLogsByStatus(status);
+ return new JsonResponse.Builder<List<JobLog>>(Response.Status.OK).
+ result(jobLogList).build();
+
+ } catch (SubmarineRuntimeException e) {
+ return parseJobServiceException(e);
+ }
+ }
+
+ @GET
+ @Path("/logs/{id}")
+ public Response getLog(@PathParam(RestConstants.JOB_ID) String id) {
+ try {
+ JobLog jobLog = jobManager.getJobLog(id);
+ return new JsonResponse.Builder<JobLog>(Response.Status.OK).
+ result(jobLog).build();
+
+ } catch (SubmarineRuntimeException e) {
+ return parseJobServiceException(e);
+ }
+ }
private Response parseJobServiceException(SubmarineRuntimeException e) {
return new
JsonResponse.Builder<String>(e.getCode()).message(e.getMessage()).build();
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
index e0b2ede..5979a2f 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/rest/RestConstants.java
@@ -34,4 +34,6 @@ public class RestConstants {
public static final String NODES = "nodes";
public static final String NODE = "node";
+
+ public static final String LOGS = "logs";
}
diff --git
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
index 557ebaa..d2efed5 100644
---
a/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
+++
b/submarine-server/server-submitter/submitter-k8s/src/main/java/org/apache/submarine/server/submitter/k8s/K8sJobSubmitter.java
@@ -28,7 +28,10 @@ import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.JSON;
+import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.apis.CustomObjectsApi;
+import io.kubernetes.client.models.V1Pod;
+import io.kubernetes.client.models.V1PodList;
import io.kubernetes.client.models.V1Status;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
@@ -37,6 +40,8 @@ import
org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
import org.apache.submarine.server.api.exception.InvalidSpecException;
import org.apache.submarine.server.api.job.JobSubmitter;
import org.apache.submarine.server.api.job.Job;
+import org.apache.submarine.server.api.job.JobLog;
+import org.apache.submarine.server.api.spec.JobLibrarySpec;
import org.apache.submarine.server.api.spec.JobSpec;
import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
import org.apache.submarine.server.submitter.k8s.model.MLJob;
@@ -52,9 +57,14 @@ public class K8sJobSubmitter implements JobSubmitter {
private static final String KUBECONFIG_ENV = "KUBECONFIG";
+ private static final String TF_JOB_SELECTOR_KEY = "tf-job-name=";
+ private static final String PYTORCH_JOB_SELECTOR_KEY = "pytorch-job-name=";
+
// K8s API client for CRD
private CustomObjectsApi api;
+ private CoreV1Api coreApi;
+
public K8sJobSubmitter() {}
@Override
@@ -79,6 +89,9 @@ public class K8sJobSubmitter implements JobSubmitter {
if (api == null) {
api = new CustomObjectsApi();
}
+ if (coreApi == null) {
+ coreApi = new CoreV1Api(client);
+ }
}
@Override
@@ -95,8 +108,10 @@ public class K8sJobSubmitter implements JobSubmitter {
mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob,
"true");
job = parseResponseObject(object, ParseOp.PARSE_OP_RESULT);
} catch (InvalidSpecException e) {
+ LOG.error("K8s submitter: parse Job object failed by " + e.getMessage(),
e);
throw new SubmarineRuntimeException(200, e.getMessage());
} catch (ApiException e) {
+ LOG.error("K8s submitter: parse Job object failed by " + e.getMessage(),
e);
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
return job;
@@ -165,11 +180,68 @@ public class K8sJobSubmitter implements JobSubmitter {
return MLJobConverter.toJobFromStatus(status);
}
} catch (JsonSyntaxException e) {
- LOG.warn("K8s submitter: parse response object failed by " +
e.getMessage(), e);
+ LOG.error("K8s submitter: parse response object failed by " +
e.getMessage(), e);
}
throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream
response failed.");
}
+ @Override
+ public JobLog getJobLogName(JobSpec jobSpec, String jobId) {
+ JobLog jobLog = new JobLog();
+ jobLog.setJobId(jobId);
+ try {
+ final V1PodList podList = coreApi.listNamespacedPod(
+ jobSpec.getNamespace(),
+ "false", null, null,
+ getJobLabelSelector(jobSpec), null, null,
+ null, null);
+ for (V1Pod pod: podList.getItems()) {
+ String podName = pod.getMetadata().getName();
+ jobLog.addPodLog(podName, null);
+ }
+ } catch (final ApiException e) {
+ LOG.error("Error when listing pod for job:" + jobSpec.getName(),
e.getMessage());
+ }
+ return jobLog;
+ }
+
+ @Override
+ public JobLog getJobLog(JobSpec jobSpec, String jobId) {
+ JobLog jobLog = new JobLog();
+ jobLog.setJobId(jobId);
+ try {
+ final V1PodList podList = coreApi.listNamespacedPod(
+ jobSpec.getNamespace(),
+ "false", null, null,
+ getJobLabelSelector(jobSpec), null, null,
+ null, null);
+
+ for (V1Pod pod : podList.getItems()) {
+ String podName = pod.getMetadata().getName();
+ String namespace = pod.getMetadata().getNamespace();
+ String podLog = coreApi.readNamespacedPodLog(
+ podName, namespace, null, Boolean.FALSE,
+ Integer.MAX_VALUE, null, Boolean.FALSE,
+ Integer.MAX_VALUE, null, Boolean.FALSE);
+
+ jobLog.addPodLog(podName, podLog);
+ }
+ } catch (final ApiException e) {
+ LOG.error("Error when listing pod for job:" + jobSpec.getName(),
e.getMessage());
+ }
+ return jobLog;
+ }
+
+ private String getJobLabelSelector(JobSpec jobSpec) {
+ // TODO(JohnTing): SELECTOR_KEY should be obtained from individual models
in MLJOB
+ if (jobSpec.getLibrarySpec()
+
.getName().equalsIgnoreCase(JobLibrarySpec.SupportedMLFramework.TENSORFLOW.getName()))
{
+ return TF_JOB_SELECTOR_KEY + jobSpec.getName();
+ } else {
+ return PYTORCH_JOB_SELECTOR_KEY + jobSpec.getName();
+ }
+ }
+
private enum ParseOp {
PARSE_OP_RESULT,
PARSE_OP_DELETE
diff --git
a/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
b/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
index 35aff5b..7501df5 100644
---
a/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
+++
b/submarine-test/test-k8s/src/test/java/org/apache/submarine/rest/JobManagerRestApiIT.java
@@ -33,6 +33,7 @@ import java.util.Map;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
+
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
@@ -66,6 +67,7 @@ public class JobManagerRestApiIT extends
AbstractSubmarineServerTest {
/** Key is the ml framework name, the value is the operator */
private static Map<String, KfOperator> kfOperatorMap;
private static String JOB_PATH = "/api/" + RestConstants.V1 + "/" +
RestConstants.JOBS;
+ private static String JOB_LOG_PATH = JOB_PATH + "/" + RestConstants.LOGS;
private Gson gson = new GsonBuilder()
.registerTypeAdapter(JobId.class, new JobIdSerializer())
@@ -151,6 +153,9 @@ public class JobManagerRestApiIT extends
AbstractSubmarineServerTest {
Job foundJob = gson.fromJson(gson.toJson(jsonResponse.getResult()),
Job.class);
verifyGetJobApiResult(createdJob, foundJob);
+ // get log list
+ // TODO(JohnTing): Test the job log after creating the job
+
// patch
// TODO(jiwq): the commons-httpclient not support patch method
// https://tools.ietf.org/html/rfc5789
@@ -252,6 +257,16 @@ public class JobManagerRestApiIT extends
AbstractSubmarineServerTest {
Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
jsonResponse.getCode());
}
+ @Test
+ public void testListJobLog() throws Exception {
+ GetMethod getMethod = httpGet(JOB_LOG_PATH);
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
getMethod.getStatusCode());
+
+ String json = getMethod.getResponseBodyAsString();
+ JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
jsonResponse.getCode());
+ }
+
String loadContent(String resourceName) throws Exception {
URL fileUrl = this.getClass().getResource("/" + resourceName);
LOG.info("Resource file: " + fileUrl);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]