This is an automated email from the ASF dual-hosted git repository. yaniv pushed a commit to branch revert-27-RC4-RefactorFramework in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git
commit 4e21ef88d17ab36554bf8f89977539f0b36addf9 Author: Yaniv Rodenski <[email protected]> AuthorDate: Sun Jul 1 10:18:46 2018 +1000 Revert "AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own project" --- build.gradle | 4 + .../common/execution/actions/Notifier.scala | 4 +- executor/build.gradle | 14 ++- .../runners/spark/PySpark}/PySparkEntryPoint.java | 11 +- .../src/main}/resources/codegen.py | 0 .../src/main}/resources/runtime.py | 0 .../main}/resources/spark-version-info.properties | 0 .../src/main/resources/spark_intp.py | 0 .../spark/PySpark}/PySparkExecutionQueue.scala | 2 +- .../spark/PySpark}/PySparkResultQueue.scala | 4 +- .../runners/spark/PySpark}/PySparkRunner.scala | 10 +- .../runners/spark/PySpark}/ResultQueue.scala | 2 +- .../actions/runners/spark}/SparkRRunner.scala | 28 ++--- .../runners/spark}/SparkRunnersProvider.scala | 10 +- .../runners/spark/SparkSql}/SparkSqlRunner.scala | 9 +- .../mesos/executors/MesosActionsExecutor.scala | 4 +- .../amaterasu/executor}/runtime/AmaContext.scala | 8 +- .../executor/yarn/executors/ActionsExecutor.scala | 8 +- .../spark/repl/amaterasu}/AmaSparkILoop.scala | 2 +- .../runners/spark}/SparkRunnerHelper.scala | 14 +-- .../runners/spark}/SparkScalaRunner.scala | 6 +- .../resources/SparkSql/csv/SparkSqlTestCsv.csv | 0 .../resources/SparkSql/json/SparkSqlTestData.json | 0 ...548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc | Bin ...548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc | Bin .../src/test/resources/SparkSql/parquet/_SUCCESS | 0 .../resources/SparkSql/parquet/_common_metadata | Bin .../src/test/resources/SparkSql/parquet/_metadata | Bin ...c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet | Bin ...c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet | Bin .../src/test/resources/amaterasu.properties | 0 .../src/test}/resources/codegen.py | 0 .../src/test/resources/py4j-0.10.4-src.zip | Bin .../src/test/resources/py4j.tar.gz | Bin .../src/test/resources/pyspark-with-amacontext.py | 0 .../src/test/resources/pyspark.tar.gz | Bin .../src/test/resources/pyspark.zip | Bin .../src/test}/resources/runtime.py | 0 .../src/test/resources/simple-pyspark.py | 0 .../src/test/resources/simple-python-err.py | 0 .../src/test/resources/simple-python.py | 0 .../src/test/resources/simple-spark.scala | 5 +- .../src/test/resources/spark_intp.py | 0 .../src/test/resources/step-2.scala | 3 +- ...c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet | Bin ...c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet | Bin .../RunnersTests}/RunnersLoadingTests.scala | 2 +- .../amaterasu/spark}/PySparkRunnerTests.scala | 3 +- .../amaterasu/spark}/SparkScalaRunnerTests.scala | 9 +- .../amaterasu/spark}/SparkSqlRunnerTests.scala | 9 +- .../apache/amaterasu}/spark/SparkTestsSuite.scala | 10 +- .../apache/amaterasu/utilities/TestNotifier.scala | 0 frameworks/spark/runner/build.gradle | 118 --------------------- .../main/resources/spark-version-info.properties | 11 -- frameworks/spark/runtime/build.gradle | 89 ---------------- gradle/wrapper/gradle-wrapper.properties | 4 +- gradlew | 16 +++ gradlew.bat | 18 ++++ .../apache/amaterasu/leader/yarn/ArgsParser.java | 6 +- .../org/apache/amaterasu/leader/yarn/Client.java | 23 +--- .../apache/amaterasu/leader/dsl/JobParser.scala | 4 +- .../frameworks/spark/SparkSetupProvider.scala | 1 + .../mesos/schedulers/AmaterasuScheduler.scala | 1 + .../leader/mesos/schedulers/JobScheduler.scala | 5 +- .../leader/utilities/ActiveReportListener.scala | 2 + .../amaterasu/leader/utilities/HttpServer.scala | 7 +- .../amaterasu/leader/yarn/ApplicationMaster.scala | 39 +++---- .../leader/yarn/YarnRMCallbackHandler.scala | 5 +- leader/src/main/scripts/ama-start-mesos.sh | 4 +- leader/src/main/scripts/ama-start-yarn.sh | 4 +- .../leader/mesos/ClusterSchedulerTests.scala | 2 +- .../amaterasu/utilities/HttpServerTests.scala | 6 ++ settings.gradle | 14 +-- 73 files changed, 192 insertions(+), 368 deletions(-) diff --git a/build.gradle b/build.gradle index 00e44ea..0f11347 100644 --- a/build.gradle +++ b/build.gradle @@ -25,6 +25,10 @@ allprojects { version '0.2.0-incubating-rc4' } +project(':leader') +project(':common') +project(':executor') + task copyLeagalFiles(type: Copy) { from "./DISCLAIMER", "./LICENSE", "./NOTICE" into "${buildDir}/amaterasu" diff --git a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala index fe69260..8a44019 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala @@ -16,9 +16,9 @@ */ package org.apache.amaterasu.common.execution.actions +import NotificationLevel.NotificationLevel +import NotificationType.NotificationType import com.fasterxml.jackson.annotation.JsonProperty -import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel -import org.apache.amaterasu.common.execution.actions.NotificationType.NotificationType abstract class Notifier { diff --git a/executor/build.gradle b/executor/build.gradle index 09e269c..21bc2b0 100644 --- a/executor/build.gradle +++ b/executor/build.gradle @@ -54,6 +54,7 @@ dependencies { compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8' compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8' + compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8' compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final' compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5' compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5' @@ -74,7 +75,18 @@ dependencies { compile project(':common') compile project(':amaterasu-sdk') - + //runtime dependency for spark + provided('org.apache.spark:spark-repl_2.11:2.2.1') + provided('org.apache.spark:spark-core_2.11:2.2.1') + + testCompile project(':common') + testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14" + testRuntime 'org.pegdown:pegdown:1.1.0' + testCompile 'junit:junit:4.11' + testCompile 'org.scalatest:scalatest_2.11:3.0.2' + testCompile 'org.scala-lang:scala-library:2.11.8' + testCompile('org.apache.spark:spark-repl_2.11:2.2.1') + testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9' } diff --git a/frameworks/spark/runner/src/main/java/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkEntryPoint.java b/executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java similarity index 92% rename from frameworks/spark/runner/src/main/java/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkEntryPoint.java rename to executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java index 6b79b2f..a521fce 100755 --- a/frameworks/spark/runner/src/main/java/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkEntryPoint.java +++ b/executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java @@ -14,14 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.pyspark; +package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark; +import org.apache.amaterasu.executor.runtime.AmaContext; import org.apache.amaterasu.common.runtime.Environment; -import org.apache.amaterasu.framework.spark.runtime.AmaContext; -import org.apache.spark.SparkConf; + import org.apache.spark.SparkEnv; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; + import org.apache.spark.sql.SparkSession; import py4j.GatewayServer; @@ -32,7 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; public class PySparkEntryPoint { //private static Boolean started = false; - private static PySparkExecutionQueue queue = new PySparkExecutionQueue(); + private static PySparkExecutionQueue queue = new PySparkExecutionQueue(); private static ConcurrentHashMap<String, ResultQueue> resultQueues = new ConcurrentHashMap<>(); private static int port = 0; diff --git a/frameworks/spark/runner/src/test/resources/codegen.py b/executor/src/main/resources/codegen.py similarity index 100% rename from frameworks/spark/runner/src/test/resources/codegen.py rename to executor/src/main/resources/codegen.py diff --git a/frameworks/spark/runner/src/test/resources/runtime.py b/executor/src/main/resources/runtime.py similarity index 100% rename from frameworks/spark/runner/src/test/resources/runtime.py rename to executor/src/main/resources/runtime.py diff --git a/frameworks/spark/runner/src/test/resources/spark-version-info.properties b/executor/src/main/resources/spark-version-info.properties similarity index 100% rename from frameworks/spark/runner/src/test/resources/spark-version-info.properties rename to executor/src/main/resources/spark-version-info.properties diff --git a/frameworks/spark/runner/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py similarity index 100% rename from frameworks/spark/runner/src/main/resources/spark_intp.py rename to executor/src/main/resources/spark_intp.py diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkExecutionQueue.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala similarity index 94% rename from frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkExecutionQueue.scala rename to executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala index ddcf66c..411069a 100755 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkExecutionQueue.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.pyspark +package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark import java.util import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkResultQueue.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala similarity index 85% rename from frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkResultQueue.scala rename to executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala index 16abbe3..6dbd445 100755 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkResultQueue.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.pyspark +package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark -import org.apache.amaterasu.framework.spark.runner.pyspark.ResultType.ResultType +import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.ResultType.ResultType object ResultType extends Enumeration { type ResultType = Value diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala old mode 100644 new mode 100755 similarity index 95% rename from frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunner.scala rename to executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala index c015ec5..79fe18a --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunner.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.pyspark +package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark import java.io.File import java.util @@ -128,7 +128,7 @@ object PySparkRunner { } else { sparkCmd = Seq(pysparkPath, intpPath, port.toString) - } + } var pysparkPython = "/usr/bin/python" if (pyDeps != null && @@ -136,9 +136,9 @@ object PySparkRunner { pysparkPython = "./miniconda/bin/python" } val proc = Process(sparkCmd, None, - "PYTHONPATH" -> pypath, - "PYSPARK_PYTHON" -> pysparkPython, - "PYTHONHASHSEED" -> 0.toString) + "PYTHONPATH" -> pypath, + "PYSPARK_PYTHON" -> pysparkPython, + "PYTHONHASHSEED" -> 0.toString) proc.run(shellLoger) } diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/ResultQueue.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala similarity index 94% rename from frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/ResultQueue.scala rename to executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala index d0cb4ae..3ac7bd7 100755 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/ResultQueue.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.pyspark +package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparkr/SparkRRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala similarity index 69% rename from frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparkr/SparkRRunner.scala rename to executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala index 390b06a..d111cfb 100644 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparkr/SparkRRunner.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.sparkr +package org.apache.amaterasu.executor.execution.actions.runners.spark import java.io.ByteArrayOutputStream import java.util @@ -28,21 +28,21 @@ import org.apache.spark.SparkContext class SparkRRunner extends Logging with AmaterasuRunner { - override def getIdentifier = "spark-r" + override def getIdentifier = "spark-r" - override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = { - } + override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = { + } } object SparkRRunner { - def apply( - env: Environment, - jobId: String, - sparkContext: SparkContext, - outStream: ByteArrayOutputStream, - notifier: Notifier, - jars: Seq[String] - ): SparkRRunner = { - new SparkRRunner() - } + def apply( + env: Environment, + jobId: String, + sparkContext: SparkContext, + outStream: ByteArrayOutputStream, + notifier: Notifier, + jars: Seq[String] + ): SparkRRunner = { + new SparkRRunner() + } } \ No newline at end of file diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala similarity index 93% rename from frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/SparkRunnersProvider.scala rename to executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala index 652f32b..ba7ff03 100644 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/SparkRunnersProvider.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner +package org.apache.amaterasu.executor.execution.actions.runners.spark import java.io._ @@ -24,10 +24,10 @@ import org.apache.amaterasu.common.dataobjects.ExecData import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage} import org.apache.amaterasu.common.logging.Logging -import org.apache.amaterasu.framework.spark.runner.pyspark.PySparkRunner -import org.apache.amaterasu.framework.spark.runner.repl.{SparkRunnerHelper, SparkScalaRunner} -import org.apache.amaterasu.framework.spark.runner.sparksql.SparkSqlRunner +import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.PySparkRunner +import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider} +import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner} import org.eclipse.aether.util.artifact.JavaScopes import org.sonatype.aether.repository.RemoteRepository import org.sonatype.aether.util.artifact.DefaultArtifact @@ -113,7 +113,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging { this.clusterConfig.mode match { case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger - case "mesos" => Seq("sh", "miniconda.sh", "-b", "-p", "miniconda") ! shellLoger + case "mesos" => Seq("sh", "Miniconda2-latest-Linux-x86_64.sh", "-b", "-p", "miniconda") ! shellLoger } Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala similarity index 96% rename from frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunner.scala rename to executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala index 62af197..350ddb4 100644 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunner.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.sparksql +package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql import java.io.File import java.util @@ -22,11 +22,10 @@ import java.util import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.common.runtime.Environment -import org.apache.amaterasu.framework.spark.runtime.AmaContext +import org.apache.amaterasu.executor.runtime.AmaContext import org.apache.amaterasu.sdk.AmaterasuRunner import org.apache.commons.io.FilenameUtils import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} - import scala.collection.JavaConverters._ /** @@ -102,8 +101,8 @@ class SparkSqlRunner extends Logging with AmaterasuRunner { try{ - result = spark.sql(parsedQuery) - notifier.success(parsedQuery) + result = spark.sql(parsedQuery) + notifier.success(parsedQuery) } catch { case e: Exception => notifier.error(parsedQuery, e.getMessage) } diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala index 90c2001..9ab75be 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala @@ -26,6 +26,7 @@ import org.apache.amaterasu.executor.common.executors.ProvidersFactory import org.apache.mesos.Protos._ import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver} +import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global @@ -36,6 +37,7 @@ class MesosActionsExecutor extends Executor with Logging { var master: String = _ var executorDriver: ExecutorDriver = _ + var sc: SparkContext = _ var jobId: String = _ var actionName: String = _ // var sparkScalaRunner: SparkScalaRunner = _ @@ -81,7 +83,7 @@ class MesosActionsExecutor extends Executor with Logging { notifier = new MesosNotifier(driver) notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered") val outStream = new ByteArrayOutputStream() - providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, "./amaterasu.properties") + providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, propFile = "./amaterasu.properties") } diff --git a/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/framework/spark/runtime/AmaContext.scala b/executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala old mode 100644 new mode 100755 similarity index 93% rename from frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/framework/spark/runtime/AmaContext.scala rename to executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala index cb2eccc..a61cd5a --- a/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/framework/spark/runtime/AmaContext.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runtime +package org.apache.amaterasu.executor.runtime import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.common.runtime.Environment import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SparkSession} +import org.apache.spark.sql._ object AmaContext extends Logging { @@ -40,11 +40,15 @@ object AmaContext extends Logging { } def getDataFrame(actionName: String, dfName: String, format: String = "parquet"): DataFrame = { + spark.read.format(format).load(s"${env.workingDir}/$jobId/$actionName/$dfName") + } def getDataset[T: Encoder](actionName: String, dfName: String, format: String = "parquet"): Dataset[T] = { + getDataFrame(actionName, dfName, format).as[T] + } } diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala index b5f8700..f4f553c 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala @@ -19,18 +19,24 @@ package org.apache.amaterasu.executor.yarn.executors import java.io.ByteArrayOutputStream import java.net.{InetAddress, URLDecoder} +import scala.collection.JavaConverters._ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData} import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.executor.common.executors.{ActiveNotifier, ProvidersFactory} +import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.spark.SparkContext -import scala.collection.JavaConverters._ +import scala.reflect.internal.util.ScalaClassLoader +import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader class ActionsExecutor extends Logging { var master: String = _ + var sc: SparkContext = _ var jobId: String = _ var actionName: String = _ var taskData: TaskData = _ diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/AmaSparkILoop.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala similarity index 95% rename from frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/AmaSparkILoop.scala rename to executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala index ec874b6..19ef3de 100755 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/AmaSparkILoop.scala +++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.repl +package org.apache.spark.repl.amaterasu import java.io.PrintWriter diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala similarity index 94% rename from frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkRunnerHelper.scala rename to executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala index 18658ec..f2c2afa 100644 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkRunnerHelper.scala +++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala @@ -14,18 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.repl +package org.apache.spark.repl.amaterasu.runners.spark import java.io.{ByteArrayOutputStream, File, PrintWriter} -import java.nio.file.{Files, Paths} import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.common.runtime.Environment import org.apache.amaterasu.common.utils.FileUtils -import org.apache.spark.SparkConf +import org.apache.spark.repl.amaterasu.AmaSparkILoop import org.apache.spark.sql.SparkSession +import org.apache.spark.util.Utils +import org.apache.spark.SparkConf import scala.tools.nsc.GenericRunnerSettings import scala.tools.nsc.interpreter.IMain @@ -33,9 +34,8 @@ import scala.tools.nsc.interpreter.IMain object SparkRunnerHelper extends Logging { private val conf = new SparkConf() - private val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir")) - private val outputDir = Files.createTempDirectory(Paths.get(rootDir), "repl").toFile - outputDir.deleteOnExit() + private val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) + private val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") private var sparkSession: SparkSession = _ @@ -145,7 +145,7 @@ object SparkRunnerHelper extends Logging { case "yarn" => conf.set("spark.home", config.spark.home) // TODO: parameterize those - .setJars(Seq("executor.jar", "spark-runner.jar", "spark-runtime.jar") ++ jars) + .setJars(s"executor.jar" +: jars) .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab") .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64") .set("spark.yarn.queue", "default") diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunner.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala similarity index 97% rename from frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunner.scala rename to executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala index 46d3077..56a04cf 100755 --- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunner.scala +++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.repl +package org.apache.spark.repl.amaterasu.runners.spark import java.io.ByteArrayOutputStream import java.util @@ -22,7 +22,7 @@ import java.util import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.common.runtime.Environment -import org.apache.amaterasu.framework.spark.runtime.AmaContext +import org.apache.amaterasu.executor.runtime.AmaContext import org.apache.amaterasu.sdk.AmaterasuRunner import org.apache.spark.sql.{Dataset, SparkSession} @@ -142,7 +142,7 @@ class SparkScalaRunner(var env: Environment, interpreter.interpret("import org.apache.spark.sql.SQLContext") interpreter.interpret("import org.apache.spark.sql.{ Dataset, SparkSession }") interpreter.interpret("import org.apache.spark.sql.SaveMode") - interpreter.interpret("import org.apache.amaterasu.framework.spark.runtime.AmaContext") + interpreter.interpret("import org.apache.amaterasu.executor.runtime.AmaContext") interpreter.interpret("import org.apache.amaterasu.common.runtime.Environment") // creating a map (_contextStore) to hold the different spark contexts diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv b/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv similarity index 100% rename from frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv rename to executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json b/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json similarity index 100% rename from frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json rename to executor/src/test/resources/SparkSql/json/SparkSqlTestData.json diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc similarity index 100% rename from frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc rename to executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc similarity index 100% rename from frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc rename to executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS b/executor/src/test/resources/SparkSql/parquet/_SUCCESS similarity index 100% rename from frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS rename to executor/src/test/resources/SparkSql/parquet/_SUCCESS diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata b/executor/src/test/resources/SparkSql/parquet/_common_metadata similarity index 100% rename from frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata rename to executor/src/test/resources/SparkSql/parquet/_common_metadata diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata b/executor/src/test/resources/SparkSql/parquet/_metadata similarity index 100% rename from frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata rename to executor/src/test/resources/SparkSql/parquet/_metadata diff --git a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet similarity index 100% rename from frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet rename to executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet diff --git a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet similarity index 100% rename from frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet rename to executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet diff --git a/frameworks/spark/runner/src/test/resources/amaterasu.properties b/executor/src/test/resources/amaterasu.properties similarity index 100% rename from frameworks/spark/runner/src/test/resources/amaterasu.properties rename to executor/src/test/resources/amaterasu.properties diff --git a/frameworks/spark/runner/src/main/resources/codegen.py b/executor/src/test/resources/codegen.py similarity index 100% rename from frameworks/spark/runner/src/main/resources/codegen.py rename to executor/src/test/resources/codegen.py diff --git a/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip b/executor/src/test/resources/py4j-0.10.4-src.zip similarity index 100% rename from frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip rename to executor/src/test/resources/py4j-0.10.4-src.zip diff --git a/frameworks/spark/runner/src/test/resources/py4j.tar.gz b/executor/src/test/resources/py4j.tar.gz similarity index 100% rename from frameworks/spark/runner/src/test/resources/py4j.tar.gz rename to executor/src/test/resources/py4j.tar.gz diff --git a/frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py b/executor/src/test/resources/pyspark-with-amacontext.py similarity index 100% rename from frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py rename to executor/src/test/resources/pyspark-with-amacontext.py diff --git a/frameworks/spark/runner/src/test/resources/pyspark.tar.gz b/executor/src/test/resources/pyspark.tar.gz similarity index 100% rename from frameworks/spark/runner/src/test/resources/pyspark.tar.gz rename to executor/src/test/resources/pyspark.tar.gz diff --git a/frameworks/spark/runner/src/test/resources/pyspark.zip b/executor/src/test/resources/pyspark.zip similarity index 100% rename from frameworks/spark/runner/src/test/resources/pyspark.zip rename to executor/src/test/resources/pyspark.zip diff --git a/frameworks/spark/runner/src/main/resources/runtime.py b/executor/src/test/resources/runtime.py similarity index 100% rename from frameworks/spark/runner/src/main/resources/runtime.py rename to executor/src/test/resources/runtime.py diff --git a/frameworks/spark/runner/src/test/resources/simple-pyspark.py b/executor/src/test/resources/simple-pyspark.py similarity index 100% rename from frameworks/spark/runner/src/test/resources/simple-pyspark.py rename to executor/src/test/resources/simple-pyspark.py diff --git a/frameworks/spark/runner/src/test/resources/simple-python-err.py b/executor/src/test/resources/simple-python-err.py similarity index 100% rename from frameworks/spark/runner/src/test/resources/simple-python-err.py rename to executor/src/test/resources/simple-python-err.py diff --git a/frameworks/spark/runner/src/test/resources/simple-python.py b/executor/src/test/resources/simple-python.py similarity index 100% rename from frameworks/spark/runner/src/test/resources/simple-python.py rename to executor/src/test/resources/simple-python.py diff --git a/frameworks/spark/runner/src/test/resources/simple-spark.scala b/executor/src/test/resources/simple-spark.scala similarity index 83% rename from frameworks/spark/runner/src/test/resources/simple-spark.scala rename to executor/src/test/resources/simple-spark.scala index f2e49fd..a11a458 100755 --- a/frameworks/spark/runner/src/test/resources/simple-spark.scala +++ b/executor/src/test/resources/simple-spark.scala @@ -14,7 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.amaterasu.executor.runtime.AmaContext +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} val data = Seq(1,3,4,5,6) @@ -22,6 +23,8 @@ val data = Seq(1,3,4,5,6) val sc = AmaContext.sc val rdd = sc.parallelize(data) val sqlContext = AmaContext.spark + +import sqlContext.implicits._ val x: DataFrame = rdd.toDF() x.write.mode(SaveMode.Overwrite) \ No newline at end of file diff --git a/frameworks/spark/runner/src/test/resources/spark_intp.py b/executor/src/test/resources/spark_intp.py similarity index 100% rename from frameworks/spark/runner/src/test/resources/spark_intp.py rename to executor/src/test/resources/spark_intp.py diff --git a/frameworks/spark/runner/src/test/resources/step-2.scala b/executor/src/test/resources/step-2.scala similarity index 94% rename from frameworks/spark/runner/src/test/resources/step-2.scala rename to executor/src/test/resources/step-2.scala index 86fd048..a3d034c 100755 --- a/frameworks/spark/runner/src/test/resources/step-2.scala +++ b/executor/src/test/resources/step-2.scala @@ -1,5 +1,3 @@ - - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -16,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import org.apache.amaterasu.executor.runtime.AmaContext val highNoDf = AmaContext.getDataFrame("start", "x").where("age > 20") diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet similarity index 100% rename from frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet rename to executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet similarity index 100% rename from frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet rename to executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/RunnersLoadingTests.scala b/executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala similarity index 96% rename from frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/RunnersLoadingTests.scala rename to executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala index 3629674..2decb9c 100644 --- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/RunnersLoadingTests.scala +++ b/executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner +package org.apache.amaterasu.RunnersTests import org.apache.amaterasu.common.runtime.Environment import org.apache.amaterasu.executor.common.executors.ProvidersFactory diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala similarity index 96% rename from frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunnerTests.scala rename to executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala index a320e56..f12d676 100755 --- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunnerTests.scala +++ b/executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala @@ -14,11 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.pyspark +package org.apache.amaterasu.spark import java.io.File import org.apache.amaterasu.executor.common.executors.ProvidersFactory +import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.PySparkRunner import org.apache.log4j.{Level, Logger} import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers} diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala similarity index 92% rename from frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunnerTests.scala rename to executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala index 26f2ceb..1d79fc9 100755 --- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunnerTests.scala +++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala @@ -14,13 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.repl +package org.apache.amaterasu.spark + +import scala.collection.JavaConverters._ import org.apache.amaterasu.executor.common.executors.ProvidersFactory -import org.apache.amaterasu.framework.spark.runtime.AmaContext +import org.apache.amaterasu.executor.runtime.AmaContext +import org.apache.spark.repl.amaterasu.runners.spark.SparkScalaRunner import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers} -import scala.collection.JavaConverters._ import scala.io.Source @DoNotDiscover @@ -31,7 +33,6 @@ class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAl "SparkScalaRunner" should "execute the simple-spark.scala" in { - val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner] val script = getClass.getResource("/simple-spark.scala").getPath val sourceCode = Source.fromFile(script).getLines().mkString("\n") diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala similarity index 96% rename from frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunnerTests.scala rename to executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala index abb5745..90cf73b 100644 --- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunnerTests.scala +++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala @@ -14,17 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark.runner.sparksql +package org.apache.amaterasu.spark import org.apache.amaterasu.common.runtime.Environment import org.apache.amaterasu.executor.common.executors.ProvidersFactory +import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner import org.apache.amaterasu.utilities.TestNotifier -import org.apache.log4j.{Level, Logger} +import org.apache.log4j.Logger +import org.apache.log4j.Level import org.apache.spark.sql.{SaveMode, SparkSession} import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers} import scala.collection.JavaConverters._ +/** + * Created by kirupa on 10/12/16. + */ @DoNotDiscover class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll { diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/SparkTestsSuite.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala similarity index 90% rename from frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/SparkTestsSuite.scala rename to executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala index 0214568..b11a4f9 100644 --- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/SparkTestsSuite.scala +++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala @@ -14,22 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.framework.spark +package org.apache.amaterasu.spark import java.io.{ByteArrayOutputStream, File} +import org.apache.amaterasu.RunnersTests.RunnersLoadingTests import org.apache.amaterasu.common.dataobjects.ExecData import org.apache.amaterasu.common.execution.dependencies._ import org.apache.amaterasu.common.runtime.Environment import org.apache.amaterasu.executor.common.executors.ProvidersFactory -import org.apache.amaterasu.framework.spark.runner.RunnersLoadingTests -import org.apache.amaterasu.framework.spark.runner.pyspark.PySparkRunnerTests -import org.apache.amaterasu.framework.spark.runner.repl.{SparkScalaRunner, SparkScalaRunnerTests} -import org.apache.amaterasu.framework.spark.runner.sparksql.SparkSqlRunnerTests import org.apache.amaterasu.utilities.TestNotifier +import org.apache.spark.repl.amaterasu.runners.spark.SparkScalaRunner import org.apache.spark.sql.SparkSession import org.scalatest._ + + import scala.collection.mutable.ListBuffer diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala b/executor/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala similarity index 100% rename from frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala rename to executor/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala diff --git a/frameworks/spark/runner/build.gradle b/frameworks/spark/runner/build.gradle deleted file mode 100644 index cc6c902..0000000 --- a/frameworks/spark/runner/build.gradle +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -plugins { - id 'com.github.johnrengelman.shadow' version '1.2.4' - id 'com.github.maiflai.scalatest' version '0.6-5-g9065d91' - id 'scala' - id 'java' -} - -shadowJar { - zip64 true -} - -repositories { - maven { - url "https://plugins.gradle.org/m2/" - } - mavenCentral() -} - -test { - maxParallelForks = 1 - forkEvery = 1 -} - -configurations { - provided -} - -sourceSets { - main.compileClasspath += configurations.provided - test.compileClasspath += configurations.provided - test.runtimeClasspath += configurations.provided -} - -dependencies { - - compile project(':executor') - compile project(':spark-runtime') - compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8' - compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8' - compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8' - compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final' - compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5' - compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5' - compile group: 'org.reflections', name: 'reflections', version: '0.9.10' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.5' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.5' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.5' - compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.5' - compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0' - - compile('com.jcabi:jcabi-aether:0.10.1') { - exclude group: 'org.jboss.netty' - } - compile('org.apache.activemq:activemq-client:5.15.2') { - exclude group: 'org.jboss.netty' - } - - //compile project(':common') - //compile project(':amaterasu-sdk') - - //runtime dependency for spark - provided('org.apache.spark:spark-repl_2.11:2.2.1') - provided('org.apache.spark:spark-core_2.11:2.2.1') - - testCompile project(':common') - testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14" - testRuntime 'org.pegdown:pegdown:1.1.0' - testCompile 'junit:junit:4.11' - testCompile 'org.scalatest:scalatest_2.11:3.0.2' - testCompile 'org.scala-lang:scala-library:2.11.8' - testCompile('org.apache.spark:spark-repl_2.11:2.2.1') - testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9' - -} - -sourceSets { - test { - resources.srcDirs += [file('src/test/resources')] - } - - main { - scala { - srcDirs = ['src/main/scala', 'src/main/java'] - } - java { - srcDirs = [] - } - } -} - -test { - - maxParallelForks = 1 -} - -task copyToHome(type: Copy) { - dependsOn shadowJar - from 'build/libs' - into '../../../build/amaterasu/dist' - from 'build/resources/main' - into '../../../build/amaterasu/dist' -} diff --git a/frameworks/spark/runner/src/main/resources/spark-version-info.properties b/frameworks/spark/runner/src/main/resources/spark-version-info.properties deleted file mode 100644 index ce0b312..0000000 --- a/frameworks/spark/runner/src/main/resources/spark-version-info.properties +++ /dev/null @@ -1,11 +0,0 @@ -version=2.1.0-SNAPSHOT - -user=root - -revision=738b4cc548ca48c010b682b8bc19a2f7e1947cfe - -branch=master - -date=2016-07-27T11:23:21Z - -url=https://github.com/apache/spark.git diff --git a/frameworks/spark/runtime/build.gradle b/frameworks/spark/runtime/build.gradle deleted file mode 100644 index 9bba2e4..0000000 --- a/frameworks/spark/runtime/build.gradle +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -plugins { - id 'com.github.johnrengelman.shadow' version '1.2.4' - id 'com.github.maiflai.scalatest' version '0.6-5-g9065d91' - id 'scala' - id 'java' -} - -shadowJar { - zip64 true -} - -repositories { - maven { - url "https://plugins.gradle.org/m2/" - } - mavenCentral() -} - -test { - maxParallelForks = 1 - forkEvery = 1 -} - -configurations { - provided - runtime.exclude module: 'hadoop-common' - runtime.exclude module: 'hadoop-yarn-api' - runtime.exclude module: 'hadoop-yarn-client' - runtime.exclude module: 'hadoop-hdfs' - runtime.exclude module: 'mesos' - runtime.exclude module: 'scala-compiler' -} - -sourceSets { - main.compileClasspath += configurations.provided - test.compileClasspath += configurations.provided - test.runtimeClasspath += configurations.provided -} - -dependencies { - - compile project(':executor') - provided('org.apache.spark:spark-repl_2.11:2.2.1') - provided('org.apache.spark:spark-core_2.11:2.2.1') - -} - -sourceSets { - test { - resources.srcDirs += [file('src/test/resources')] - } - - main { - scala { - srcDirs = ['src/main/scala', 'src/main/java'] - } - java { - srcDirs = [] - } - } -} - -test { - - maxParallelForks = 1 -} - -task copyToHome(type: Copy) { - from 'build/libs' - into '../../../build/amaterasu/dist' - from 'build/resources/main' - into '../../../build/amaterasu/dist' -} diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 9fdd83c..4b125b8 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Thu Jun 28 13:07:02 SGT 2018 +#Fri Jan 27 12:21:51 AEDT 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-all.zip diff --git a/gradlew b/gradlew index 9aa616c..3efb0e9 100755 --- a/gradlew +++ b/gradlew @@ -1,4 +1,20 @@ #!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ############################################################################## ## diff --git a/gradlew.bat b/gradlew.bat index f955316..718266c 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,3 +1,21 @@ +rem +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + @if "%DEBUG%" == "" @echo off @rem ########################################################################## @rem diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java index 38a9c38..be0fc05 100644 --- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java +++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java @@ -16,11 +16,7 @@ */ package org.apache.amaterasu.leader.yarn; -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.*; public class ArgsParser { private static Options getOptions() { diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java index 8f16ee7..e3c2812 100644 --- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java +++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java @@ -31,16 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -52,19 +43,11 @@ import org.apache.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.Topic; +import javax.jms.*; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import static java.lang.System.exit; diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala index aba6210..8ef1c7a 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala @@ -18,12 +18,12 @@ package org.apache.amaterasu.leader.dsl import java.util.concurrent.BlockingQueue -import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.dataformat.yaml.YAMLFactory import org.apache.amaterasu.common.dataobjects.ActionData -import org.apache.amaterasu.leader.execution.JobManager import org.apache.amaterasu.leader.execution.actions.{Action, ErrorAction, SequentialAction} +import org.apache.amaterasu.leader.execution.JobManager import org.apache.curator.framework.CuratorFramework import scala.collection.JavaConverters._ diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala index f6dea22..8c487c1 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala @@ -19,6 +19,7 @@ package org.apache.amaterasu.leader.frameworks.spark import java.io.File import org.apache.amaterasu.common.configuration.ClusterConfig +import org.apache.amaterasu.common.dataobjects.ExecData import org.apache.amaterasu.leader.utilities.{DataLoader, MemoryFormatParser} import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala index 4b1a74c..f2f2c00 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala @@ -17,6 +17,7 @@ package org.apache.amaterasu.leader.mesos.schedulers import org.apache.amaterasu.common.logging.Logging + import org.apache.mesos.Protos.{Resource, Value} import org.apache.mesos.Scheduler diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala index 2c2e8af..87a8f5d 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala @@ -16,10 +16,11 @@ */ package org.apache.amaterasu.leader.mesos.schedulers +import java.io.File import java.util import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} -import java.util.{Collections, UUID} +import java.util.{Collections, Properties, UUID} import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule @@ -184,7 +185,7 @@ class JobScheduler extends AmaterasuScheduler { .setExtract(true) .build()) .addUris(URI.newBuilder() - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/Miniconda2-latest-Linux-x86_64.sh") .setExecutable(false) .setExtract(false) .build()) diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala index b3ffaad..2664665 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala @@ -17,7 +17,9 @@ package org.apache.amaterasu.leader.utilities import javax.jms.{Message, MessageListener, TextMessage} + import net.liftweb.json._ +import net.liftweb.json.JsonDSL._ import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType} class ActiveReportListener extends MessageListener { diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala index 5c48329..2e01963 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala @@ -18,14 +18,19 @@ package org.apache.amaterasu.leader.utilities import org.apache.amaterasu.common.logging.Logging import org.apache.log4j.{BasicConfigurator, Level, Logger} +import org.eclipse.jetty.server.{Handler, Server, ServerConnector} import org.eclipse.jetty.server.handler._ -import org.eclipse.jetty.server.{Server, ServerConnector} +import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder} +import org.eclipse.jetty.toolchain.test.MavenTestingUtils +import org.eclipse.jetty.util.thread.QueuedThreadPool import org.eclipse.jetty.util.log.StdErrLog +import org.eclipse.jetty.util.resource.Resource import org.jsoup.Jsoup import org.jsoup.select.Elements import scala.collection.JavaConverters._ import scala.io.{BufferedSource, Source} +import scala.text.Document /** * Created by kirupa diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala index 406c150..1828100 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala @@ -250,7 +250,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { val commands: List[String] = List( "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ", s"/bin/bash spark/bin/load-spark-env.sh && ", - s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${config.YARN.hadoopHomeDir}/conf/ " + + s"java -cp spark/jars/*:executor.jar:spark/conf/:${config.YARN.hadoopHomeDir}/conf/ " + "-Xmx1G " + "-Dscala.usejavacp=true " + "-Dhdp.version=2.6.1.0-129 " + @@ -266,37 +266,22 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { ctx.setCommands(commands) ctx.setTokens(allTokens) - val yarnJarPath = new Path(config.YARN.hdfsJarsPath) - - //TODO Arun - Remove the hardcoding of the dist path - /* val resources = mutable.Map[String, LocalResource]() - val binaryFileIter = fs.listFiles(new Path(s"${config.YARN.hdfsJarsPath}/dist"), false) - while (binaryFileIter.hasNext) { - val eachFile = binaryFileIter.next().getPath - resources (eachFile.getName) = setLocalResourceFromPath(fs.makeQualified(eachFile)) - } - resources("log4j.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/log4j.properties"))) - resources ("amaterasu.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/amaterasu.properties")))*/ - val resources = mutable.Map[String, LocalResource]( - "executor.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/executor-${config.version}-all.jar"))), - "spark-runner.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runner-${config.version}-all.jar"))), - "spark-runtime.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runtime-${config.version}.jar"))), - "amaterasu.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/amaterasu.properties"))), - "log4j.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/log4j.properties"))), + "executor.jar" -> executorJar, + "amaterasu.properties" -> propFile, // TODO: Nadav/Eyal all of these should move to the executor resource setup - "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/miniconda.sh"))), - "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/codegen.py"))), - "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/runtime.py"))), - "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))), - "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py")))) + "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/Miniconda2-latest-Linux-x86_64.sh"))), + "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/codegen.py"))), + "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/runtime.py"))), + "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark-version-info.properties"))), + "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark_intp.py")))) val frameworkFactory = FrameworkProvidersFactory(env, config) val framework = frameworkFactory.getFramework(actionData.groupId) //adding the framework and executor resources - setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier) - setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}") + setupResources(framework.getGroupIdentifier, resources, framework.getGroupIdentifier) + setupResources(s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}") ctx.setLocalResources(resources) @@ -342,9 +327,9 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { ByteBuffer.wrap(dob.getData, 0, dob.getLength) } - private def setupResources(yarnJarPath: Path, frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = { + private def setupResources(frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = { - val sourcePath = Path.mergePaths(yarnJarPath, new Path(s"/$resourcesPath")) + val sourcePath = Path.mergePaths(jarPath, new Path(s"/$resourcesPath")) if (fs.exists(sourcePath)) { diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala index b178f52..70da38e 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala @@ -32,9 +32,10 @@ import org.apache.hadoop.yarn.util.Records import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.concurrent -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.{Future, _} +import scala.concurrent.Future import scala.util.{Failure, Success} +import scala.concurrent._ +import ExecutionContext.Implicits.global class YarnRMCallbackHandler(nmClient: NMClientAsync, jobManager: JobManager, diff --git a/leader/src/main/scripts/ama-start-mesos.sh b/leader/src/main/scripts/ama-start-mesos.sh index e01ea42..18dbed9 100755 --- a/leader/src/main/scripts/ama-start-mesos.sh +++ b/leader/src/main/scripts/ama-start-mesos.sh @@ -126,9 +126,9 @@ if ! ls ${BASEDIR}/dist/spark*.tgz 1> /dev/null 2>&1; then #wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist wget http://apache.mirror.digitalpacific.com.au/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist fi -if [ ! -f ${BASEDIR}/dist/miniconda.sh ]; then +if [ ! -f ${BASEDIR}/dist/Miniconda2-latest-Linux-x86_64.sh ]; then echo "${bold}Fetching miniconda distributable ${NC}" - wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -O miniconda.sh -P ${BASEDIR}/dist + wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -P ${BASEDIR}/dist fi cp ${BASEDIR}/amaterasu.properties ${BASEDIR}/dist eval $CMD | grep "===>" diff --git a/leader/src/main/scripts/ama-start-yarn.sh b/leader/src/main/scripts/ama-start-yarn.sh index f6af18f..8aa58f1 100755 --- a/leader/src/main/scripts/ama-start-yarn.sh +++ b/leader/src/main/scripts/ama-start-yarn.sh @@ -136,9 +136,9 @@ fi echo $CMD -if [ ! -f ${BASEDIR}/dist/miniconda.sh ]; then +if [ ! -f ${BASEDIR}/dist/Miniconda2-latest-Linux-x86_64.sh ]; then echo "${bold}Fetching miniconda distributable ${NC}" - wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -O ${BASEDIR}/dist/miniconda.sh + wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -P ${BASEDIR}/dist fi diff --git a/leader/src/test/scala/org/apache/amaterasu/leader/mesos/ClusterSchedulerTests.scala b/leader/src/test/scala/org/apache/amaterasu/leader/mesos/ClusterSchedulerTests.scala index af42677..ac5af36 100755 --- a/leader/src/test/scala/org/apache/amaterasu/leader/mesos/ClusterSchedulerTests.scala +++ b/leader/src/test/scala/org/apache/amaterasu/leader/mesos/ClusterSchedulerTests.scala @@ -17,8 +17,8 @@ package org.apache.amaterasu.leader.mesos import org.apache.amaterasu.common.configuration.ClusterConfig -import org.apache.amaterasu.leader.Kami import org.apache.amaterasu.leader.mesos.schedulers.ClusterScheduler +import org.apache.amaterasu.leader.Kami import org.scalatest._ class ClusterSchedulerTests extends FlatSpec with Matchers { diff --git a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala index 0e321f0..25769b6 100644 --- a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala +++ b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala @@ -19,8 +19,14 @@ package org.apache.amaterasu.utilities import java.io.File +import org.apache.amaterasu.leader.utilities.HttpServer +import org.jsoup.Jsoup +import org.jsoup.select.Elements import org.scalatest.{FlatSpec, Matchers} +import scala.collection.JavaConverters._ +import scala.io.Source + class HttpServerTests extends FlatSpec with Matchers { diff --git a/settings.gradle b/settings.gradle index c222795..1056e01 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,20 +15,8 @@ * limitations under the License. */ include 'leader' -project(':leader') - -include 'common' -project(':common') - include 'executor' -project(':executor') - +include 'common' include 'sdk' findProject(':sdk')?.name = 'amaterasu-sdk' -//Spark -include 'spark-runner' -project(':spark-runner').projectDir=file("frameworks/spark/runner") -include 'spark-runtime' -project(':spark-runtime').projectDir=file("frameworks/spark/runtime") -
