Repository: incubator-gearpump Updated Branches: refs/heads/master ac8ac0392 -> fa3f892d7
[GEARPUMP-331] Allow applications to be run in IDE Author: huafengw <[email protected]> Closes #202 from huafengw/GP331. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/fa3f892d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/fa3f892d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/fa3f892d Branch: refs/heads/master Commit: fa3f892d76ca932fae44587f7860030b32f401bf Parents: ac8ac03 Author: huafengw <[email protected]> Authored: Mon Jul 31 14:36:38 2017 +0800 Committer: manuzhang <[email protected]> Committed: Mon Jul 31 14:36:48 2017 +0800 ---------------------------------------------------------------------- .../gearpump/cluster/client/ClientContext.scala | 21 ++------ .../cluster/client/RuntimeEnvironment.scala | 55 ++++++++++++++++++++ .../cluster/embedded/EmbeddedCluster.scala | 44 ++++++---------- .../embedded/EmbeddedRuntimeEnvironemnt.scala | 48 +++++++++++++++++ .../gearpump/cluster/main/AppSubmitter.scala | 2 + .../apache/gearpump/cluster/MasterHarness.scala | 6 ++- docs/contents/dev/dev-write-1st-app.md | 4 +- .../examples/wordcountjava/WordCount.java | 30 +---------- .../examples/wordcount/WordCount.scala | 30 +---------- .../examples/wordcount/dsl/WordCount.scala | 6 +-- .../gearpump/akkastream/graph/RemoteGraph.scala | 6 +-- .../gearpump/services/MasterService.scala | 2 +- 12 files changed, 137 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala index 48b95d8..fc8af59 100755 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala @@ -45,17 +45,12 @@ import scala.util.{Failure, Success, Try} * TODO: add interface to query master here */ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { - def this(system: ActorSystem) = { - this(system.settings.config, system, null) - } - def this(config: Config) = { this(config, null, null) } private val LOG: Logger = LogUtil.getLogger(getClass) implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config)) - LOG.info(s"Starting system ${system.name}") private val jarStoreClient = new JarStoreClient(config, system) private val masterClientTimeout = { val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90) @@ -183,19 +178,9 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { object ClientContext { - def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null) - - def apply(system: ActorSystem): ClientContext = { - new ClientContext(ClusterConfig.default(), system, null) - } - - def apply(system: ActorSystem, master: ActorRef): ClientContext = { - new ClientContext(ClusterConfig.default(), system, master) - } - - def apply(config: Config): ClientContext = new ClientContext(config, null, null) + def apply(): ClientContext = apply(ClusterConfig.default()) - def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = { - new ClientContext(config, system, master) + def apply(config: Config): ClientContext = { + RuntimeEnvironment.newClientContext(config) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala new file mode 100644 index 0000000..e90e73b --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.client + +import com.typesafe.config.Config +import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt + +/** + * The RuntimeEnvironment is the context decides where an application is submitted to. + */ +abstract class RuntimeEnvironment { + def newClientContext(akkaConf: Config): ClientContext +} + +/** + * Usually RemoteRuntimeEnvironment is the default enviroment when user using command line + * to submit application. It will connect to the right remote Master. + */ +class RemoteRuntimeEnvironment extends RuntimeEnvironment { + override def newClientContext(akkaConf: Config): ClientContext = { + new ClientContext(akkaConf) + } +} + +object RuntimeEnvironment { + private var envInstance: RuntimeEnvironment = _ + + def get() : RuntimeEnvironment = { + Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironemnt) + } + + def newClientContext(akkaConf: Config): ClientContext = { + get().newClientContext(akkaConf) + } + + def setRuntimeEnv(env: RuntimeEnvironment): Unit = { + envInstance = env + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala index 9bde4d1..a3a3e39 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala @@ -36,34 +36,22 @@ import org.apache.gearpump.util.{LogUtil, Util} * Create a in-process cluster with single worker */ class EmbeddedCluster(inputConfig: Config) { - - private val workerCount: Int = 1 - private var _master: ActorRef = null - private var _system: ActorSystem = null - private var _config: Config = null - private val LOG = LogUtil.getLogger(getClass) + private val workerCount: Int = 1 + private val port = Util.findFreePort().get + private[embedded] val config: Config = getConfig(inputConfig, port) + private[embedded] val system: ActorSystem = ActorSystem(MASTER, config) + private[embedded] val master: ActorRef = system.actorOf(Props[MasterActor], MASTER) - def start(): Unit = { - val port = Util.findFreePort().get - val akkaConf = getConfig(inputConfig, port) - _config = akkaConf - val system = ActorSystem(MASTER, akkaConf) - - val master = system.actorOf(Props[MasterActor], MASTER) - - 0.until(workerCount).foreach { id => - system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) - } - this._master = master - this._system = system - - LOG.info("=================================") - LOG.info("Local Cluster is started at: ") - LOG.info(s" 127.0.0.1:$port") - LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port") + 0.until(workerCount).foreach { id => + system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) } + LOG.info("=================================") + LOG.info("Local Cluster is started at: ") + LOG.info(s" 127.0.0.1:$port") + LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port") + private def getConfig(inputConfig: Config, port: Int): Config = { val config = inputConfig. withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). @@ -78,13 +66,13 @@ class EmbeddedCluster(inputConfig: Config) { } def newClientContext: ClientContext = { - ClientContext(_config, _system, _master) + new ClientContext(config, system, master) } def stop(): Unit = { - _system.stop(_master) - _system.terminate() - Await.result(_system.whenTerminated, Duration.Inf) + system.stop(master) + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala new file mode 100644 index 0000000..246fabd --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.embedded + +import com.typesafe.config.Config +import org.apache.gearpump.cluster.client.{ClientContext, RuntimeEnvironment} +import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.LocalClientContext + +/** + * The EmbeddedRuntimeEnvironemnt is initiated when user trying to launch their application + * from IDE. It will create an embedded cluster and user's applcaition will run in a single + * local process. + */ +class EmbeddedRuntimeEnvironemnt extends RuntimeEnvironment { + override def newClientContext(akkaConf: Config): ClientContext = { + new LocalClientContext(akkaConf) + } +} + +object EmbeddedRuntimeEnvironemnt { + class LocalClientContext private (cluster: EmbeddedCluster) + extends ClientContext(cluster.config, cluster.system, cluster.master) { + + def this(akkaConf: Config) { + this(new EmbeddedCluster(akkaConf)) + } + + override def close(): Unit = { + super.close() + cluster.stop() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala index 79f31eb..c5ddcb0 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala @@ -21,6 +21,7 @@ import java.io.File import java.net.{URL, URLClassLoader} import java.util.jar.JarFile +import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment} import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util} import scala.util.{Failure, Success, Try} @@ -84,6 +85,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser { Thread.currentThread().setContextClassLoader(classLoader) val clazz = classLoader.loadClass(main) val mainMethod = clazz.getMethod("main", classOf[Array[String]]) + RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment) mainMethod.invoke(null, arguments) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala index 1ec5660..654da4b 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala @@ -22,15 +22,15 @@ import java.io.File import java.net.{InetSocketAddress, Socket, SocketTimeoutException, URLClassLoader, UnknownHostException} import java.util.Properties import java.util.concurrent.{Executors, TimeUnit} + import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext} - import akka.actor.{Actor, ActorSystem, Address, Props} import akka.testkit.TestProbe import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory} - import org.apache.gearpump.cluster.MasterHarness.MockMaster +import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment} import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.{ActorUtil, FileUtils, LogUtil} @@ -63,6 +63,8 @@ trait MasterHarness { masterProperties.put(s"${GEARPUMP_HOSTNAME}", s"$getHost") LOG.info(s"Actor system is started, $host, $port") + // Make sure there will be no EmbeddedCluster created, otherwise mock master won't work + RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment) } def shutdownActorSystem(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/docs/contents/dev/dev-write-1st-app.md ---------------------------------------------------------------------- diff --git a/docs/contents/dev/dev-write-1st-app.md b/docs/contents/dev/dev-write-1st-app.md index 6234577..4ef1cc3 100644 --- a/docs/contents/dev/dev-write-1st-app.md +++ b/docs/contents/dev/dev-write-1st-app.md @@ -20,8 +20,8 @@ We'll use the classical [wordcount](https://github.com/apache/incubator-gearpump // (word, count1), (word, count2) => (word, count1 + count2) groupByKey().sum.log - context.submit(app).waitUntilFinish() - context.close() + context.submit(app).waitUntilFinish() + context.close() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java index 5e3d472..cc6cd02 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java @@ -37,7 +37,6 @@ public class WordCount { } public static void main(Config akkaConf, String[] args) throws InterruptedException { - // For split task, we config to create two tasks int splitTaskNumber = 2; Processor split = new Processor(Split.class).withParallelism(splitTaskNumber); @@ -56,36 +55,9 @@ public class WordCount { UserConfig conf = UserConfig.empty(); StreamApplication app = new StreamApplication("wordcountJava", conf, graph); - - EmbeddedCluster localCluster = null; - - Boolean debugMode = System.getProperty("DEBUG") != null; - - if (debugMode) { - localCluster = new EmbeddedCluster(akkaConf); - localCluster.start(); - } - - ClientContext masterClient = null; - - if (localCluster != null) { - masterClient = localCluster.newClientContext(); - } else { - // create master client - // It will read the master settings under gearpump.cluster.masters - masterClient = new ClientContext(akkaConf); - } - + ClientContext masterClient = ClientContext.apply(akkaConf); masterClient.submit(app); - if (debugMode) { - Thread.sleep(30 * 1000); // sleep for 30 seconds. - } - masterClient.close(); - - if (localCluster != null) { - localCluster.stop(); - } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala index 0e3d840..14965e7 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala @@ -21,7 +21,6 @@ package org.apache.gearpump.streaming.examples.wordcount import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedCluster import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.source.DataSourceProcessor @@ -38,10 +37,7 @@ object WordCount extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array( "split" -> CLIOption[Int]("<how many source tasks>", required = false, defaultValue = Some(1)), - "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), - "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)), - "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false, - defaultValue = Some(30)) + "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)) ) def application(config: ParseResult, system: ActorSystem): StreamApplication = { @@ -60,32 +56,10 @@ object WordCount extends AkkaApp with ArgumentsParser { override def main(akkaConf: Config, args: Array[String]): Unit = { val config = parse(args) - - val debugMode = config.getBoolean("debug") - val sleepSeconds = config.getInt("sleep") - - val localCluster = if (debugMode) { - val cluster = new EmbeddedCluster(akkaConf: Config) - cluster.start() - Some(cluster) - } else { - None - } - - val context: ClientContext = localCluster match { - case Some(local) => local.newClientContext - case None => ClientContext(akkaConf) - } - + val context: ClientContext = ClientContext(akkaConf) val app = application(config, context.system) context.submit(app) - - if (debugMode) { - Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging. - } - context.close() - localCluster.map(_.stop()) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala index 65f63d2..d10dbe7 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala @@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.examples.wordcount.dsl import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedCluster import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._ @@ -31,9 +30,7 @@ object WordCount extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array.empty override def main(akkaConf: Config, args: Array[String]): Unit = { - val cluster = new EmbeddedCluster(akkaConf) - cluster.start() - val context: ClientContext = cluster.newClientContext + val context: ClientContext = ClientContext(akkaConf) val app = StreamApp("dsl", context) val data = "This is a good start, bingo!! bingo!!" app.source(data.lines.toList, 1, "source"). @@ -44,6 +41,5 @@ object WordCount extends AkkaApp with ArgumentsParser { context.submit(app).waitUntilFinish() context.close() - cluster.stop() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala index 99ebe17..4d400ff 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala @@ -49,16 +49,14 @@ object RemoteGraph { class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) extends SubGraphMaterializer { private val local = if (useInProcessCluster) { - val cluster = EmbeddedCluster() - cluster.start() - Some(cluster) + Some(EmbeddedCluster()) } else { None } private val context: ClientContext = local match { case Some(l) => l.newClientContext - case None => ClientContext(system) + case None => ClientContext(null) } override def materialize(subGraph: SubGraph, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala index be96577..5ba101b 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala @@ -160,7 +160,7 @@ class MasterService(val master: ActorRef, val msg = java.net.URLDecoder.decode(request, "UTF-8") val submitApplicationRequest = read[SubmitApplicationRequest](msg) import submitApplicationRequest.{appName, dag, processors, userConfig} - val context = ClientContext(system.settings.config, system, master) + val context = new ClientContext(system.settings.config, system, master) val graph = dag.mapVertex { processorId => processors(processorId)
