nadav-har-tzvi closed pull request #34: Amaterasu 49 - mesos implementation URL: https://github.com/apache/incubator-amaterasu/pull/34
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 94b5dad..06a07e8 100644 --- a/build.gradle +++ b/build.gradle @@ -19,20 +19,25 @@ plugins { } apply plugin: 'distribution' +apply plugin: 'project-report' + +htmlDependencyReport { + projects = project.allprojects +} allprojects { group 'org.apache.amaterasu' version '0.2.0-incubating-rc4' } -task copyLeagalFiles(type: Copy) { +task copyLegalFiles(type: Copy) { from "./DISCLAIMER", "./LICENSE", "./NOTICE" into "${buildDir}/amaterasu" } task buildHomeDir() { dependsOn subprojects.collect { getTasksByName('copyToHome', true) } - dependsOn copyLeagalFiles + dependsOn copyLegalFiles } distributions { diff --git a/frameworks/spark/dispatcher/build.gradle b/frameworks/spark/dispatcher/build.gradle index a95d958..a2bf9fe 100644 --- a/frameworks/spark/dispatcher/build.gradle +++ b/frameworks/spark/dispatcher/build.gradle @@ -39,6 +39,7 @@ dependencies { compile project(':common') compile project(':leader-common') compile project(':amaterasu-sdk') + compile 'com.uchuhimo:konf:0.11' } task copyToHomeBin(type: Copy) { diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala index ac442d5..79bd080 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala @@ -103,4 +103,6 @@ class SparkSetupProvider extends FrameworkSetupProvider { override def getRunnerProvider(runnerId: String): RunnerSetupProvider = { runnerProviders(runnerId) } + + override def getConfigurationItems = Array("sparkConfiguration", "sparkExecutor") } \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 6ffa237..1948b90 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 38185b0..a95009c 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,5 @@ -#Sun Jun 10 14:41:17 AEST 2018 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-bin.zip diff --git a/gradlew b/gradlew index 3efb0e9..cccdd3d 100755 --- a/gradlew +++ b/gradlew @@ -1,20 +1,4 @@ -#!/usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# +#!/usr/bin/env sh ############################################################################## ## @@ -49,11 +33,11 @@ DEFAULT_JVM_OPTS="" # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" -warn ( ) { +warn () { echo "$*" } -die ( ) { +die () { echo echo "$*" echo @@ -170,16 +154,19 @@ if $cygwin ; then esac fi -# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules -function splitJvmOpts() { - JVM_OPTS=("$@") +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " } -eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS -JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" # by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong -if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then cd "$(dirname "$0")" fi -exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index f664a3f..e95643d 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,21 +1,3 @@ -rem -rem -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. -rem - @if "%DEBUG%" == "" @echo off @rem ########################################################################## @rem diff --git a/leader-common/build.gradle b/leader-common/build.gradle index 3f3ac98..61b9309 100644 --- a/leader-common/build.gradle +++ b/leader-common/build.gradle @@ -14,21 +14,53 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +buildscript { + ext.kotlin_version = '1.2.60' + + repositories { + mavenCentral() + maven { + url 'http://repository.jetbrains.com/all' + } + maven { + url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" + } + } + + dependencies { + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" + classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0' + } +} + plugins { - id "com.github.johnrengelman.shadow" version "1.2.4" - id 'com.github.maiflai.scalatest' version '0.22' + id "com.github.johnrengelman.shadow" version "2.0.4" id 'scala' - id 'java' +} + +apply plugin: 'kotlin' +apply plugin: 'org.junit.platform.gradle.plugin' + +junitPlatform { + filters { + engines { + include 'spek' + } + } } sourceCompatibility = 1.8 targetCompatibility = 1.8 repositories { - maven { - url "https://plugins.gradle.org/m2/" - } + maven { url "https://plugins.gradle.org/m2/" } + maven { url 'http://repository.jetbrains.com/all' } + maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" } + maven { url "http://dl.bintray.com/jetbrains/spek" } + maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" } + mavenCentral() + jcenter() } dependencies { @@ -41,4 +73,43 @@ dependencies { compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4' compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4' compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4' -} \ No newline at end of file + + compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r' + + compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" + compile "org.jetbrains.kotlin:kotlin-reflect" + compile('com.uchuhimo:konf:0.11') { + exclude group: 'org.eclipse.jgit' + } + + testCompile 'org.jetbrains.spek:spek-api:1.1.5' + testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version" + testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5' + + // spek requires kotlin-reflect, can be omitted if already in the classpath + testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" + +} + +sourceSets { + test { + resources.srcDirs += [file('src/test/resources')] + } + +} + +compileKotlin { + kotlinOptions.jvmTarget = "1.8" +} +compileTestKotlin { + kotlinOptions.jvmTarget = "1.8" +} +// +//kotlin { +// experimental { +// coroutines 'enable' +// } +//} + +//task copyToHome() { +//} \ No newline at end of file diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt new file mode 100644 index 0000000..2f98fa6 --- /dev/null +++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.amaterasu.leader.common.configuration + +import com.uchuhimo.konf.Config +import com.uchuhimo.konf.source.yaml.toYaml +import java.io.File + +class ConfigManager(private val env: String, private val repoPath: String, private val frameworkItems: List<String> = emptyList()) { + + private val envFolder = "$repoPath/env/$env" + + // this is currently public for testing reasons, need to reconsider + var config: Config = Config { + addSpec(Job) + for (item in frameworkItems) { + val frameworkSpec = GenericSpec(item) + addSpec(frameworkSpec.spec) + } + } + + init { + for (file in File(envFolder).listFiles()) { + config = config.from.yaml.file(file) + println(config.toYaml.toText()) + } + } + + fun getActionConfiguration(action: String, path: String = ""): Config { + + val actionPath = if (path.isEmpty()) { + "$repoPath/src/$action/env/$env" + } else { + "$repoPath/$path" + .replace("{env}", env) + .replace("{action_name}", action) + } + + var result = config + + val configLocation = File(actionPath) + if (configLocation.exists()) { + if (configLocation.isDirectory) { + for (file in File(actionPath).listFiles()) { + result = config.from.yaml.file(file) + } + } else { + result = config.from.yaml.file(configLocation) + } + } + return result + } + + fun getActionConfigContent(action: String, path: String = ""): String { + return getActionConfiguration(action, path).toYaml.toText() + } +} \ No newline at end of file diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt new file mode 100644 index 0000000..ed6fa9e --- /dev/null +++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.amaterasu.leader.common.configuration + +import com.uchuhimo.konf.ConfigSpec +import com.uchuhimo.konf.OptionalItem + +class GenericSpec(configurationItem: String) { + val spec = ConfigSpec() + val items = OptionalItem(spec, configurationItem, emptyMap<String, String>()) +} \ No newline at end of file diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt new file mode 100644 index 0000000..076b0af --- /dev/null +++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.amaterasu.leader.common.configuration + +import com.uchuhimo.konf.ConfigSpec + +object Job : ConfigSpec("") { + val name by required<String>() + val master by required<String>() + val inputRootPath by required<String>() + val outputRootPath by required<String>() + val workingDir by required<String>() + val configuration by optional(emptyMap<String, String>()) +} \ No newline at end of file diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtil.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtil.kt new file mode 100644 index 0000000..d479e5f --- /dev/null +++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtil.kt @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.amaterasu.leader.common.dsl + +import org.eclipse.jgit.api.Git +import java.io.File + +object GitUtil { + @JvmStatic + fun cloneRepo(repoAddress: String, branch: String) { + Git.cloneRepository().apply { + setURI(repoAddress) + setDirectory(File("repo")) + setBranch(branch) + }.call().close() + } +} \ No newline at end of file diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt new file mode 100644 index 0000000..1aa729b --- /dev/null +++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.amaterasu.leader.common.configuration + + +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.* +import kotlin.test.assertEquals +import java.io.File + +object ConfigManagerTests : Spek({ + + val marker = this.javaClass.getResource("/maki.yml").path + + given("a ConfigManager for a job ") { + + val repoPath = "${File(marker).parent}/test_repo" + val cfg = ConfigManager("test", repoPath) + + it("loads the job level environment"){ + assertEquals(cfg.config[Job.master] , "yarn") + } + + on("getting an env for an action with default path") { + val startConf = cfg.getActionConfiguration("start") + it("loads the specific configuration defined in the actions folder"){ + assertEquals(startConf[Job.master] , "mesos") + } + } + + on("getting an env for an action with a conf: property in the maki.yml"){ + val step2conf = cfg.getActionConfiguration("step2", "src/{action_name}/{env}/") + + it("loads the specific configuration defined in the actions folder"){ + assertEquals(step2conf[Job.name] , "test2") + } + } + + on("getting an env for an action with no action level config"){ + val step3conf = cfg.getActionConfiguration("step3") + + it("loads only the job level conf"){ + assertEquals(step3conf[Job.name] , "test") + } + } + + on("receiving a path to a specific file" ){ + val step4conf = cfg.getActionConfiguration("step4", "src/start/env/{env}/job.yml") + + it("loads the specific configuration from the file"){ + assertEquals(step4conf[Job.master] , "mesos") + } + } + + + } + + given("a ConfigManager for a job with spark framework") { + + val repoPath = "${File(marker).parent}/spark_repo" + val cfg = ConfigManager("test", repoPath, listOf("sparkConfiguration")) + + it("load the framework configuration for spark"){ + val spark: Map<String, String> = cfg.config["sparkConfiguration"] + assertEquals(spark["spark.executor.memory"], "1g") + } + } +}) \ No newline at end of file diff --git a/leader-common/src/test/resources/maki.yml b/leader-common/src/test/resources/maki.yml new file mode 100644 index 0000000..576b7dd --- /dev/null +++ b/leader-common/src/test/resources/maki.yml @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +job-config: env/{env}/job.yml, env/{env}/passwords.yml, env/{env}/spark.yml +job-name: amaterasu-test +flow: + - name: start + runner: + group: spark + type: scala + exports: + odd: parquet + - name: step2 + config: src/{action_name}/{env}/ + runner: + group: spark + type: scala + file: file2.scala +... diff --git a/leader-common/src/test/resources/spark_repo/env/test/job.yml b/leader-common/src/test/resources/spark_repo/env/test/job.yml new file mode 100644 index 0000000..1f75ec6 --- /dev/null +++ b/leader-common/src/test/resources/spark_repo/env/test/job.yml @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +name: test +master: yarn +inputRootPath: /apps/amaterasu/input +outputRootPath: /apps/amaterasu/output +workingDir: /apps/amaterasu/work_dir +configuration: + spark.cassandra.connection.host: 127.0.0.1 + sourceTable: documents \ No newline at end of file diff --git a/leader-common/src/test/resources/spark_repo/env/test/spark.yml b/leader-common/src/test/resources/spark_repo/env/test/spark.yml new file mode 100644 index 0000000..85f1431 --- /dev/null +++ b/leader-common/src/test/resources/spark_repo/env/test/spark.yml @@ -0,0 +1,3 @@ +sparkConfiguration: + spark.executor.extraJavaOptions: -XX:+PrintGCDetails + spark.executor.memory: 1g \ No newline at end of file diff --git a/leader-common/src/test/resources/test_repo/env/test/job.yml b/leader-common/src/test/resources/test_repo/env/test/job.yml new file mode 100644 index 0000000..1f75ec6 --- /dev/null +++ b/leader-common/src/test/resources/test_repo/env/test/job.yml @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +name: test +master: yarn +inputRootPath: /apps/amaterasu/input +outputRootPath: /apps/amaterasu/output +workingDir: /apps/amaterasu/work_dir +configuration: + spark.cassandra.connection.host: 127.0.0.1 + sourceTable: documents \ No newline at end of file diff --git a/leader-common/src/test/resources/test_repo/src/start/env/test/job.yml b/leader-common/src/test/resources/test_repo/src/start/env/test/job.yml new file mode 100644 index 0000000..26cfeeb --- /dev/null +++ b/leader-common/src/test/resources/test_repo/src/start/env/test/job.yml @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +master: mesos +configuration: + name: yaniv \ No newline at end of file diff --git a/leader-common/src/test/resources/test_repo/src/step2/test/job.yml b/leader-common/src/test/resources/test_repo/src/step2/test/job.yml new file mode 100644 index 0000000..9203514 --- /dev/null +++ b/leader-common/src/test/resources/test_repo/src/step2/test/job.yml @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +name: test2 \ No newline at end of file diff --git a/leader/build.gradle b/leader/build.gradle index a0de6f5..114bbd3 100644 --- a/leader/build.gradle +++ b/leader/build.gradle @@ -19,6 +19,7 @@ plugins { id 'com.github.maiflai.scalatest' version '0.22' id 'scala' id 'java' + } sourceCompatibility = 1.8 @@ -42,19 +43,18 @@ dependencies { compile group: 'com.github.scopt', name: 'scopt_2.11', version: '3.3.0' compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0' compile group: 'org.apache.curator', name:'curator-test', version:'2.9.1' - compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4' - compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4' + compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.4' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.4' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.4' + compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.4' compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0' - compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r' - compile group: 'org.yaml', name: 'snakeyaml', version: '1.18' + compile group: 'org.yaml', name: 'snakeyaml', version: '1.23' compile group: 'commons-cli', name: 'commons-cli', version: '1.2' compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2' compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6' diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala index 73b9dc5..b2492bb 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala @@ -14,31 +14,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.leader.dsl - -import java.io.File - -import org.eclipse.jgit.api.Git - -import scala.reflect.io.Path +//package org.apache.amaterasu.leader.dsl +// +//import java.io.File +// +//import org.eclipse.jgit.api.Git +// +//import scala.reflect.io.Path /** * The GitUtil class handles getting the job git repository */ -object GitUtil { - - def cloneRepo(repoAddress: String, branch: String) = { - - val path = Path("repo") - path.deleteRecursively() - - //TODO: add authentication - Git.cloneRepository - .setURI(repoAddress) - .setDirectory(new File("repo")) - .setBranch(branch) - .call - - } - -} \ No newline at end of file +//object GitUtil { +// +// def cloneRepo(repoAddress: String, branch: String) = { +// +// val path = Path("repo") +// path.deleteRecursively() +// +// //TODO: add authentication +// val git = Git.cloneRepository +// .setURI(repoAddress) +// .setDirectory(new File("repo")) +// .setBranch(branch) +// .call +// +// git.close() +// } +// +//} \ No newline at end of file diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala index 80cc1bc..234070d 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala @@ -21,7 +21,8 @@ import java.util.concurrent.BlockingQueue import org.apache.amaterasu.common.configuration.enums.ActionStatus import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.common.logging.Logging -import org.apache.amaterasu.leader.dsl.{GitUtil, JobParser} +import org.apache.amaterasu.leader.common.dsl.GitUtil +import org.apache.amaterasu.leader.dsl.JobParser import org.apache.curator.framework.CuratorFramework import org.apache.zookeeper.CreateMode diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala index 3e1a67b..7630221 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala @@ -33,6 +33,7 @@ class FrameworkProvidersFactory { def getFramework(groupId: String): FrameworkSetupProvider = { providers(groupId) } + } object FrameworkProvidersFactory extends Logging { diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala index a6c8306..d68ae77 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala @@ -16,12 +16,14 @@ */ package org.apache.amaterasu.leader.mesos.schedulers +import java.io.{File, PrintWriter, StringWriter} import java.util import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} import java.util.{Collections, UUID} import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.configuration.enums.ActionStatus @@ -29,6 +31,7 @@ import org.apache.amaterasu.common.configuration.enums.ActionStatus.ActionStatus import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType} +import org.apache.amaterasu.leader.common.configuration.ConfigManager import org.apache.amaterasu.leader.common.utilities.DataLoader import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory import org.apache.amaterasu.leader.execution.{JobLoader, JobManager} @@ -55,6 +58,8 @@ class JobScheduler extends AmaterasuScheduler { private val version = props.getProperty("version") println(s"===> version $version")*/ LogManager.resetConfiguration() + private var frameworkFactory: FrameworkProvidersFactory = _ + private var configManager: ConfigManager = _ private var jobManager: JobManager = _ private var client: CuratorFramework = _ private var config: ClusterConfig = _ @@ -78,6 +83,9 @@ class JobScheduler extends AmaterasuScheduler { private val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) + private val yamlMapper = new ObjectMapper(new YAMLFactory()) + yamlMapper.registerModule(DefaultScalaModule) + def error(driver: SchedulerDriver, message: String) {} def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {} @@ -130,6 +138,7 @@ class JobScheduler extends AmaterasuScheduler { } def resourceOffers(driver: SchedulerDriver, offers: util.List[Offer]): Unit = { + for (offer <- offers.asScala) { if (validateOffer(offer)) { @@ -145,6 +154,18 @@ class JobScheduler extends AmaterasuScheduler { if (actionData != null) { val taskId = Protos.TaskID.newBuilder().setValue(actionData.id).build() + // setting up the configuration files for the container + val envYaml = configManager.getActionConfigContent(actionData.name, "") //TODO: replace with the value in actionData.config + writeConfigFile(envYaml, jobManager.jobId, actionData.name, "env.yaml") + + val dataStores = DataLoader.getTaskData(actionData, env).exports + val writer = new StringWriter() + yamlMapper.writeValue(writer, dataStores) + val dataStoresYaml = writer.toString + writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name, "datastores.yaml") + + writeConfigFile(s"jobId: ${jobManager.jobId}\nactionName: ${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml") + offersToTaskIds.put(offer.getId.getValue, taskId.getValue) // atomically adding a record for the slave, I'm storing all the actions @@ -154,7 +175,7 @@ class JobScheduler extends AmaterasuScheduler { val slaveActions = executionMap(offer.getSlaveId.toString) slaveActions.put(taskId.getValue, ActionStatus.started) - val frameworkFactory = FrameworkProvidersFactory.apply(env, config) + val frameworkProvider = frameworkFactory.providers(actionData.groupId) val runnerProvider = frameworkProvider.getRunnerProvider(actionData.typeId) @@ -182,6 +203,27 @@ class JobScheduler extends AmaterasuScheduler { .setExtract(false) .build()) + // Getting env.yaml + command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml") + .setExecutable(false) + .setExtract(true) + .build()) + + // Getting datastores.yaml + command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml") + .setExecutable(false) + .setExtract(true) + .build()) + + // Getting runtime.yaml + command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml") + .setExecutable(false) + .setExtract(true) + .build()) + // Getting framework resources frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}") @@ -198,7 +240,7 @@ class JobScheduler extends AmaterasuScheduler { command .addUris(URI.newBuilder() - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side .setExecutable(true) .setExtract(false) .build()) @@ -207,6 +249,7 @@ class JobScheduler extends AmaterasuScheduler { .setExecutable(false) .setExtract(false) .build()) + executor = ExecutorInfo .newBuilder .setData(ByteString.copyFrom(execData)) @@ -239,6 +282,9 @@ class JobScheduler extends AmaterasuScheduler { else if (jobManager.outOfActions) { log.info(s"framework ${jobManager.jobId} execution finished") + val repo = new File("repo/") + repo.delete() + HttpServer.stop() driver.declineOffer(offer.getId) driver.stop() @@ -284,8 +330,15 @@ class JobScheduler extends AmaterasuScheduler { ) } + + frameworkFactory = FrameworkProvidersFactory(env, config) + val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava + configManager = new ConfigManager(env, "repo", items) + jobManager.start() + createJobDir(jobManager.jobId) + } def reregistered(driver: SchedulerDriver, masterInfo: Protos.MasterInfo) {} @@ -310,6 +363,42 @@ class JobScheduler extends AmaterasuScheduler { } } + + private def createJobDir(jobId: String): Unit = { + val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath) + val amaHome = new File(jarFile.getParent).getParent + val jobDir = s"$amaHome/dist/$jobId/" + + val dir = new File(jobDir) + if (!dir.exists()) { + dir.mkdir() + } + } + + /** + * This function creates an action specific env.yml file int the dist folder with the following path: + * dist/{jobId}/{actionName}/env.yml to be added to the container + * + * @param configuration A YAML string to be written to the env file + * @param jobId the jobId + * @param actionName the name of the action + */ + def writeConfigFile(configuration: String, jobId: String, actionName: String, fileName: String): Unit = { + val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath) + val amaHome = new File(jarFile.getParent).getParent + val envLocation = s"$amaHome/dist/$jobId/$actionName/" + + val dir = new File(envLocation) + if (!dir.exists()) { + dir.mkdir() + } + + + new PrintWriter(s"$envLocation/$fileName") { + write(configuration) + close + } + } } object JobScheduler { @@ -343,6 +432,7 @@ object JobScheduler { scheduler.client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy) scheduler.client.start() scheduler.config = config + scheduler } diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala index e28d99f..1bbaa15 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala @@ -89,6 +89,8 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { val stat = fs.getFileStatus(path) val fileResource = Records.newRecord(classOf[LocalResource]) + fileResource.setShouldBeUploadedToSharedCache(true) + fileResource.setVisibility(LocalResourceVisibility.PUBLIC) fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path)) fileResource.setSize(stat.getLen) fileResource.setTimestamp(stat.getModificationTime) diff --git a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala b/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala index cef7cb0..feccfcd 100755 --- a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala +++ b/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala @@ -16,7 +16,7 @@ */ package org.apache.amaterasu.integration -import org.apache.amaterasu.leader.dsl.GitUtil +import org.apache.amaterasu.leader.common.dsl.GitUtil import org.scalatest.{FlatSpec, Matchers} import scala.reflect.io.Path diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java index 07a28b1..b676c1c 100644 --- a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java +++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java @@ -33,4 +33,6 @@ RunnerSetupProvider getRunnerProvider(String runnerId); + String[] getConfigurationItems(); + } \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services