http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/LivyConf.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala deleted file mode 100644 index e4eb118..0000000 --- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy - -import java.io.File -import java.lang.{Boolean => JBoolean, Long => JLong} -import java.util.{Map => JMap} - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration - -import com.cloudera.livy.client.common.ClientConf -import com.cloudera.livy.client.common.ClientConf.ConfEntry -import com.cloudera.livy.client.common.ClientConf.DeprecatedConf - -object LivyConf { - - case class Entry(override val key: String, override val dflt: AnyRef) extends ConfEntry - - object Entry { - def apply(key: String, dflt: Boolean): Entry = Entry(key, dflt: JBoolean) - def apply(key: String, dflt: Int): Entry = Entry(key, dflt: Integer) - def apply(key: String, dflt: Long): Entry = Entry(key, dflt: JLong) - } - - val TEST_MODE = ClientConf.TEST_MODE - - val SPARK_HOME = Entry("livy.server.spark-home", null) - val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local") - val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null) - - // Two configurations to specify Spark and related Scala version. These are internal - // configurations will be set by LivyServer and used in session creation. It is not required to - // set usually unless running with unofficial Spark + Scala versions - // (like Spark 2.0 + Scala 2.10, Spark 1.6 + Scala 2.11) - val LIVY_SPARK_SCALA_VERSION = Entry("livy.spark.scala-version", null) - val LIVY_SPARK_VERSION = Entry("livy.spark.version", null) - - val SESSION_STAGING_DIR = Entry("livy.session.staging-dir", null) - val FILE_UPLOAD_MAX_SIZE = Entry("livy.file.upload.max.size", 100L * 1024 * 1024) - val LOCAL_FS_WHITELIST = Entry("livy.file.local-dir-whitelist", null) - val ENABLE_HIVE_CONTEXT = Entry("livy.repl.enable-hive-context", false) - - val ENVIRONMENT = Entry("livy.environment", "production") - - val SERVER_HOST = Entry("livy.server.host", "0.0.0.0") - val SERVER_PORT = Entry("livy.server.port", 8998) - - val UI_ENABLED = Entry("livy.ui.enabled", true) - - val REQUEST_HEADER_SIZE = Entry("livy.server.request-header.size", 131072) - val RESPONSE_HEADER_SIZE = Entry("livy.server.response-header.size", 131072) - - val CSRF_PROTECTION = Entry("livy.server.csrf-protection.enabled", false) - - val IMPERSONATION_ENABLED = Entry("livy.impersonation.enabled", false) - val SUPERUSERS = Entry("livy.superusers", null) - - val ACCESS_CONTROL_ENABLED = Entry("livy.server.access-control.enabled", false) - val ACCESS_CONTROL_USERS = Entry("livy.server.access-control.users", null) - - val SSL_KEYSTORE = Entry("livy.keystore", null) - val SSL_KEYSTORE_PASSWORD = Entry("livy.keystore.password", null) - val SSL_KEY_PASSWORD = Entry("livy.key-password", null) - - val AUTH_TYPE = Entry("livy.server.auth.type", null) - val AUTH_KERBEROS_PRINCIPAL = Entry("livy.server.auth.kerberos.principal", null) - val AUTH_KERBEROS_KEYTAB = Entry("livy.server.auth.kerberos.keytab", null) - val AUTH_KERBEROS_NAME_RULES = Entry("livy.server.auth.kerberos.name-rules", "DEFAULT") - - val HEARTBEAT_WATCHDOG_INTERVAL = Entry("livy.server.heartbeat-watchdog.interval", "1m") - - val LAUNCH_KERBEROS_PRINCIPAL = Entry("livy.server.launch.kerberos.principal", null) - val LAUNCH_KERBEROS_KEYTAB = Entry("livy.server.launch.kerberos.keytab", null) - val LAUNCH_KERBEROS_REFRESH_INTERVAL = Entry("livy.server.launch.kerberos.refresh-interval", "1h") - val KINIT_FAIL_THRESHOLD = Entry("livy.server.launch.kerberos.kinit-fail-threshold", 5) - - /** - * Recovery mode of Livy. Possible values: - * off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions. - * recovery: Livy persists session info to the state store. When Livy restarts, it recovers - * previous sessions from the state store. - * Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to - * configure the state store. - */ - val RECOVERY_MODE = Entry("livy.server.recovery.mode", "off") - /** - * Where Livy should store state to for recovery. Possible values: - * <empty>: Default. State store disabled. - * filesystem: Store state on a file system. - * zookeeper: Store state in a Zookeeper instance. - */ - val RECOVERY_STATE_STORE = Entry("livy.server.recovery.state-store", null) - /** - * For filesystem state store, the path of the state store directory. Please don't use a - * filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///. - * For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2 - */ - val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "") - - // If Livy can't find the yarn app within this time, consider it lost. - val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "60s") - - // How often Livy polls YARN to refresh YARN app state. - val YARN_POLL_INTERVAL = Entry("livy.server.yarn.poll-interval", "5s") - - // Days to keep Livy server request logs. - val REQUEST_LOG_RETAIN_DAYS = Entry("livy.server.request-log-retain.days", 5) - - // REPL related jars separated with comma. - val REPL_JARS = Entry("livy.repl.jars", null) - // RSC related jars separated with comma. - val RSC_JARS = Entry("livy.rsc.jars", null) - - // How long to check livy session leakage - val YARN_APP_LEAKAGE_CHECK_TIMEOUT = Entry("livy.server.yarn.app-leakage.check-timeout", "600s") - // how often to check livy session leakage - val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s") - - // Whether session timeout should be checked, by default it will be checked, which means inactive - // session will be stopped after "livy.server.session.timeout" - val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) - // How long will an inactive session be gc-ed. - val SESSION_TIMEOUT = Entry("livy.server.session.timeout", "1h") - // How long a finished session state will be kept in memory - val SESSION_STATE_RETAIN_TIME = Entry("livy.server.session.state-retain.sec", "600s") - - val SPARK_MASTER = "spark.master" - val SPARK_DEPLOY_MODE = "spark.submit.deployMode" - val SPARK_JARS = "spark.jars" - val SPARK_FILES = "spark.files" - val SPARK_ARCHIVES = "spark.yarn.dist.archives" - val SPARK_PY_FILES = "spark.submit.pyFiles" - - /** - * These are Spark configurations that contain lists of files that the user can add to - * their jobs in one way or another. Livy needs to pre-process these to make sure the - * user can read them (in case they reference local files), and to provide correct URIs - * to Spark based on the Livy config. - * - * The configuration allows adding new configurations in case we either forget something in - * the hardcoded list, or new versions of Spark add new configs. - */ - val SPARK_FILE_LISTS = Entry("livy.spark.file-list-configs", null) - - private val HARDCODED_SPARK_FILE_LISTS = Seq( - SPARK_JARS, - SPARK_FILES, - SPARK_ARCHIVES, - SPARK_PY_FILES, - "spark.yarn.archive", - "spark.yarn.dist.files", - "spark.yarn.dist.jars", - "spark.yarn.jar", - "spark.yarn.jars" - ) - - case class DepConf( - override val key: String, - override val version: String, - override val deprecationMessage: String = "") - extends DeprecatedConf - - private val configsWithAlternatives: Map[String, DeprecatedConf] = Map[String, DepConf]( - LIVY_SPARK_DEPLOY_MODE.key -> DepConf("livy.spark.deployMode", "0.4"), - LIVY_SPARK_SCALA_VERSION.key -> DepConf("livy.spark.scalaVersion", "0.4"), - ENABLE_HIVE_CONTEXT.key -> DepConf("livy.repl.enableHiveContext", "0.4"), - CSRF_PROTECTION.key -> DepConf("livy.server.csrf_protection.enabled", "0.4"), - ACCESS_CONTROL_ENABLED.key -> DepConf("livy.server.access_control.enabled", "0.4"), - ACCESS_CONTROL_USERS.key -> DepConf("livy.server.access_control.users", "0.4"), - AUTH_KERBEROS_NAME_RULES.key -> DepConf("livy.server.auth.kerberos.name_rules", "0.4"), - LAUNCH_KERBEROS_REFRESH_INTERVAL.key -> - DepConf("livy.server.launch.kerberos.refresh_interval", "0.4"), - KINIT_FAIL_THRESHOLD.key -> DepConf("livy.server.launch.kerberos.kinit_fail_threshold", "0.4"), - YARN_APP_LEAKAGE_CHECK_TIMEOUT.key -> - DepConf("livy.server.yarn.app-leakage.check_timeout", "0.4"), - YARN_APP_LEAKAGE_CHECK_INTERVAL.key -> - DepConf("livy.server.yarn.app-leakage.check_interval", "0.4") - ) - - private val deprecatedConfigs: Map[String, DeprecatedConf] = { - val configs: Seq[DepConf] = Seq( - // There are no deprecated configs without alternatives currently. - ) - - Map(configs.map { cfg => (cfg.key -> cfg) }: _*) - } - -} - -/** - * - * @param loadDefaults whether to also load values from the Java system properties - */ -class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { - - import LivyConf._ - - private lazy val _superusers = configToSeq(SUPERUSERS) - private lazy val _allowedUsers = configToSeq(ACCESS_CONTROL_USERS).toSet - - lazy val hadoopConf = new Configuration() - lazy val localFsWhitelist = configToSeq(LOCAL_FS_WHITELIST).map { path => - // Make sure the path ends with a single separator. - path.stripSuffix("/") + "/" - } - - lazy val sparkFileLists = HARDCODED_SPARK_FILE_LISTS ++ configToSeq(SPARK_FILE_LISTS) - - /** - * Create a LivyConf that loads defaults from the system properties and the classpath. - * @return - */ - def this() = this(true) - - if (loadDefaults) { - loadFromMap(sys.props) - } - - def loadFromFile(name: String): LivyConf = { - getConfigFile(name) - .map(Utils.getPropertiesFromFile) - .foreach(loadFromMap) - this - } - - /** Return true if spark master starts with yarn. */ - def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn") - - /** Return the spark deploy mode Livy sessions should use. */ - def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) - - /** Return the location of the spark home directory */ - def sparkHome(): Option[String] = Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME")) - - /** Return the spark master Livy sessions should use. */ - def sparkMaster(): String = get(LIVY_SPARK_MASTER) - - /** Return the path to the spark-submit executable. */ - def sparkSubmit(): String = { - sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get - } - - /** Return the list of superusers. */ - def superusers(): Seq[String] = _superusers - - /** Return the set of users allowed to use Livy via SPNEGO. */ - def allowedUsers(): Set[String] = _allowedUsers - - private val configDir: Option[File] = { - sys.env.get("LIVY_CONF_DIR") - .orElse(sys.env.get("LIVY_HOME").map(path => s"$path${File.separator}conf")) - .map(new File(_)) - .filter(_.exists()) - } - - private def getConfigFile(name: String): Option[File] = { - configDir.map(new File(_, name)).filter(_.exists()) - } - - private def loadFromMap(map: Iterable[(String, String)]): Unit = { - map.foreach { case (k, v) => - if (k.startsWith("livy.")) { - set(k, v) - } - } - } - - private def configToSeq(entry: LivyConf.Entry): Seq[String] = { - Option(get(entry)).map(_.split("[, ]+").toSeq).getOrElse(Nil) - } - - override def getConfigsWithAlternatives: JMap[String, DeprecatedConf] = { - configsWithAlternatives.asJava - } - - override def getDeprecatedConfigs: JMap[String, DeprecatedConf] = { - deprecatedConfigs.asJava - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/package.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/package.scala b/server/src/main/scala/com/cloudera/livy/package.scala deleted file mode 100644 index c1661a9..0000000 --- a/server/src/main/scala/com/cloudera/livy/package.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera - -import java.util.Properties - -import scala.util.control.NonFatal - -package object livy { - - private object LivyBuildInfo { - val ( - livyVersion: String, - livyBuildUser: String, - livyRevision: String, - livyBranch: String, - livyBuildDate: String, - livyRepo: String - ) = { - val unknown = "<unknown>" - val defaultValue = (unknown, unknown, unknown, unknown, unknown, unknown) - val resource = Option(Thread.currentThread().getContextClassLoader - .getResourceAsStream("livy-version-info.properties")) - - try { - resource.map { r => - val properties = new Properties() - properties.load(r) - ( - properties.getProperty("version", unknown), - properties.getProperty("user", unknown), - properties.getProperty("revision", unknown), - properties.getProperty("branch", unknown), - properties.getProperty("date", unknown), - properties.getProperty("url", unknown) - ) - }.getOrElse(defaultValue) - } catch { - case NonFatal(e) => - // swallow the exception - defaultValue - } finally { - try { - resource.foreach(_.close()) - } catch { - case NonFatal(e) => // swallow the exception in closing the stream - } - } - } - } - - val LIVY_VERSION = LivyBuildInfo.livyVersion - val LIVY_BUILD_USER = LivyBuildInfo.livyBuildUser - val LIVY_REVISION = LivyBuildInfo.livyRevision - val LIVY_BRANCH = LivyBuildInfo.livyBranch - val LIVY_BUILD_DATE = LivyBuildInfo.livyBuildDate - val LIVY_REPO_URL = LivyBuildInfo.livyRepo -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/AccessFilter.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/AccessFilter.scala b/server/src/main/scala/com/cloudera/livy/server/AccessFilter.scala deleted file mode 100644 index 1138cc6..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/AccessFilter.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server - -import javax.servlet._ -import javax.servlet.http.{HttpServletRequest, HttpServletResponse} - -import com.cloudera.livy.LivyConf - -class AccessFilter(livyConf: LivyConf) extends Filter { - - override def init(filterConfig: FilterConfig): Unit = {} - - override def doFilter(request: ServletRequest, - response: ServletResponse, - chain: FilterChain): Unit = { - val httpRequest = request.asInstanceOf[HttpServletRequest] - val remoteUser = httpRequest.getRemoteUser - if (livyConf.allowedUsers.contains(remoteUser)) { - chain.doFilter(request, response) - } else { - val httpServletResponse = response.asInstanceOf[HttpServletResponse] - httpServletResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, - "User not authorised to use Livy.") - } - } - - override def destroy(): Unit = {} -} - http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/ApiVersioningSupport.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/ApiVersioningSupport.scala b/server/src/main/scala/com/cloudera/livy/server/ApiVersioningSupport.scala deleted file mode 100644 index ff2f05f..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/ApiVersioningSupport.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server - -import javax.servlet.http.HttpServletRequest - -import org.scalatra.{NotAcceptable, ScalatraBase} - -/** - * Livy's servlets can mix-in this trait to get API version support. - * - * Example: {{{ - * import ApiVersions._ - * class FooServlet - * ... - * with ApiVersioningSupport - * ... - * { - * get("/test") { - * ... - * } - * get("/test", apiVersion <= v0_2) { - * ... - * } - * get("/test", apiVersion <= v0_1) { - * ... - * } - * } - * }}} - */ -trait ApiVersioningSupport extends AbstractApiVersioningSupport { - this: ScalatraBase => - // Link the abstract trait to Livy's version enum. - override val apiVersions = ApiVersions - override type ApiVersionType = ApiVersions.Value -} - -trait AbstractApiVersioningSupport { - this: ScalatraBase => - protected val apiVersions: Enumeration - protected type ApiVersionType - - /** - * Before proceeding with routing, validate the specified API version in the request. - * If validation passes, cache the parsed API version as a per-request attribute. - */ - before() { - request(AbstractApiVersioningSupport.ApiVersionKey) = request.getHeader("Accept") match { - case acceptHeader @ AbstractApiVersioningSupport.AcceptHeaderRegex(apiVersion) => - try { - apiVersions.withName(apiVersion).asInstanceOf[ApiVersionType] - } catch { - case e: NoSuchElementException => - halt(NotAcceptable(e.getMessage)) - } - case _ => - // Return the latest version. - apiVersions.apply(apiVersions.maxId - 1).asInstanceOf[ApiVersionType] - } - } - - /** - * @return The specified API version in the request. - */ - def apiVersion: ApiVersionType = { - request(AbstractApiVersioningSupport.ApiVersionKey).asInstanceOf[ApiVersionType] - } - -} - -object AbstractApiVersioningSupport { - // Get every character after "application/vnd.livy.v" until hitting a + sign. - private final val AcceptHeaderRegex = """application/vnd\.livy\.v([^\+]*).*""".r - - // AbstractApiVersioningSupport uses a per-request attribute to store the parsed API version. - // This is the key name for the attribute. - private final val ApiVersionKey = "apiVersion" -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/ApiVersions.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/ApiVersions.scala b/server/src/main/scala/com/cloudera/livy/server/ApiVersions.scala deleted file mode 100644 index 34275a4..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/ApiVersions.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server - -/** - * This enum defines Livy's API versions. - * [[com.cloudera.livy.server.AbstractApiVersioningSupport]] uses this for API version checking. - * - * Version is defined as <major version>.<minor version>. - * When making backward compatible change (adding methods/fields), bump minor version. - * When making backward incompatible change (renaming/removing methods/fields), bump major version. - * This ensures our users can safely migrate to a newer API version if major version is unchanged. - */ -object ApiVersions extends Enumeration { - type ApiVersions = Value - // ApiVersions are ordered and the ordering is defined by the order of Value() calls. - // Please make sure API version is defined in ascending order (Older API before newer). - // AbstractApiVersioningSupport relies on the ordering. - val v0_1 = Value("0.1") -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/CsrfFilter.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/CsrfFilter.scala b/server/src/main/scala/com/cloudera/livy/server/CsrfFilter.scala deleted file mode 100644 index d2aa011..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/CsrfFilter.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package com.cloudera.livy.server - -import javax.servlet._ -import javax.servlet.http.{HttpServletRequest, HttpServletResponse} - - -class CsrfFilter extends Filter { - - val METHODS_TO_IGNORE = Set("GET", "OPTIONS", "HEAD"); - - val HEADER_NAME = "X-Requested-By"; - - override def init(filterConfig: FilterConfig): Unit = {} - - override def doFilter(request: ServletRequest, - response: ServletResponse, - chain: FilterChain): Unit = { - val httpRequest = request.asInstanceOf[HttpServletRequest] - - if (!METHODS_TO_IGNORE.contains(httpRequest.getMethod) - && httpRequest.getHeader(HEADER_NAME) == null) { - response.asInstanceOf[HttpServletResponse].sendError(HttpServletResponse.SC_BAD_REQUEST, - "Missing Required Header for CSRF protection.") - } else { - chain.doFilter(request, response) - } - } - - override def destroy(): Unit = {} -} - http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/JsonServlet.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/JsonServlet.scala b/server/src/main/scala/com/cloudera/livy/server/JsonServlet.scala deleted file mode 100644 index 7af6cce..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/JsonServlet.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server - -import javax.servlet.http.{HttpServletRequest, HttpServletResponse} - -import scala.concurrent.ExecutionContext -import scala.reflect.ClassTag - -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.databind.{JsonMappingException, ObjectMapper} -import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException -import org.scalatra._ - -/** - * An abstract servlet that provides overridden implementations for "post", "put" and "patch" - * that can deserialize JSON data directly into user-defined types, without having to go through - * a json4s intermediate. Results are also automatically serialized into JSON if the content type - * says so. - * - * Serialization and deserialization are done through Jackson directly, so all Jackson features - * are available. - */ -abstract class JsonServlet extends ScalatraServlet with ApiFormats with FutureSupport { - - override protected implicit def executor: ExecutionContext = ExecutionContext.global - - private lazy val _defaultMapper = new ObjectMapper() - .registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule) - - /** - * Override this method if you need a custom Jackson object mapper; the default mapper - * has the default configuration, plus the Scala module. - */ - protected def createMapper(): ObjectMapper = _defaultMapper - - protected final val mapper = createMapper() - - before() { - contentType = formats("json") - } - - error { - case e: JsonParseException => BadRequest(e.getMessage) - case e: UnrecognizedPropertyException => BadRequest(e.getMessage) - case e: JsonMappingException => BadRequest(e.getMessage) - case e => - SessionServlet.error("internal error", e) - InternalServerError(e.toString) - } - - protected def jpatch[T: ClassTag](t: RouteTransformer*)(action: T => Any): Route = { - patch(t: _*) { - doAction(request, action) - } - } - - protected def jpost[T: ClassTag](t: RouteTransformer*)(action: T => Any): Route = { - post(t: _*) { - doAction(request, action) - } - } - - protected def jput[T: ClassTag](t: RouteTransformer*)(action: T => Any): Route = { - put(t: _*) { - doAction(request, action) - } - } - - override protected def renderResponseBody(actionResult: Any): Unit = { - val result = actionResult match { - case ActionResult(status, body, headers) if format == "json" => - ActionResult(status, toJson(body), headers) - case str: String if format == "json" => - // This should be changed when we implement LIVY-54. For now, just create a dummy - // JSON object when a raw string is being returned. - toJson(Map("msg" -> str)) - case other if format == "json" => - toJson(other) - case other => - other - } - super.renderResponseBody(result) - } - - protected def bodyAs[T: ClassTag](req: HttpServletRequest) - (implicit klass: ClassTag[T]): T = { - bodyAs(req, klass.runtimeClass) - } - - private def bodyAs[T](req: HttpServletRequest, klass: Class[_]): T = { - mapper.readValue(req.getInputStream(), klass).asInstanceOf[T] - } - - private def doAction[T: ClassTag]( - req: HttpServletRequest, - action: T => Any)(implicit klass: ClassTag[T]): Any = { - action(bodyAs[T](req, klass.runtimeClass)) - } - - private def isJson(res: HttpServletResponse, headers: Map[String, String] = Map()): Boolean = { - val ctypeHeader = "Content-Type" - headers.get(ctypeHeader).orElse(Option(res.getHeader(ctypeHeader))) - .map(_.startsWith("application/json")).getOrElse(false) - } - - private def toResult(obj: Any, res: HttpServletResponse): Any = obj match { - case async: AsyncResult => - new AsyncResult { - val is = async.is.map(toResult(_, res)) - } - case ActionResult(status, body, headers) if isJson(res, headers) => - ActionResult(status, toJson(body), headers) - case body if isJson(res) => - Ok(toJson(body)) - case other => - other - } - - private def toJson(obj: Any): Any = { - if (obj != null && obj != ()) { - mapper.writeValueAsBytes(obj) - } else { - null - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala b/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala deleted file mode 100644 index 54f00ca..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala +++ /dev/null @@ -1,348 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server - -import java.io.{BufferedInputStream, InputStream} -import java.util.concurrent._ -import java.util.EnumSet -import javax.servlet._ - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future - -import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation} -import org.apache.hadoop.security.authentication.server._ -import org.eclipse.jetty.servlet.FilterHolder -import org.scalatra.{NotFound, ScalatraServlet} -import org.scalatra.metrics.MetricsBootstrap -import org.scalatra.metrics.MetricsSupportExtensions._ -import org.scalatra.servlet.{MultipartConfig, ServletApiImplicits} - -import com.cloudera.livy._ -import com.cloudera.livy.server.batch.BatchSessionServlet -import com.cloudera.livy.server.interactive.InteractiveSessionServlet -import com.cloudera.livy.server.recovery.{SessionStore, StateStore} -import com.cloudera.livy.server.ui.UIServlet -import com.cloudera.livy.sessions.{BatchSessionManager, InteractiveSessionManager} -import com.cloudera.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF -import com.cloudera.livy.utils.LivySparkUtils._ -import com.cloudera.livy.utils.SparkYarnApp - -class LivyServer extends Logging { - - import LivyConf._ - - private var server: WebServer = _ - private var _serverUrl: Option[String] = None - // make livyConf accessible for testing - private[livy] var livyConf: LivyConf = _ - - private var kinitFailCount: Int = 0 - private var executor: ScheduledExecutorService = _ - - def start(): Unit = { - livyConf = new LivyConf().loadFromFile("livy.conf") - - val host = livyConf.get(SERVER_HOST) - val port = livyConf.getInt(SERVER_PORT) - val multipartConfig = MultipartConfig( - maxFileSize = Some(livyConf.getLong(LivyConf.FILE_UPLOAD_MAX_SIZE)) - ).toMultipartConfigElement - - // Make sure the `spark-submit` program exists, otherwise much of livy won't work. - testSparkHome(livyConf) - - // Test spark-submit and get Spark Scala version accordingly. - val (sparkVersion, scalaVersionFromSparkSubmit) = sparkSubmitVersion(livyConf) - testSparkVersion(sparkVersion) - - // If Spark and Scala version is set manually, should verify if they're consistent with - // ones parsed from "spark-submit --version" - val formattedSparkVersion = formatSparkVersion(sparkVersion) - Option(livyConf.get(LIVY_SPARK_VERSION)).map(formatSparkVersion).foreach { version => - require(formattedSparkVersion == version, - s"Configured Spark version $version is not equal to Spark version $formattedSparkVersion " + - "got from spark-submit -version") - } - - // Set formatted Spark and Scala version into livy configuration, this will be used by - // session creation. - // TODO Create a new class to pass variables from LivyServer to sessions and remove these - // internal LivyConfs. - livyConf.set(LIVY_SPARK_VERSION.key, formattedSparkVersion.productIterator.mkString(".")) - livyConf.set(LIVY_SPARK_SCALA_VERSION.key, - sparkScalaVersion(formattedSparkVersion, scalaVersionFromSparkSubmit, livyConf)) - - if (UserGroupInformation.isSecurityEnabled) { - // If Hadoop security is enabled, run kinit periodically. runKinit() should be called - // before any Hadoop operation, otherwise Kerberos exception will be thrown. - executor = Executors.newScheduledThreadPool(1, - new ThreadFactory() { - override def newThread(r: Runnable): Thread = { - val thread = new Thread(r) - thread.setName("kinit-thread") - thread.setDaemon(true) - thread - } - } - ) - val launch_keytab = livyConf.get(LAUNCH_KERBEROS_KEYTAB) - val launch_principal = SecurityUtil.getServerPrincipal( - livyConf.get(LAUNCH_KERBEROS_PRINCIPAL), host) - require(launch_keytab != null, - s"Kerberos requires ${LAUNCH_KERBEROS_KEYTAB.key} to be provided.") - require(launch_principal != null, - s"Kerberos requires ${LAUNCH_KERBEROS_PRINCIPAL.key} to be provided.") - if (!runKinit(launch_keytab, launch_principal)) { - error("Failed to run kinit, stopping the server.") - sys.exit(1) - } - startKinitThread(launch_keytab, launch_principal) - } - - testRecovery(livyConf) - - // Initialize YarnClient ASAP to save time. - if (livyConf.isRunningOnYarn()) { - SparkYarnApp.init(livyConf) - Future { SparkYarnApp.yarnClient } - } - - StateStore.init(livyConf) - val sessionStore = new SessionStore(livyConf) - val batchSessionManager = new BatchSessionManager(livyConf, sessionStore) - val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore) - - server = new WebServer(livyConf, host, port) - server.context.setResourceBase("src/main/com/cloudera/livy/server") - - val livyVersionServlet = new JsonServlet { - before() { contentType = "application/json" } - - get("/") { - Map("version" -> LIVY_VERSION, - "user" -> LIVY_BUILD_USER, - "revision" -> LIVY_REVISION, - "branch" -> LIVY_BRANCH, - "date" -> LIVY_BUILD_DATE, - "url" -> LIVY_REPO_URL) - } - } - - // Servlet for hosting static files such as html, css, and js - // Necessary since Jetty cannot set it's resource base inside a jar - // Returns 404 if the file does not exist - val staticResourceServlet = new ScalatraServlet { - get("/*") { - val fileName = params("splat") - val notFoundMsg = "File not found" - - if (!fileName.isEmpty) { - getClass.getResourceAsStream(s"ui/static/$fileName") match { - case is: InputStream => new BufferedInputStream(is) - case null => NotFound(notFoundMsg) - } - } else { - NotFound(notFoundMsg) - } - } - } - - def uiRedirectServlet(path: String) = new ScalatraServlet { - get("/") { - redirect(path) - } - } - - server.context.addEventListener( - new ServletContextListener() with MetricsBootstrap with ServletApiImplicits { - - private def mount(sc: ServletContext, servlet: Servlet, mappings: String*): Unit = { - val registration = sc.addServlet(servlet.getClass().getName(), servlet) - registration.addMapping(mappings: _*) - registration.setMultipartConfig(multipartConfig) - } - - override def contextDestroyed(sce: ServletContextEvent): Unit = { - - } - - override def contextInitialized(sce: ServletContextEvent): Unit = { - try { - val context = sce.getServletContext() - context.initParameters(org.scalatra.EnvironmentKey) = livyConf.get(ENVIRONMENT) - - val interactiveServlet = - new InteractiveSessionServlet(interactiveSessionManager, sessionStore, livyConf) - mount(context, interactiveServlet, "/sessions/*") - - val batchServlet = new BatchSessionServlet(batchSessionManager, sessionStore, livyConf) - mount(context, batchServlet, "/batches/*") - - if (livyConf.getBoolean(UI_ENABLED)) { - val uiServlet = new UIServlet - mount(context, uiServlet, "/ui/*") - mount(context, staticResourceServlet, "/static/*") - mount(context, uiRedirectServlet("/ui/"), "/*") - } else { - mount(context, uiRedirectServlet("/metrics"), "/*") - } - - context.mountMetricsAdminServlet("/metrics") - - mount(context, livyVersionServlet, "/version/*") - } catch { - case e: Throwable => - error("Exception thrown when initializing server", e) - sys.exit(1) - } - } - - }) - - livyConf.get(AUTH_TYPE) match { - case authType @ KerberosAuthenticationHandler.TYPE => - val principal = SecurityUtil.getServerPrincipal(livyConf.get(AUTH_KERBEROS_PRINCIPAL), - server.host) - val keytab = livyConf.get(AUTH_KERBEROS_KEYTAB) - require(principal != null, - s"Kerberos auth requires ${AUTH_KERBEROS_PRINCIPAL.key} to be provided.") - require(keytab != null, - s"Kerberos auth requires ${AUTH_KERBEROS_KEYTAB.key} to be provided.") - - val holder = new FilterHolder(new AuthenticationFilter()) - holder.setInitParameter(AuthenticationFilter.AUTH_TYPE, authType) - holder.setInitParameter(KerberosAuthenticationHandler.PRINCIPAL, principal) - holder.setInitParameter(KerberosAuthenticationHandler.KEYTAB, keytab) - holder.setInitParameter(KerberosAuthenticationHandler.NAME_RULES, - livyConf.get(AUTH_KERBEROS_NAME_RULES)) - server.context.addFilter(holder, "/*", EnumSet.allOf(classOf[DispatcherType])) - info(s"SPNEGO auth enabled (principal = $principal)") - - case null => - // Nothing to do. - - case other => - throw new IllegalArgumentException(s"Invalid auth type: $other") - } - - if (livyConf.getBoolean(CSRF_PROTECTION)) { - info("CSRF protection is enabled.") - val csrfHolder = new FilterHolder(new CsrfFilter()) - server.context.addFilter(csrfHolder, "/*", EnumSet.allOf(classOf[DispatcherType])) - } - - if (livyConf.getBoolean(ACCESS_CONTROL_ENABLED)) { - if (livyConf.get(AUTH_TYPE) != null) { - info("Access control is enabled.") - val accessHolder = new FilterHolder(new AccessFilter(livyConf)) - server.context.addFilter(accessHolder, "/*", EnumSet.allOf(classOf[DispatcherType])) - } else { - throw new IllegalArgumentException("Access control was requested but could " + - "not be enabled, since authentication is disabled.") - } - } - - server.start() - - Runtime.getRuntime().addShutdownHook(new Thread("Livy Server Shutdown") { - override def run(): Unit = { - info("Shutting down Livy server.") - server.stop() - } - }) - - _serverUrl = Some(s"${server.protocol}://${server.host}:${server.port}") - sys.props("livy.server.server-url") = _serverUrl.get - } - - def runKinit(keytab: String, principal: String): Boolean = { - val commands = Seq("kinit", "-kt", keytab, principal) - val proc = new ProcessBuilder(commands: _*).inheritIO().start() - proc.waitFor() match { - case 0 => - debug("Ran kinit command successfully.") - kinitFailCount = 0 - true - case _ => - warn("Fail to run kinit command.") - kinitFailCount += 1 - false - } - } - - def startKinitThread(keytab: String, principal: String): Unit = { - val refreshInterval = livyConf.getTimeAsMs(LAUNCH_KERBEROS_REFRESH_INTERVAL) - val kinitFailThreshold = livyConf.getInt(KINIT_FAIL_THRESHOLD) - executor.schedule( - new Runnable() { - override def run(): Unit = { - if (runKinit(keytab, principal)) { - // schedule another kinit run with a fixed delay. - executor.schedule(this, refreshInterval, TimeUnit.MILLISECONDS) - } else { - // schedule another retry at once or fail the livy server if too many times kinit fail - if (kinitFailCount >= kinitFailThreshold) { - error(s"Exit LivyServer after ${kinitFailThreshold} times failures running kinit.") - if (server.server.isStarted()) { - stop() - } else { - sys.exit(1) - } - } else { - executor.submit(this) - } - } - } - }, refreshInterval, TimeUnit.MILLISECONDS) - } - - def join(): Unit = server.join() - - def stop(): Unit = { - if (server != null) { - server.stop() - } - } - - def serverUrl(): String = { - _serverUrl.getOrElse(throw new IllegalStateException("Server not yet started.")) - } - - private[livy] def testRecovery(livyConf: LivyConf): Unit = { - if (!livyConf.isRunningOnYarn()) { - // If recovery is turned on but we are not running on YARN, quit. - require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF, - "Session recovery requires YARN.") - } - } -} - -object LivyServer { - - def main(args: Array[String]): Unit = { - val server = new LivyServer() - try { - server.start() - server.join() - } finally { - server.stop() - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/SessionServlet.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/SessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/SessionServlet.scala deleted file mode 100644 index b08aafa..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/SessionServlet.scala +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server - -import javax.servlet.http.HttpServletRequest - -import org.scalatra._ -import scala.concurrent._ -import scala.concurrent.duration._ - -import com.cloudera.livy.{LivyConf, Logging} -import com.cloudera.livy.sessions.{Session, SessionManager} -import com.cloudera.livy.sessions.Session.RecoveryMetadata - -object SessionServlet extends Logging - -/** - * Base servlet for session management. All helper methods in this class assume that the session - * id parameter in the handler's URI is "id". - * - * Type parameters: - * S: the session type - */ -abstract class SessionServlet[S <: Session, R <: RecoveryMetadata]( - private[livy] val sessionManager: SessionManager[S, R], - livyConf: LivyConf) - extends JsonServlet - with ApiVersioningSupport - with MethodOverride - with UrlGeneratorSupport - with GZipSupport -{ - /** - * Creates a new session based on the current request. The implementation is responsible for - * parsing the body of the request. - */ - protected def createSession(req: HttpServletRequest): S - - /** - * Returns a object representing the session data to be sent back to the client. - */ - protected def clientSessionView(session: S, req: HttpServletRequest): Any = session - - override def shutdown(): Unit = { - sessionManager.shutdown() - } - - before() { - contentType = "application/json" - } - - get("/") { - val from = params.get("from").map(_.toInt).getOrElse(0) - val size = params.get("size").map(_.toInt).getOrElse(100) - - val sessions = sessionManager.all() - - Map( - "from" -> from, - "total" -> sessionManager.size(), - "sessions" -> sessions.view(from, from + size).map(clientSessionView(_, request)) - ) - } - - val getSession = get("/:id") { - withUnprotectedSession { session => - clientSessionView(session, request) - } - } - - get("/:id/state") { - withUnprotectedSession { session => - Map("id" -> session.id, "state" -> session.state.toString) - } - } - - get("/:id/log") { - withSession { session => - val from = params.get("from").map(_.toInt) - val size = params.get("size").map(_.toInt) - val (from_, total, logLines) = serializeLogs(session, from, size) - - Map( - "id" -> session.id, - "from" -> from_, - "total" -> total, - "log" -> logLines) - } - } - - delete("/:id") { - withSession { session => - sessionManager.delete(session.id) match { - case Some(future) => - Await.ready(future, Duration.Inf) - Ok(Map("msg" -> "deleted")) - - case None => - NotFound(s"Session ${session.id} already stopped.") - } - } - } - - post("/") { - val session = sessionManager.register(createSession(request)) - // Because it may take some time to establish the session, update the last activity - // time before returning the session info to the client. - session.recordActivity() - Created(clientSessionView(session, request), - headers = Map("Location" -> - (getRequestPathInfo(request) + url(getSession, "id" -> session.id.toString)))) - } - - private def getRequestPathInfo(request: HttpServletRequest): String = { - if (request.getPathInfo != null && request.getPathInfo != "/") { - request.getPathInfo - } else { - "" - } - } - - error { - case e: IllegalArgumentException => BadRequest(e.getMessage) - } - - /** - * Returns the remote user for the given request. Separate method so that tests can override it. - */ - protected def remoteUser(req: HttpServletRequest): String = req.getRemoteUser() - - /** - * Checks that the request's user can impersonate the target user. - * - * If the user does not have permission to impersonate, then halt execution. - * - * @return The user that should be impersonated. That can be the target user if defined, the - * request's user - which may not be defined - otherwise, or `None` if impersonation is - * disabled. - */ - protected def checkImpersonation( - target: Option[String], - req: HttpServletRequest): Option[String] = { - if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) { - if (!target.map(hasAccess(_, req)).getOrElse(true)) { - halt(Forbidden(s"User '${remoteUser(req)}' not allowed to impersonate '$target'.")) - } - target.orElse(Option(remoteUser(req))) - } else { - None - } - } - - /** - * Check that the request's user has access to resources owned by the given target user. - */ - protected def hasAccess(target: String, req: HttpServletRequest): Boolean = { - val user = remoteUser(req) - user == null || user == target || livyConf.superusers().contains(user) - } - - /** - * Performs an operation on the session, without checking for ownership. Operations executed - * via this method must not modify the session in any way, or return potentially sensitive - * information. - */ - protected def withUnprotectedSession(fn: (S => Any)): Any = doWithSession(fn, true) - - /** - * Performs an operation on the session, verifying whether the caller is the owner of the - * session. - */ - protected def withSession(fn: (S => Any)): Any = doWithSession(fn, false) - - private def doWithSession(fn: (S => Any), allowAll: Boolean): Any = { - val sessionId = params("id").toInt - sessionManager.get(sessionId) match { - case Some(session) => - if (allowAll || hasAccess(session.owner, request)) { - fn(session) - } else { - Forbidden() - } - case None => - NotFound(s"Session '$sessionId' not found.") - } - } - - private def serializeLogs(session: S, fromOpt: Option[Int], sizeOpt: Option[Int]) = { - val lines = session.logLines() - - val size = sizeOpt.getOrElse(100) - var from = fromOpt.getOrElse(-1) - if (from < 0) { - from = math.max(0, lines.length - size) - } - val until = from + size - - (from, lines.length, lines.view(from, until)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/WebServer.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/WebServer.scala b/server/src/main/scala/com/cloudera/livy/server/WebServer.scala deleted file mode 100644 index 93e32e6..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/WebServer.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server - -import java.net.InetAddress -import javax.servlet.ServletContextListener - -import org.eclipse.jetty.server._ -import org.eclipse.jetty.server.handler.{HandlerCollection, RequestLogHandler} -import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler} -import org.eclipse.jetty.util.ssl.SslContextFactory - -import com.cloudera.livy.{LivyConf, Logging} - -class WebServer(livyConf: LivyConf, var host: String, var port: Int) extends Logging { - val server = new Server() - - server.setStopTimeout(1000) - server.setStopAtShutdown(true) - - val (connector, protocol) = Option(livyConf.get(LivyConf.SSL_KEYSTORE)) match { - case None => - val http = new HttpConfiguration() - http.setRequestHeaderSize(livyConf.getInt(LivyConf.REQUEST_HEADER_SIZE)) - http.setResponseHeaderSize(livyConf.getInt(LivyConf.RESPONSE_HEADER_SIZE)) - (new ServerConnector(server, new HttpConnectionFactory(http)), "http") - - case Some(keystore) => - val https = new HttpConfiguration() - https.setRequestHeaderSize(livyConf.getInt(LivyConf.REQUEST_HEADER_SIZE)) - https.setResponseHeaderSize(livyConf.getInt(LivyConf.RESPONSE_HEADER_SIZE)) - https.addCustomizer(new SecureRequestCustomizer()) - - val sslContextFactory = new SslContextFactory() - sslContextFactory.setKeyStorePath(keystore) - Option(livyConf.get(LivyConf.SSL_KEYSTORE_PASSWORD)) - .foreach(sslContextFactory.setKeyStorePassword) - Option(livyConf.get(LivyConf.SSL_KEY_PASSWORD)) - .foreach(sslContextFactory.setKeyManagerPassword) - - (new ServerConnector(server, - new SslConnectionFactory(sslContextFactory, "http/1.1"), - new HttpConnectionFactory(https)), "https") - } - - connector.setHost(host) - connector.setPort(port) - - server.setConnectors(Array(connector)) - - val context = new ServletContextHandler() - - context.setContextPath("/") - context.addServlet(classOf[DefaultServlet], "/") - - val handlers = new HandlerCollection - handlers.addHandler(context) - - // Configure the access log - val requestLogHandler = new RequestLogHandler - val requestLog = new NCSARequestLog(sys.env.getOrElse("LIVY_LOG_DIR", - sys.env("LIVY_HOME") + "/logs") + "/yyyy_mm_dd.request.log") - requestLog.setAppend(true) - requestLog.setExtended(false) - requestLog.setLogTimeZone("GMT") - requestLog.setRetainDays(livyConf.getInt(LivyConf.REQUEST_LOG_RETAIN_DAYS)) - requestLogHandler.setRequestLog(requestLog) - handlers.addHandler(requestLogHandler) - - server.setHandler(handlers) - - def addEventListener(listener: ServletContextListener): Unit = { - context.addEventListener(listener) - } - - def start(): Unit = { - server.start() - - val connector = server.getConnectors()(0).asInstanceOf[NetworkConnector] - - if (host == "0.0.0.0") { - host = InetAddress.getLocalHost.getCanonicalHostName - } - port = connector.getLocalPort - - info("Starting server on %s://%s:%d" format (protocol, host, port)) - } - - def join(): Unit = { - server.join() - } - - def stop(): Unit = { - context.stop() - server.stop() - } -} - http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala deleted file mode 100644 index c58c99f..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.batch - -import java.lang.ProcessBuilder.Redirect - -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} -import scala.util.Random - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties - -import com.cloudera.livy.{LivyConf, Logging} -import com.cloudera.livy.server.recovery.SessionStore -import com.cloudera.livy.sessions.{Session, SessionState} -import com.cloudera.livy.sessions.Session._ -import com.cloudera.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder} - -@JsonIgnoreProperties(ignoreUnknown = true) -case class BatchRecoveryMetadata( - id: Int, - appId: Option[String], - appTag: String, - owner: String, - proxyUser: Option[String], - version: Int = 1) - extends RecoveryMetadata - -object BatchSession extends Logging { - val RECOVERY_SESSION_TYPE = "batch" - - def create( - id: Int, - request: CreateBatchRequest, - livyConf: LivyConf, - owner: String, - proxyUser: Option[String], - sessionStore: SessionStore, - mockApp: Option[SparkApp] = None): BatchSession = { - val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}" - - def createSparkApp(s: BatchSession): SparkApp = { - val conf = SparkApp.prepareSparkConf( - appTag, - livyConf, - prepareConf( - request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf)) - require(request.file != null, "File is required.") - - val builder = new SparkProcessBuilder(livyConf) - builder.conf(conf) - - proxyUser.foreach(builder.proxyUser) - request.className.foreach(builder.className) - request.driverMemory.foreach(builder.driverMemory) - request.driverCores.foreach(builder.driverCores) - request.executorMemory.foreach(builder.executorMemory) - request.executorCores.foreach(builder.executorCores) - request.numExecutors.foreach(builder.numExecutors) - request.queue.foreach(builder.queue) - request.name.foreach(builder.name) - - // Spark 1.x does not support specifying deploy mode in conf and needs special handling. - livyConf.sparkDeployMode().foreach(builder.deployMode) - - sessionStore.save(BatchSession.RECOVERY_SESSION_TYPE, s.recoveryMetadata) - - builder.redirectOutput(Redirect.PIPE) - builder.redirectErrorStream(true) - - val file = resolveURIs(Seq(request.file), livyConf)(0) - val sparkSubmit = builder.start(Some(file), request.args) - - SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s)) - } - - info(s"Creating batch session $id: [owner: $owner, request: $request]") - - new BatchSession( - id, - appTag, - SessionState.Starting(), - livyConf, - owner, - proxyUser, - sessionStore, - mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp)) - } - - def recover( - m: BatchRecoveryMetadata, - livyConf: LivyConf, - sessionStore: SessionStore, - mockApp: Option[SparkApp] = None): BatchSession = { - new BatchSession( - m.id, - m.appTag, - SessionState.Recovering(), - livyConf, - m.owner, - m.proxyUser, - sessionStore, - mockApp.map { m => (_: BatchSession) => m }.getOrElse { s => - SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s)) - }) - } -} - -class BatchSession( - id: Int, - appTag: String, - initialState: SessionState, - livyConf: LivyConf, - owner: String, - override val proxyUser: Option[String], - sessionStore: SessionStore, - sparkApp: BatchSession => SparkApp) - extends Session(id, owner, livyConf) with SparkAppListener { - import BatchSession._ - - protected implicit def executor: ExecutionContextExecutor = ExecutionContext.global - - private[this] var _state: SessionState = initialState - private val app = sparkApp(this) - - override def state: SessionState = _state - - override def logLines(): IndexedSeq[String] = app.log() - - override def stopSession(): Unit = { - app.kill() - } - - override def appIdKnown(appId: String): Unit = { - _appId = Option(appId) - sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata) - } - - override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = { - synchronized { - debug(s"$this state changed from $oldState to $newState") - newState match { - case SparkApp.State.RUNNING => - _state = SessionState.Running() - info(s"Batch session $id created [appid: ${appId.orNull}, state: ${state.toString}, " + - s"info: ${appInfo.asJavaMap}]") - case SparkApp.State.FINISHED => _state = SessionState.Success() - case SparkApp.State.KILLED | SparkApp.State.FAILED => - _state = SessionState.Dead() - case _ => - } - } - } - - override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo } - - override def recoveryMetadata: RecoveryMetadata = - BatchRecoveryMetadata(id, appId, appTag, owner, proxyUser) -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala deleted file mode 100644 index ac3a239..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.batch - -import javax.servlet.http.HttpServletRequest - -import com.cloudera.livy.LivyConf -import com.cloudera.livy.server.SessionServlet -import com.cloudera.livy.server.recovery.SessionStore -import com.cloudera.livy.sessions.BatchSessionManager -import com.cloudera.livy.utils.AppInfo - -case class BatchSessionView( - id: Long, - state: String, - appId: Option[String], - appInfo: AppInfo, - log: Seq[String]) - -class BatchSessionServlet( - sessionManager: BatchSessionManager, - sessionStore: SessionStore, - livyConf: LivyConf) - extends SessionServlet(sessionManager, livyConf) -{ - - override protected def createSession(req: HttpServletRequest): BatchSession = { - val createRequest = bodyAs[CreateBatchRequest](req) - val proxyUser = checkImpersonation(createRequest.proxyUser, req) - BatchSession.create( - sessionManager.nextId(), createRequest, livyConf, remoteUser(req), proxyUser, sessionStore) - } - - override protected[batch] def clientSessionView( - session: BatchSession, - req: HttpServletRequest): Any = { - val logs = - if (hasAccess(session.owner, req)) { - val lines = session.logLines() - - val size = 10 - val from = math.max(0, lines.length - size) - val until = from + size - - lines.view(from, until).toSeq - } else { - Nil - } - BatchSessionView(session.id, session.state.toString, session.appId, session.appInfo, logs) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala b/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala deleted file mode 100644 index 3a9b49f..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.batch - -class CreateBatchRequest { - - var file: String = _ - var proxyUser: Option[String] = None - var args: List[String] = List() - var className: Option[String] = None - var jars: List[String] = List() - var pyFiles: List[String] = List() - var files: List[String] = List() - var driverMemory: Option[String] = None - var driverCores: Option[Int] = None - var executorMemory: Option[String] = None - var executorCores: Option[Int] = None - var numExecutors: Option[Int] = None - var archives: List[String] = List() - var queue: Option[String] = None - var name: Option[String] = None - var conf: Map[String, String] = Map() - - override def toString: String = { - s"[proxyUser: $proxyUser, " + - s"file: $file, " + - (if (args.nonEmpty) s"args: ${args.mkString(",")}, " else "") + - (if (jars.nonEmpty) s"jars: ${jars.mkString(",")}, " else "") + - (if (pyFiles.nonEmpty) s"pyFiles: ${pyFiles.mkString(",")}, " else "") + - (if (files.nonEmpty) s"files: ${files.mkString(",")}, " else "") + - (if (archives.nonEmpty) s"archives: ${archives.mkString(",")}, " else "") + - (if (driverMemory.isDefined) s"driverMemory: ${driverMemory.get}, " else "") + - (if (driverCores.isDefined) s"driverCores: ${driverCores.get}, " else "") + - (if (executorMemory.isDefined) s"executorMemory: ${executorMemory.get}, " else "") + - (if (executorCores.isDefined) s"executorCores: ${executorCores.get}, " else "") + - (if (numExecutors.isDefined) s"numExecutors: ${numExecutors.get}, " else "") + - (if (queue.isDefined) s"queue: ${queue.get}, " else "") + - (if (name.isDefined) s"name: ${name.get}, " else "") + - (if (conf.nonEmpty) s"conf: ${conf.mkString(",")}]" else "]") - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequest.scala ---------------------------------------------------------------------- diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequest.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequest.scala deleted file mode 100644 index 8f861cc..0000000 --- a/server/src/main/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequest.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.interactive - -import com.cloudera.livy.sessions.{Kind, Spark} - -class CreateInteractiveRequest { - var kind: Kind = Spark() - var proxyUser: Option[String] = None - var jars: List[String] = List() - var pyFiles: List[String] = List() - var files: List[String] = List() - var driverMemory: Option[String] = None - var driverCores: Option[Int] = None - var executorMemory: Option[String] = None - var executorCores: Option[Int] = None - var numExecutors: Option[Int] = None - var archives: List[String] = List() - var queue: Option[String] = None - var name: Option[String] = None - var conf: Map[String, String] = Map() - var heartbeatTimeoutInSecond: Int = 0 - - override def toString: String = { - s"[kind: $kind, " + - s"proxyUser: $proxyUser, " + - (if (jars.nonEmpty) s"jars: ${jars.mkString(",")}, " else "") + - (if (pyFiles.nonEmpty) s"pyFiles: ${pyFiles.mkString(",")}, " else "") + - (if (files.nonEmpty) s"files: ${files.mkString(",")}, " else "") + - (if (archives.nonEmpty) s"archives: ${archives.mkString(",")}, " else "") + - (if (driverMemory.isDefined) s"driverMemory: ${driverMemory.get}, " else "") + - (if (driverCores.isDefined) s"driverCores: ${driverCores.get}, " else "") + - (if (executorMemory.isDefined) s"executorMemory: ${executorMemory.get}, " else "") + - (if (executorCores.isDefined) s"executorCores: ${executorCores.get}, " else "") + - (if (numExecutors.isDefined) s"numExecutors: ${numExecutors.get}, " else "") + - (if (queue.isDefined) s"queue: ${queue.get}, " else "") + - (if (name.isDefined) s"name: ${name.get}, " else "") + - (if (conf.nonEmpty) s"conf: ${conf.mkString(",")}, " else "") + - s"heartbeatTimeoutInSecond: $heartbeatTimeoutInSecond]" - } -}