Repository: incubator-gearpump
Updated Branches:
  refs/heads/sql e4c33004b -> e04df0ddd


Merge master into sql

Author: manuzhang <[email protected]>
Author: huafengw <[email protected]>
Author: darionyaphet <[email protected]>

Closes #206 from manuzhang/merge_master_into_sql.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e04df0dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e04df0dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e04df0dd

Branch: refs/heads/sql
Commit: e04df0dddb9d45cced5b3587ee61caf7e25bbcbd
Parents: e4c3300
Author: manuzhang <[email protected]>
Authored: Wed Aug 2 19:03:36 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Wed Aug 2 19:03:46 2017 +0800

----------------------------------------------------------------------
 .../gearpump/cluster/client/ClientContext.scala | 46 +++++-----
 .../cluster/client/RuntimeEnvironment.scala     | 58 +++++++++++++
 .../cluster/embedded/EmbeddedCluster.scala      | 49 ++++-------
 .../embedded/EmbeddedRuntimeEnvironemnt.scala   | 48 +++++++++++
 .../gearpump/cluster/main/AppSubmitter.scala    |  2 +
 .../apache/gearpump/cluster/MasterHarness.scala |  6 +-
 docs/contents/dev/dev-write-1st-app.md          |  4 +-
 .../examples/wordcountjava/WordCount.java       | 30 +------
 .../examples/wordcountjava/dsl/WordCount.java   | 14 ++-
 .../examples/wordcount/WordCount.scala          | 30 +------
 .../examples/wordcount/dsl/WordCount.scala      |  6 +-
 .../gearpump/akkastream/graph/RemoteGraph.scala | 14 +--
 .../experiments/storm/main/GearpumpNimbus.scala |  2 +-
 .../kafka/src/test/resources/log4j.properties   | 35 ++++++++
 project/BuildExamples.scala                     | 90 ++++++++------------
 15 files changed, 246 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
index 48b95d8..4840120 100755
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -40,22 +40,13 @@ import scala.concurrent.{Await, Future}
 import scala.util.{Failure, Success, Try}
 
 /**
- * ClientContext is a user facing util to submit/manage an application.
- *
- * TODO: add interface to query master here
+ * ClientContext is a user facing utility to interact with the master.
+ * (e.g. submit/manage an application).
  */
-class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
-  def this(system: ActorSystem) = {
-    this(system.settings.config, system, null)
-  }
-
-  def this(config: Config) = {
-    this(config, null, null)
-  }
+class ClientContext protected(config: Config, sys: ActorSystem, _master: 
ActorRef) {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
   implicit val system = 
Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
-  LOG.info(s"Starting system ${system.name}")
   private val jarStoreClient = new JarStoreClient(config, system)
   private val masterClientTimeout = {
     val timeout = 
Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
@@ -183,18 +174,35 @@ class ClientContext(config: Config, sys: ActorSystem, 
_master: ActorRef) {
 
 object ClientContext {
 
-  def apply(): ClientContext = new ClientContext(ClusterConfig.default(), 
null, null)
+  /**
+   * Create a [[ClientContext]] which will instantiate an actor system
+   * to interact with the master parsed from `gearpump.cluster.masters`.
+   * The config is loaded from classpath.
+   */
+  def apply(): ClientContext = apply(ClusterConfig.default())
 
-  def apply(system: ActorSystem): ClientContext = {
-    new ClientContext(ClusterConfig.default(), system, null)
+  /**
+   * Create a [[ClientContext]] which will instantiate an actor system
+   * to interact with the master parsed from `gearpump.cluster.masters`
+   * through the given config.
+   */
+  def apply(config: Config): ClientContext = {
+    RuntimeEnvironment.newClientContext(config)
   }
 
-  def apply(system: ActorSystem, master: ActorRef): ClientContext = {
-    new ClientContext(ClusterConfig.default(), system, master)
+  /**
+   * Create a [[ClientContext]] for the passed in actor system
+   * to interact with the master parsed from `gearpump.cluster.masters`
+   * through the given config.
+   */
+  def apply(config: Config, system: ActorSystem): ClientContext = {
+    new ClientContext(config, system, null)
   }
 
-  def apply(config: Config): ClientContext = new ClientContext(config, null, 
null)
-
+  /**
+   * Create a [[ClientContext]] for the passed in actor system
+   * to interact with the given master.
+   */
   def apply(config: Config, system: ActorSystem, master: ActorRef): 
ClientContext = {
     new ClientContext(config, system, master)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
new file mode 100644
index 0000000..cf5842f
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import com.typesafe.config.Config
+import 
org.apache.gearpump.cluster.client.RuntimeEnvironment.RemoteClientContext
+import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt
+
+/**
+ * The RuntimeEnvironment is the context decides where an application is 
submitted to.
+ */
+abstract class RuntimeEnvironment {
+  def newClientContext(akkaConf: Config): ClientContext
+}
+
+/**
+ * Usually RemoteRuntimeEnvironment is the default enviroment when user using 
command line
+ * to submit application. It will connect to the right remote Master.
+ */
+class RemoteRuntimeEnvironment extends RuntimeEnvironment {
+  override def newClientContext(akkaConf: Config): ClientContext = {
+    new RemoteClientContext(akkaConf)
+  }
+}
+
+object RuntimeEnvironment {
+  private var envInstance: RuntimeEnvironment = _
+
+  class RemoteClientContext(akkaConf: Config) extends ClientContext(akkaConf, 
null, null)
+
+  def get() : RuntimeEnvironment = {
+    Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironemnt)
+  }
+
+  def newClientContext(akkaConf: Config): ClientContext = {
+    get().newClientContext(akkaConf)
+  }
+
+  def setRuntimeEnv(env: RuntimeEnvironment): Unit = {
+    envInstance = env
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
index 9bde4d1..8abcd96 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -21,12 +21,11 @@ package org.apache.gearpump.cluster.embedded
 import scala.collection.JavaConverters._
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
-
 import akka.actor.{ActorRef, ActorSystem, Props}
 import com.typesafe.config.{Config, ConfigValueFactory}
-
 import org.apache.gearpump.cluster.ClusterConfig
 import org.apache.gearpump.cluster.client.ClientContext
+import 
org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext
 import org.apache.gearpump.cluster.master.{Master => MasterActor}
 import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
 import 
org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
 GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
@@ -36,34 +35,22 @@ import org.apache.gearpump.util.{LogUtil, Util}
  * Create a in-process cluster with single worker
  */
 class EmbeddedCluster(inputConfig: Config) {
-
-  private val workerCount: Int = 1
-  private var _master: ActorRef = null
-  private var _system: ActorSystem = null
-  private var _config: Config = null
-
   private val LOG = LogUtil.getLogger(getClass)
+  private val workerCount: Int = 1
+  private val port = Util.findFreePort().get
+  private[embedded] val config: Config = getConfig(inputConfig, port)
+  private[embedded] val system: ActorSystem = ActorSystem(MASTER, config)
+  private[embedded] val master: ActorRef = system.actorOf(Props[MasterActor], 
MASTER)
 
-  def start(): Unit = {
-    val port = Util.findFreePort().get
-    val akkaConf = getConfig(inputConfig, port)
-    _config = akkaConf
-    val system = ActorSystem(MASTER, akkaConf)
-
-    val master = system.actorOf(Props[MasterActor], MASTER)
-
-    0.until(workerCount).foreach { id =>
-      system.actorOf(Props(classOf[WorkerActor], master), 
classOf[WorkerActor].getSimpleName + id)
-    }
-    this._master = master
-    this._system = system
-
-    LOG.info("=================================")
-    LOG.info("Local Cluster is started at: ")
-    LOG.info(s"                 127.0.0.1:$port")
-    LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+  0.until(workerCount).foreach { id =>
+    system.actorOf(Props(classOf[WorkerActor], master), 
classOf[WorkerActor].getSimpleName + id)
   }
 
+  LOG.info("=================================")
+  LOG.info("Local Cluster is started at: ")
+  LOG.info(s"                 127.0.0.1:$port")
+  LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+
   private def getConfig(inputConfig: Config, port: Int): Config = {
     val config = inputConfig.
       withValue("akka.remote.netty.tcp.port", 
ConfigValueFactory.fromAnyRef(port)).
@@ -77,14 +64,10 @@ class EmbeddedCluster(inputConfig: Config) {
     config
   }
 
-  def newClientContext: ClientContext = {
-    ClientContext(_config, _system, _master)
-  }
-
   def stop(): Unit = {
-    _system.stop(_master)
-    _system.terminate()
-    Await.result(_system.whenTerminated, Duration.Inf)
+    system.stop(master)
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
new file mode 100644
index 0000000..fbea53f
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.embedded
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.client.{ClientContext, RuntimeEnvironment}
+import 
org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext
+
+/**
+ * The EmbeddedRuntimeEnvironemnt is initiated when user trying to launch 
their application
+ * from IDE. It will create an embedded cluster and user's applcaition will 
run in a single
+ * local process.
+ */
+class EmbeddedRuntimeEnvironemnt extends RuntimeEnvironment {
+  override def newClientContext(akkaConf: Config): ClientContext = {
+    new EmbeddedClientContext(akkaConf)
+  }
+}
+
+object EmbeddedRuntimeEnvironemnt {
+  class EmbeddedClientContext private(cluster: EmbeddedCluster)
+    extends ClientContext(cluster.config, cluster.system, cluster.master) {
+
+    def this(akkaConf: Config) {
+      this(new EmbeddedCluster(akkaConf))
+    }
+
+    override def close(): Unit = {
+      super.close()
+      cluster.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
index 79f31eb..c5ddcb0 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
@@ -21,6 +21,7 @@ import java.io.File
 import java.net.{URL, URLClassLoader}
 import java.util.jar.JarFile
 
+import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, 
RuntimeEnvironment}
 import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
 
 import scala.util.{Failure, Success, Try}
@@ -84,6 +85,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
       Thread.currentThread().setContextClassLoader(classLoader)
       val clazz = classLoader.loadClass(main)
       val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+      RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment)
       mainMethod.invoke(null, arguments)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala 
b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
index 1ec5660..654da4b 100644
--- a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
@@ -22,15 +22,15 @@ import java.io.File
 import java.net.{InetSocketAddress, Socket, SocketTimeoutException, 
URLClassLoader, UnknownHostException}
 import java.util.Properties
 import java.util.concurrent.{Executors, TimeUnit}
+
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext}
-
 import akka.actor.{Actor, ActorSystem, Address, Props}
 import akka.testkit.TestProbe
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, 
ConfigValueFactory}
-
 import org.apache.gearpump.cluster.MasterHarness.MockMaster
+import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, 
RuntimeEnvironment}
 import org.apache.gearpump.util.Constants._
 import org.apache.gearpump.util.{ActorUtil, FileUtils, LogUtil}
 
@@ -63,6 +63,8 @@ trait MasterHarness {
     masterProperties.put(s"${GEARPUMP_HOSTNAME}", s"$getHost")
 
     LOG.info(s"Actor system is started, $host, $port")
+    // Make sure there will be no EmbeddedCluster created, otherwise mock 
master won't work
+    RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment)
   }
 
   def shutdownActorSystem(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/docs/contents/dev/dev-write-1st-app.md
----------------------------------------------------------------------
diff --git a/docs/contents/dev/dev-write-1st-app.md 
b/docs/contents/dev/dev-write-1st-app.md
index 6234577..4ef1cc3 100644
--- a/docs/contents/dev/dev-write-1st-app.md
+++ b/docs/contents/dev/dev-write-1st-app.md
@@ -20,8 +20,8 @@ We'll use the classical 
[wordcount](https://github.com/apache/incubator-gearpump
              // (word, count1), (word, count2) => (word, count1 + count2)
              groupByKey().sum.log
        
-         context.submit(app).waitUntilFinish()
-         context.close()
+        context.submit(app).waitUntilFinish()
+        context.close()
          }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
index 5e3d472..cc6cd02 100644
--- 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -37,7 +37,6 @@ public class WordCount {
   }
 
   public static void main(Config akkaConf, String[] args) throws 
InterruptedException {
-
     // For split task, we config to create two tasks
     int splitTaskNumber = 2;
     Processor split = new 
Processor(Split.class).withParallelism(splitTaskNumber);
@@ -56,36 +55,9 @@ public class WordCount {
 
     UserConfig conf = UserConfig.empty();
     StreamApplication app = new StreamApplication("wordcountJava", conf, 
graph);
-
-    EmbeddedCluster localCluster = null;
-
-    Boolean debugMode = System.getProperty("DEBUG") != null;
-
-    if (debugMode) {
-      localCluster = new EmbeddedCluster(akkaConf);
-      localCluster.start();
-    }
-
-    ClientContext masterClient = null;
-
-    if (localCluster != null) {
-      masterClient = localCluster.newClientContext();
-    } else {
-      // create master client
-      // It will read the master settings under gearpump.cluster.masters
-      masterClient = new ClientContext(akkaConf);
-    }
-
+    ClientContext masterClient = ClientContext.apply(akkaConf);
     masterClient.submit(app);
 
-    if (debugMode) {
-      Thread.sleep(30 * 1000); // sleep for 30 seconds.
-    }
-
     masterClient.close();
-
-    if (localCluster != null) {
-      localCluster.stop();
-    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 2830b16..e8467fa 100644
--- 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -31,6 +31,7 @@ import 
org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
 import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.source.Watermark;
 import org.apache.gearpump.streaming.task.TaskContext;
 import scala.Tuple2;
 
@@ -46,7 +47,7 @@ public class WordCount {
   }
 
   public static void main(Config akkaConf, String[] args) throws 
InterruptedException {
-    ClientContext context = new ClientContext(akkaConf);
+    ClientContext context = ClientContext.apply(akkaConf);
     JavaStreamApp app = new JavaStreamApp("JavaDSL", context, 
UserConfig.empty());
 
     JavaStream<String> sentence = app.source(new StringSource("This is a good 
start, bingo!! bingo!!"),
@@ -69,6 +70,7 @@ public class WordCount {
   private static class StringSource implements DataSource {
 
     private final String str;
+    private boolean hasNext = true;
 
     StringSource(String str) {
       this.str = str;
@@ -80,7 +82,9 @@ public class WordCount {
 
     @Override
     public Message read() {
-      return new DefaultMessage(str, Instant.now());
+      Message msg = new DefaultMessage(str, Instant.now());
+      hasNext = false;
+      return msg;
     }
 
     @Override
@@ -89,7 +93,11 @@ public class WordCount {
 
     @Override
     public Instant getWatermark() {
-      return Instant.now();
+      if (hasNext) {
+        return Instant.now();
+      } else {
+        return Watermark.MAX();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
index 0e3d840..14965e7 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -21,7 +21,6 @@ package org.apache.gearpump.streaming.examples.wordcount
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
 import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.source.DataSourceProcessor
@@ -38,10 +37,7 @@ object WordCount extends AkkaApp with ArgumentsParser {
   override val options: Array[(String, CLIOption[Any])] = Array(
     "split" -> CLIOption[Int]("<how many source tasks>", required = false,
       defaultValue = Some(1)),
-    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1)),
-    "debug" -> CLIOption[Boolean]("<true|false>", required = false, 
defaultValue = Some(false)),
-    "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", 
required = false,
-      defaultValue = Some(30))
+    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1))
   )
 
   def application(config: ParseResult, system: ActorSystem): StreamApplication 
= {
@@ -60,32 +56,10 @@ object WordCount extends AkkaApp with ArgumentsParser {
 
   override def main(akkaConf: Config, args: Array[String]): Unit = {
     val config = parse(args)
-
-    val debugMode = config.getBoolean("debug")
-    val sleepSeconds = config.getInt("sleep")
-
-    val localCluster = if (debugMode) {
-      val cluster = new EmbeddedCluster(akkaConf: Config)
-      cluster.start()
-      Some(cluster)
-    } else {
-      None
-    }
-
-    val context: ClientContext = localCluster match {
-      case Some(local) => local.newClientContext
-      case None => ClientContext(akkaConf)
-    }
-
+    val context: ClientContext = ClientContext(akkaConf)
     val app = application(config, context.system)
     context.submit(app)
-
-    if (debugMode) {
-      Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging.
-    }
-
     context.close()
-    localCluster.map(_.stop())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
index 65f63d2..d10dbe7 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -19,7 +19,6 @@
 package org.apache.gearpump.streaming.examples.wordcount.dsl
 
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
 import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
 import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._
@@ -31,9 +30,7 @@ object WordCount extends AkkaApp with ArgumentsParser {
   override val options: Array[(String, CLIOption[Any])] = Array.empty
 
   override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val cluster = new EmbeddedCluster(akkaConf)
-    cluster.start()
-    val context: ClientContext = cluster.newClientContext
+    val context: ClientContext = ClientContext(akkaConf)
     val app = StreamApp("dsl", context)
     val data = "This is a good start, bingo!! bingo!!"
     app.source(data.lines.toList, 1, "source").
@@ -44,6 +41,5 @@ object WordCount extends AkkaApp with ArgumentsParser {
 
     context.submit(app).waitUntilFinish()
     context.close()
-    cluster.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
index 99ebe17..f45cae0 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -26,7 +26,6 @@ import 
org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient
 import 
org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient
 import akka.stream.impl.StreamLayout.Module
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster
 import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.util.Graph
 
@@ -48,18 +47,8 @@ object RemoteGraph {
    */
   class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: 
ActorSystem)
     extends SubGraphMaterializer {
-    private val local = if (useInProcessCluster) {
-      val cluster = EmbeddedCluster()
-      cluster.start()
-      Some(cluster)
-    } else {
-      None
-    }
 
-    private val context: ClientContext = local match {
-      case Some(l) => l.newClientContext
-      case None => ClientContext(system)
-    }
+    private val context: ClientContext = ClientContext()
 
     override def materialize(subGraph: SubGraph,
         inputMatValues: scala.collection.mutable.Map[Module, Any]):
@@ -107,7 +96,6 @@ object RemoteGraph {
 
     override def shutdown: Unit = {
       context.close()
-      local.foreach(_.stop())
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
index e2d421c..987546c 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -60,7 +60,7 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
     val akkaConf = updateClientConfig(inputAkkaConf)
     val system = ActorSystem("storm", akkaConf)
 
-    val clientContext = new ClientContext(akkaConf, system, null)
+    val clientContext = ClientContext(akkaConf, system, null)
     val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
     val thriftConf: JMap[AnyRef, AnyRef] = Map(
       Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/external/kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/resources/log4j.properties 
b/external/kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..66fbaf4
--- /dev/null
+++ b/external/kafka/src/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A kafka test could be run tens of times and generates tons of logs,
+# which makes it other to spot failure.
+# Setting the log level to ERROR to reduce the logs.
+log4j.rootLevel=ERROR
+log4j.rootAppender=console
+log4j.rootLogger=${log4j.rootLevel},${log4j.rootAppender}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+# console appender
+# Add "console" to rootlogger above if you want to use this
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%c{1}] %m%n
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e04df0dd/project/BuildExamples.scala
----------------------------------------------------------------------
diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala
index 00affb0..afb7459 100644
--- a/project/BuildExamples.scala
+++ b/project/BuildExamples.scala
@@ -39,21 +39,9 @@ object BuildExamples extends sbt.Build {
     example_hbase
   )
 
-  lazy val example_hbase = Project(
-    id = "gearpump-examples-hbase",
-    base = file("examples/streaming/hbase"),
-    settings = 
exampleSettings("org.apache.gearpump.streaming.examples.hbase.HBaseConn") ++
-      Seq(
-        libraryDependencies ++= Seq(
-          "org.apache.hadoop" % "hadoop-common" % hadoopVersion
-            exclude("commons-beanutils", "commons-beanutils-core")
-            exclude("commons-beanutils", "commons-beanutils")
-            exclude("asm", "asm")
-            exclude("org.ow2.asm", "asm")
-        )
-      ) ++ include("examples/streaming/hbase", "external/hbase")
-  ) dependsOn(core, streaming % "compile; test->test", external_hbase)
-
+  /**
+   * The follow examples can be run in IDE or with `sbt run`
+   */
   lazy val wordcountJava = Project(
     id = "gearpump-examples-wordcountjava",
     base = file("examples/streaming/wordcount-java"),
@@ -82,6 +70,17 @@ object BuildExamples extends sbt.Build {
       include("examples/streaming/complexdag")
   ).dependsOn(core, streaming % "compile; test->test")
 
+  lazy val pagerank = Project(
+    id = "gearpump-examples-pagerank",
+    base = file("examples/pagerank"),
+    settings =
+      
exampleSettings("org.apache.gearpump.experiments.pagerank.example.PageRankExample")
 ++
+        include("examples/pagerank")
+  ).dependsOn(core % "provided", streaming % "provided; test->test")
+
+  /**
+   * The following examples must be submitted to a deployed gearpump clutser
+   */
   lazy val distributedshell = Project(
     id = "gearpump-examples-distributedshell",
     base = file("examples/distributedshell"),
@@ -90,8 +89,8 @@ object BuildExamples extends sbt.Build {
         Some("org.apache.gearpump.examples.distributedshell.DistributedShell"),
       target in assembly := baseDirectory.value.getParentFile / "target" /
         CrossVersion.binaryScalaVersion(scalaVersion.value)
-    ) ++ include("examples/distributedshell")
-  ).dependsOn(core % "compile; test->test")
+    )
+  ).dependsOn(core % "provided; test->test")
 
   lazy val distributeservice = Project(
     id = "gearpump-examples-distributeservice",
@@ -109,7 +108,18 @@ object BuildExamples extends sbt.Build {
         "io.spray" %% "spray-routing-shapeless2" % sprayVersion
         )
     ) ++ include("examples/distributeservice")
-  ).dependsOn(core % "compile; test->test")
+  ).dependsOn(core % "provided; test->test")
+
+  lazy val example_hbase = Project(
+    id = "gearpump-examples-hbase",
+    base = file("examples/streaming/hbase"),
+    settings = 
exampleSettings("org.apache.gearpump.streaming.examples.hbase.HBaseConn") ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided"
+        )
+      )
+  ) dependsOn(core % "provided", streaming % "provided; test->test", 
external_hbase)
 
   lazy val fsio = Project(
     id = "gearpump-examples-fsio",
@@ -117,26 +127,17 @@ object BuildExamples extends sbt.Build {
     settings = 
exampleSettings("org.apache.gearpump.streaming.examples.fsio.SequenceFileIO") ++
       Seq(
         libraryDependencies ++= Seq(
-          "org.apache.hadoop" % "hadoop-common" % hadoopVersion
-            exclude("org.mortbay.jetty", "jetty-util")
-            exclude("org.mortbay.jetty", "jetty")
-            exclude("org.fusesource.leveldbjni", "leveldbjni-all")
-            exclude("tomcat", "jasper-runtime")
-            exclude("commons-beanutils", "commons-beanutils-core")
-            exclude("commons-beanutils", "commons-beanutils")
-            exclude("asm", "asm")
-            exclude("org.ow2.asm", "asm")
+          "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided"
         )
-      ) ++ include("examples/streaming/fsio")
-  ).dependsOn(core, streaming % "compile; test->test")
+      )
+  ).dependsOn(core % "provided", streaming % "provided; test->test")
 
   lazy val examples_kafka = Project(
     id = "gearpump-examples-kafka",
     base = file("examples/streaming/kafka"),
     settings =
-      
exampleSettings("org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount")
 ++
-    include("examples/streaming/kafka", "external/kafka")
-  ).dependsOn(core, streaming % "compile; test->test", external_kafka)
+      
exampleSettings("org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount")
+  ).dependsOn(core % "provided", streaming % "provided; test->test", 
external_kafka)
 
   lazy val examples_state = Project(
     id = "gearpump-examples-state",
@@ -144,30 +145,13 @@ object BuildExamples extends sbt.Build {
     settings = 
exampleSettings("org.apache.gearpump.streaming.examples.state.MessageCountApp") 
++
       Seq(
         libraryDependencies ++= Seq(
-          "org.apache.hadoop" % "hadoop-common" % hadoopVersion
-            exclude("org.mortbay.jetty", "jetty-util")
-            exclude("org.mortbay.jetty", "jetty")
-            exclude("org.fusesource.leveldbjni", "leveldbjni-all")
-            exclude("tomcat", "jasper-runtime")
-            exclude("commons-beanutils", "commons-beanutils-core")
-            exclude("commons-beanutils", "commons-beanutils")
-            exclude("asm", "asm")
-            exclude("org.ow2.asm", "asm"),
-          "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion
+          "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided",
+          "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % "provided"
         )
-      ) ++ include("examples/streaming/state",
-      "external/hadoopfs", "external/monoid", "external/serializer", 
"external/kafka")
-  ).dependsOn(core, streaming % "compile; test->test",
+      )
+  ).dependsOn(core % "provided", streaming % "provided; test->test",
     external_hadoopfs, external_monoid, external_serializer, external_kafka)
 
-  lazy val pagerank = Project(
-    id = "gearpump-examples-pagerank",
-    base = file("examples/pagerank"),
-    settings =
-      
exampleSettings("org.apache.gearpump.experiments.pagerank.example.PageRankExample")
 ++
-      include("examples/pagerank")
-  ).dependsOn(core, streaming % "compile; test->test")
-
   private def exampleSettings(className: String): Seq[Def.Setting[_]] =
     commonSettings ++ noPublish ++ myAssemblySettings ++ Seq(
       mainClass in(Compile, packageBin) :=


Reply via email to