This is an automated email from the ASF dual-hosted git repository.

ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new b089dd68 [LIVY-702] Submit Spark apps to Kubernetes (#451)
b089dd68 is described below

commit b089dd68277f5b97bd939787f05a953694302fba
Author: Asif Khatri <[email protected]>
AuthorDate: Wed Jul 10 14:35:17 2024 +0530

    [LIVY-702] Submit Spark apps to Kubernetes (#451)
    
    This pull request (PR) is the foundational PR for adding Kubernetes support 
in Apache Livy, originally found here (#249). This update includes a newer 
version of the Kubernetes client and adds code to display the Spark UI.
    
    ## Summary of the Proposed Changes
    
    This PR introduces a method to submit Spark applications to a Kubernetes 
cluster. The key points covered include:
    
     * Submitting batch sessions
     * Submitting interactive sessions
     * Monitoring sessions, collecting logs, and gathering diagnostic 
information
     * Restoring session monitoring after restarts
     * Garbage collection (GC) of created Kubernetes resources
    
    JIRA link: https://issues.apache.org/jira/browse/LIVY-702
    
    ## How was this patch tested?
    
     * Unit Tests: The patch has been verified through comprehensive unit tests.
     * Manual Testing: Conducted manual testing using Kubernetes on Docker 
Desktop.
        *  Environment: Helm charts.
    
    For detailed instructions on testing using Helm charts, please refer to the 
documentation available at https://github.com/askhatri/livycluster
    
    Co-authored-by: Asif Khatri <[email protected]>
    Co-authored-by: Alex Sasnouskikh <[email protected]>
---
 .github/workflows/integration-tests.yaml           |   1 +
 .github/workflows/unit-tests.yaml                  |   1 +
 conf/livy.conf.template                            |  55 ++
 pom.xml                                            |  13 +
 .../java/org/apache/livy/rsc/driver/RSCDriver.java |   7 +
 server/pom.xml                                     |   5 +
 .../apache/livy/server/ui/static/css/livy-ui.css   |   6 +
 .../livy/server/ui/static/js/all-sessions.js       |   4 +-
 .../org/apache/livy/server/ui/static/js/livy-ui.js |  27 +-
 .../org/apache/livy/server/ui/static/js/session.js |  14 +-
 .../src/main/scala/org/apache/livy/LivyConf.scala  |  60 ++
 .../scala/org/apache/livy/server/LivyServer.scala  |  12 +-
 .../server/interactive/InteractiveSession.scala    |  10 +-
 .../org/apache/livy/sessions/SessionManager.scala  |   4 +-
 .../scala/org/apache/livy/utils/SparkApp.scala     |  27 +-
 .../org/apache/livy/utils/SparkKubernetesApp.scala | 739 +++++++++++++++++++++
 .../apache/livy/utils/SparkKubernetesAppSpec.scala | 242 +++++++
 17 files changed, 1208 insertions(+), 19 deletions(-)

diff --git a/.github/workflows/integration-tests.yaml 
b/.github/workflows/integration-tests.yaml
index e52df4d8..36162dbb 100644
--- a/.github/workflows/integration-tests.yaml
+++ b/.github/workflows/integration-tests.yaml
@@ -19,6 +19,7 @@ on:
   pull_request:
     types: [opened, reopened, synchronize]
 env:
+  ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
   MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false 
-Dmaven.wagon.http.retryHandler.class=standard 
-Dmaven.wagon.http.retryHandler.count=3
 jobs:
   build:
diff --git a/.github/workflows/unit-tests.yaml 
b/.github/workflows/unit-tests.yaml
index 552642ff..f3b10cc7 100644
--- a/.github/workflows/unit-tests.yaml
+++ b/.github/workflows/unit-tests.yaml
@@ -17,6 +17,7 @@
 name: Unit Tests
 on: [push]
 env:
+  ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
   MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false 
-Dmaven.wagon.http.retryHandler.class=standard 
-Dmaven.wagon.http.retryHandler.count=3
 jobs:
   build:
diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index e99251d0..a5e47a9d 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -200,3 +200,58 @@
 # livy.server.hdfs.safe-mode.interval = 5
 # value specifies max attempts to retry when safe mode is ON in hdfs filesystem
 # livy.server.hdfs.safe-mode.max.retry.attempts = 10
+
+# Manual authentication to KubeApiserver (by default configured with 
Kubernetes ServiceAccount
+# if deployed to Kubernetes cluster as a Pod)
+# Kubernetes oauth token file path
+# livy.server.kubernetes.oauthTokenFile =
+# Kubernetes oauth token string value
+# livy.server.kubernetes.oauthTokenValue =
+# Kubernetes CA cert file path
+# livy.server.kubernetes.caCertFile =
+# Kubernetes client key file path
+# livy.server.kubernetes.clientKeyFile =
+# Kubernetes client cert file path
+# livy.server.kubernetes.clientCertFile =
+
+# If Livy can't find the Kubernetes app within this time, consider it lost.
+# livy.server.kubernetes.app-lookup-timeout = 600s
+# When the cluster is busy, we may fail to launch yarn app in 
app-lookup-timeout, then it would
+# cause session leakage, so we need to check session leakage.
+# How long to check livy session leakage
+# livy.server.kubernetes.app-leakage.check-timeout = 600s
+# How often to check livy session leakage
+# livy.server.kubernetes.app-leakage.check-interval = 60s
+
+# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods 
state, logs, description
+# details, routes, etc...)
+# livy.server.kubernetes.poll-interval = 15s
+
+# Weather to create Kubernetes Nginx Ingress for Spark UI. If set to true, 
configure the desired
+# options below
+# livy.server.kubernetes.ingress.create = false
+# Kubernetes Nginx Ingress protocol. If set to https refer Ingress TLS section 
below
+# livy.server.kubernetes.ingress.protocol = http
+# Kubernetes Nginx Ingress host. Be sure to set it to the FQDN of your Nginx 
Ingress Controller
+# proxy server
+# livy.server.kubernetes.ingress.host = localhost
+# Kubernetes secret name for Nginx Ingress TLS. Is omitted if 
'livy.server.kubernetes.ingress.protocol'
+# is not https
+# livy.server.kubernetes.ingress.tls.secretName = spark-cluster-tls
+# Kubernetes Nginx Ingress additional configuration snippet for specific 
config options
+# livy.server.kubernetes.ingress.additionalConfSnippet =
+# Kubernetes Nginx Ingress additional annotations for specific config options, 
eg. for configuring
+# basic auth of external oauth2 proxy. Format: 
annotation1=value1;annotation2=value2;...
+# livy.server.kubernetes.ingress.additionalAnnotations =
+
+# Set to true to enable Grafana Loki integration and configure options below
+livy.server.kubernetes.grafana.loki.enabled = false
+# Grafana UI root endpoint to build links based on
+# livy.server.kubernetes.grafana.url = http://localhost:3000
+# Grafana Datasource name for Loki
+# livy.server.kubernetes.grafana.loki.datasource = loki
+# Time range from now to past to get logs for
+# livy.server.kubernetes.grafana.timeRange = 6h
+
+# Used to build links to Spark History Server pages on Spark App completion 
(Kubernetes only)
+# livy.ui.history-server-url = http://spark-history-server
diff --git a/pom.xml b/pom.xml
index 23561962..8c5e1050 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@
     <spark.scala-2.11.version>2.4.5</spark.scala-2.11.version>
     <spark.scala-2.12.version>2.4.5</spark.scala-2.12.version>
     <spark.version>${spark.scala-2.11.version}</spark.version>
+    <kubernetes.client.version>5.6.0</kubernetes.client.version>
     <hive.version>3.0.0</hive.version>
     <commons-codec.version>1.9</commons-codec.version>
     <httpclient.version>4.5.13</httpclient.version>
@@ -318,6 +319,18 @@
         <version>${metrics.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>io.fabric8</groupId>
+        <artifactId>kubernetes-client</artifactId>
+        <version>${kubernetes.client.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>*</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
index b93c5cc7..b5b99f62 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
@@ -169,6 +169,13 @@ public class RSCDriver extends BaseProtocol {
     // on the cluster, it would be tricky to solve that problem in a generic 
way.
     livyConf.set(RPC_SERVER_ADDRESS, null);
 
+    // If we are running on Kubernetes, get RPC_SERVER_ADDRESS from 
"spark.driver.host" option
+    // this option is set in class 
org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep:
+    // line 61: val driverHostname = 
s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
+    if (conf.get("spark.master").startsWith("k8s")) {
+      livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host"));
+    }
+
     if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) {
       // Test flag is turned on so we will just infinite loop here. It should 
cause
       // timeout and we should still see yarn application being cleaned up.
diff --git a/server/pom.xml b/server/pom.xml
index c2a8ef4f..f9c296e5 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -84,6 +84,11 @@
       <artifactId>metrics-healthchecks</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>kubernetes-client</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>javax.servlet</groupId>
       <artifactId>javax.servlet-api</artifactId>
diff --git 
a/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css 
b/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css
index fc2ca307..a7df0ec2 100644
--- a/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css
+++ b/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css
@@ -41,6 +41,12 @@ td .progress {
   margin: 0;
 }
 
+.with-scroll-bar {
+  display: block;
+  overflow-y: scroll;
+  max-height: 200px;
+}
+
 #session-summary {
   margin: 20px 0;
 }
\ No newline at end of file
diff --git 
a/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js 
b/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js
index d8a84a76..fd68ff71 100644
--- 
a/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js
+++ 
b/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js
@@ -30,7 +30,7 @@ function loadSessionsTable(sessions) {
         tdWrap(session.proxyUser) +
         tdWrap(session.kind) +
         tdWrap(session.state) +
-        tdWrap(logLinks(session, "session")) +
+        tdWrapWithClass(logLinks(session, "session"), "with-scroll-bar") +
         "</tr>"
     );
   });
@@ -46,7 +46,7 @@ function loadBatchesTable(sessions) {
         tdWrap(session.owner) +
         tdWrap(session.proxyUser) +
         tdWrap(session.state) +
-        tdWrap(logLinks(session, "batch")) +
+        tdWrapWithClass(logLinks(session, "batch"), "with-scroll-bar") +
         "</tr>"
     );
   });
diff --git 
a/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js 
b/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js
index f2d743ae..af935251 100644
--- a/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js
+++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js
@@ -52,10 +52,23 @@ function driverLogLink(session) {
   }
 }
 
+function executorsLogLinks(session) {
+  var executorLogUrls = session.appInfo.executorLogUrls;
+  if (executorLogUrls != null) {
+    return executorLogUrls.split(";").map(function (pair) {
+      var nameAndLink = pair.split("#");
+      return divWrap(anchorLink(nameAndLink[1], nameAndLink[0]));
+    }).join("");
+  } else {
+    return "";
+  }
+}
+
 function logLinks(session, kind) {
   var sessionLog = divWrap(uiLink(kind + "/" + session.id + "/log", 
"session"));
   var driverLog =  divWrap(driverLogLink(session));
-  return sessionLog + driverLog;
+  var executorsLogs = executorsLogLinks(session);
+  return sessionLog + driverLog + executorsLogs;
 }
 
 function appIdLink(session) {
@@ -75,6 +88,18 @@ function tdWrap(val) {
   return "<td>" + inner + "</td>";
 }
 
+function tdWrapWithClass(val, cl) {
+  var inner = "";
+  if (val != null) {
+    inner = val;
+  }
+  var clVal = "";
+  if (cl != null) {
+      clVal = " class=\"" + cl + "\"";
+  }
+  return "<td" + clVal + ">" + inner + "</td>";
+}
+
 function preWrap(inner) {
   return "<pre>" + escapeHtml(inner) + "</pre>";
 }
diff --git 
a/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js 
b/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js
index c87e5ca4..3a23dc98 100644
--- a/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js
+++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js
@@ -23,6 +23,18 @@ function sumWrap(name, val) {
   }
 }
 
+function sumWrapWithClass(name, val, cl) {
+  var clVal = "";
+  if (cl != null) {
+    clVal = " class=\"" + cl + "\"";
+  }
+  if (val != null) {
+    return "<li" + clVal + "><strong>" + name + ": </strong>" + val + "</li>";
+  } else {
+    return "";
+  }
+}
+
 function formatError(output) {
   var errStr = output.evalue + "\n";
   var trace = output.traceback;
@@ -93,7 +105,7 @@ function appendSummary(session) {
       sumWrap("Proxy User", session.proxyUser) +
       sumWrap("Session Kind", session.kind) +
       sumWrap("State", session.state) +
-      sumWrap("Logs", logLinks(session, "session")) +
+      sumWrapWithClass("Logs", logLinks(session, "session"), 
"with-scroll-bar") +
     "</ul>"
   );
 }
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala 
b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 720aa4e1..6bef0974 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -258,6 +258,63 @@ object LivyConf {
   // value specifies max attempts to retry when safe mode is ON in hdfs 
filesystem
   val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = 
Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12)
 
+  // Kubernetes oauth token file path.
+  val KUBERNETES_OAUTH_TOKEN_FILE = 
Entry("livy.server.kubernetes.oauthTokenFile", "")
+  // Kubernetes oauth token string value.
+  val KUBERNETES_OAUTH_TOKEN_VALUE = 
Entry("livy.server.kubernetes.oauthTokenValue", "")
+  // Kubernetes CA cert file path.
+  val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "")
+  // Kubernetes client key file path.
+  val KUBERNETES_CLIENT_KEY_FILE = 
Entry("livy.server.kubernetes.clientKeyFile", "")
+  // Kubernetes client cert file path.
+  val KUBERNETES_CLIENT_CERT_FILE = 
Entry("livy.server.kubernetes.clientCertFile", "")
+
+  // If Livy can't find the Kubernetes app within this time, consider it lost.
+  val KUBERNETES_APP_LOOKUP_TIMEOUT = 
Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
+  // How often Livy polls Kubernetes to refresh Kubernetes app state.
+  val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", 
"15s")
+
+  // How long to check livy session leakage.
+  val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT =
+    Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s")
+  // How often to check livy session leakage.
+  val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL =
+    Entry("livy.server.kubernetes.app-leakage.check-interval", "60s")
+
+  // Weather to create Kubernetes Nginx Ingress for Spark UI.
+  val KUBERNETES_INGRESS_CREATE = 
Entry("livy.server.kubernetes.ingress.create", false)
+  // Kubernetes Ingress class name.
+  val KUBERNETES_INGRESS_CLASS_NAME = 
Entry("livy.server.kubernetes.ingress.className", "")
+  // Kubernetes Nginx Ingress protocol.
+  val KUBERNETES_INGRESS_PROTOCOL = 
Entry("livy.server.kubernetes.ingress.protocol", "http")
+  // Kubernetes Nginx Ingress host.
+  val KUBERNETES_INGRESS_HOST = Entry("livy.server.kubernetes.ingress.host", 
"localhost")
+  // Kubernetes Nginx Ingress additional configuration snippet.
+  val KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET =
+    Entry("livy.server.kubernetes.ingress.additionalConfSnippet", "")
+  // Kubernetes Nginx Ingress additional annotations: 
key1=value1;key2=value2;... .
+  val KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS =
+    Entry("livy.server.kubernetes.ingress.additionalAnnotations", "")
+  // Kubernetes secret name for Nginx Ingress TLS.
+  // Is omitted if 'livy.server.kubernetes.ingress.protocol' value doesn't end 
with 's'
+  val KUBERNETES_INGRESS_TLS_SECRET_NAME =
+    Entry("livy.server.kubernetes.ingress.tls.secretName", "spark-cluster-tls")
+
+  val KUBERNETES_GRAFANA_LOKI_ENABLED = 
Entry("livy.server.kubernetes.grafana.loki.enabled", false)
+  val KUBERNETES_GRAFANA_URL = Entry("livy.server.kubernetes.grafana.url", 
"http://localhost:3000";)
+  val KUBERNETES_GRAFANA_LOKI_DATASOURCE =
+    Entry("livy.server.kubernetes.grafana.loki.datasource", "loki")
+  val KUBERNETES_GRAFANA_TIME_RANGE = 
Entry("livy.server.kubernetes.grafana.timeRange", "6h")
+
+  // side car container for spark pods enabled?
+  val KUBERNETES_SPARK_SIDECAR_ENABLED =
+    Entry("livy.server.kubernetes.spark.sidecar.enabled", true)
+  // container name to identify spark pod if running with sidecar containers
+  val KUBERNETES_SPARK_CONTAINER_NAME =
+    Entry("livy.server.kubernetes.spark.container.name", "spark-container")
+
+  val UI_HISTORY_SERVER_URL = Entry("livy.ui.history-server-url", 
"http://spark-history-server";)
+
   // Whether session timeout should be checked, by default it will be checked, 
which means inactive
   // session will be stopped after "livy.server.session.timeout"
   val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
@@ -371,6 +428,9 @@ class LivyConf(loadDefaults: Boolean) extends 
ClientConf[LivyConf](null) {
   /** Return true if spark master starts with yarn. */
   def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn")
 
+  /** Return true if spark master starts with k8s. */
+  def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s")
+
   /** Return the spark deploy mode Livy sessions should use. */
   def sparkDeployMode(): Option[String] = 
Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)
 
diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala 
b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
index 3e715bdf..c7c7fe75 100644
--- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala
+++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
@@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, 
StateStore, ZooKeeperManag
 import org.apache.livy.server.ui.UIServlet
 import org.apache.livy.sessions.{BatchSessionManager, 
InteractiveSessionManager}
 import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
+import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp}
 import org.apache.livy.utils.LivySparkUtils._
-import org.apache.livy.utils.SparkYarnApp
 
 class LivyServer extends Logging {
 
@@ -142,10 +142,12 @@ class LivyServer extends Logging {
 
     testRecovery(livyConf)
 
-    // Initialize YarnClient ASAP to save time.
+    // Initialize YarnClient/KubernetesClient ASAP to save time.
     if (livyConf.isRunningOnYarn()) {
       SparkYarnApp.init(livyConf)
       Future { SparkYarnApp.yarnClient }
+    } else if (livyConf.isRunningOnKubernetes()) {
+      SparkKubernetesApp.init(livyConf)
     }
 
     if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
@@ -415,10 +417,10 @@ class LivyServer extends Logging {
   }
 
   private[livy] def testRecovery(livyConf: LivyConf): Unit = {
-    if (!livyConf.isRunningOnYarn()) {
-      // If recovery is turned on but we are not running on YARN, quit.
+    if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) {
+      // If recovery is turned on, and we are not running on YARN or 
Kubernetes, quit.
       require(livyConf.get(LivyConf.RECOVERY_MODE) == 
SESSION_RECOVERY_MODE_OFF,
-        "Session recovery requires YARN.")
+        "Session recovery requires YARN or Kubernetes.")
     }
   }
 }
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 34499d34..d8c4a16b 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -462,11 +462,11 @@ class InteractiveSession(
     app = mockApp.orElse {
       val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
         .map(new LineBufferedProcess(_, 
livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
-
-      if (livyConf.isRunningOnYarn() || driverProcess.isDefined) {
-        Some(SparkApp.create(appTag, appId, driverProcess, livyConf, 
Some(this)))
+      if (!livyConf.isRunningOnKubernetes()) {
+        driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, 
livyConf, Some(this)))
       } else {
-        None
+        // Create SparkApp for Kubernetes anyway
+        Some(SparkApp.create(appTag, appId, driverProcess, livyConf, 
Some(this)))
       }
     }
 
@@ -547,6 +547,8 @@ class InteractiveSession(
       transition(SessionState.ShuttingDown)
       sessionStore.remove(RECOVERY_SESSION_TYPE, id)
       client.foreach { _.stop(true) }
+      // We need to call #kill here explicitly to delete Interactive pods from 
the cluster
+      if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill())
     } catch {
       case _: Exception =>
         app.foreach {
diff --git 
a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala 
b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
index b81bbc03..8742f65e 100644
--- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
@@ -192,10 +192,10 @@ class SessionManager[S <: Session, R <: RecoveryMetadata 
: ClassTag](
 
     Future.sequence(all().filter(expired).map { s =>
       s.state match {
-        case st: FinishedSessionState =>
+        case _: FinishedSessionState =>
           info(s"Deleting $s because it finished before 
${sessionStateRetainedInSec / 1e9} secs.")
         case _ =>
-          info(s"Deleting $s because it was inactive or the time to leave the 
period is over.")
+          info(s"Deleting $s because it was inactive for more than 
${sessionTimeout / 1e9} secs.")
       }
       delete(s)
     })
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala 
b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
index 9afe2816..e424f80f 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala
@@ -24,12 +24,20 @@ import org.apache.livy.LivyConf
 object AppInfo {
   val DRIVER_LOG_URL_NAME = "driverLogUrl"
   val SPARK_UI_URL_NAME = "sparkUiUrl"
+  val EXECUTORS_LOG_URLS_NAME = "executorLogUrls"
 }
 
-case class AppInfo(var driverLogUrl: Option[String] = None, var sparkUiUrl: 
Option[String] = None) {
+case class AppInfo(
+  var driverLogUrl: Option[String] = None,
+  var sparkUiUrl: Option[String] = None,
+  var executorLogUrls: Option[String] = None) {
   import AppInfo._
   def asJavaMap: java.util.Map[String, String] =
-    Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, SPARK_UI_URL_NAME -> 
sparkUiUrl.orNull).asJava
+    Map(
+      DRIVER_LOG_URL_NAME -> driverLogUrl.orNull,
+      SPARK_UI_URL_NAME -> sparkUiUrl.orNull,
+      EXECUTORS_LOG_URLS_NAME -> executorLogUrls.orNull
+    ).asJava
 }
 
 trait SparkAppListener {
@@ -71,13 +79,21 @@ object SparkApp {
       sparkConf ++ Map(
         SPARK_YARN_TAG_KEY -> mergedYarnTags,
         "spark.yarn.submit.waitAppCompletion" -> "false")
+    } else if (livyConf.isRunningOnKubernetes()) {
+      import KubernetesConstants._
+      sparkConf ++ Map(
+        s"spark.kubernetes.driver.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag,
+        s"spark.kubernetes.executor.label.$SPARK_APP_TAG_LABEL" -> 
uniqueAppTag,
+        "spark.kubernetes.submission.waitAppCompletion" -> "false",
+        "spark.ui.proxyBase" -> s"/$uniqueAppTag")
     } else {
       sparkConf
     }
   }
 
   /**
-   * Return a SparkApp object to control the underlying Spark application via 
YARN or spark-submit.
+   * Return a SparkApp object to control the underlying Spark application via 
YARN, Kubernetes
+   * or spark-submit.
    *
    * @param uniqueAppTag A tag that can uniquely identify the application.
    */
@@ -89,8 +105,11 @@ object SparkApp {
       listener: Option[SparkAppListener]): SparkApp = {
     if (livyConf.isRunningOnYarn()) {
       new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
+    } else if (livyConf.isRunningOnKubernetes()) {
+      new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf)
     } else {
-      require(process.isDefined, "process must not be None when Livy master is 
not YARN.")
+      require(process.isDefined, "process must not be None when Livy master is 
not YARN or" +
+        "Kubernetes.")
       new SparkProcApp(process.get, listener)
     }
   }
diff --git 
a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala 
b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
new file mode 100644
index 00000000..b5160aaf
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
@@ -0,0 +1,739 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.utils
+
+import java.net.URLEncoder
+import java.util.Collections
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressBuilder}
+import io.fabric8.kubernetes.client.{Config, ConfigBuilder, _}
+import org.apache.commons.lang.StringUtils
+
+import org.apache.livy.{LivyConf, Logging, Utils}
+
+object SparkKubernetesApp extends Logging {
+
+  private val leakedAppTags = new 
java.util.concurrent.ConcurrentHashMap[String, Long]()
+
+  private val leakedAppsGCThread = new Thread() {
+    override def run(): Unit = {
+      import KubernetesExtensions._
+      while (true) {
+        if (!leakedAppTags.isEmpty) {
+          // kill the app if found it and remove it if exceeding a threshold
+          val iter = leakedAppTags.entrySet().iterator()
+          var isRemoved = false
+          val now = System.currentTimeMillis()
+          val apps = withRetry(kubernetesClient.getApplications())
+          while (iter.hasNext) {
+            val entry = iter.next()
+            apps.find(_.getApplicationTag.contains(entry.getKey))
+              .foreach({
+                app =>
+                  info(s"Kill leaked app ${app.getApplicationId}")
+                  withRetry(kubernetesClient.killApplication(app))
+                  iter.remove()
+                  isRemoved = true
+              })
+            if (!isRemoved) {
+              if ((entry.getValue - now) > sessionLeakageCheckTimeout) {
+                iter.remove()
+                info(s"Remove leaked Kubernetes app tag ${entry.getKey}")
+              }
+            }
+          }
+        }
+        Thread.sleep(sessionLeakageCheckInterval)
+      }
+    }
+  }
+
+  val RefreshServiceAccountTokenThread = new Thread() {
+    override def run(): Unit = {
+      while (true) {
+        var currentContext = new Context()
+        var currentContextName = new String
+        val config = kubernetesClient.getConfiguration
+        if (config.getCurrentContext != null) {
+          currentContext = config.getCurrentContext.getContext
+          currentContextName = config.getCurrentContext.getName
+        }
+
+        var newAccessToken = new String
+        val newestConfig = Config.autoConfigure(currentContextName)
+        newAccessToken = newestConfig.getOauthToken
+        info(s"Refresh a new token ${newAccessToken}")
+
+        config.setOauthToken(newAccessToken)
+        kubernetesClient = new DefaultKubernetesClient(config)
+
+        // Token will expire 1 hour default, community recommend to update 
every 5 minutes
+        Thread.sleep(300000)
+      }
+    }
+  }
+
+  private var livyConf: LivyConf = _
+
+  private var cacheLogSize: Int = _
+  private var appLookupTimeout: FiniteDuration = _
+  private var pollInterval: FiniteDuration = _
+
+  private var sessionLeakageCheckTimeout: Long = _
+  private var sessionLeakageCheckInterval: Long = _
+
+  var kubernetesClient: DefaultKubernetesClient = _
+
+  def init(livyConf: LivyConf): Unit = {
+    this.livyConf = livyConf
+
+    // KubernetesClient is thread safe. Create once, share it across threads.
+    kubernetesClient =
+      KubernetesClientFactory.createKubernetesClient(livyConf)
+
+    cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)
+    appLookupTimeout = 
livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds
+    pollInterval = 
livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds
+
+    sessionLeakageCheckInterval =
+      livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL)
+    sessionLeakageCheckTimeout = 
livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT)
+
+    leakedAppsGCThread.setDaemon(true)
+    leakedAppsGCThread.setName("LeakedAppsGCThread")
+    leakedAppsGCThread.start()
+
+    RefreshServiceAccountTokenThread.
+      setName("RefreshServiceAccountTokenThread")
+    RefreshServiceAccountTokenThread.setDaemon(true)
+    RefreshServiceAccountTokenThread.start()
+  }
+
+  // Returning T, throwing the exception on failure
+  // When istio-proxy restarts, the access to K8s API from livy could be down
+  // until envoy comes back, which could take upto 30 seconds
+  @tailrec
+  private def withRetry[T](fn: => T, n: Int = 10, retryBackoff: Long = 3000): 
T = {
+    Try { fn } match {
+      case Success(x) => x
+      case _ if n > 1 =>
+        Thread.sleep(Math.max(retryBackoff, 3000))
+        withRetry(fn, n - 1)
+      case Failure(e) => throw e
+    }
+  }
+
+}
+
+class SparkKubernetesApp private[utils] (
+  appTag: String,
+  appIdOption: Option[String],
+  process: Option[LineBufferedProcess],
+  listener: Option[SparkAppListener],
+  livyConf: LivyConf,
+  kubernetesClient: => KubernetesClient = SparkKubernetesApp.kubernetesClient) 
// For unit test.
+  extends SparkApp
+    with Logging {
+
+  import KubernetesExtensions._
+  import SparkKubernetesApp._
+
+  private val appPromise: Promise[KubernetesApplication] = Promise()
+  private[utils] var state: SparkApp.State = SparkApp.State.STARTING
+  private var kubernetesDiagnostics: IndexedSeq[String] = 
IndexedSeq.empty[String]
+  private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String]
+
+  // Exposed for unit test.
+  // TODO Instead of spawning a thread for every session, create a centralized 
thread and
+  // batch Kubernetes queries.
+  private[utils] val kubernetesAppMonitorThread = Utils
+    .startDaemonThread(s"kubernetesAppMonitorThread-$this") {
+    try {
+      // Get KubernetesApplication by appTag.
+      val app: KubernetesApplication = try {
+        getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
+      } catch {
+        case e: Exception =>
+          appPromise.failure(e)
+          throw e
+      }
+      appPromise.success(app)
+      val appId = app.getApplicationId
+
+      Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId")
+      listener.foreach(_.appIdKnown(appId))
+
+      if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) {
+        withRetry(kubernetesClient.createSparkUIIngress(app, livyConf))
+      }
+
+      var appInfo = AppInfo()
+      while (isRunning) {
+        try {
+          Clock.sleep(pollInterval.toMillis)
+
+          // Refresh application state
+          val appReport = withRetry {
+            debug(s"getApplicationReport, applicationId: 
${app.getApplicationId}, " +
+              s"namespace: ${app.getApplicationNamespace} " +
+              s"applicationTag: ${app.getApplicationTag}")
+            val report = kubernetesClient.getApplicationReport(livyConf, app,
+              cacheLogSize = cacheLogSize)
+            report
+          }
+
+          kubernetesAppLog = appReport.getApplicationLog
+          kubernetesDiagnostics = appReport.getApplicationDiagnostics
+          changeState(mapKubernetesState(appReport.getApplicationState, 
appTag))
+
+          val latestAppInfo = AppInfo(
+            appReport.getDriverLogUrl,
+            appReport.getTrackingUrl,
+            appReport.getExecutorsLogUrls
+          )
+          if (appInfo != latestAppInfo) {
+            listener.foreach(_.infoChanged(latestAppInfo))
+            appInfo = latestAppInfo
+          }
+        } catch {
+          // TODO analyse available exceptions
+          case e: Throwable =>
+            throw e
+        }
+      }
+      debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}")
+    } catch {
+      case _: InterruptedException =>
+        kubernetesDiagnostics = ArrayBuffer("Application stopped by user.")
+        changeState(SparkApp.State.KILLED)
+      case NonFatal(e) =>
+        error(s"Error while refreshing Kubernetes state", e)
+        kubernetesDiagnostics = ArrayBuffer(e.getMessage)
+        changeState(SparkApp.State.FAILED)
+    } finally {
+      if (!isRunning) {
+        listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = 
Option(buildHistoryServerUiUrl(
+          livyConf, 
Try(appPromise.future.value.get.get.getApplicationId).getOrElse("unknown")
+        )))))
+      }
+    }
+  }
+
+  override def log(): IndexedSeq[String] =
+    ("stdout: " +: kubernetesAppLog) ++
+      ("\nstderr: " +: 
(process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++
+        process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++
+      ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics)
+
+  override def kill(): Unit = synchronized {
+    try {
+      
withRetry(kubernetesClient.killApplication(Await.result(appPromise.future, 
appLookupTimeout)))
+    } catch {
+      // We cannot kill the Kubernetes app without the appTag.
+      // There's a chance the Kubernetes app hasn't been submitted during a 
livy-server failure.
+      // We don't want a stuck session that can't be deleted. Emit a warning 
and move on.
+      case _: TimeoutException | _: InterruptedException =>
+        warn("Deleting a session while its Kubernetes application is not 
found.")
+        kubernetesAppMonitorThread.interrupt()
+    } finally {
+      process.foreach(_.destroy())
+    }
+  }
+
+  private def isRunning: Boolean = {
+    state != SparkApp.State.FAILED &&
+      state != SparkApp.State.FINISHED &&
+      state != SparkApp.State.KILLED
+  }
+
+  private def changeState(newState: SparkApp.State.Value): Unit = {
+    if (state != newState) {
+      listener.foreach(_.stateChanged(state, newState))
+      state = newState
+    }
+  }
+
+  /**
+    * Find the corresponding KubernetesApplication from an application tag.
+    *
+    * @param appTag The application tag tagged on the target application.
+    *               If the tag is not unique, it returns the first application 
it found.
+    * @return KubernetesApplication or the failure.
+    */
+  @tailrec
+  private def getAppFromTag(
+    appTag: String,
+    pollInterval: duration.Duration,
+    deadline: Deadline): KubernetesApplication = {
+    import KubernetesExtensions._
+
+    
withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag)))
+    match {
+      case Some(app) => app
+      case None =>
+        if (deadline.isOverdue) {
+          process.foreach(_.destroy())
+          leakedAppTags.put(appTag, System.currentTimeMillis())
+          throw new IllegalStateException(s"No Kubernetes application is found 
with tag" +
+            s" $appTag in 
${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" +
+            " seconds. This may be because 1) spark-submit failed to submit 
application to " +
+            "Kubernetes; or 2) Kubernetes cluster doesn't have enough 
resources to start the " +
+            "application in time. Please check Livy log and Kubernetes log to 
know the details.")
+        } else if (process.exists(p => !p.isAlive && p.exitValue() != 0)) {
+          throw new IllegalStateException(s"Failed to submit Kubernetes 
application with tag" +
+            s" $appTag. 'spark-submit' exited with non-zero status. " +
+            s"Please check Livy log and Kubernetes log to know the details.")
+        } else {
+          Clock.sleep(pollInterval.toMillis)
+          getAppFromTag(appTag, pollInterval, deadline)
+        }
+    }
+  }
+
+  // Exposed for unit test.
+  private[utils] def mapKubernetesState(
+    kubernetesAppState: String,
+    appTag: String
+  ): SparkApp.State.Value = {
+    import KubernetesApplicationState._
+    kubernetesAppState.toLowerCase match {
+      case PENDING | CONTAINER_CREATING =>
+        SparkApp.State.STARTING
+      case RUNNING =>
+        SparkApp.State.RUNNING
+      case COMPLETED | SUCCEEDED =>
+        SparkApp.State.FINISHED
+      case FAILED | ERROR =>
+        SparkApp.State.FAILED
+      case other => // any other combination is invalid, so FAIL the 
application.
+        error(s"Unknown Kubernetes state $other for app with tag $appTag.")
+        SparkApp.State.FAILED
+    }
+  }
+
+  private def buildHistoryServerUiUrl(livyConf: LivyConf, appId: String): 
String =
+    s"${livyConf.get(LivyConf.UI_HISTORY_SERVER_URL)}/history/$appId/jobs/"
+
+}
+
+object KubernetesApplicationState {
+  val PENDING = "pending"
+  val CONTAINER_CREATING = "containercreating"
+  val RUNNING = "running"
+  val COMPLETED = "completed"
+  val SUCCEEDED = "succeeded"
+  val FAILED = "failed"
+  val ERROR = "error"
+}
+
+object KubernetesConstants {
+  val SPARK_APP_ID_LABEL = "spark-app-selector"
+  val SPARK_APP_TAG_LABEL = "spark-app-tag"
+  val SPARK_ROLE_LABEL = "spark-role"
+  val SPARK_EXEC_ID_LABEL = "spark-exec-id"
+  val SPARK_ROLE_DRIVER = "driver"
+  val SPARK_ROLE_EXECUTOR = "executor"
+  val SPARK_UI_PORT_NAME = "spark-ui"
+  val CREATED_BY_LIVY_LABEL = Map("created-by" -> "livy")
+}
+
+class KubernetesApplication(driverPod: Pod) {
+
+  import KubernetesConstants._
+
+  private val appTag = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL)
+  private val appId = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)
+  private val namespace = driverPod.getMetadata.getNamespace
+
+  def getApplicationTag: String = appTag
+
+  def getApplicationId: String = appId
+
+  def getApplicationNamespace: String = namespace
+
+  def getApplicationPod: Pod = driverPod
+}
+
+private[utils] case class KubernetesAppReport(driver: Option[Pod], executors: 
Seq[Pod],
+  appLog: IndexedSeq[String], ingress: Option[Ingress], livyConf: LivyConf) {
+
+  import KubernetesConstants._
+
+  private val grafanaUrl = livyConf.get(LivyConf.KUBERNETES_GRAFANA_URL)
+  private val timeRange = livyConf.get(LivyConf.KUBERNETES_GRAFANA_TIME_RANGE)
+  private val lokiDatasource = 
livyConf.get(LivyConf.KUBERNETES_GRAFANA_LOKI_DATASOURCE)
+  private val sparkAppTagLogLabel = SPARK_APP_TAG_LABEL.replaceAll("-", "_")
+  private val sparkRoleLogLabel = SPARK_ROLE_LABEL.replaceAll("-", "_")
+  private val sparkExecIdLogLabel = SPARK_EXEC_ID_LABEL.replaceAll("-", "_")
+
+  def getApplicationState: String =
+    driver.map(getDriverState).getOrElse("unknown")
+
+  // if 'KUBERNETES_SPARK_SIDECAR_ENABLED' is set
+  // inspect the spark container status to figure out the termination status
+  // if spark container cannot be detected, default to pod phase.
+  def getDriverState(driverPod: Pod): String = {
+    val podStatus = driverPod.getStatus
+    val phase = podStatus.getPhase.toLowerCase
+    // if not running with sidecars, just return the pod phase
+    if (!livyConf.getBoolean(LivyConf.KUBERNETES_SPARK_SIDECAR_ENABLED)) {
+      return phase
+    }
+    if (phase != KubernetesApplicationState.RUNNING) {
+      return phase
+    }
+    // if the POD is still running, check spark container termination status
+    // default to pod phase if container state is indeterminate.
+    getTerminalState(podStatus).getOrElse(phase)
+  }
+
+  // if the spark container has terminated
+  // try to figure out status based on termination status
+  def getTerminalState(podStatus: PodStatus): Option[String] = {
+    import scala.collection.JavaConverters._
+    val sparkContainerName = 
livyConf.get(LivyConf.KUBERNETES_SPARK_CONTAINER_NAME)
+    for (c <- podStatus.getContainerStatuses.asScala) {
+      if (c.getName ==  sparkContainerName && c.getState.getTerminated != 
null) {
+        val exitCode = c.getState.getTerminated.getExitCode
+        if (exitCode == 0) {
+          return Some(KubernetesApplicationState.SUCCEEDED)
+        } else {
+          return Some(KubernetesApplicationState.FAILED)
+        }
+      }
+    }
+    None
+  }
+
+  def getApplicationLog: IndexedSeq[String] = appLog
+
+  def getDriverLogUrl: Option[String] = {
+    if (livyConf.getBoolean(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED)) {
+      val appTag = driver.map(_.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL))
+      if (appTag.isDefined && appTag.get != null) {
+        return Some(
+          s"""$grafanaUrl/explore?left=""" + URLEncoder.encode(
+            s"""["now-$timeRange","now","$lokiDatasource",""" +
+              s"""{"expr":"{$sparkAppTagLogLabel=\\"${appTag.get}\\",""" +
+              s"""$sparkRoleLogLabel=\\"$SPARK_ROLE_DRIVER\\"}"},""" +
+              s"""{"ui":[true,true,true,"exact"]}]""", "UTF-8")
+        )
+      }
+    }
+    None
+  }
+
+  def getExecutorsLogUrls: Option[String] = {
+    if (livyConf.getBoolean(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED)) {
+      val urls = executors.map(_.getMetadata.getLabels).flatMap(labels => {
+        val sparkAppTag = labels.get(SPARK_APP_TAG_LABEL)
+        val sparkExecId = labels.get(SPARK_EXEC_ID_LABEL)
+        if (sparkAppTag != null && sparkExecId != null) {
+          val sparkRole = labels.getOrDefault(SPARK_ROLE_LABEL, 
SPARK_ROLE_EXECUTOR)
+          Some(s"executor-$sparkExecId#$grafanaUrl/explore?left=" + 
URLEncoder.encode(
+            s"""["now-$timeRange","now","$lokiDatasource",""" +
+              s"""{"expr":"{$sparkAppTagLogLabel=\\"$sparkAppTag\\",""" +
+              s"""$sparkRoleLogLabel=\\"$sparkRole\\",""" +
+              s"""$sparkExecIdLogLabel=\\"$sparkExecId\\"}"},""" +
+              s"""{"ui":[true,true,true,"exact"]}]""", "UTF-8"))
+        } else {
+          None
+        }
+      })
+      if (urls.nonEmpty) return Some(urls.mkString(";"))
+    }
+    None
+  }
+
+  def getTrackingUrl: Option[String] = {
+    val host = ingress.flatMap(i => 
Try(i.getSpec.getRules.get(0).getHost).toOption)
+    val path = driver
+      .map(_.getMetadata.getLabels.getOrDefault(SPARK_APP_TAG_LABEL, 
"unknown"))
+    val protocol = livyConf.get(LivyConf.KUBERNETES_INGRESS_PROTOCOL)
+    if (host.isDefined && path.isDefined) 
Some(s"$protocol://${host.get}/${path.get}")
+    else None
+  }
+
+  def getApplicationDiagnostics: IndexedSeq[String] = {
+    (Seq(driver) ++ executors.sortBy(_.getMetadata.getName).map(Some(_)))
+      .filter(_.nonEmpty)
+      .map(opt => buildSparkPodDiagnosticsPrettyString(opt.get))
+      .flatMap(_.split("\n")).toIndexedSeq
+  }
+
+  private def buildSparkPodDiagnosticsPrettyString(pod: Pod): String = {
+    import scala.collection.JavaConverters._
+    def printMap(map: Map[_, _]): String = map.map {
+      case (key, value) => s"$key=$value"
+    }.mkString(", ")
+
+    if (pod == null) return "unknown"
+
+    s"${pod.getMetadata.getName}.${pod.getMetadata.getNamespace}:" +
+      s"\n\tnode: ${pod.getSpec.getNodeName}" +
+      s"\n\thostname: ${pod.getSpec.getHostname}" +
+      s"\n\tpodIp: ${pod.getStatus.getPodIP}" +
+      s"\n\tstartTime: ${pod.getStatus.getStartTime}" +
+      s"\n\tphase: ${pod.getStatus.getPhase}" +
+      s"\n\treason: ${pod.getStatus.getReason}" +
+      s"\n\tmessage: ${pod.getStatus.getMessage}" +
+      s"\n\tlabels: ${printMap(pod.getMetadata.getLabels.asScala.toMap)}" +
+      s"\n\tcontainers:" +
+      s"\n\t\t${
+        pod.getSpec.getContainers.asScala.map(container =>
+          s"${container.getName}:" +
+            s"\n\t\t\timage: ${container.getImage}" +
+            s"\n\t\t\trequests: 
${printMap(container.getResources.getRequests.asScala.toMap)}" +
+            s"\n\t\t\tlimits: 
${printMap(container.getResources.getLimits.asScala.toMap)}" +
+            s"\n\t\t\tcommand: ${container.getCommand} ${container.getArgs}"
+        ).mkString("\n\t\t")
+      }" +
+      s"\n\tconditions:" +
+      s"\n\t\t${pod.getStatus.getConditions.asScala.mkString("\n\t\t")}"
+  }
+
+}
+
+private[utils] object KubernetesExtensions {
+  import KubernetesConstants._
+
+  implicit class KubernetesClientExtensions(client: KubernetesClient) {
+    import scala.collection.JavaConverters._
+
+    private val NGINX_CONFIG_SNIPPET: String =
+      """
+        |proxy_set_header Accept-Encoding "";
+        |sub_filter_last_modified off;
+        |sub_filter_once off;
+        |sub_filter_types text/html text/css text/javascript 
application/javascript;
+      """.stripMargin
+
+    def getApplications(
+      labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER),
+      appTagLabel: String = SPARK_APP_TAG_LABEL,
+      appIdLabel: String = SPARK_APP_ID_LABEL
+    ): Seq[KubernetesApplication] = {
+      client.pods.inAnyNamespace
+        .withLabels(labels.asJava)
+        .withLabel(appTagLabel)
+        .withLabel(appIdLabel)
+        .list.getItems.asScala.map(new KubernetesApplication(_))
+    }
+
+    def killApplication(app: KubernetesApplication): Boolean = {
+      client.pods.inAnyNamespace.delete(app.getApplicationPod)
+    }
+
+    def getApplicationReport(
+      livyConf: LivyConf,
+      app: KubernetesApplication,
+      cacheLogSize: Int,
+      appTagLabel: String = SPARK_APP_TAG_LABEL
+    ): KubernetesAppReport = {
+      val pods = client.pods.inNamespace(app.getApplicationNamespace)
+        .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava)
+        .list.getItems.asScala
+      val driver = pods.find(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == 
SPARK_ROLE_DRIVER)
+      val executors =
+        pods.filter(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == 
SPARK_ROLE_EXECUTOR)
+      val appLog = Try(
+        client.pods.inNamespace(app.getApplicationNamespace)
+          .withName(app.getApplicationPod.getMetadata.getName)
+          .tailingLines(cacheLogSize).getLog.split("\n").toIndexedSeq
+      ).getOrElse(IndexedSeq.empty)
+      val ingress = 
client.network.v1.ingresses.inNamespace(app.getApplicationNamespace)
+        .withLabel(SPARK_APP_TAG_LABEL, app.getApplicationTag)
+        .list.getItems.asScala.headOption
+      KubernetesAppReport(driver, executors, appLog, ingress, livyConf)
+    }
+
+    def createSparkUIIngress(app: KubernetesApplication, livyConf: LivyConf): 
Unit = {
+      val annotationsString = 
livyConf.get(LivyConf.KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS)
+      var annotations: Seq[(String, String)] = Seq.empty
+      if (annotationsString != null && annotationsString.trim.nonEmpty) {
+        annotations = annotationsString
+          .split(";").map(_.split("="))
+          .map(array => array.head -> array.tail.mkString("=")).toSeq
+      }
+
+      val sparkUIIngress = buildSparkUIIngress(
+        app,
+        livyConf.get(LivyConf.KUBERNETES_INGRESS_CLASS_NAME),
+        livyConf.get(LivyConf.KUBERNETES_INGRESS_PROTOCOL),
+        livyConf.get(LivyConf.KUBERNETES_INGRESS_HOST),
+        livyConf.get(LivyConf.KUBERNETES_INGRESS_TLS_SECRET_NAME),
+        livyConf.get(LivyConf.KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET),
+        annotations: _*
+      )
+      val resources: Seq[HasMetadata] = Seq(sparkUIIngress)
+      addOwnerReference(app.getApplicationPod, resources: _*)
+      client.network.v1.ingresses.inNamespace(app.getApplicationNamespace).
+        createOrReplace(sparkUIIngress)
+    }
+
+    private[utils] def buildSparkUIIngress(
+      app: KubernetesApplication, className: String, protocol: String, host: 
String,
+      tlsSecretName: String, additionalConfSnippet: String, 
additionalAnnotations: (String, String)*
+    ): Ingress = {
+      val appTag = app.getApplicationTag
+      val serviceHost = 
s"${getServiceName(app)}.${app.getApplicationNamespace}.svc.cluster.local"
+
+      // Common annotations
+      val annotations = Map(
+        "nginx.ingress.kubernetes.io/rewrite-target" -> "/$1",
+        "nginx.ingress.kubernetes.io/proxy-redirect-to" -> s"/$appTag/",
+        "nginx.ingress.kubernetes.io/proxy-redirect-from" -> 
s"http://$serviceHost/";,
+        "nginx.ingress.kubernetes.io/upstream-vhost" -> s"$serviceHost",
+        "nginx.ingress.kubernetes.io/service-upstream" -> "true",
+        "nginx.ingress.kubernetes.io/x-forwarded-prefix" -> s"/$appTag",
+        "nginx.ingress.kubernetes.io/configuration-snippet" ->
+          NGINX_CONFIG_SNIPPET.concat(additionalConfSnippet)
+      ) ++ additionalAnnotations
+
+      val builder = new IngressBuilder()
+        .withApiVersion("networking.k8s.io/v1")
+        .withNewMetadata()
+        .withName(getServiceName(app))
+        .withNamespace(app.getApplicationNamespace)
+        .addToAnnotations(annotations.asJava)
+        .addToLabels(SPARK_APP_TAG_LABEL, appTag)
+        .addToLabels(CREATED_BY_LIVY_LABEL.asJava)
+        .endMetadata()
+        .withNewSpec()
+        .withIngressClassName(className)
+        .addNewRule()
+        .withHost(host)
+        .withNewHttp()
+        .addNewPath()
+        .withPath(s"/$appTag/?(.*)")
+        .withPathType("ImplementationSpecific")
+        .withNewBackend()
+        .withNewService()
+        .withName(getServiceName(app))
+        .withNewPort()
+        .withName(SPARK_UI_PORT_NAME).endPort()
+        .endService()
+        .endBackend()
+        .endPath()
+        .endHttp()
+        .endRule()
+      if (protocol.endsWith("s") && tlsSecretName != null && 
tlsSecretName.nonEmpty) {
+        
builder.addNewTl().withSecretName(tlsSecretName).addToHosts(host).endTl()
+      }
+      builder.endSpec().build()
+    }
+
+    private def getServiceName(app: KubernetesApplication): String =
+      StringUtils.stripEnd(
+        StringUtils.left(s"${app.getApplicationPod.getMetadata.getName}-svc", 
63), "-"
+      ).toLowerCase
+
+    // Add a OwnerReference to the given resources making the driver pod an 
owner of them so when
+    // the driver pod is deleted, the resources are garbage collected.
+    private def addOwnerReference(owner: Pod, resources: HasMetadata*): Unit = 
{
+      val driverPodOwnerReference = new OwnerReferenceBuilder()
+        .withName(owner.getMetadata.getName)
+        .withApiVersion(owner.getApiVersion)
+        .withUid(owner.getMetadata.getUid)
+        .withKind(owner.getKind)
+        .withController(true)
+        .build()
+      resources.foreach {
+        resource =>
+          val originalMetadata = resource.getMetadata
+          
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
+      }
+    }
+
+  }
+
+}
+
+private[utils] object KubernetesClientFactory {
+  import java.io.File
+  import com.google.common.base.Charsets
+  import com.google.common.io.Files
+
+  private implicit class OptionString(val string: String) extends AnyVal {
+    def toOption: Option[String] = if (string == null || string.isEmpty) None 
else Option(string)
+  }
+
+  def createKubernetesClient(livyConf: LivyConf): DefaultKubernetesClient = {
+    val masterUrl = sparkMasterToKubernetesApi(livyConf.sparkMaster())
+
+    val oauthTokenFile = 
livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE).toOption
+    val oauthTokenValue = 
livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE).toOption
+    require(oauthTokenFile.isEmpty || oauthTokenValue.isEmpty,
+      s"Cannot specify OAuth token through both " +
+        s"a file $oauthTokenFile and a value $oauthTokenValue.")
+
+    val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption
+    val clientKeyFile = 
livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption
+    val clientCertFile = 
livyConf.get(LivyConf.KUBERNETES_CLIENT_CERT_FILE).toOption
+
+    val config = new ConfigBuilder()
+      .withApiVersion("v1")
+      .withMasterUrl(masterUrl)
+      .withOption(oauthTokenValue) {
+        (token, configBuilder) => configBuilder.withOauthToken(token)
+      }
+      .withOption(oauthTokenFile) {
+        (file, configBuilder) =>
+          configBuilder
+            .withOauthToken(Files.toString(new File(file), Charsets.UTF_8))
+      }
+      .withOption(caCertFile) {
+        (file, configBuilder) => configBuilder.withCaCertFile(file)
+      }
+      .withOption(clientKeyFile) {
+        (file, configBuilder) => configBuilder.withClientKeyFile(file)
+      }
+      .withOption(clientCertFile) {
+        (file, configBuilder) => configBuilder.withClientCertFile(file)
+      }
+      .build()
+    new DefaultKubernetesClient(config)
+  }
+
+  def sparkMasterToKubernetesApi(sparkMaster: String): String = {
+    val replaced = sparkMaster.replaceFirst("k8s://", "")
+    if (!replaced.startsWith("http")) s"https://$replaced";
+    else replaced
+  }
+
+  private implicit class OptionConfigurableConfigBuilder(
+    val configBuilder: ConfigBuilder) extends AnyVal {
+    def withOption[T]
+    (option: Option[T])
+      (configurator: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder = {
+      option.map {
+        opt => configurator(opt, configBuilder)
+      }.getOrElse(configBuilder)
+    }
+  }
+
+}
diff --git 
a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala 
b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
new file mode 100644
index 00000000..00257acd
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.utils
+
+import java.util.Objects._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressRule, 
IngressSpec}
+import org.mockito.Mockito.when
+import org.scalatest.FunSpec
+import org.scalatestplus.mockito.MockitoSugar._
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.utils.KubernetesConstants.SPARK_APP_TAG_LABEL
+
+class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite {
+
+  describe("KubernetesAppReport") {
+    import scala.collection.JavaConverters._
+
+    it("should return application state") {
+      val status = 
when(mock[PodStatus].getPhase).thenReturn("Status").getMock[PodStatus]
+      val driver = when(mock[Pod].getStatus).thenReturn(status).getMock[Pod]
+      assertResult("status") {
+        KubernetesAppReport(Option(driver), Seq.empty, IndexedSeq.empty, None, 
new LivyConf(false))
+          .getApplicationState
+      }
+      assertResult("unknown") {
+        KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, new 
LivyConf(false))
+          .getApplicationState
+      }
+    }
+
+    def livyConf(lokiEnabled: Boolean): LivyConf = new LivyConf(false)
+      .set(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED, lokiEnabled)
+
+    def podMockWithLabels(labelMap: Map[String, String]): Pod = {
+      val metaWithLabel = 
when(mock[ObjectMeta].getLabels).thenReturn(labelMap.asJava)
+        .getMock[ObjectMeta]
+      when(mock[Pod].getMetadata).thenReturn(metaWithLabel).getMock[Pod]
+    }
+
+    def driverMock(labelExists: Boolean): Option[Pod] = {
+      val labels = if (labelExists) 
Map(KubernetesConstants.SPARK_APP_TAG_LABEL -> "app_tag")
+      else Map.empty[String, String]
+      Some(podMockWithLabels(labels))
+    }
+
+    it("should return driver log url") {
+
+      def test(labelExists: Boolean, lokiEnabled: Boolean, shouldBeDefined: 
Boolean): Unit =
+        assertResult(shouldBeDefined) {
+          KubernetesAppReport(
+            driverMock(labelExists), Seq.empty, IndexedSeq.empty, None, 
livyConf(lokiEnabled)
+          ).getDriverLogUrl.isDefined
+        }
+
+      test(labelExists = false, lokiEnabled = false, shouldBeDefined = false)
+      test(labelExists = false, lokiEnabled = true, shouldBeDefined = false)
+      test(labelExists = true, lokiEnabled = false, shouldBeDefined = false)
+      test(labelExists = true, lokiEnabled = true, shouldBeDefined = true)
+      assert(KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, 
livyConf(true))
+        .getDriverLogUrl.isEmpty)
+    }
+
+    it("should return executors log urls") {
+      def executorMock(labelsExist: Boolean): Option[Pod] = {
+        val labels = if (labelsExist) {
+          Map(KubernetesConstants.SPARK_APP_TAG_LABEL -> "app_tag",
+            KubernetesConstants.SPARK_EXEC_ID_LABEL -> "exec-1")
+        } else {
+          Map.empty[String, String]
+        }
+        Some(podMockWithLabels(labels))
+      }
+
+      def test(labelExists: Boolean, lokiEnabled: Boolean, shouldBeDefined: 
Boolean): Unit =
+        assertResult(shouldBeDefined) {
+          KubernetesAppReport(
+            None, Seq(executorMock(labelExists).get), IndexedSeq.empty, None, 
livyConf(lokiEnabled)
+          ).getExecutorsLogUrls.isDefined
+        }
+
+      test(labelExists = false, lokiEnabled = false, shouldBeDefined = false)
+      test(labelExists = false, lokiEnabled = true, shouldBeDefined = false)
+      test(labelExists = true, lokiEnabled = false, shouldBeDefined = false)
+      test(labelExists = true, lokiEnabled = true, shouldBeDefined = true)
+      assert(KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, 
livyConf(true))
+        .getExecutorsLogUrls.isEmpty)
+    }
+
+    it("should return driver ingress url") {
+
+      def livyConf(protocol: Option[String]): LivyConf = {
+        val conf = new LivyConf()
+        protocol.map(conf.set(LivyConf.KUBERNETES_INGRESS_PROTOCOL, 
_)).getOrElse(conf)
+      }
+
+      def ingressMock(host: Option[String]): Ingress = {
+        val ingressRules = host.map(h =>
+          
List(when(mock[IngressRule].getHost).thenReturn(h).getMock[IngressRule]))
+          .getOrElse(List.empty).asJava
+        val ingressSpec = when(mock[IngressSpec].getRules)
+          .thenReturn(ingressRules).getMock[IngressSpec]
+        when(mock[Ingress].getSpec).thenReturn(ingressSpec).getMock[Ingress]
+      }
+
+      def test(driver: Option[Pod], ingress: Option[Ingress],
+               protocol: Option[String], shouldBeDefined: Boolean): Unit = {
+        assertResult(shouldBeDefined) {
+          KubernetesAppReport(driver, Seq.empty, IndexedSeq.empty, ingress, 
livyConf(protocol))
+            .getTrackingUrl.isDefined
+        }
+      }
+
+      val hostname = Some("hostname")
+      val protocol = Some("protocol")
+
+      test(None, None, None, shouldBeDefined = false)
+      test(None, None, protocol, shouldBeDefined = false)
+      test(None, Some(ingressMock(None)), None, shouldBeDefined = false)
+      test(None, Some(ingressMock(None)), protocol, shouldBeDefined = false)
+      test(None, Some(ingressMock(hostname)), None, shouldBeDefined = false)
+      test(None, Some(ingressMock(hostname)), protocol, shouldBeDefined = 
false)
+
+      test(driverMock(true), None, None, shouldBeDefined = false)
+      test(driverMock(true), None, protocol, shouldBeDefined = false)
+      test(driverMock(true), Some(ingressMock(None)), None, shouldBeDefined = 
false)
+      test(driverMock(true), Some(ingressMock(None)), protocol, 
shouldBeDefined = false)
+      test(driverMock(true), Some(ingressMock(hostname)), None, 
shouldBeDefined = true)
+      test(driverMock(true), Some(ingressMock(hostname)), protocol, 
shouldBeDefined = true)
+
+      test(driverMock(false), None, None, shouldBeDefined = false)
+      test(driverMock(false), None, protocol, shouldBeDefined = false)
+      test(driverMock(false), Some(ingressMock(None)), None, shouldBeDefined = 
false)
+      test(driverMock(false), Some(ingressMock(None)), protocol, 
shouldBeDefined = false)
+      test(driverMock(false), Some(ingressMock(hostname)), None, 
shouldBeDefined = true)
+      test(driverMock(false), Some(ingressMock(hostname)), protocol, 
shouldBeDefined = true)
+
+      assertResult(s"${protocol.get}://${hostname.get}/app_tag") {
+        KubernetesAppReport(driverMock(true), Seq.empty, IndexedSeq.empty,
+          Some(ingressMock(hostname)), livyConf(protocol)).getTrackingUrl.get
+      }
+      assertResult(s"${protocol.get}://${hostname.get}/unknown") {
+        KubernetesAppReport(driverMock(false), Seq.empty, IndexedSeq.empty,
+          Some(ingressMock(hostname)), livyConf(protocol)).getTrackingUrl.get
+      }
+    }
+
+  }
+
+  describe("KubernetesClientFactory") {
+    it("should build KubernetesApi url from LivyConf masterUrl") {
+      def actual(sparkMaster: String): String =
+        KubernetesClientFactory.sparkMasterToKubernetesApi(sparkMaster)
+
+      val masterUrl = "kubernetes.default.svc:443"
+
+      assertResult(s"https://local";)(actual(s"https://local";))
+      assertResult(s"https://$masterUrl";)(actual(s"k8s://$masterUrl"))
+      assertResult(s"http://$masterUrl";)(actual(s"k8s://http://$masterUrl";))
+      assertResult(s"https://$masterUrl";)(actual(s"k8s://https://$masterUrl";))
+      assertResult(s"http://$masterUrl";)(actual(s"http://$masterUrl";))
+      assertResult(s"https://$masterUrl";)(actual(s"https://$masterUrl";))
+    }
+
+    it("should create KubernetesClient with default configs") {
+      assert(nonNull(KubernetesClientFactory.createKubernetesClient(new 
LivyConf(false))))
+    }
+
+    it("should throw IllegalArgumentException in both oauth file and token 
provided") {
+      val conf = new LivyConf(false)
+        .set(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE, "dummy_path")
+        .set(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE, "dummy_value")
+      intercept[IllegalArgumentException] {
+        KubernetesClientFactory.createKubernetesClient(conf)
+      }
+    }
+  }
+
+  describe("KubernetesClientExtensions") {
+    it("should build an ingress from the supplied KubernetesApplication") {
+      def test(app: KubernetesApplication, expectedAnnotations: Map[String, 
String]): Unit = {
+        import scala.collection.JavaConverters._
+        val livyConf = new LivyConf(false)
+        val client = KubernetesClientFactory.createKubernetesClient(livyConf)
+        val clientExtensions = 
KubernetesExtensions.KubernetesClientExtensions(client)
+        val ingress = clientExtensions.buildSparkUIIngress(app, 
"ingress-class", "https",
+          "cluster.example.com", "tlsSecret", "")
+        val diff = expectedAnnotations.toSet diff 
ingress.getMetadata.getAnnotations.asScala.toSet
+        assert(ingress.getMetadata.getName == 
s"${app.getApplicationPod.getMetadata.getName}-svc")
+        assert(diff.isEmpty)
+      }
+
+      def mockPod(name: String, namespace: String, tag: String): Pod = {
+        new 
PodBuilder().withNewMetadata().withName(name).withNamespace(namespace).
+          addToLabels(SPARK_APP_TAG_LABEL, 
tag).endMetadata().withNewSpec().endSpec().build()
+      }
+
+      def app(name: String, namespace: String, tag: String): 
KubernetesApplication = {
+        new KubernetesApplication(mockPod(name, namespace, tag))
+      }
+
+      test(app("app1", "ns-1", "tag-1"), Map(
+        "nginx.ingress.kubernetes.io/rewrite-target" -> "/$1",
+        "nginx.ingress.kubernetes.io/proxy-redirect-to" -> s"/tag-1/",
+        "nginx.ingress.kubernetes.io/x-forwarded-prefix" -> s"/tag-1",
+        "nginx.ingress.kubernetes.io/proxy-redirect-from" ->
+          s"http://app1-svc.ns-1.svc.cluster.local/";,
+        "nginx.ingress.kubernetes.io/upstream-vhost" ->
+          s"app1-svc.ns-1.svc.cluster.local",
+        "nginx.ingress.kubernetes.io/service-upstream" -> "true"
+      ))
+
+      test(app("app2", "ns-2", "tag-2"), Map(
+        "nginx.ingress.kubernetes.io/rewrite-target" -> "/$1",
+        "nginx.ingress.kubernetes.io/proxy-redirect-to" -> s"/tag-2/",
+        "nginx.ingress.kubernetes.io/x-forwarded-prefix" -> s"/tag-2",
+        "nginx.ingress.kubernetes.io/proxy-redirect-from" ->
+          s"http://app2-svc.ns-2.svc.cluster.local/";,
+        "nginx.ingress.kubernetes.io/upstream-vhost" ->
+          s"app2-svc.ns-2.svc.cluster.local",
+        "nginx.ingress.kubernetes.io/service-upstream" -> "true"
+      ))
+    }
+  }
+
+}

Reply via email to