http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala
new file mode 100644
index 0000000..edfc635
--- /dev/null
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.loader.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, 
Scan}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.api.java.function.{FlatMapFunction, Function, 
VoidFunction}
+import org.apache.spark.streaming.api.java.JavaDStream
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+/**
+ * This is the Java Wrapper over HBaseContext which is written in
+ * Scala.  This class will be used by developers that want to
+ * work with Spark or Spark Streaming in Java
+ *
+ * @param jsc    This is the JavaSparkContext that we will wrap
+ * @param config This is the config information to out HBase cluster
+ */
+class JavaHBaseContext(@transient jsc: JavaSparkContext,
+                       @transient config: Configuration) extends Serializable {
+  val hbaseContext = new HBaseContext(jsc.sc, config)
+
+  /**
+   * A simple enrichment of the traditional Spark javaRdd foreachPartition.
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param javaRdd Original javaRdd with data to iterate over
+   * @param f       Function to be given a iterator to iterate through
+   *                the RDD values and a HConnection object to interact
+   *                with HBase
+   */
+  def foreachPartition[T](javaRdd: JavaRDD[T],
+                          f: VoidFunction[(java.util.Iterator[T], 
Connection)]) = {
+
+    hbaseContext.foreachPartition(javaRdd.rdd,
+      (it: Iterator[T], conn: Connection) => {
+        f.call((it, conn))
+      })
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming dStream foreach
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param javaDstream Original DStream with data to iterate over
+   * @param f           Function to be given a iterator to iterate through
+   *                    the JavaDStream values and a HConnection object to
+   *                    interact with HBase
+   */
+  def foreachPartition[T](javaDstream: JavaDStream[T],
+                          f: VoidFunction[(Iterator[T], Connection)]) = {
+    hbaseContext.foreachPartition(javaDstream.dstream,
+      (it: Iterator[T], conn: Connection) => f.call(it, conn))
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark JavaRDD mapPartition.
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * Note: Make sure to partition correctly to avoid memory issue when
+   * getting data from HBase
+   *
+   * @param javaRdd Original JavaRdd with data to iterate over
+   * @param f       Function to be given a iterator to iterate through
+   *                the RDD values and a HConnection object to interact
+   *                with HBase
+   * @return        Returns a new RDD generated by the user definition
+   *                function just like normal mapPartition
+   */
+  def mapPartitions[T, R](javaRdd: JavaRDD[T],
+                          f: FlatMapFunction[(java.util.Iterator[T],
+                            Connection), R]): JavaRDD[R] = {
+
+    def fn = (it: Iterator[T], conn: Connection) =>
+      asScalaIterator(
+        f.call((asJavaIterator(it), conn)).iterator()
+      )
+
+    JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
+      (iterator: Iterator[T], connection: Connection) =>
+        fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R])
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming JavaDStream
+   * mapPartition.
+   *
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * Note: Make sure to partition correctly to avoid memory issue when
+   * getting data from HBase
+   *
+   * @param javaDstream Original JavaDStream with data to iterate over
+   * @param mp          Function to be given a iterator to iterate through
+   *                    the JavaDStream values and a HConnection object to
+   *                    interact with HBase
+   * @return            Returns a new JavaDStream generated by the user
+   *                    definition function just like normal mapPartition
+   */
+  def streamMap[T, U](javaDstream: JavaDStream[T],
+                      mp: Function[(Iterator[T], Connection), Iterator[U]]):
+  JavaDStream[U] = {
+    
JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
+      (it: Iterator[T], conn: Connection) =>
+        mp.call(it, conn))(fakeClassTag[U]))(fakeClassTag[U])
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.foreachPartition method.
+   *
+   * It allow addition support for a user to take JavaRDD
+   * and generate puts and send them to HBase.
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param javaRdd   Original JavaRDD with data to iterate over
+   * @param tableName The name of the table to put into
+   * @param f         Function to convert a value in the JavaRDD
+   *                  to a HBase Put
+   */
+  def bulkPut[T](javaRdd: JavaRDD[T],
+                 tableName: TableName,
+                 f: Function[(T), Put]) {
+
+    hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t))
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamMapPartition method.
+   *
+   * It allow addition support for a user to take a JavaDStream and
+   * generate puts and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param javaDstream Original DStream with data to iterate over
+   * @param tableName   The name of the table to put into
+   * @param f           Function to convert a value in
+   *                    the JavaDStream to a HBase Put
+   */
+  def streamBulkPut[T](javaDstream: JavaDStream[T],
+                       tableName: TableName,
+                       f: Function[T, Put]) = {
+    hbaseContext.streamBulkPut(javaDstream.dstream,
+      tableName,
+      (t: T) => f.call(t))
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.foreachPartition method.
+   *
+   * It allow addition support for a user to take a JavaRDD and
+   * generate delete and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param javaRdd   Original JavaRDD with data to iterate over
+   * @param tableName The name of the table to delete from
+   * @param f         Function to convert a value in the JavaRDD to a
+   *                  HBase Deletes
+   * @param batchSize The number of deletes to batch before sending to HBase
+   */
+  def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
+                    f: Function[T, Delete], batchSize: Integer) {
+    hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), 
batchSize)
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamBulkMutation method.
+   *
+   * It allow addition support for a user to take a JavaDStream and
+   * generate Delete and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param javaDStream Original DStream with data to iterate over
+   * @param tableName   The name of the table to delete from
+   * @param f           Function to convert a value in the JavaDStream to a
+   *                    HBase Delete
+   * @param batchSize   The number of deletes to be sent at once
+   */
+  def streamBulkDelete[T](javaDStream: JavaDStream[T],
+                          tableName: TableName,
+                          f: Function[T, Delete],
+                          batchSize: Integer) = {
+    hbaseContext.streamBulkDelete(javaDStream.dstream, tableName,
+      (t: T) => f.call(t),
+      batchSize)
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.mapPartition method.
+   *
+   * It allow addition support for a user to take a JavaRDD and generates a
+   * new RDD based on Gets and the results they bring back from HBase
+   *
+   * @param tableName     The name of the table to get from
+   * @param batchSize     batch size of how many gets to retrieve in a single 
fetch
+   * @param javaRdd       Original JavaRDD with data to iterate over
+   * @param makeGet       Function to convert a value in the JavaRDD to a
+   *                      HBase Get
+   * @param convertResult This will convert the HBase Result object to
+   *                      what ever the user wants to put in the resulting
+   *                      JavaRDD
+   * @return              New JavaRDD that is created by the Get to HBase
+   */
+  def bulkGet[T, U](tableName: TableName,
+                    batchSize: Integer,
+                    javaRdd: JavaRDD[T],
+                    makeGet: Function[T, Get],
+                    convertResult: Function[Result, U]): JavaRDD[U] = {
+
+    JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
+      batchSize,
+      javaRdd.rdd,
+      (t: T) => makeGet.call(t),
+      (r: Result) => {
+        convertResult.call(r)
+      })(fakeClassTag[U]))(fakeClassTag[U])
+
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamMap method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generates a new DStream based on Gets and the results
+   * they bring back from HBase
+   *
+
+   * @param tableName     The name of the table to get from
+   * @param batchSize     The number of gets to be batched together
+   * @param javaDStream   Original DStream with data to iterate over
+   * @param makeGet       Function to convert a value in the JavaDStream to a
+   *                      HBase Get
+   * @param convertResult This will convert the HBase Result object to
+   *                      what ever the user wants to put in the resulting
+   *                      JavaDStream
+   * @return              New JavaDStream that is created by the Get to HBase
+   */
+  def streamBulkGet[T, U](tableName: TableName,
+                          batchSize: Integer,
+                          javaDStream: JavaDStream[T],
+                          makeGet: Function[T, Get],
+                          convertResult: Function[Result, U]) {
+    JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
+      batchSize,
+      javaDStream.dstream,
+      (t: T) => makeGet.call(t),
+      (r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U])
+  }
+
+  /**
+   * This function will use the native HBase TableInputFormat with the
+   * given scan object to generate a new JavaRDD
+   *
+   * @param tableName The name of the table to scan
+   * @param scans     The HBase scan object to use to read data from HBase
+   * @param f         Function to convert a Result object from HBase into
+   *                  What the user wants in the final generated JavaRDD
+   * @return          New JavaRDD with results from scan
+   */
+  def hbaseRDD[U](tableName: TableName,
+                  scans: Scan,
+                  f: Function[(ImmutableBytesWritable, Result), U]):
+  JavaRDD[U] = {
+    JavaRDD.fromRDD(
+      hbaseContext.hbaseRDD[U](tableName,
+        scans,
+        (v: (ImmutableBytesWritable, Result)) =>
+          f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
+  }
+
+  /**
+   * A overloaded version of HBaseContext hbaseRDD that define the
+   * type of the resulting JavaRDD
+   *
+   * @param tableName The name of the table to scan
+   * @param scans     The HBase scan object to use to read data from HBase
+   * @return          New JavaRDD with results from scan
+   */
+  def hbaseRDD(tableName: TableName,
+               scans: Scan):
+  JavaRDD[(ImmutableBytesWritable, Result)] = {
+    JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
+  }
+
+  /**
+   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+   *
+   * This method is used to keep ClassTags out of the external Java API, as 
the Java compiler
+   * cannot produce them automatically. While this ClassTag-faking does please 
the compiler,
+   * it can cause problems at runtime if the Scala API relies on ClassTags for 
correctness.
+   *
+   * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
+   * just worse performance or security issues.
+   * For instance, an Array[AnyRef] can hold any type T,
+   * but may lose primitive
+   * specialization.
+   */
+  private[spark]
+  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/spark/KeyFamilyQualifier.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/spark/KeyFamilyQualifier.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/KeyFamilyQualifier.scala
new file mode 100644
index 0000000..d7c6277
--- /dev/null
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/KeyFamilyQualifier.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.loader.spark
+
+import java.io.Serializable
+
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * This is the key to be used for sorting and shuffling.
+ *
+ * We will only partition on the rowKey but we will sort on all three
+ *
+ * @param rowKey    Record RowKey
+ * @param family    Record ColumnFamily
+ * @param qualifier Cell Qualifier
+ */
+class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val 
qualifier:Array[Byte])
+  extends Comparable[KeyFamilyQualifier] with Serializable {
+  override def compareTo(o: KeyFamilyQualifier): Int = {
+    var result = Bytes.compareTo(rowKey, o.rowKey)
+    if (result == 0) {
+      result = Bytes.compareTo(family, o.family)
+      if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier)
+    }
+    result
+  }
+  override def toString: String = {
+    Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + 
Bytes.toString(qualifier)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
new file mode 100644
index 0000000..7838593
--- /dev/null
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -0,0 +1,335 @@
+package org.apache.s2graph.loader.subscriber
+
+import com.typesafe.config.{Config, ConfigFactory}
+import kafka.javaapi.producer.Producer
+import kafka.producer.KeyedMessage
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.client._
+import org.apache.s2graph.core._
+import org.apache.s2graph.spark.spark.WithKafka
+import org.apache.spark.{Accumulable, SparkContext}
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+import scala.concurrent.ExecutionContext
+
+object GraphConfig {
+  var database = ""
+  var zkQuorum = ""
+  var kafkaBrokers = ""
+  var cacheTTL = s"${60 * 60 * 1}"
+  def apply(phase: String, dbUrl: Option[String], zkAddr: Option[String], 
kafkaBrokerList: Option[String]): Config = {
+    database = dbUrl.getOrElse("jdbc:mysql://localhost:3306/graph_dev")
+    zkQuorum = zkAddr.getOrElse("localhost")
+
+//    val newConf = new util.HashMap[String, Object]()
+//    newConf.put("hbase.zookeeper.quorum", zkQuorum)
+//    newConf.put("db.default.url", database)
+//    newConf.put("kafka.metadata.broker.list", kafkaBrokers)
+    val newConf =
+      if (kafkaBrokerList.isEmpty) Map("hbase.zookeeper.quorum" -> zkQuorum, 
"db.default.url" -> database, "cache.ttl.seconds" -> cacheTTL)
+      else Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> 
database, "kafka.metadata.broker.list" -> kafkaBrokers, "cache.ttl.seconds" -> 
cacheTTL)
+
+    ConfigFactory.parseMap(newConf).withFallback(Graph.DefaultConfig)
+  }
+}
+
+object GraphSubscriberHelper extends WithKafka {
+
+
+  type HashMapAccumulable = Accumulable[HashMap[String, Long], (String, Long)]
+
+
+  lazy val producer = new Producer[String, 
String](kafkaConf(GraphConfig.kafkaBrokers))
+  var config: Config = _
+  private val writeBufferSize = 1024 * 1024 * 8
+  private val sleepPeriod = 10000
+  private val maxTryNum = 10
+
+  var g: Graph = null
+  var management: Management = null
+  val conns = new scala.collection.mutable.HashMap[String, Connection]()
+
+  def toOption(s: String) = {
+    s match {
+      case "" | "none" => None
+      case _ => Some(s)
+    }
+  }
+
+  def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: 
String): Unit = {
+    config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), 
toOption(kafkaBrokerList))
+
+    if (g == null) {
+      val ec = ExecutionContext.Implicits.global
+      g = new Graph(config)(ec)
+      management = new Management(g)
+    }
+  }
+
+  def getConn(zkQuorum: String): Connection = {
+    conns.getOrElseUpdate(zkQuorum, {
+      val hbaseConf = HBaseConfiguration.create()
+      hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
+      ConnectionFactory.createConnection(hbaseConf)
+    })
+    conns(zkQuorum)
+  }
+  //  def apply(phase: String, dbUrl: Option[String], zkQuorum: 
Option[String], kafkaBrokerList: Option[String]): Unit  = {
+  //    Graph.apply(GraphConfig(phase, dbUrl, zkQuorum, 
kafkaBrokerList))(ExecutionContext.Implicits.global)
+  //  }
+  def report(key: String, value: Option[String], topic: String = "report") = {
+    val msg = Seq(Some(key), value).flatten.mkString("\t")
+    val kafkaMsg = new KeyedMessage[String, String](topic, msg)
+    producer.send(kafkaMsg)
+  }
+
+  def toGraphElements(msgs: Seq[String], labelMapping: Map[String, String] = 
Map.empty)
+                     (statFunc: (String, Int) => Unit): Iterable[GraphElement] 
= {
+    (for (msg <- msgs) yield {
+      statFunc("total", 1)
+      Graph.toGraphElement(msg, labelMapping) match {
+        case Some(e) if e.isInstanceOf[Edge] =>
+          statFunc("EdgeParseOk", 1)
+          e.asInstanceOf[Edge]
+        case Some(v) if v.isInstanceOf[Vertex] =>
+          statFunc("VertexParseOk", 1)
+          v.asInstanceOf[Vertex]
+        case Some(x) =>
+          throw new RuntimeException(s">>>>> GraphSubscriber.toGraphElements: 
parsing failed. ${x.serviceName}")
+        case None =>
+          throw new RuntimeException(s"GraphSubscriber.toGraphElements: 
parsing failed. $msg")
+      }
+
+    }).toList
+  }
+
+//  private def storeRec(zkQuorum: String, tableName: String, puts: List[Put], 
elementsSize: Int, tryNum: Int)
+//                      (statFunc: (String, Int) => Unit, statPrefix: String = 
"edge"): Unit = {
+//    if (tryNum <= 0) {
+//      statFunc("errorStore", elementsSize)
+//      throw new RuntimeException(s"retry failed after $maxTryNum")
+//    }
+//    val conn = getConn(zkQuorum)
+//    val mutator = conn.getBufferedMutator(TableName.valueOf(tableName))
+//    //      val table = conn.getTable(TableName.valueOf(tableName))
+//    //      table.setAutoFlush(false, false)
+//
+//    try {
+//      puts.foreach { put => put.setDurability(Durability.ASYNC_WAL) }
+//      mutator.mutate(puts)
+//      //        table.put(puts)
+//      statFunc(s"$statPrefix:storeOk", elementsSize)
+//    } catch {
+//      case e: Throwable =>
+//        e.printStackTrace()
+//        Thread.sleep(sleepPeriod)
+//        storeRec(zkQuorum, tableName, puts, elementsSize, tryNum - 
1)(statFunc)
+//    } finally {
+//      mutator.close()
+//      //        table.close()
+//    }
+//  }
+//
+//  def storeDegreeBulk(zkQuorum: String, tableName: String)
+//                     (degrees: Iterable[(String, String, String, Int)], 
labelMapping: Map[String, String] = Map.empty)
+//                     (mapAccOpt: Option[HashMapAccumulable]): 
Iterable[(String, Long)] = {
+//    val counts = HashMap[String, Long]()
+//    val statFunc = storeStat(counts)(mapAccOpt) _
+//
+//    for {
+//      (vertexId, labelName, direction, degreeVal) <- degrees
+//      incrementRequests <- TransferToHFile.buildDegreePutRequests(vertexId, 
labelName, direction, degreeVal)
+//    } {
+//      storeRec(zkQuorum, tableName, incrementRequests, degrees.size, 
maxTryNum)(statFunc, "degree")
+//    }
+//    counts
+//  }
+//  def storeBulk(zkQuorum: String, tableName: String)
+//               (msgs: Seq[String], labelMapping: Map[String, String] = 
Map.empty, autoCreateEdge: Boolean = false)
+//               (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, 
Long)] = {
+//
+//    val counts = HashMap[String, Long]()
+//    val statFunc = storeStat(counts)(mapAccOpt) _
+//    val elements = toGraphElements(msgs, labelMapping)(statFunc)
+//
+//    val puts = elements.flatMap { element =>
+//      element match {
+//        case v: Vertex if v.op == GraphUtil.operations("insert") || v.op == 
GraphUtil.operations("insertBulk") =>
+//          v.buildPuts()
+//        case e: Edge if e.op == GraphUtil.operations("insert") || e.op == 
GraphUtil.operations("insertBulk") =>
+//          EdgeWriter(e).insertBulkForLoader(autoCreateEdge)
+//        case _ => Nil
+//      }
+//    } toList
+//
+//    storeRec(zkQuorum, tableName, puts, msgs.size, maxTryNum)(statFunc)
+//    counts
+//  }
+
+  def storeStat(counts: HashMap[String, Long])(mapAccOpt: 
Option[HashMapAccumulable])(key: String, value: Int) = {
+    counts.put(key, counts.getOrElse(key, 0L) + value)
+    mapAccOpt match {
+      case None =>
+      case Some(mapAcc) => mapAcc += (key -> value)
+    }
+  }
+
+  def toLabelMapping(lableMapping: String): Map[String, String] = {
+    (for {
+      token <- lableMapping.split(",")
+      inner = token.split(":") if inner.length == 2
+    } yield {
+        (inner.head, inner.last)
+      }).toMap
+  }
+
+  def isValidQuorum(quorum: String) = {
+    quorum.split(",").size > 1
+  }
+}
+
+//object GraphSubscriber extends SparkApp with WithKafka {
+//  val sleepPeriod = 5000
+//  val usages =
+//    s"""
+//       |/**
+//       |* this job read edge format(TSV) from HDFS file system then bulk 
load edges into s2graph. assumes that newLabelName is already created by API.
+//       |* params:
+//       |*  0. hdfsPath: where is your data in hdfs. require full path with 
hdfs:// predix
+//       |*  1. dbUrl: jdbc database connection string to specify database for 
meta.
+//       |*  2. labelMapping: oldLabel:newLabel delimited by ,
+//       |*  3. zkQuorum: target hbase zkQuorum where this job will publish 
data to.
+//       |*  4. hTableName: target hbase physical table name where this job 
will publish data to.
+//       |*  5. batchSize: how many edges will be batched for Put request to 
target hbase.
+//       |*  6. kafkaBrokerList: using kafka as fallback queue. when something 
goes wrong during batch, data needs to be replay will be stored in kafka.
+//       |*  7. kafkaTopic: fallback queue topic.
+//       |*  8. edgeAutoCreate: true if need to create reversed edge 
automatically.
+//       |*
+//       |* after this job finished, s2graph will have data with sequence 
corresponding newLabelName.
+//       |* change this newLabelName to ogirinalName if you want to online 
replace of label.
+//       |*
+//       |*/
+//     """.stripMargin
+//
+//  override def run() = {
+//    /**
+//     * Main function
+//     */
+//    println(args.toList)
+////    if (args.length != 10) {
+////      System.err.println(usages)
+////      System.exit(1)
+////    }
+//    val hdfsPath = args(0)
+//    val dbUrl = args(1)
+//    val labelMapping = GraphSubscriberHelper.toLabelMapping(args(2))
+//
+//    val zkQuorum = args(3)
+//    val hTableName = args(4)
+//    val batchSize = args(5).toInt
+//    val kafkaBrokerList = args(6)
+//    val kafkaTopic = args(7)
+//    val edgeAutoCreate = args(8).toBoolean
+//    val vertexDegreePathOpt = if (args.length >= 10) 
GraphSubscriberHelper.toOption(args(9)) else None
+//
+//    val conf = sparkConf(s"$hdfsPath: GraphSubscriber")
+//    val sc = new SparkContext(conf)
+//    val mapAcc = sc.accumulable(HashMap.empty[String, Long], 
"counter")(HashMapParam[String, Long](_ + _))
+//
+//
+//    if (!GraphSubscriberHelper.isValidQuorum(zkQuorum)) throw new 
RuntimeException(s"$zkQuorum is not valid.")
+//
+//    /** this job expect only one hTableName. all labels in this job will be 
stored in same physical hbase table */
+//    try {
+//
+//      import GraphSubscriberHelper._
+//      // set local driver setting.
+//      val phase = System.getProperty("phase")
+//      GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
+//
+//      /** copy when oldLabel exist and newLabel done exist. otherwise 
ignore. */
+//
+//      if (labelMapping.isEmpty) {
+//        // pass
+//      } else {
+//        for {
+//          (oldLabelName, newLabelName) <- labelMapping
+//        } {
+//          Management.copyLabel(oldLabelName, newLabelName, 
toOption(hTableName))
+//        }
+//      }
+//
+//      vertexDegreePathOpt.foreach { vertexDegreePath =>
+//        val vertexDegrees = sc.textFile(vertexDegreePath).filter(line => 
line.split("\t").length == 4).map { line =>
+//          val tokens = line.split("\t")
+//          (tokens(0), tokens(1), tokens(2), tokens(3).toInt)
+//        }
+//        vertexDegrees.foreachPartition { partition =>
+//
+//          // init Graph
+//          val phase = System.getProperty("phase")
+//          GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, 
kafkaBrokerList)
+//
+//          partition.grouped(batchSize).foreach { msgs =>
+//            try {
+//              val start = System.currentTimeMillis()
+//              val counts = GraphSubscriberHelper.storeDegreeBulk(zkQuorum, 
hTableName)(msgs, labelMapping)(Some(mapAcc))
+//              for ((k, v) <- counts) {
+//                mapAcc +=(k, v)
+//              }
+//              val duration = System.currentTimeMillis() - start
+//              println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, 
$hTableName")
+//            } catch {
+//              case e: Throwable =>
+//                println(s"[Failed]: store $e")
+//
+//                msgs.foreach { msg =>
+//                  GraphSubscriberHelper.report(msg.toString(), 
Some(e.getMessage()), topic = kafkaTopic)
+//                }
+//            }
+//          }
+//        }
+//      }
+//
+//
+//      val msgs = sc.textFile(hdfsPath)
+//      msgs.foreachPartition(partition => {
+//        // set executor setting.
+//        val phase = System.getProperty("phase")
+//        GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
+//
+//        partition.grouped(batchSize).foreach { msgs =>
+//          try {
+//            val start = System.currentTimeMillis()
+//            //            val counts =
+//            //              GraphSubscriberHelper.store(msgs, 
GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
+//            val counts =
+//              GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, 
labelMapping, edgeAutoCreate)(Some(mapAcc))
+//
+//            for ((k, v) <- counts) {
+//              mapAcc +=(k, v)
+//            }
+//            val duration = System.currentTimeMillis() - start
+//            println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, 
$hTableName")
+//          } catch {
+//            case e: Throwable =>
+//              println(s"[Failed]: store $e")
+//
+//              msgs.foreach { msg =>
+//                GraphSubscriberHelper.report(msg, Some(e.getMessage()), 
topic = kafkaTopic)
+//              }
+//          }
+//        }
+//      })
+//
+//      logInfo(s"counter: $mapAcc")
+//      println(s"Stats: ${mapAcc}")
+//
+//    } catch {
+//      case e: Throwable =>
+//        println(s"job failed with exception: $e")
+//        throw e
+//    }
+//
+//  }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
new file mode 100644
index 0000000..60b43ca
--- /dev/null
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -0,0 +1,194 @@
+package org.apache.s2graph.loader.subscriber
+
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.mapreduce.{TableOutputFormat}
+import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.regionserver.BloomType
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId, 
LabelWithDirection}
+import org.apache.s2graph.loader.spark.{KeyFamilyQualifier, HBaseContext, 
FamilyHFileWriteOptions}
+import org.apache.s2graph.spark.spark.SparkApp
+import org.apache.spark.{SparkContext}
+import org.apache.spark.rdd.RDD
+import org.hbase.async.{PutRequest}
+import play.api.libs.json.Json
+import scala.collection.JavaConversions._
+
+
+object TransferToHFile extends SparkApp with JSONParser {
+
+  val usages =
+    s"""
+       |create HFiles for hbase table on zkQuorum specified.
+       |note that hbase table is created already and pre-splitted properly.
+       |
+       |params:
+       |0. input: hdfs path for tsv file(bulk format).
+       |1. output: hdfs path for storing HFiles.
+       |2. zkQuorum: running hbase cluster zkQuorum.
+       |3. tableName: table name for this bulk upload.
+       |4. dbUrl: db url for parsing to graph element.
+     """.stripMargin
+
+  //TODO: Process AtomicIncrementRequest too.
+  /** build key values */
+  case class DegreeKey(vertexIdStr: String, labelName: String, direction: 
String)
+
+  private def insertBulkForLoaderAsync(edge: Edge, createRelEdges: Boolean = 
true): List[PutRequest] = {
+    val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
+    buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e =>
+      e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(indexEdge) }
+    }
+  }
+
+  def buildDegrees(msgs: RDD[String], labelMapping: Map[String, String], 
edgeAutoCreate: Boolean) = {
+    for {
+      msg <- msgs
+      tokens = GraphUtil.split(msg)
+      if tokens(2) == "e" || tokens(2) == "edge"
+      tempDirection = if (tokens.length == 7) "out" else tokens(7)
+      direction = if (tempDirection != "out" && tempDirection != "in") "out" 
else tempDirection
+      reverseDirection = if (direction == "out") "in" else "out"
+      convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5))
+      (vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4))
+      degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction)
+      degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, 
reverseDirection)
+      extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil
+      output <- List(degreeKey -> 1L) ++ extra
+    } yield output
+  }
+  def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = {
+    val kvs = 
GraphSubscriberHelper.g.storage.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
+    kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
+  }
+  def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = {
+    val kvs = 
GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.toList
+    kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
+  }
+  def buildDegreePutRequests(vertexId: String, labelName: String, direction: 
String, degreeVal: Long): List[PutRequest] = {
+    val label = Label.findByName(labelName).getOrElse(throw new 
RuntimeException(s"$labelName is not found in DB."))
+    val dir = GraphUtil.directions(direction)
+    val innerVal = jsValueToInnerVal(Json.toJson(vertexId), 
label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
+      throw new RuntimeException(s"$vertexId can not be converted into 
innerval")
+    }
+    val vertex = Vertex(SourceVertexId(label.srcColumn.id.get, innerVal))
+
+    val ts = System.currentTimeMillis()
+    val propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
+    val labelWithDir = LabelWithDirection(label.id.get, dir)
+    val edge = Edge(vertex, vertex, labelWithDir, propsWithTs=propsWithTs)
+
+    edge.edgesWithIndex.flatMap { indexEdge =>
+      
GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.map 
{ kv =>
+        new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], 
Bytes.toBytes(degreeVal), kv.timestamp)
+      }
+    }
+  }
+
+  def toKeyValues(degreeKeyVals: Seq[(DegreeKey, Long)]): Iterator[KeyValue] = 
{
+    val kvs = for {
+      (key, value) <- degreeKeyVals
+      putRequest <- buildDegreePutRequests(key.vertexIdStr, key.labelName, 
key.direction, value)
+    } yield {
+        val p = putRequest
+        val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
+        kv
+      }
+    kvs.toIterator
+  }
+
+  def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], 
autoEdgeCreate: Boolean): Iterator[KeyValue] = {
+    val kvs = for {
+      s <- strs
+      element <- Graph.toGraphElement(s, labelMapping).toSeq if 
element.isInstanceOf[Edge]
+      edge = element.asInstanceOf[Edge]
+      putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate)
+    } yield {
+        val p = putRequest
+        val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
+
+        //        println(s"[Edge]: $edge\n[Put]: $p\n[KeyValue]: 
${kv.getRow.toList}, ${kv.getQualifier.toList}, ${kv.getValue.toList}, 
${kv.getTimestamp}")
+
+        kv
+      }
+    kvs.toIterator
+  }
+
+
+  override def run() = {
+    val input = args(0)
+    val tmpPath = args(1)
+    val zkQuorum = args(2)
+    val tableName = args(3)
+    val dbUrl = args(4)
+    val maxHFilePerResionServer = args(5).toInt
+    val labelMapping = if (args.length >= 7) 
GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String]
+    val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false
+    val buildDegree = if (args.length >= 9) args(8).toBoolean else true
+    val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4"
+    val conf = sparkConf(s"$input: TransferToHFile")
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    conf.set("spark.kryoserializer.buffer.mb", "24")
+
+    val sc = new SparkContext(conf)
+
+    GraphSubscriberHelper.management.createTable(zkQuorum, tableName, 
List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm)
+
+    /** set up hbase init */
+    val hbaseConf = HBaseConfiguration.create()
+    hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
+    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
+    hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName")
+
+
+    val rdd = sc.textFile(input)
+
+
+    val kvs = rdd.mapPartitions { iter =>
+      val phase = System.getProperty("phase")
+      GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
+      toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate)
+    }
+    //
+    //    val newRDD = if (!buildDegree) new HFileRDD(kvs)
+    //    else {
+    //      val degreeKVs = buildDegrees(rdd, labelMapping, 
autoEdgeCreate).reduceByKey { (agg, current) =>
+    //        agg + current
+    //      }.mapPartitions { iter =>
+    //        val phase = System.getProperty("phase")
+    //        GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
+    //        toKeyValues(iter.toSeq)
+    //      }
+    //      new HFileRDD(kvs ++ degreeKVs)
+    //    }
+    //
+    //    newRDD.toHFile(hbaseConf, zkQuorum, tableName, 
maxHFilePerResionServer, tmpPath)
+    val merged = if (!buildDegree) kvs
+    else {
+      kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { 
(agg, current) =>
+        agg + current
+      }.mapPartitions { iter =>
+        val phase = System.getProperty("phase")
+        GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
+        toKeyValues(iter.toSeq)
+      }
+    }
+
+    val hbaseSc = new HBaseContext(sc, hbaseConf)
+    def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
+      val k = new KeyFamilyQualifier(kv.getRow(), kv.getFamily(), 
kv.getQualifier())
+      val v = kv.getValue()
+      Seq((k -> v)).toIterator
+    }
+    val familyOptions = new 
FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase,
+      BloomType.ROW.name().toUpperCase, 32768, 
DataBlockEncoding.FAST_DIFF.name().toUpperCase)
+    val familyOptionsMap = Map("e".getBytes() -> familyOptions, "v".getBytes() 
-> familyOptions)
+
+    hbaseSc.bulkLoad(merged, TableName.valueOf(tableName), flatMap, tmpPath, 
familyOptionsMap)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
new file mode 100644
index 0000000..3b54e1c
--- /dev/null
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
@@ -0,0 +1,90 @@
+package org.apache.s2graph.loader.subscriber
+
+import kafka.producer.KeyedMessage
+import kafka.serializer.StringDecoder
+import org.apache.s2graph.core.Graph
+import org.apache.s2graph.spark.spark.{WithKafka, SparkApp}
+import org.apache.spark.streaming.Durations._
+import org.apache.spark.streaming.kafka.HasOffsetRanges
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.language.postfixOps
+
+object WalLogStat extends SparkApp with WithKafka {
+
+  override def run() = {
+
+    validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", 
"dbUrl", "statTopic")
+
+    val kafkaZkQuorum = args(0)
+    val brokerList = args(1)
+    val topics = args(2)
+    val intervalInSec = seconds(args(3).toLong)
+    val dbUrl = args(4)
+    val statTopic = args(5)
+
+
+    val conf = sparkConf(s"$topics: ${getClass.getSimpleName}")
+    val ssc = streamingContext(conf, intervalInSec)
+    val sc = ssc.sparkContext
+
+    val groupId = topics.replaceAll(",", "_") + "_stat"
+
+    val kafkaParams = Map(
+      "zookeeper.connect" -> kafkaZkQuorum,
+      "group.id" -> groupId,
+      "metadata.broker.list" -> brokerList,
+      "zookeeper.connection.timeout.ms" -> "10000",
+      "auto.offset.reset" -> "largest")
+
+    val stream = getStreamHelper(kafkaParams).createStream[String, String, 
StringDecoder, StringDecoder](ssc, topics.split(",").toSet)
+    val statProducer = getProducer[String, String](brokerList)
+
+    stream.foreachRDD { (rdd, time) =>
+
+      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+
+      val ts = time.milliseconds
+
+      val elements = rdd.mapPartitions { partition =>
+        // set executor setting.
+        val phase = System.getProperty("phase")
+        GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
+        partition.map { case (key, msg) =>
+          Graph.toGraphElement(msg) match {
+            case Some(elem) =>
+              val serviceName = elem.serviceName
+              msg.split("\t", 7) match {
+                case Array(_, operation, log_type, _, _, label, _*) =>
+                  Seq(serviceName, label, operation, log_type).mkString("\t")
+                case _ =>
+                  Seq("no_service_name", "no_label", "no_operation", 
"parsing_error").mkString("\t")
+              }
+            case None =>
+              Seq("no_service_name", "no_label", "no_operation", 
"no_element_error").mkString("\t")
+          }
+        }
+      }
+
+      val countByKey = elements.map(_ -> 1L).reduceByKey(_ + _).collect()
+      val totalCount = countByKey.map(_._2).sum
+      val keyedMessage = countByKey.map { case (key, value) =>
+        new KeyedMessage[String, String](statTopic, 
s"$ts\t$key\t$value\t$totalCount")
+      }
+
+      statProducer.send(keyedMessage: _*)
+
+      elements.mapPartitionsWithIndex { (i, part) =>
+        // commit offset range
+        val osr = offsets(i)
+        getStreamHelper(kafkaParams).commitConsumerOffset(osr)
+        Iterator.empty
+      }.foreach {
+        (_: Nothing) => ()
+      }
+
+    }
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
new file mode 100644
index 0000000..f51a148
--- /dev/null
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
@@ -0,0 +1,148 @@
+package org.apache.s2graph.loader.subscriber
+
+import java.text.SimpleDateFormat
+import java.util.Date
+import kafka.serializer.StringDecoder
+import org.apache.s2graph.core.Graph
+import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam}
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.streaming.Durations._
+import org.apache.spark.streaming.kafka.HasOffsetRanges
+
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.language.postfixOps
+
+object WalLogToHDFS extends SparkApp with WithKafka {
+
+  override def run() = {
+
+    validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", 
"dbUrl", "outputPath", "hiveDatabase", "hiveTable", "splitListPath")
+
+    val kafkaZkQuorum = args(0)
+    val brokerList = args(1)
+    val topics = args(2)
+    val intervalInSec = seconds(args(3).toLong)
+    val dbUrl = args(4)
+    val outputPath = args(5)
+    val hiveDatabase = args(6)
+    val hiveTable = args(7)
+    val splitListPath = args(8)
+
+    val conf = sparkConf(s"$topics: WalLogToHDFS")
+    val ssc = streamingContext(conf, intervalInSec)
+    val sc = ssc.sparkContext
+
+    val groupId = topics.replaceAll(",", "_") + "_stream"
+    val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed"
+
+    val kafkaParams = Map(
+      "zookeeper.connect" -> kafkaZkQuorum,
+      "group.id" -> groupId,
+      "metadata.broker.list" -> brokerList,
+      "zookeeper.connection.timeout.ms" -> "10000",
+      "auto.offset.reset" -> "largest")
+
+    val stream = getStreamHelper(kafkaParams).createStream[String, String, 
StringDecoder, StringDecoder](ssc, topics.split(",").toSet)
+
+    val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), 
"Throughput")(HashMapParam[String, Long](_ + _))
+
+    val hdfsBlockSize = 134217728 // 128M
+    val hiveContext = new HiveContext(sc)
+    var splits = Array[String]()
+    var excludeLabels = Set[String]()
+    var excludeServices = Set[String]()
+    stream.foreachRDD { (rdd, time) =>
+      try {
+        val read = 
sc.textFile(splitListPath).collect().map(_.split("=")).flatMap {
+          case Array(value) => Some(("split", value))
+          case Array(key, value) => Some((key, value))
+          case _ => None
+        }
+        splits = read.filter(_._1 == "split").map(_._2)
+        excludeLabels = read.filter(_._1 == "exclude_label").map(_._2).toSet
+        excludeServices = read.filter(_._1 == 
"exclude_service").map(_._2).toSet
+      } catch {
+        case _: Throwable => // use previous information
+      }
+
+      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+
+      val elements = rdd.mapPartitions { partition =>
+        // set executor setting.
+        val phase = System.getProperty("phase")
+        GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
+
+        partition.flatMap { case (key, msg) =>
+          val optMsg = Graph.toGraphElement(msg).flatMap { element =>
+            val arr = msg.split("\t", 7)
+            val service = element.serviceName
+            val label = arr(5)
+            val n = arr.length
+
+            if (excludeServices.contains(service) || 
excludeLabels.contains(label)) {
+              None
+            } else if(n == 6) {
+              Some(Seq(msg, "{}", service).mkString("\t"))
+            }
+            else if(n == 7) {
+              Some(Seq(msg, service).mkString("\t"))
+            }
+            else {
+              None
+            }
+          }
+          optMsg
+        }
+      }
+
+      val ts = time.milliseconds
+      val dateId = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts))
+
+      /** make sure that `elements` are not running at the same time */
+      val elementsWritten = {
+        elements.cache()
+        (Array("all") ++ splits).foreach {
+          case split if split == "all" =>
+            val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
+            elements.saveAsTextFile(path)
+          case split =>
+            val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
+            val strlen = split.length
+            val splitData = elements.filter(_.takeRight(strlen) == 
split).cache()
+            val totalSize = splitData
+                .mapPartitions { iterator =>
+                  val s = iterator.map(_.length.toLong).sum
+                  Iterator.single(s)
+                }
+                .sum
+                .toLong
+            val numPartitions = math.max(1, (totalSize / 
hdfsBlockSize.toDouble).toInt)
+            splitData.coalesce(math.min(splitData.partitions.length, 
numPartitions)).saveAsTextFile(path)
+            splitData.unpersist()
+        }
+        elements.unpersist()
+        elements
+      }
+
+      elementsWritten.mapPartitionsWithIndex { (i, part) =>
+        // commit offset range
+        val osr = offsets(i)
+        getStreamHelper(kafkaParams).commitConsumerOffset(osr)
+        Iterator.empty
+      }.foreach {
+        (_: Nothing) => ()
+      }
+
+      (Array("all") ++ splits).foreach { split =>
+        val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
+        hiveContext.sql(s"use $hiveDatabase")
+        hiveContext.sql(s"alter table $hiveTable add partition 
(split='$split', date_id='$dateId', ts='$ts') location '$path'")
+      }
+    }
+
+    logInfo(s"counter: ${mapAcc.value}")
+    println(s"counter: ${mapAcc.value}")
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/BulkLoadPartitioner.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/spark/BulkLoadPartitioner.scala 
b/loader/src/main/scala/spark/BulkLoadPartitioner.scala
deleted file mode 100644
index ed1587a..0000000
--- a/loader/src/main/scala/spark/BulkLoadPartitioner.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.util
-import java.util.Comparator
-
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.Partitioner
-
-/**
- * A Partitioner implementation that will separate records to different
- * HBase Regions based on region splits
- *
- * @param startKeys   The start keys for the given table
- */
-class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
-  extends Partitioner {
-
-  override def numPartitions: Int = startKeys.length
-
-  override def getPartition(key: Any): Int = {
-
-    val rowKey:Array[Byte] =
-      key match {
-        case qualifier: KeyFamilyQualifier =>
-          qualifier.rowKey
-        case _ =>
-          key.asInstanceOf[Array[Byte]]
-      }
-
-    val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
-      override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
-        Bytes.compareTo(o1, o2)
-      }
-    }
-    val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
-    if (partition < 0) partition * -1 + -2
-    else partition
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/ColumnFamilyQualifierMapKeyWrapper.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/spark/ColumnFamilyQualifierMapKeyWrapper.scala 
b/loader/src/main/scala/spark/ColumnFamilyQualifierMapKeyWrapper.scala
deleted file mode 100644
index ab34b76..0000000
--- a/loader/src/main/scala/spark/ColumnFamilyQualifierMapKeyWrapper.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *    http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package spark
-//
-//import org.apache.hadoop.hbase.util.Bytes
-//
-///**
-// * A wrapper class that will allow both columnFamily and qualifier to
-// * be the key of a hashMap.  Also allow for finding the value in a hashmap
-// * with out cloning the HBase value from the HBase Cell object
-// * @param columnFamily       ColumnFamily byte array
-// * @param columnFamilyOffSet Offset of columnFamily value in the array
-// * @param columnFamilyLength Length of the columnFamily value in the 
columnFamily array
-// * @param qualifier          Qualifier byte array
-// * @param qualifierOffSet    Offset of qualifier value in the array
-// * @param qualifierLength    Length of the qualifier value with in the array
-// */
-//class ColumnFamilyQualifierMapKeyWrapper(val columnFamily:Array[Byte],
-//                                         val columnFamilyOffSet:Int,
-//                                         val columnFamilyLength:Int,
-//                                         val qualifier:Array[Byte],
-//                                         val qualifierOffSet:Int,
-//                                         val qualifierLength:Int)
-//  extends Serializable{
-//
-//  override def equals(other:Any): Boolean = {
-//    val otherWrapper = other.asInstanceOf[ColumnFamilyQualifierMapKeyWrapper]
-//
-//    Bytes.compareTo(columnFamily,
-//      columnFamilyOffSet,
-//      columnFamilyLength,
-//      otherWrapper.columnFamily,
-//      otherWrapper.columnFamilyOffSet,
-//      otherWrapper.columnFamilyLength) == 0 && Bytes.compareTo(qualifier,
-//        qualifierOffSet,
-//        qualifierLength,
-//        otherWrapper.qualifier,
-//        otherWrapper.qualifierOffSet,
-//        otherWrapper.qualifierLength) == 0
-//  }
-//
-//  override def hashCode():Int = {
-//    Bytes.hashCode(columnFamily, columnFamilyOffSet, columnFamilyLength) +
-//      Bytes.hashCode(qualifier, qualifierOffSet, qualifierLength)
-//  }
-//
-//  def cloneColumnFamily():Array[Byte] = {
-//    val resultArray = new Array[Byte](columnFamilyLength)
-//    System.arraycopy(columnFamily, columnFamilyOffSet, resultArray, 0, 
columnFamilyLength)
-//    resultArray
-//  }
-//
-//  def cloneQualifier():Array[Byte] = {
-//    val resultArray = new Array[Byte](qualifierLength)
-//    System.arraycopy(qualifier, qualifierOffSet, resultArray, 0, 
qualifierLength)
-//    resultArray
-//  }
-//}

Reply via email to