Repository: incubator-gearpump Updated Branches: refs/heads/sql e4c33004b -> e04df0ddd
Merge master into sql Author: manuzhang <[email protected]> Author: huafengw <[email protected]> Author: darionyaphet <[email protected]> Closes #206 from manuzhang/merge_master_into_sql. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e04df0dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e04df0dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e04df0dd Branch: refs/heads/sql Commit: e04df0dddb9d45cced5b3587ee61caf7e25bbcbd Parents: e4c3300 Author: manuzhang <[email protected]> Authored: Wed Aug 2 19:03:36 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed Aug 2 19:03:46 2017 +0800 ---------------------------------------------------------------------- .../gearpump/cluster/client/ClientContext.scala | 46 +++++----- .../cluster/client/RuntimeEnvironment.scala | 58 +++++++++++++ .../cluster/embedded/EmbeddedCluster.scala | 49 ++++------- .../embedded/EmbeddedRuntimeEnvironemnt.scala | 48 +++++++++++ .../gearpump/cluster/main/AppSubmitter.scala | 2 + .../apache/gearpump/cluster/MasterHarness.scala | 6 +- docs/contents/dev/dev-write-1st-app.md | 4 +- .../examples/wordcountjava/WordCount.java | 30 +------ .../examples/wordcountjava/dsl/WordCount.java | 14 ++- .../examples/wordcount/WordCount.scala | 30 +------ .../examples/wordcount/dsl/WordCount.scala | 6 +- .../gearpump/akkastream/graph/RemoteGraph.scala | 14 +-- .../experiments/storm/main/GearpumpNimbus.scala | 2 +- .../kafka/src/test/resources/log4j.properties | 35 ++++++++ project/BuildExamples.scala | 90 ++++++++------------ 15 files changed, 246 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala index 48b95d8..4840120 100755 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala @@ -40,22 +40,13 @@ import scala.concurrent.{Await, Future} import scala.util.{Failure, Success, Try} /** - * ClientContext is a user facing util to submit/manage an application. - * - * TODO: add interface to query master here + * ClientContext is a user facing utility to interact with the master. + * (e.g. submit/manage an application). */ -class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { - def this(system: ActorSystem) = { - this(system.settings.config, system, null) - } - - def this(config: Config) = { - this(config, null, null) - } +class ClientContext protected(config: Config, sys: ActorSystem, _master: ActorRef) { private val LOG: Logger = LogUtil.getLogger(getClass) implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config)) - LOG.info(s"Starting system ${system.name}") private val jarStoreClient = new JarStoreClient(config, system) private val masterClientTimeout = { val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90) @@ -183,18 +174,35 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { object ClientContext { - def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null) + /** + * Create a [[ClientContext]] which will instantiate an actor system + * to interact with the master parsed from `gearpump.cluster.masters`. + * The config is loaded from classpath. + */ + def apply(): ClientContext = apply(ClusterConfig.default()) - def apply(system: ActorSystem): ClientContext = { - new ClientContext(ClusterConfig.default(), system, null) + /** + * Create a [[ClientContext]] which will instantiate an actor system + * to interact with the master parsed from `gearpump.cluster.masters` + * through the given config. + */ + def apply(config: Config): ClientContext = { + RuntimeEnvironment.newClientContext(config) } - def apply(system: ActorSystem, master: ActorRef): ClientContext = { - new ClientContext(ClusterConfig.default(), system, master) + /** + * Create a [[ClientContext]] for the passed in actor system + * to interact with the master parsed from `gearpump.cluster.masters` + * through the given config. + */ + def apply(config: Config, system: ActorSystem): ClientContext = { + new ClientContext(config, system, null) } - def apply(config: Config): ClientContext = new ClientContext(config, null, null) - + /** + * Create a [[ClientContext]] for the passed in actor system + * to interact with the given master. + */ def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = { new ClientContext(config, system, master) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala new file mode 100644 index 0000000..cf5842f --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.client + +import com.typesafe.config.Config +import org.apache.gearpump.cluster.client.RuntimeEnvironment.RemoteClientContext +import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt + +/** + * The RuntimeEnvironment is the context decides where an application is submitted to. + */ +abstract class RuntimeEnvironment { + def newClientContext(akkaConf: Config): ClientContext +} + +/** + * Usually RemoteRuntimeEnvironment is the default enviroment when user using command line + * to submit application. It will connect to the right remote Master. + */ +class RemoteRuntimeEnvironment extends RuntimeEnvironment { + override def newClientContext(akkaConf: Config): ClientContext = { + new RemoteClientContext(akkaConf) + } +} + +object RuntimeEnvironment { + private var envInstance: RuntimeEnvironment = _ + + class RemoteClientContext(akkaConf: Config) extends ClientContext(akkaConf, null, null) + + def get() : RuntimeEnvironment = { + Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironemnt) + } + + def newClientContext(akkaConf: Config): ClientContext = { + get().newClientContext(akkaConf) + } + + def setRuntimeEnv(env: RuntimeEnvironment): Unit = { + envInstance = env + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala index 9bde4d1..8abcd96 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala @@ -21,12 +21,11 @@ package org.apache.gearpump.cluster.embedded import scala.collection.JavaConverters._ import scala.concurrent.Await import scala.concurrent.duration.Duration - import akka.actor.{ActorRef, ActorSystem, Props} import com.typesafe.config.{Config, ConfigValueFactory} - import org.apache.gearpump.cluster.ClusterConfig import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext import org.apache.gearpump.cluster.master.{Master => MasterActor} import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER} @@ -36,34 +35,22 @@ import org.apache.gearpump.util.{LogUtil, Util} * Create a in-process cluster with single worker */ class EmbeddedCluster(inputConfig: Config) { - - private val workerCount: Int = 1 - private var _master: ActorRef = null - private var _system: ActorSystem = null - private var _config: Config = null - private val LOG = LogUtil.getLogger(getClass) + private val workerCount: Int = 1 + private val port = Util.findFreePort().get + private[embedded] val config: Config = getConfig(inputConfig, port) + private[embedded] val system: ActorSystem = ActorSystem(MASTER, config) + private[embedded] val master: ActorRef = system.actorOf(Props[MasterActor], MASTER) - def start(): Unit = { - val port = Util.findFreePort().get - val akkaConf = getConfig(inputConfig, port) - _config = akkaConf - val system = ActorSystem(MASTER, akkaConf) - - val master = system.actorOf(Props[MasterActor], MASTER) - - 0.until(workerCount).foreach { id => - system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) - } - this._master = master - this._system = system - - LOG.info("=================================") - LOG.info("Local Cluster is started at: ") - LOG.info(s" 127.0.0.1:$port") - LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port") + 0.until(workerCount).foreach { id => + system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) } + LOG.info("=================================") + LOG.info("Local Cluster is started at: ") + LOG.info(s" 127.0.0.1:$port") + LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port") + private def getConfig(inputConfig: Config, port: Int): Config = { val config = inputConfig. withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). @@ -77,14 +64,10 @@ class EmbeddedCluster(inputConfig: Config) { config } - def newClientContext: ClientContext = { - ClientContext(_config, _system, _master) - } - def stop(): Unit = { - _system.stop(_master) - _system.terminate() - Await.result(_system.whenTerminated, Duration.Inf) + system.stop(master) + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala new file mode 100644 index 0000000..fbea53f --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.embedded + +import com.typesafe.config.Config +import org.apache.gearpump.cluster.client.{ClientContext, RuntimeEnvironment} +import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext + +/** + * The EmbeddedRuntimeEnvironemnt is initiated when user trying to launch their application + * from IDE. It will create an embedded cluster and user's applcaition will run in a single + * local process. + */ +class EmbeddedRuntimeEnvironemnt extends RuntimeEnvironment { + override def newClientContext(akkaConf: Config): ClientContext = { + new EmbeddedClientContext(akkaConf) + } +} + +object EmbeddedRuntimeEnvironemnt { + class EmbeddedClientContext private(cluster: EmbeddedCluster) + extends ClientContext(cluster.config, cluster.system, cluster.master) { + + def this(akkaConf: Config) { + this(new EmbeddedCluster(akkaConf)) + } + + override def close(): Unit = { + super.close() + cluster.stop() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala index 79f31eb..c5ddcb0 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala @@ -21,6 +21,7 @@ import java.io.File import java.net.{URL, URLClassLoader} import java.util.jar.JarFile +import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment} import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util} import scala.util.{Failure, Success, Try} @@ -84,6 +85,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser { Thread.currentThread().setContextClassLoader(classLoader) val clazz = classLoader.loadClass(main) val mainMethod = clazz.getMethod("main", classOf[Array[String]]) + RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment) mainMethod.invoke(null, arguments) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala index 1ec5660..654da4b 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala @@ -22,15 +22,15 @@ import java.io.File import java.net.{InetSocketAddress, Socket, SocketTimeoutException, URLClassLoader, UnknownHostException} import java.util.Properties import java.util.concurrent.{Executors, TimeUnit} + import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext} - import akka.actor.{Actor, ActorSystem, Address, Props} import akka.testkit.TestProbe import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory} - import org.apache.gearpump.cluster.MasterHarness.MockMaster +import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment} import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.{ActorUtil, FileUtils, LogUtil} @@ -63,6 +63,8 @@ trait MasterHarness { masterProperties.put(s"${GEARPUMP_HOSTNAME}", s"$getHost") LOG.info(s"Actor system is started, $host, $port") + // Make sure there will be no EmbeddedCluster created, otherwise mock master won't work + RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment) } def shutdownActorSystem(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/docs/contents/dev/dev-write-1st-app.md ---------------------------------------------------------------------- diff --git a/docs/contents/dev/dev-write-1st-app.md b/docs/contents/dev/dev-write-1st-app.md index 6234577..4ef1cc3 100644 --- a/docs/contents/dev/dev-write-1st-app.md +++ b/docs/contents/dev/dev-write-1st-app.md @@ -20,8 +20,8 @@ We'll use the classical [wordcount](https://github.com/apache/incubator-gearpump // (word, count1), (word, count2) => (word, count1 + count2) groupByKey().sum.log - context.submit(app).waitUntilFinish() - context.close() + context.submit(app).waitUntilFinish() + context.close() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java index 5e3d472..cc6cd02 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java @@ -37,7 +37,6 @@ public class WordCount { } public static void main(Config akkaConf, String[] args) throws InterruptedException { - // For split task, we config to create two tasks int splitTaskNumber = 2; Processor split = new Processor(Split.class).withParallelism(splitTaskNumber); @@ -56,36 +55,9 @@ public class WordCount { UserConfig conf = UserConfig.empty(); StreamApplication app = new StreamApplication("wordcountJava", conf, graph); - - EmbeddedCluster localCluster = null; - - Boolean debugMode = System.getProperty("DEBUG") != null; - - if (debugMode) { - localCluster = new EmbeddedCluster(akkaConf); - localCluster.start(); - } - - ClientContext masterClient = null; - - if (localCluster != null) { - masterClient = localCluster.newClientContext(); - } else { - // create master client - // It will read the master settings under gearpump.cluster.masters - masterClient = new ClientContext(akkaConf); - } - + ClientContext masterClient = ClientContext.apply(akkaConf); masterClient.submit(app); - if (debugMode) { - Thread.sleep(30 * 1000); // sleep for 30 seconds. - } - masterClient.close(); - - if (localCluster != null) { - localCluster.stop(); - } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java index 2830b16..e8467fa 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -31,6 +31,7 @@ import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction; import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction; import org.apache.gearpump.streaming.source.DataSource; +import org.apache.gearpump.streaming.source.Watermark; import org.apache.gearpump.streaming.task.TaskContext; import scala.Tuple2; @@ -46,7 +47,7 @@ public class WordCount { } public static void main(Config akkaConf, String[] args) throws InterruptedException { - ClientContext context = new ClientContext(akkaConf); + ClientContext context = ClientContext.apply(akkaConf); JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty()); JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"), @@ -69,6 +70,7 @@ public class WordCount { private static class StringSource implements DataSource { private final String str; + private boolean hasNext = true; StringSource(String str) { this.str = str; @@ -80,7 +82,9 @@ public class WordCount { @Override public Message read() { - return new DefaultMessage(str, Instant.now()); + Message msg = new DefaultMessage(str, Instant.now()); + hasNext = false; + return msg; } @Override @@ -89,7 +93,11 @@ public class WordCount { @Override public Instant getWatermark() { - return Instant.now(); + if (hasNext) { + return Instant.now(); + } else { + return Watermark.MAX(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala index 0e3d840..14965e7 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala @@ -21,7 +21,6 @@ package org.apache.gearpump.streaming.examples.wordcount import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedCluster import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.source.DataSourceProcessor @@ -38,10 +37,7 @@ object WordCount extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array( "split" -> CLIOption[Int]("<how many source tasks>", required = false, defaultValue = Some(1)), - "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), - "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)), - "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false, - defaultValue = Some(30)) + "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)) ) def application(config: ParseResult, system: ActorSystem): StreamApplication = { @@ -60,32 +56,10 @@ object WordCount extends AkkaApp with ArgumentsParser { override def main(akkaConf: Config, args: Array[String]): Unit = { val config = parse(args) - - val debugMode = config.getBoolean("debug") - val sleepSeconds = config.getInt("sleep") - - val localCluster = if (debugMode) { - val cluster = new EmbeddedCluster(akkaConf: Config) - cluster.start() - Some(cluster) - } else { - None - } - - val context: ClientContext = localCluster match { - case Some(local) => local.newClientContext - case None => ClientContext(akkaConf) - } - + val context: ClientContext = ClientContext(akkaConf) val app = application(config, context.system) context.submit(app) - - if (debugMode) { - Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging. - } - context.close() - localCluster.map(_.stop()) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala index 65f63d2..d10dbe7 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala @@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.examples.wordcount.dsl import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedCluster import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._ @@ -31,9 +30,7 @@ object WordCount extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array.empty override def main(akkaConf: Config, args: Array[String]): Unit = { - val cluster = new EmbeddedCluster(akkaConf) - cluster.start() - val context: ClientContext = cluster.newClientContext + val context: ClientContext = ClientContext(akkaConf) val app = StreamApp("dsl", context) val data = "This is a good start, bingo!! bingo!!" app.source(data.lines.toList, 1, "source"). @@ -44,6 +41,5 @@ object WordCount extends AkkaApp with ArgumentsParser { context.submit(app).waitUntilFinish() context.close() - cluster.stop() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala index 99ebe17..f45cae0 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala @@ -26,7 +26,6 @@ import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient import akka.stream.impl.StreamLayout.Module import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedCluster import org.apache.gearpump.streaming.ProcessorId import org.apache.gearpump.util.Graph @@ -48,18 +47,8 @@ object RemoteGraph { */ class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) extends SubGraphMaterializer { - private val local = if (useInProcessCluster) { - val cluster = EmbeddedCluster() - cluster.start() - Some(cluster) - } else { - None - } - private val context: ClientContext = local match { - case Some(l) => l.newClientContext - case None => ClientContext(system) - } + private val context: ClientContext = ClientContext() override def materialize(subGraph: SubGraph, inputMatValues: scala.collection.mutable.Map[Module, Any]): @@ -107,7 +96,6 @@ object RemoteGraph { override def shutdown: Unit = { context.close() - local.foreach(_.stop()) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala index e2d421c..987546c 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -60,7 +60,7 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser { val akkaConf = updateClientConfig(inputAkkaConf) val system = ActorSystem("storm", akkaConf) - val clientContext = new ClientContext(akkaConf, system, null) + val clientContext = ClientContext(akkaConf, system, null) val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]] val thriftConf: JMap[AnyRef, AnyRef] = Map( Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/external/kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000..66fbaf4 --- /dev/null +++ b/external/kafka/src/test/resources/log4j.properties @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# A kafka test could be run tens of times and generates tons of logs, +# which makes it other to spot failure. +# Setting the log level to ERROR to reduce the logs. +log4j.rootLevel=ERROR +log4j.rootAppender=console +log4j.rootLogger=${log4j.rootLevel},${log4j.rootAppender} + +# Logging Threshold +log4j.threshhold=ALL + +# console appender +# Add "console" to rootlogger above if you want to use this +# +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%c{1}] %m%n + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/project/BuildExamples.scala ---------------------------------------------------------------------- diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala index 00affb0..afb7459 100644 --- a/project/BuildExamples.scala +++ b/project/BuildExamples.scala @@ -39,21 +39,9 @@ object BuildExamples extends sbt.Build { example_hbase ) - lazy val example_hbase = Project( - id = "gearpump-examples-hbase", - base = file("examples/streaming/hbase"), - settings = exampleSettings("org.apache.gearpump.streaming.examples.hbase.HBaseConn") ++ - Seq( - libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-common" % hadoopVersion - exclude("commons-beanutils", "commons-beanutils-core") - exclude("commons-beanutils", "commons-beanutils") - exclude("asm", "asm") - exclude("org.ow2.asm", "asm") - ) - ) ++ include("examples/streaming/hbase", "external/hbase") - ) dependsOn(core, streaming % "compile; test->test", external_hbase) - + /** + * The follow examples can be run in IDE or with `sbt run` + */ lazy val wordcountJava = Project( id = "gearpump-examples-wordcountjava", base = file("examples/streaming/wordcount-java"), @@ -82,6 +70,17 @@ object BuildExamples extends sbt.Build { include("examples/streaming/complexdag") ).dependsOn(core, streaming % "compile; test->test") + lazy val pagerank = Project( + id = "gearpump-examples-pagerank", + base = file("examples/pagerank"), + settings = + exampleSettings("org.apache.gearpump.experiments.pagerank.example.PageRankExample") ++ + include("examples/pagerank") + ).dependsOn(core % "provided", streaming % "provided; test->test") + + /** + * The following examples must be submitted to a deployed gearpump clutser + */ lazy val distributedshell = Project( id = "gearpump-examples-distributedshell", base = file("examples/distributedshell"), @@ -90,8 +89,8 @@ object BuildExamples extends sbt.Build { Some("org.apache.gearpump.examples.distributedshell.DistributedShell"), target in assembly := baseDirectory.value.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) - ) ++ include("examples/distributedshell") - ).dependsOn(core % "compile; test->test") + ) + ).dependsOn(core % "provided; test->test") lazy val distributeservice = Project( id = "gearpump-examples-distributeservice", @@ -109,7 +108,18 @@ object BuildExamples extends sbt.Build { "io.spray" %% "spray-routing-shapeless2" % sprayVersion ) ) ++ include("examples/distributeservice") - ).dependsOn(core % "compile; test->test") + ).dependsOn(core % "provided; test->test") + + lazy val example_hbase = Project( + id = "gearpump-examples-hbase", + base = file("examples/streaming/hbase"), + settings = exampleSettings("org.apache.gearpump.streaming.examples.hbase.HBaseConn") ++ + Seq( + libraryDependencies ++= Seq( + "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided" + ) + ) + ) dependsOn(core % "provided", streaming % "provided; test->test", external_hbase) lazy val fsio = Project( id = "gearpump-examples-fsio", @@ -117,26 +127,17 @@ object BuildExamples extends sbt.Build { settings = exampleSettings("org.apache.gearpump.streaming.examples.fsio.SequenceFileIO") ++ Seq( libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-common" % hadoopVersion - exclude("org.mortbay.jetty", "jetty-util") - exclude("org.mortbay.jetty", "jetty") - exclude("org.fusesource.leveldbjni", "leveldbjni-all") - exclude("tomcat", "jasper-runtime") - exclude("commons-beanutils", "commons-beanutils-core") - exclude("commons-beanutils", "commons-beanutils") - exclude("asm", "asm") - exclude("org.ow2.asm", "asm") + "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided" ) - ) ++ include("examples/streaming/fsio") - ).dependsOn(core, streaming % "compile; test->test") + ) + ).dependsOn(core % "provided", streaming % "provided; test->test") lazy val examples_kafka = Project( id = "gearpump-examples-kafka", base = file("examples/streaming/kafka"), settings = - exampleSettings("org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount") ++ - include("examples/streaming/kafka", "external/kafka") - ).dependsOn(core, streaming % "compile; test->test", external_kafka) + exampleSettings("org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount") + ).dependsOn(core % "provided", streaming % "provided; test->test", external_kafka) lazy val examples_state = Project( id = "gearpump-examples-state", @@ -144,30 +145,13 @@ object BuildExamples extends sbt.Build { settings = exampleSettings("org.apache.gearpump.streaming.examples.state.MessageCountApp") ++ Seq( libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-common" % hadoopVersion - exclude("org.mortbay.jetty", "jetty-util") - exclude("org.mortbay.jetty", "jetty") - exclude("org.fusesource.leveldbjni", "leveldbjni-all") - exclude("tomcat", "jasper-runtime") - exclude("commons-beanutils", "commons-beanutils-core") - exclude("commons-beanutils", "commons-beanutils") - exclude("asm", "asm") - exclude("org.ow2.asm", "asm"), - "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion + "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % "provided" ) - ) ++ include("examples/streaming/state", - "external/hadoopfs", "external/monoid", "external/serializer", "external/kafka") - ).dependsOn(core, streaming % "compile; test->test", + ) + ).dependsOn(core % "provided", streaming % "provided; test->test", external_hadoopfs, external_monoid, external_serializer, external_kafka) - lazy val pagerank = Project( - id = "gearpump-examples-pagerank", - base = file("examples/pagerank"), - settings = - exampleSettings("org.apache.gearpump.experiments.pagerank.example.PageRankExample") ++ - include("examples/pagerank") - ).dependsOn(core, streaming % "compile; test->test") - private def exampleSettings(className: String): Seq[Def.Setting[_]] = commonSettings ++ noPublish ++ myAssemblySettings ++ Seq( mainClass in(Compile, packageBin) :=
