This is an automated email from the ASF dual-hosted git repository.
ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 80c6303b [LIVY-967][SERVER] Return session information with livy
sessions APIs (#383)
80c6303b is described below
commit 80c6303b5bc7ac5d07281b0366dc2c4a2d7f082a
Author: Asif Khatri <[email protected]>
AuthorDate: Tue Apr 18 17:45:55 2023 +0530
[LIVY-967][SERVER] Return session information with livy sessions APIs (#383)
## What changes were proposed in this pull request?
Currently livy GET /Sessions doesn't return fields like driver-executor
memory, spark configuration, etc. Ideally a session response should return all
the values set in session request api call(POST /sessions).
JIRA: https://issues.apache.org/jira/browse/LIVY-967
## How was this patch tested?
Verified manually by creating interactive session via REST API call in a
local Yarn cluster. Also, we have updated the unit tests.
---
.../apache/livy/client/common/HttpMessages.java | 37 +++++++++++++--
.../apache/livy/client/http/HttpClientSpec.scala | 12 +++++
docs/rest-api.md | 55 ++++++++++++++++++++++
.../server/interactive/InteractiveSession.scala | 52 +++++++++++++++++++-
.../interactive/InteractiveSessionServlet.scala | 12 +++--
.../InteractiveSessionServletSpec.scala | 22 +++++++++
.../interactive/InteractiveSessionSpec.scala | 11 +++--
7 files changed, 189 insertions(+), 12 deletions(-)
diff --git
a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
index a88ed8ca..e0621a39 100644
---
a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
+++
b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
@@ -62,9 +62,25 @@ public class HttpMessages {
public final Map<String, String> appInfo;
public final List<String> log;
public final String ttl;
-
- public SessionInfo(int id, String name, String appId, String owner, String
proxyUser,
- String state, String kind, Map<String, String> appInfo, List<String>
log, String ttl) {
+ public final String driverMemory;
+ public final int driverCores;
+ public final String executorMemory;
+ public final int executorCores;
+ public final Map<String, String> conf;
+ public final List<String> archives;
+ public final List<String> files;
+ public final int heartbeatTimeoutInSecond;
+ public final List<String> jars;
+ public final int numExecutors;
+ public final List<String> pyFiles;
+ public final String queue;
+
+ public SessionInfo(int id, String name, String appId, String owner, String
state,
+ String kind, Map<String, String> appInfo, List<String> log,
+ String ttl, String driverMemory,
+ int driverCores, String executorMemory, int executorCores,
Map<String, String> conf,
+ List<String> archives, List<String> files, int
heartbeatTimeoutInSecond, List<String> jars,
+ int numExecutors, String proxyUser, List<String> pyFiles, String
queue) {
this.id = id;
this.name = name;
this.appId = appId;
@@ -75,10 +91,23 @@ public class HttpMessages {
this.appInfo = appInfo;
this.log = log;
this.ttl = ttl;
+ this.driverMemory = driverMemory;
+ this.driverCores = driverCores;
+ this.executorMemory = executorMemory;
+ this.executorCores = executorCores;
+ this.conf = conf;
+ this.archives = archives;
+ this.files = files;
+ this.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond;
+ this.jars = jars;
+ this.numExecutors = numExecutors;
+ this.pyFiles = pyFiles;
+ this.queue = queue;
}
private SessionInfo() {
- this(-1, null, null, null, null, null, null, null, null, null);
+ this(-1, null, null, null, null, null, null, null, null, null, 0, null,
0, null, null,
+ null, 0, null, 0, null, null, null);
}
}
diff --git
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
index 336ff8c8..d24ec92a 100644
---
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
+++
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
@@ -279,6 +279,18 @@ private class HttpClientTestBootstrap extends LifeCycle {
when(session.state).thenReturn(SessionState.Idle)
when(session.proxyUser).thenReturn(None)
when(session.kind).thenReturn(Spark)
+ when(session.driverMemory).thenReturn(None)
+ when(session.driverCores).thenReturn(None)
+ when(session.executorMemory).thenReturn(None)
+ when(session.executorCores).thenReturn(None)
+ when(session.numExecutors).thenReturn(None)
+ when(session.proxyUser).thenReturn(None)
+ when(session.queue).thenReturn(None)
+ when(session.conf).thenReturn(Map("" -> ""))
+ when(session.archives).thenReturn(List())
+ when(session.files).thenReturn(List())
+ when(session.jars).thenReturn(List())
+ when(session.pyFiles).thenReturn(List())
when(session.stop()).thenReturn(Future.successful(()))
when(session.ttl).thenReturn(None)
require(HttpClientSpec.session == null, "Session already created?")
diff --git a/docs/rest-api.md b/docs/rest-api.md
index 903bb5b2..ec43e0f8 100644
--- a/docs/rest-api.md
+++ b/docs/rest-api.md
@@ -636,6 +636,61 @@ A session represents an interactive shell.
<td>The detailed application info</td>
<td>Map of key=val</td>
</tr>
+ <tr>
+ <td>jars</td>
+ <td>jars to be used in this session</td>
+ <td>list of strings</td>
+ </tr>
+ <tr>
+ <td>pyFiles</td>
+ <td>Python files to be used in this session</td>
+ <td>list of strings</td>
+ </tr>
+ <tr>
+ <td>files</td>
+ <td>files to be used in this session</td>
+ <td>list of strings</td>
+ </tr>
+ <tr>
+ <td>driverMemory</td>
+ <td>Amount of memory to use for the driver process</td>
+ <td>string</td>
+ </tr>
+ <tr>
+ <td>driverCores</td>
+ <td>Number of cores to use for the driver process</td>
+ <td>int</td>
+ </tr>
+ <tr>
+ <td>executorMemory</td>
+ <td>Amount of memory to use per executor process</td>
+ <td>string</td>
+ </tr>
+ <tr>
+ <td>executorCores</td>
+ <td>Number of cores to use for each executor</td>
+ <td>int</td>
+ </tr>
+ <tr>
+ <td>numExecutors</td>
+ <td>Number of executors to launch for this session</td>
+ <td>int</td>
+ </tr>
+ <tr>
+ <td>archives</td>
+ <td>Archives to be used in this session</td>
+ <td>List of string</td>
+ </tr>
+ <tr>
+ <td>queue</td>
+ <td>The name of the YARN queue to which submitted</td>
+ <td>string</td>
+ </tr>
+ <tr>
+ <td>conf</td>
+ <td>Spark configuration properties</td>
+ <td>Map of key=val</td>
+ </tr>
</table>
diff --git
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 6a1dba0f..59a593f6 100644
---
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -55,6 +55,18 @@ case class InteractiveRecoveryMetadata(
heartbeatTimeoutS: Int,
owner: String,
ttl: Option[String],
+ driverMemory: Option[String],
+ driverCores: Option[Int],
+ executorMemory: Option[String],
+ executorCores: Option[Int],
+ conf: Map[String, String],
+ archives: List[String],
+ files: List[String],
+ jars: List[String],
+ numExecutors: Option[Int],
+ pyFiles: List[String],
+ queue: Option[String],
+ // proxyUser is deprecated. It is available here only for backward
compatibility
proxyUser: Option[String],
rscDriverUri: Option[URI],
version: Int = 1)
@@ -127,6 +139,17 @@ object InteractiveSession extends Logging {
impersonatedUser,
ttl,
sessionStore,
+ request.driverMemory,
+ request.driverCores,
+ request.executorMemory,
+ request.executorCores,
+ request.conf,
+ request.archives,
+ request.files,
+ request.jars,
+ request.numExecutors,
+ request.pyFiles,
+ request.queue,
mockApp)
}
@@ -155,6 +178,17 @@ object InteractiveSession extends Logging {
metadata.proxyUser,
metadata.ttl,
sessionStore,
+ metadata.driverMemory,
+ metadata.driverCores,
+ metadata.executorMemory,
+ metadata.executorCores,
+ metadata.conf,
+ metadata.archives,
+ metadata.files,
+ metadata.jars,
+ metadata.numExecutors,
+ metadata.pyFiles,
+ metadata.queue,
mockApp)
}
@@ -377,12 +411,23 @@ class InteractiveSession(
val client: Option[RSCClient],
initialState: SessionState,
val kind: Kind,
- heartbeatTimeoutS: Int,
+ val heartbeatTimeoutS: Int,
livyConf: LivyConf,
owner: String,
override val proxyUser: Option[String],
ttl: Option[String],
sessionStore: SessionStore,
+ val driverMemory: Option[String],
+ val driverCores: Option[Int],
+ val executorMemory: Option[String],
+ val executorCores: Option[Int],
+ val conf: Map[String, String],
+ val archives: List[String],
+ val files: List[String],
+ val jars: List[String],
+ val numExecutors: Option[Int],
+ val pyFiles: List[String],
+ val queue: Option[String],
mockApp: Option[SparkApp]) // For unit test.
extends Session(id, name, owner, ttl, livyConf)
with SessionHeartbeat
@@ -475,7 +520,10 @@ class InteractiveSession(
override def recoveryMetadata: RecoveryMetadata =
InteractiveRecoveryMetadata(id, name, appId, appTag, kind,
- heartbeatTimeout.toSeconds.toInt, owner, None, proxyUser, rscDriverUri)
+ heartbeatTimeout.toSeconds.toInt, owner, None,
+ driverMemory, driverCores, executorMemory, executorCores, conf,
+ archives, files, jars, numExecutors, pyFiles, queue,
+ proxyUser, rscDriverUri)
override def state: SessionState = {
if (serverSideState == SessionState.Running) {
diff --git
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
index 85407b04..239936f3 100644
---
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
+++
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -92,9 +92,15 @@ class InteractiveSessionServlet(
Nil
}
- new SessionInfo(session.id, session.name.orNull, session.appId.orNull,
session.owner,
- session.proxyUser.orNull, session.state.toString, session.kind.toString,
- session.appInfo.asJavaMap, logs.asJava, session.ttl.orNull)
+ new SessionInfo(session.id, session.name.orNull, session.appId.orNull,
+ session.owner, session.state.toString, session.kind.toString,
+ session.appInfo.asJavaMap, logs.asJava,
+ session.proxyUser.orNull, session.driverMemory.orNull,
+ session.driverCores.getOrElse(0), session.executorMemory.orNull,
+ session.executorCores.getOrElse(0), session.conf.asJava,
session.archives.asJava,
+ session.files.asJava, session.heartbeatTimeoutS, session.jars.asJava,
+ session.numExecutors.getOrElse(0), session.proxyUser.orNull,
session.pyFiles.asJava,
+ session.queue.orNull)
}
post("/:id/stop") {
diff --git
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
index 0f1cdc7f..f7c7ad3a 100644
---
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
+++
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
@@ -70,6 +70,17 @@ class InteractiveSessionServletSpec extends
BaseInteractiveServletSpec {
when(session.proxyUser).thenReturn(None)
when(session.heartbeatExpired).thenReturn(false)
when(session.ttl).thenReturn(None)
+ when(session.driverMemory).thenReturn(None)
+ when(session.driverCores).thenReturn(None)
+ when(session.executorMemory).thenReturn(None)
+ when(session.executorCores).thenReturn(None)
+ when(session.numExecutors).thenReturn(None)
+ when(session.queue).thenReturn(None)
+ when(session.conf).thenReturn(Map.empty[String, String])
+ when(session.archives).thenReturn(List())
+ when(session.files).thenReturn(List())
+ when(session.jars).thenReturn(List())
+ when(session.pyFiles).thenReturn(List())
when(session.statements).thenAnswer(
new Answer[IndexedSeq[Statement]]() {
override def answer(args: InvocationOnMock): IndexedSeq[Statement] =
statements
@@ -184,6 +195,17 @@ class InteractiveSessionServletSpec extends
BaseInteractiveServletSpec {
when(session.logLines()).thenReturn(log)
when(session.heartbeatExpired).thenReturn(false)
when(session.ttl).thenReturn(None)
+ when(session.driverMemory).thenReturn(None)
+ when(session.driverCores).thenReturn(None)
+ when(session.executorMemory).thenReturn(None)
+ when(session.executorCores).thenReturn(None)
+ when(session.numExecutors).thenReturn(None)
+ when(session.queue).thenReturn(None)
+ when(session.conf).thenReturn(Map.empty[String, String])
+ when(session.archives).thenReturn(List())
+ when(session.files).thenReturn(List())
+ when(session.jars).thenReturn(List())
+ when(session.pyFiles).thenReturn(List())
val req = mock[HttpServletRequest]
diff --git
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index 02aca27a..af66ba95 100644
---
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -278,7 +278,8 @@ class InteractiveSessionSpec extends FunSpec
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
val m = InteractiveRecoveryMetadata(
78, Some("Test session"), None, "appTag", Spark, 0, null, None, None,
- Some(URI.create("")))
+ None, None, None, Map.empty[String, String], List.empty[String],
List.empty[String],
+ List.empty[String], None, List.empty[String], None, None,
Some(URI.create("")))
val s = InteractiveSession.recover(m, conf, sessionStore, None,
Some(mockClient))
s.start()
@@ -295,7 +296,9 @@ class InteractiveSessionSpec extends FunSpec
val mockClient = mock[RSCClient]
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
val m = InteractiveRecoveryMetadata(
- 78, None, None, "appTag", Spark, 0, null, None, None,
Some(URI.create("")))
+ 78, None, None, "appTag", Spark, 0, null, None, None,
+ None, None, None, Map.empty[String, String], List.empty[String],
List.empty[String],
+ List.empty[String], None, List.empty[String], None, None,
Some(URI.create("")))
val s = InteractiveSession.recover(m, conf, sessionStore, None,
Some(mockClient))
s.start()
@@ -310,7 +313,9 @@ class InteractiveSessionSpec extends FunSpec
val conf = new LivyConf()
val sessionStore = mock[SessionStore]
val m = InteractiveRecoveryMetadata(
- 78, None, Some("appId"), "appTag", Spark, 0, null, None, None, None)
+ 78, None, Some("appId"), "appTag", Spark, 0, null, None, None,
+ None, None, None, Map.empty[String, String], List.empty[String],
List.empty[String],
+ List.empty[String], None, List.empty[String], None, None, None)
val s = InteractiveSession.recover(m, conf, sessionStore, None)
s.start()
s.state shouldBe a[SessionState.Dead]