roadan closed pull request #36: Amaterasu-56: Create a Kotlin logger implementation URL: https://github.com/apache/incubator-amaterasu/pull/36
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 06a07e82..c7b08dbe 100644 --- a/build.gradle +++ b/build.gradle @@ -14,11 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +buildscript { + ext.kotlin_version = '1.3.0' + + repositories { + mavenCentral() + } + + dependencies { + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" + } +} + plugins { id "org.nosphere.apache.rat" version "0.3.1" + id "org.jetbrains.kotlin.jvm" version "1.3.0" + id "distribution" } apply plugin: 'distribution' +apply plugin: 'kotlin' apply plugin: 'project-report' htmlDependencyReport { @@ -63,4 +78,12 @@ rat { tasks.withType(Test) { maxParallelForks = 1 +} + +repositories { + mavenCentral() +} + +dependencies { + compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" } \ No newline at end of file diff --git a/common/build.gradle b/common/build.gradle index 5a0a211c..9a456ced 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -16,6 +16,7 @@ */ plugins { id 'com.github.johnrengelman.shadow' version '1.2.4' + id "org.jetbrains.kotlin.jvm" id 'scala' } @@ -45,6 +46,10 @@ dependencies { compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.9' compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4' + + compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8" + compile "org.jetbrains.kotlin:kotlin-reflect" + // currently we have to use this specific mesos version to prevent from // clashing with spark compile('org.apache.mesos:mesos:0.22.2:shaded-protobuf') { @@ -60,9 +65,50 @@ dependencies { testCompile 'junit:junit:4.11' testCompile 'org.scalatest:scalatest_2.11:3.0.1' testCompile 'org.scala-lang:scala-library:2.11.8' + testCompile 'org.jetbrains.spek:spek-api:1.1.5' + testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version" + testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5' + + // spek requires kotlin-reflect, can be omitted if already in the classpath + testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" } task copyToHome() { } +sourceSets { + test { + resources.srcDirs += [file('src/test/resources')] + } + + // this is done so Scala will compile before Kotlin + main { + kotlin { + srcDirs = ['src/main/kotlin'] + } + scala { + srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala'] + } + java { + srcDirs = ['src/main/java'] + } + } +} + +compileKotlin{ + kotlinOptions.jvmTarget = "1.8" +} +compileTestKotlin { + kotlinOptions.jvmTarget = "1.8" +} + +compileScala { + dependsOn compileJava + classpath += files(compileJava.destinationDir) + files(compileKotlin.destinationDir) +} + +compileJava { + dependsOn compileKotlin +} + diff --git a/common/src/main/java/org/apache/amaterasu/common/logging/Logging.java b/common/src/main/java/org/apache/amaterasu/common/logging/Logging.java new file mode 100644 index 00000000..3f3413fa --- /dev/null +++ b/common/src/main/java/org/apache/amaterasu/common/logging/Logging.java @@ -0,0 +1,10 @@ +package org.apache.amaterasu.common.logging; + +import org.slf4j.Logger; + +/** + * Created by Eran Bartenstein (p765790) on 5/11/18. + */ +public abstract class Logging extends KLogging { + protected Logger log = getLog(); +} diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt new file mode 100644 index 00000000..24853c53 --- /dev/null +++ b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt @@ -0,0 +1,13 @@ +package org.apache.amaterasu.common.configuration.enums + +/** + * Created by Eran Bartenstein on 21/10/18. + */ +enum class ActionStatus (val value: String) { + pending("pending"), + queued("queued"), + started("started"), + complete("complete"), + failed("failed"), + canceled("canceled") +} diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt old mode 100755 new mode 100644 similarity index 63% rename from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionData.scala rename to common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt index 78528c4d..7e19db2c --- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionData.scala +++ b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt @@ -16,17 +16,21 @@ */ package org.apache.amaterasu.common.dataobjects -import org.apache.amaterasu.common.configuration.enums.ActionStatus.ActionStatus +import org.apache.amaterasu.common.configuration.enums.ActionStatus -import scala.collection.mutable.ListBuffer -case class ActionData(var status: ActionStatus, - name: String, - src: String, - groupId: String, - typeId: String, - id: String, - exports: Map[String, String], - nextActionIds: ListBuffer[String]) { - var errorActionId: String = _ + +/* + Adding default values just for the sake of Scala + */ +data class ActionData(var status: ActionStatus = ActionStatus.pending, + var name: String= "", + var src: String= "", + var groupId: String= "", + var typeId: String= "", + var id: String= "", + var exports: Map<String, String> = mutableMapOf(), + var nextActionIds: List<String> = listOf()) { + lateinit var errorActionId: String + } diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/logging/KLogging.kt b/common/src/main/kotlin/org/apache/amaterasu/common/logging/KLogging.kt new file mode 100644 index 00000000..2b4e4112 --- /dev/null +++ b/common/src/main/kotlin/org/apache/amaterasu/common/logging/KLogging.kt @@ -0,0 +1,10 @@ +package org.apache.amaterasu.common.logging + +import org.slf4j.LoggerFactory + +/** + * Created by Eran Bartenstein on 5/11/18. + */ +abstract class KLogging { + protected var log = LoggerFactory.getLogger(this.javaClass.name) +} diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/enums/ActionStatus.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/enums/ActionStatus.scala deleted file mode 100755 index 4d2afa3a..00000000 --- a/common/src/main/scala/org/apache/amaterasu/common/configuration/enums/ActionStatus.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.amaterasu.common.configuration.enums - -object ActionStatus extends Enumeration { - type ActionStatus = Value - val pending = Value("pending") - val queued = Value("queued") - val started = Value("started") - val complete = Value("complete") - val failed = Value("failed") - val canceled = Value("canceled") -} \ No newline at end of file diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala index 5fa2d749..75be6e75 100644 --- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala @@ -18,6 +18,7 @@ package org.apache.amaterasu.common.dataobjects import com.google.gson.Gson +/* object ActionDataHelper { private val gson = new Gson def toJsonString(actionData: ActionData): String = { @@ -25,6 +26,7 @@ object ActionDataHelper { } def fromJsonString(jsonString: String) : ActionData = { - gson.fromJson[ActionData](jsonString, ActionData.getClass) + gson.fromJson[ActionData](jsonString, new ActionData().getClass) } -} \ No newline at end of file +} +*/ \ No newline at end of file diff --git a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala index fe692606..ee71f854 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala @@ -19,8 +19,9 @@ package org.apache.amaterasu.common.execution.actions import com.fasterxml.jackson.annotation.JsonProperty import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel import org.apache.amaterasu.common.execution.actions.NotificationType.NotificationType +import org.apache.amaterasu.common.logging.Logging -abstract class Notifier { +abstract class Notifier extends Logging { def info(msg: String) diff --git a/common/src/main/scala/org/apache/amaterasu/common/logging/Logging.scala b/common/src/main/scala/org/apache/amaterasu/common/logging/Logging.scala deleted file mode 100755 index 5c9f6560..00000000 --- a/common/src/main/scala/org/apache/amaterasu/common/logging/Logging.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.amaterasu.common.logging - -import org.slf4j.LoggerFactory - -trait Logging { - protected lazy val log = LoggerFactory.getLogger(getClass.getName) -} - diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala index 0c2edf80..90e624b5 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala @@ -25,7 +25,7 @@ import org.apache.activemq.ActiveMQConnectionFactory import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType, Notifier} import org.apache.amaterasu.common.logging.Logging -class ActiveNotifier extends Notifier with Logging { +class ActiveNotifier extends Notifier { var producer: MessageProducer = _ var session: Session = _ diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala index fff2a81d..2c087935 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala @@ -32,7 +32,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success} -class MesosActionsExecutor extends Executor with Logging { +class MesosActionsExecutor extends Logging with Executor { var master: String = _ var executorDriver: ExecutorDriver = _ diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala index a091c1b7..fcb453a3 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala @@ -23,7 +23,7 @@ import org.apache.amaterasu.common.logging.Logging import org.apache.mesos.ExecutorDriver -class MesosNotifier(driver: ExecutorDriver) extends Notifier with Logging { +class MesosNotifier(driver: ExecutorDriver) extends Notifier { private val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala index b5f8700c..282de685 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala @@ -61,7 +61,7 @@ class ActionsExecutor extends Logging { // launched with args: //s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(gson.toJson(taskData), "UTF-8")}' '${URLEncoder.encode(gson.toJson(execData), "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}'" -object ActionsExecutorLauncher extends App with Logging { +object ActionsExecutorLauncher extends Logging with App { val hostName = InetAddress.getLocalHost.getHostName diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala index 841fe425..831cfc8e 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala @@ -21,7 +21,7 @@ import org.apache.amaterasu.common.logging.Logging import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC -class YarnNotifier(conf: YarnConfiguration) extends Notifier with Logging { +class YarnNotifier(conf: YarnConfiguration) extends Notifier { var rpc: YarnRPC = YarnRPC.create(conf) diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala index 7f911420..ce3edb9a 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala @@ -17,7 +17,7 @@ class PySparkRunnerProvider extends RunnerSetupProvider { case "mesos" => s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " + s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " + - s"-Dscala.usejavacp=true -Djava.library.path=$libPath org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}.stripMargin" + s"-Dscala.usejavacp=true -Djava.library.path=$libPath org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}.stripMargin" case "yarn" => "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && " + s"/bin/bash spark/bin/load-spark-env.sh && " + s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " + @@ -25,7 +25,7 @@ class PySparkRunnerProvider extends RunnerSetupProvider { "-Dscala.usejavacp=true " + "-Dhdp.version=2.6.1.0-129 " + "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " + - s"'$jobId' '${conf.master}' '${actionData.name}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " + + s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " + s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " + s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr " case _ => "" diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala index 8dbc36d6..5a8bf37b 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala @@ -32,16 +32,16 @@ class SparkScalaRunnerProvider extends RunnerSetupProvider { override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match { case "mesos" => s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " + - s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " + - s"-Dscala.usejavacp=true -Djava.library.path=$libPath " + - s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}".stripMargin + s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " + + s"-Dscala.usejavacp=true -Djava.library.path=$libPath " + + s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}".stripMargin case "yarn" => s"/bin/bash spark/bin/load-spark-env.sh && " + s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " + "-Xmx2G " + "-Dscala.usejavacp=true " + "-Dhdp.version=2.6.1.0-129 " + "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " + - s"'$jobId' '${conf.master}' '${actionData.name}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " + + s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " + s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " + s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr " case _ => "" diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala index 525c4f5d..5d566a0b 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala @@ -9,13 +9,13 @@ class SparkShellScalaRunnerProvider extends RunnerSetupProvider { private var conf: ClusterConfig = _ override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = - s"$$SPARK_HOME/bin/spark-shell ${actionData.src} --jars spark-runtime-${conf.version}.jar" + s"$$SPARK_HOME/bin/spark-shell ${actionData.getSrc} --jars spark-runtime-${conf.version}.jar" override def getRunnerResources: Array[String] = Array[String]() def getActionResources(jobId: String, actionData: ActionData): Array[String] = - Array[String](s"$jobId/${actionData.name}/${actionData.src}") + Array[String](s"$jobId/${actionData.getName}/${actionData.getSrc}") override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = Array[String]() diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala index e6c0a7d0..a48aaa06 100644 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala +++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala @@ -38,7 +38,7 @@ import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap import scala.sys.process._ -class SparkRunnersProvider extends RunnersProvider with Logging { +class SparkRunnersProvider extends Logging with RunnersProvider { private val runners = new TrieMap[String, AmaterasuRunner] private var shellLoger = ProcessLogger( diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala index 90f8c680..a60c827d 100644 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala +++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala @@ -33,7 +33,7 @@ import scala.sys.process.{Process, ProcessLogger} -class PySparkRunner extends AmaterasuRunner with Logging { +class PySparkRunner extends Logging with AmaterasuRunner { var proc: Process = _ var notifier: Notifier = _ diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala index 16cb97bb..430e75ab 100644 --- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala +++ b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala @@ -20,7 +20,7 @@ import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.common.logging.Logging -class TestNotifier extends Notifier with Logging { +class TestNotifier extends Notifier { override def info(msg: String): Unit = { log.info(msg) diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index a95009c3..7dc503f1 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-all.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/leader-common/build.gradle b/leader-common/build.gradle index 61b93092..6ca9513b 100644 --- a/leader-common/build.gradle +++ b/leader-common/build.gradle @@ -15,7 +15,6 @@ * limitations under the License. */ buildscript { - ext.kotlin_version = '1.2.60' repositories { mavenCentral() @@ -104,12 +103,3 @@ compileKotlin { compileTestKotlin { kotlinOptions.jvmTarget = "1.8" } -// -//kotlin { -// experimental { -// coroutines 'enable' -// } -//} - -//task copyToHome() { -//} \ No newline at end of file diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt new file mode 100644 index 00000000..e4f0e3ec --- /dev/null +++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.amaterasu.leader.common.execution.actions + +import org.apache.amaterasu.common.dataobjects.ActionData +import org.apache.amaterasu.common.configuration.enums.ActionStatus +import org.apache.amaterasu.common.logging.KLogging +import org.apache.amaterasu.common.logging.Logging +import org.apache.curator.framework.CuratorFramework + +/** + * Created by Eran Bartenstein on 19/10/18. + */ +abstract class Action : KLogging() { + lateinit var actionPath: String + lateinit var actionId: String + lateinit var client: CuratorFramework + lateinit var data: ActionData + abstract fun execute() + abstract fun handleFailure(message: String) : String + + fun announceStart() { + log.debug("Starting action ${data.name} of group ${data.groupId} and type ${data.typeId}") + client.setData().forPath(actionPath, ActionStatus.started.value.toByteArray()) + data.status = ActionStatus.started + } + + fun announceQueued() { + log.debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} is queued for execution") + client.setData().forPath(actionPath, ActionStatus.queued.value.toByteArray()) + data.status = ActionStatus.queued + } + + fun announceComplete() { + log.debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} complete") + client.setData().forPath(actionPath, ActionStatus.complete.value.toByteArray()) + data.status = ActionStatus.complete + } + + fun announceCanceled() { + log.debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} was canceled") + client.setData().forPath(actionPath, ActionStatus.canceled.value.toByteArray()) + data.status = ActionStatus.canceled + } + + protected fun announceFailure() {} + +} diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala index 7a1bc6c9..e4179055 100755 --- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala +++ b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala @@ -15,7 +15,7 @@ * limitations under the License. */ package org.apache.amaterasu.leader.common.utilities - +import scala.collection.JavaConverters._ import java.io.{File, FileInputStream} import java.nio.file.{Files, Paths} @@ -43,12 +43,15 @@ object DataLoader extends Logging { ymlMapper.registerModule(DefaultScalaModule) def getTaskData(actionData: ActionData, env: String): TaskData = { - val srcFile = actionData.src + val srcFile = actionData.getSrc val src = Source.fromFile(s"repo/src/$srcFile").mkString val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString val envData = ymlMapper.readValue(envValue, classOf[Environment]) - TaskData(src, envData, actionData.groupId, actionData.typeId, actionData.exports) + + val exports = actionData.getExports.asScala.toMap // Kotlin to Scala TODO: Remove me as fast as you can + + TaskData(src, envData, actionData.getGroupId, actionData.getTypeId, exports) } def getTaskDataBytes(actionData: ActionData, env: String): Array[Byte] = { diff --git a/leader/build.gradle b/leader/build.gradle index 114bbd32..dc244fc1 100644 --- a/leader/build.gradle +++ b/leader/build.gradle @@ -18,6 +18,7 @@ plugins { id "com.github.johnrengelman.shadow" version "1.2.4" id 'com.github.maiflai.scalatest' version '0.22' id 'scala' + id 'org.jetbrains.kotlin.jvm' id 'java' } @@ -62,6 +63,8 @@ dependencies { compile group: 'org.reflections', name: 'reflections', version: '0.9.11' compile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.15.3' compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0' + compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8" + compile "org.jetbrains.kotlin:kotlin-reflect" runtime group: 'org.apache.activemq', name: 'activemq-kahadb-store', version: '5.15.3' testCompile project(':common') @@ -82,8 +85,11 @@ sourceSets { // this is done so Scala will compile before Java main { + kotlin { + srcDirs = ['src/main/kotlin'] + } scala { - srcDirs = ['src/main/scala', 'src/main/java'] + srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala'] } java { srcDirs = [] @@ -109,4 +115,11 @@ task copyToHomeBin(type: Copy) { task copyToHome() { dependsOn copyToHomeRoot dependsOn copyToHomeBin -} \ No newline at end of file +} + +compileKotlin{ + kotlinOptions.jvmTarget = "1.8" +} +compileTestKotlin { + kotlinOptions.jvmTarget = "1.8" +} diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/SequentialAction.scala b/leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala similarity index 82% rename from leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/SequentialAction.scala rename to leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala index ca29f0cb..d9be4dd7 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/SequentialAction.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala @@ -14,15 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.leader.execution.actions +package org.apache.amaterasu.leader.common.actions +import java.util import java.util.concurrent.BlockingQueue import org.apache.amaterasu.common.configuration.enums.ActionStatus import org.apache.amaterasu.common.dataobjects.ActionData +import org.apache.amaterasu.leader.common.execution.actions.Action import org.apache.curator.framework.CuratorFramework import org.apache.zookeeper.CreateMode +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer class SequentialAction extends Action { @@ -51,16 +54,16 @@ class SequentialAction extends Action { override def handleFailure(message: String): String = { - println(s"Part ${data.name} of group ${data.groupId} and of type ${data.typeId} failed on attempt $attempt with message: $message") + println(s"Part ${data.getName} of group ${data.getGroupId} and of type ${data.getTypeId} failed on attempt $attempt with message: $message") attempt += 1 if (attempt <= attempts) { - data.id + data.getId } else { announceFailure() println(s"===> moving to err action ${data.errorActionId}") - data.status = ActionStatus.failed + data.setStatus ( ActionStatus.failed ) data.errorActionId } @@ -91,7 +94,8 @@ object SequentialAction { action.attempts = attempts action.jobId = jobId - action.data = ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, exports, new ListBuffer[String]) + val javaExports = exports.asJava + action.data = new ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, javaExports, new util.ArrayList[String]()) action.jobsQueue = queue action.client = zkClient @@ -121,7 +125,7 @@ object ErrorAction { action.actionId = action.actionPath.substring(action.actionPath.indexOf('-') + 1).replace("/", "-") action.jobId = jobId - action.data = ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, Map.empty, new ListBuffer[String]) + action.data = new ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, new util.HashMap[String, String](), new util.ArrayList[String]()) action.jobsQueue = queue action.client = zkClient diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala index c271f50a..e08489cd 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala @@ -23,7 +23,8 @@ import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import com.fasterxml.jackson.dataformat.yaml.YAMLFactory import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.leader.execution.JobManager -import org.apache.amaterasu.leader.execution.actions.{Action, ErrorAction, SequentialAction} +import org.apache.amaterasu.leader.common.actions.{ErrorAction, SequentialAction} +import org.apache.amaterasu.leader.common.execution.actions.Action import org.apache.curator.framework.CuratorFramework import scala.collection.JavaConverters._ @@ -103,17 +104,18 @@ object JobParser { ) //updating the list of frameworks setup - manager.frameworks.getOrElseUpdate(action.data.groupId, - new mutable.HashSet[String]()) - .add(action.data.typeId) + manager.frameworks.getOrElseUpdate(action.data.getGroupId, + new mutable.HashSet[String]()) + .add(action.data.getTypeId) - if (manager.head == null) + if (manager.head == null) { manager.head = action + } - if (previous != null) - previous.data.nextActionIds.append(action.actionId) - + if (previous != null) { + previous.data.getNextActionIds.add(action.actionId) + } manager.registerAction(action) val errorNode = actionData.path("error") @@ -123,18 +125,18 @@ object JobParser { val errorAction = parseErrorAction( errorNode, manager.jobId, - action.data.id, + action.data.getId, actionsQueue, manager.client ) - action.data.errorActionId = errorAction.data.id + action.data.errorActionId = errorAction.data.getId manager.registerAction(errorAction) //updating the list of frameworks setup - manager.frameworks.getOrElseUpdate(errorAction.data.groupId, + manager.frameworks.getOrElseUpdate(errorAction.data.getGroupId, new mutable.HashSet[String]()) - .add(errorAction.data.typeId) + .add(errorAction.data.getTypeId) } parseActions(actions.tail, manager, actionsQueue, attempts, action) diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala index 38f4b7c5..70642dbf 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala @@ -21,7 +21,7 @@ import java.util.concurrent.BlockingQueue import org.apache.amaterasu.common.configuration.enums.ActionStatus import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.common.logging.Logging -import org.apache.amaterasu.leader.execution.actions.Action +import org.apache.amaterasu.leader.common.execution.actions.Action import org.apache.curator.framework.CuratorFramework import scala.collection.concurrent.TrieMap @@ -54,9 +54,9 @@ class JobManager extends Logging { } - def outOfActions: Boolean = !registeredActions.values.exists(a => a.data.status == ActionStatus.pending || - a.data.status == ActionStatus.queued || - a.data.status == ActionStatus.started) + def outOfActions: Boolean = !registeredActions.values.exists(a => a.data.getStatus == ActionStatus.pending || + a.data.getStatus == ActionStatus.queued || + a.data.getStatus == ActionStatus.started) /** * getNextActionData returns the data of the next action to be executed if such action * exists @@ -68,7 +68,7 @@ class JobManager extends Logging { val nextAction: ActionData = executionQueue.poll() if (nextAction != null) { - registeredActions(nextAction.id).announceStart + registeredActions(nextAction.getId).announceStart } nextAction @@ -102,8 +102,7 @@ class JobManager extends Logging { val action = registeredActions.get(actionId).get action.announceComplete - action.data.nextActionIds.foreach(id => - registeredActions.get(id).get.execute()) + action.data.getNextActionIds.toArray.foreach(id => registeredActions.get(id.toString).get.execute()) // we don't need the error action anymore if (action.data.errorActionId != null) @@ -131,11 +130,11 @@ class JobManager extends Logging { def cancelFutureActions(action: Action): Unit = { - if (action.data.status != ActionStatus.failed) + if (action.data.getStatus != ActionStatus.failed) action.announceCanceled - action.data.nextActionIds.foreach(id => - cancelFutureActions(registeredActions.get(id).get)) + action.data.getNextActionIds.toArray.foreach(id => + cancelFutureActions(registeredActions.get(id.toString).get)) } /** @@ -185,4 +184,4 @@ object JobManager { } -} \ No newline at end of file +} diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala deleted file mode 100755 index f5409972..00000000 --- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.amaterasu.leader.execution.actions - -import org.apache.amaterasu.common.configuration.enums.ActionStatus -import org.apache.amaterasu.common.dataobjects.ActionData -import org.apache.amaterasu.common.logging.Logging -import org.apache.curator.framework.CuratorFramework - -trait Action extends Logging { - - // this is the znode path for the action - var actionPath: String = _ - var actionId: String = _ - - var data: ActionData = _ - var client: CuratorFramework = _ - - def execute(): Unit - - def handleFailure(message: String): String - - /** - * The announceStart register the beginning of the of the task with ZooKeper - */ - def announceStart: Unit = { - - log.debug(s"Starting action ${data.name} of group ${data.groupId} and type ${data.typeId}") - client.setData().forPath(actionPath, ActionStatus.started.toString.getBytes) - data.status = ActionStatus.started - } - - def announceQueued: Unit = { - - log.debug(s"Action ${data.name} of group ${data.groupId} and of type ${data.typeId} is queued for execution") - client.setData().forPath(actionPath, ActionStatus.queued.toString.getBytes) - data.status = ActionStatus.queued - } - - def announceComplete: Unit = { - - log.debug(s"Action ${data.name} of group ${data.groupId} and of type ${data.typeId} completed") - client.setData().forPath(actionPath, ActionStatus.complete.toString.getBytes) - data.status = ActionStatus.complete - } - - def announceCanceled: Unit = { - - log.debug(s"Action ${data.name} of group ${data.groupId} and of type ${data.typeId} was canceled") - client.setData().forPath(actionPath, ActionStatus.canceled.toString.getBytes) - data.status = ActionStatus.canceled - } - protected def announceFailure(): Unit = {} - -} \ No newline at end of file diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/Launcher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/Launcher.scala index 737f59d9..0f234380 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/Launcher.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/Launcher.scala @@ -24,7 +24,7 @@ import org.apache.amaterasu.leader.Kami import org.apache.amaterasu.leader.mesos.schedulers.ClusterScheduler import org.apache.mesos.{MesosSchedulerDriver, Protos} -object Launcher extends App with Logging { +object Launcher extends Logging with App { println( """ diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala index 0ffdb7a3..2adff07a 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala @@ -20,7 +20,7 @@ import org.apache.amaterasu.common.logging.Logging import org.apache.mesos.Protos._ import org.apache.mesos.{Executor, ExecutorDriver} -object JobExecutor extends Executor with Logging { +object JobExecutor extends Logging with Executor { override def shutdown(driver: ExecutorDriver): Unit = {} diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala index 4b1a74c1..68c8f85a 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala @@ -20,7 +20,7 @@ import org.apache.amaterasu.common.logging.Logging import org.apache.mesos.Protos.{Resource, Value} import org.apache.mesos.Scheduler -trait AmaterasuScheduler extends Scheduler with Logging { +trait AmaterasuScheduler extends Logging with Scheduler { def createScalarResource(name: String, value: Double): Resource = { Resource.newBuilder diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala index a407a0db..bcd7923d 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala @@ -30,7 +30,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.configuration.enums.ActionStatus -import org.apache.amaterasu.common.configuration.enums.ActionStatus.ActionStatus import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType} @@ -156,19 +155,19 @@ class JobScheduler extends AmaterasuScheduler { try { val actionData = jobManager.getNextActionData if (actionData != null) { - val taskId = Protos.TaskID.newBuilder().setValue(actionData.id).build() + val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build() // setting up the configuration files for the container - val envYaml = configManager.getActionConfigContent(actionData.name, "") //TODO: replace with the value in actionData.config - writeConfigFile(envYaml, jobManager.jobId, actionData.name, "env.yaml") + val envYaml = configManager.getActionConfigContent(actionData.getName, "") //TODO: replace with the value in actionData.config + writeConfigFile(envYaml, jobManager.jobId, actionData.getName, "env.yaml") val dataStores = DataLoader.getTaskData(actionData, env).exports val writer = new StringWriter() yamlMapper.writeValue(writer, dataStores) val dataStoresYaml = writer.toString - writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name, "datastores.yaml") + writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.getName, "datastores.yaml") - writeConfigFile(s"jobId: ${jobManager.jobId}\nactionName: ${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml") + writeConfigFile(s"jobId: ${jobManager.jobId}\nactionName: ${actionData.getName}", jobManager.jobId, actionData.getName, "runtime.yaml") offersToTaskIds.put(offer.getId.getValue, taskId.getValue) @@ -180,8 +179,8 @@ class JobScheduler extends AmaterasuScheduler { slaveActions.put(taskId.getValue, ActionStatus.started) - val frameworkProvider = frameworkFactory.providers(actionData.groupId) - val runnerProvider = frameworkProvider.getRunnerProvider(actionData.typeId) + val frameworkProvider = frameworkFactory.providers(actionData.getGroupId) + val runnerProvider = frameworkProvider.getRunnerProvider(actionData.getTypeId) // searching for an executor that already exist on the slave, if non exist // we create a new one @@ -198,7 +197,7 @@ class JobScheduler extends AmaterasuScheduler { //creating the command // TODO: move this into the runner provider somehow - copy(get(s"repo/src/${actionData.src}"), get(s"dist/${jobManager.jobId}/${actionData.name}/${actionData.src}"), REPLACE_EXISTING) + copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.jobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING) println(s"===> ${runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")}") val command = CommandInfo @@ -210,26 +209,26 @@ class JobScheduler extends AmaterasuScheduler { .setExtract(false) .build()) - // Getting env.yaml - command.addUris(URI.newBuilder - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml") - .setExecutable(false) - .setExtract(true) - .build()) + // Getting env.yaml + command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/env.yaml") + .setExecutable(false) + .setExtract(true) + .build()) - // Getting datastores.yaml - command.addUris(URI.newBuilder - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml") - .setExecutable(false) - .setExtract(true) - .build()) + // Getting datastores.yaml + command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/datastores.yaml") + .setExecutable(false) + .setExtract(true) + .build()) - // Getting runtime.yaml - command.addUris(URI.newBuilder - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml") - .setExecutable(false) - .setExtract(true) - .build()) + // Getting runtime.yaml + command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/runtime.yaml") + .setExecutable(false) + .setExtract(true) + .build()) // Getting framework resources frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala index d1d0c531..38c90c7e 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala @@ -22,7 +22,7 @@ import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.logging.Logging -abstract class BaseJobLauncher extends App with Logging { +abstract class BaseJobLauncher extends Logging with App { def run(args: Args, config: ClusterConfig, resume: Boolean): Unit = ??? diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala index 1bbaa157..23700f80 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala @@ -53,7 +53,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success} -class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { +class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler { var capability: Resource = _ @@ -170,7 +170,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { val actionData = jobManager.getNextActionData if (actionData != null) { - val frameworkProvider = frameworkFactory.providers(actionData.groupId) + val frameworkProvider = frameworkFactory.providers(actionData.getGroupId) val driverConfiguration = frameworkProvider.getDriverConfiguration var mem: Int = driverConfiguration.getMemory @@ -221,14 +221,14 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { private def askContainer(actionData: ActionData): Unit = { actionsBuffer.add(actionData) - log.info(s"About to ask container for action ${actionData.id}. Action buffer size is: ${actionsBuffer.size()}") + log.info(s"About to ask container for action ${actionData.getId}. Action buffer size is: ${actionsBuffer.size()}") // we have an action to schedule, let's request a container val priority: Priority = Records.newRecord(classOf[Priority]) priority.setPriority(1) val containerReq = new ContainerRequest(capability, null, null, priority) rmClient.addContainerRequest(containerReq) - log.info(s"Asked container for action ${actionData.id}") + log.info(s"Asked container for action ${actionData.getId}") } @@ -245,10 +245,10 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { val containerTask = Future[ActionData] { val frameworkFactory = FrameworkProvidersFactory(env, config) - val framework = frameworkFactory.getFramework(actionData.groupId) - val runnerProvider = framework.getRunnerProvider(actionData.typeId) + val framework = frameworkFactory.getFramework(actionData.getGroupId) + val runnerProvider = framework.getRunnerProvider(actionData.getTypeId) val ctx = Records.newRecord(classOf[ContainerLaunchContext]) - val commands: List[String] = List(runnerProvider.getCommand(jobManager.jobId, actionData, env, s"${actionData.id}-${container.getId.getContainerId}", address)) + val commands: List[String] = List(runnerProvider.getCommand(jobManager.jobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address)) log.info("Running container id {}.", container.getId.getContainerId) log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last) @@ -283,7 +283,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { //adding the framework and executor resources setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier) - setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}") + setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.getTypeId}", resources, s"${framework.getGroupIdentifier}-${actionData.getTypeId}") ctx.setLocalResources(resources) @@ -305,9 +305,9 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { askContainer(actionData) case Success(requestedActionData) => - jobManager.actionStarted(requestedActionData.id) + jobManager.actionStarted(requestedActionData.getId) containersIdsToTask.put(container.getId.getContainerId, requestedActionData) - log.info(s"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.id}") + log.info(s"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.getId}") } } @@ -371,15 +371,16 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { val task = containersIdsToTask(containerId) rmClient.releaseAssignedContainer(status.getContainerId) + val taskId = task.getId if (status.getExitStatus == 0) { //completedContainersAndTaskIds.put(containerId, task.id) - jobManager.actionComplete(task.id) - log.info(s"Container $containerId completed with task ${task.id} with success.") + jobManager.actionComplete(taskId) + log.info(s"Container $containerId complete with task ${taskId} with success.") } else { // TODO: Check the getDiagnostics value and see if appropriate - jobManager.actionFailed(task.id, status.getDiagnostics) - log.warn(s"Container $containerId completed with task ${task.id} with failed status code (${status.getExitStatus})") + jobManager.actionFailed(taskId, status.getDiagnostics) + log.warn(s"Container $containerId complete with task ${taskId} with failed status code (${status.getExitStatus})") } } } @@ -456,7 +457,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { } } -object ApplicationMaster extends App with Logging { +object ApplicationMaster extends Logging with App { val parser = Args.getParser diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala index 14c4f439..23f4af68 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.{ContainerId, ContainerStatus} import org.apache.hadoop.yarn.client.api.async.NMClientAsync -class YarnNMCallbackHandler extends NMClientAsync.CallbackHandler with Logging { +class YarnNMCallbackHandler extends Logging with NMClientAsync.CallbackHandler { override def onStartContainerError(containerId: ContainerId, t: Throwable): Unit = { log.error(s"Container ${containerId.getContainerId} couldn't start.", t) diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala index 24f28ccc..379dd1b7 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala @@ -41,7 +41,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync, env: String, awsEnv: String, config: ClusterConfig, - executorJar: LocalResource) extends AMRMClientAsync.CallbackHandler with Logging { + executorJar: LocalResource) extends Logging with AMRMClientAsync.CallbackHandler { val gson:Gson = new Gson() @@ -67,9 +67,9 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync, val taskId = containersIdsToTaskIds(containerId) if (status.getExitStatus == 0) { completedContainersAndTaskIds.put(containerId, taskId) - log.info(s"Container $containerId completed with task $taskId with success.") + log.info(s"Container $containerId complete with task $taskId with success.") } else { - log.warn(s"Container $containerId completed with task $taskId with failed status code (${status.getExitStatus}.") + log.warn(s"Container $containerId complete with task $taskId with failed status code (${status.getExitStatus}.") val failedTries = failedTasksCounter.getOrElse(taskId, 0) if (failedTries < MAX_ATTEMPTS_PER_TASK) { // TODO: notify and ask for a new container @@ -108,7 +108,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync, | java -cp executor.jar:spark-${config.Webserver.sparkVersion}/lib/* | -Dscala.usejavacp=true | -Djava.library.path=/usr/lib org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher - | ${jobManager.jobId} ${config.master} ${actionData.name} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin + | ${jobManager.jobId} ${config.master} ${actionData.getName} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin ctx.setCommands(Collections.singletonList(command)) ctx.setLocalResources(Map[String, LocalResource] ( @@ -116,7 +116,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync, )) nmClient.startContainerAsync(container, ctx) - actionData.id + actionData.getId } containerTask onComplete { diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala similarity index 79% rename from leader/src/test/scala/org/apache/amaterasu/common/execution/ActionTests.scala rename to leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala index c89fd2a8..197c703d 100755 --- a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionTests.scala +++ b/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala @@ -16,24 +16,27 @@ */ package org.apache.amaterasu.common.execution +import java.util import java.util.concurrent.LinkedBlockingQueue import org.apache.amaterasu.common.configuration.enums.ActionStatus import org.apache.amaterasu.common.dataobjects.ActionData -import org.apache.amaterasu.leader.execution.actions.SequentialAction +import org.apache.amaterasu.leader.common.actions.SequentialAction import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.test.TestingServer import org.apache.zookeeper.CreateMode import org.scalatest.{FlatSpec, Matchers} -class ActionTests extends FlatSpec with Matchers { +import scala.collection.JavaConverters._ + +class ActionStatusTests extends FlatSpec with Matchers { // setting up a testing zookeeper server (curator TestServer) val retryPolicy = new ExponentialBackoffRetry(1000, 3) val server = new TestingServer(2181, true) val jobId = s"job_${System.currentTimeMillis}" - val data = ActionData(ActionStatus.pending, "test_action", "start.scala", "spark","scala", null, Map.empty , null) + val data = new ActionData(ActionStatus.pending, "test_action", "start.scala", "spark","scala", "0000001", new util.HashMap() , List[String]().asJava) "an Action" should "queue it's ActionData int the job queue when executed" in { @@ -44,11 +47,11 @@ class ActionTests extends FlatSpec with Matchers { client.start() client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId") - val action = SequentialAction(data.name, data.src, data.groupId, data.typeId, Map.empty, jobId, queue, client, 1) + val action = SequentialAction(data.getName, data.getSrc, data.getGroupId, data.getTypeId, Map.empty, jobId, queue, client, 1) action.execute() - queue.peek().name should be(data.name) - queue.peek().src should be(data.src) + queue.peek().getName should be(data.getName) + queue.peek().getSrc should be(data.getSrc) } diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala index b15f0bd6..ef47cc17 100755 --- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala +++ b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala @@ -49,21 +49,21 @@ class JobExecutionTests extends FlatSpec with Matchers { "a job" should "queue the first action when the JobManager.start method is called " in { job.start - queue.peek.name should be ("start") + queue.peek.getName should be ("start") // making sure that the status is reflected in zk - val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000000") + val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000") new String(actionStatus) should be("queued") } it should "return the start action when calling getNextAction and dequeue it" in { - job.getNextActionData.name should be ("start") + job.getNextActionData.getName should be ("start") queue.size should be (0) // making sure that the status is reflected in zk - val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000000") + val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000") new String(actionStatus) should be("started") } @@ -73,17 +73,17 @@ class JobExecutionTests extends FlatSpec with Matchers { job.actionComplete("0000000000") // making sure that the status is reflected in zk - val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000000") + val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000") new String(actionStatus) should be("complete") } "the next step2 job" should "be queued as a result of the completion" in { - queue.peek.name should be ("step2") + queue.peek.getName should be ("step2") // making sure that the status is reflected in zk - val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000001") + val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001") new String(actionStatus) should be("queued") } @@ -92,20 +92,20 @@ class JobExecutionTests extends FlatSpec with Matchers { val data = job.getNextActionData - data.name should be ("step2") + data.getName should be ("step2") // making sure that the status is reflected in zk - val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000001") + val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001") new String(actionStatus) should be("started") } it should "be marked as failed when JobManager. is called" in { job.actionFailed("0000000001", "test failure") - queue.peek.name should be ("error-action") + queue.peek.getName should be ("error-action") // making sure that the status is reflected in zk - val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000001-error") + val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error") new String(actionStatus) should be("queued") // and returned by getNextActionData diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala index 3a347c1f..13685f91 100755 --- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala +++ b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala @@ -55,9 +55,9 @@ class JobParserTests extends FlatSpec with Matchers { job.registeredActions.size should be(3) - job.registeredActions.get("0000000000").get.data.name should be("start") - job.registeredActions.get("0000000001").get.data.name should be("step2") - job.registeredActions.get("0000000001-error").get.data.name should be("error-action") + job.registeredActions.get("0000000000").get.data.getName should be("start") + job.registeredActions.get("0000000001").get.data.getName should be("step2") + job.registeredActions.get("0000000001-error").get.data.getName should be("error-action") } diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala index eb08942f..64887ab6 100755 --- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala +++ b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala @@ -68,7 +68,7 @@ class JobRestoreTests extends FlatSpec with Matchers with BeforeAndAfterEach { JobLoader.restoreJobState(manager, jobId, client) - queue.peek.name should be("start") + queue.peek.getName should be("start") } "a restored job" should "have all started actions in the executionQueue" in { @@ -78,6 +78,6 @@ class JobRestoreTests extends FlatSpec with Matchers with BeforeAndAfterEach { JobLoader.restoreJobState(manager, jobId, client) - queue.peek.name should be("start") + queue.peek.getName should be("start") } } \ No newline at end of file diff --git a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala index 0e321f01..7eba1daf 100644 --- a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala +++ b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala @@ -26,7 +26,7 @@ class HttpServerTests extends FlatSpec with Matchers { // this is an ugly hack, getClass.getResource("/").getPath should have worked but // stopped working when we moved to gradle :( - val resources = new File(getClass.getResource("/simple-maki.yml").getPath).getParent + val resources: String = new File(getClass.getResource("/simple-maki.yml").getPath).getParent // "Jetty Web server" should "start HTTP server, serve content and stop successfully" in { // diff --git a/sdk/build.gradle b/sdk/build.gradle index 581ea081..c5378b88 100644 --- a/sdk/build.gradle +++ b/sdk/build.gradle @@ -15,7 +15,7 @@ * limitations under the License. */ apply plugin: 'java' - +apply plugin: "kotlin" repositories { mavenCentral() } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services