http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/spark/HBaseRDDFunctions.scala 
b/loader/src/main/scala/spark/HBaseRDDFunctions.scala
deleted file mode 100644
index 8ff8d58..0000000
--- a/loader/src/main/scala/spark/HBaseRDDFunctions.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.util
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hbase.{HConstants, TableName}
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.spark.rdd.RDD
-
-import scala.collection.immutable.HashMap
-import scala.reflect.ClassTag
-
-/**
- * HBaseRDDFunctions contains a set of implicit functions that can be
- * applied to a Spark RDD so that we can easily interact with HBase
- */
-object HBaseRDDFunctions
-{
-
-  /**
-   * These are implicit methods for a RDD that contains any type of
-   * data.
-   *
-   * @param rdd This is for rdd of any type
-   * @tparam T  This is any type
-   */
-  implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) {
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * put.  This will not return a new RDD.  Think of it like a foreach
-     *
-     * @param hc         The hbaseContext object to identify which
-     *                   HBase cluster connection to use
-     * @param tableName  The tableName that the put will be sent to
-     * @param f          The function that will turn the RDD values
-     *                   into HBase Put objects.
-     */
-    def hbaseBulkPut(hc: HBaseContext,
-                     tableName: TableName,
-                     f: (T) => Put): Unit = {
-      hc.bulkPut(rdd, tableName, f)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * get.  This will return a new RDD.  Think about it as a RDD map
-     * function.  In that every RDD value will get a new value out of
-     * HBase.  That new value will populate the newly generated RDD.
-     *
-     * @param hc             The hbaseContext object to identify which
-     *                       HBase cluster connection to use
-     * @param tableName      The tableName that the put will be sent to
-     * @param batchSize      How many gets to execute in a single batch
-     * @param f              The function that will turn the RDD values
-     *                       in HBase Get objects
-     * @param convertResult  The function that will convert a HBase
-     *                       Result object into a value that will go
-     *                       into the resulting RDD
-     * @tparam R             The type of Object that will be coming
-     *                       out of the resulting RDD
-     * @return               A resulting RDD with type R objects
-     */
-    def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
-                            tableName: TableName, batchSize:Int,
-                            f: (T) => Get, convertResult: (Result) => R): 
RDD[R] = {
-      hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * get.  This will return a new RDD.  Think about it as a RDD map
-     * function.  In that every RDD value will get a new value out of
-     * HBase.  That new value will populate the newly generated RDD.
-     *
-     * @param hc             The hbaseContext object to identify which
-     *                       HBase cluster connection to use
-     * @param tableName      The tableName that the put will be sent to
-     * @param batchSize      How many gets to execute in a single batch
-     * @param f              The function that will turn the RDD values
-     *                       in HBase Get objects
-     * @return               A resulting RDD with type R objects
-     */
-    def hbaseBulkGet(hc: HBaseContext,
-                                  tableName: TableName, batchSize:Int,
-                                  f: (T) => Get): RDD[(ImmutableBytesWritable, 
Result)] = {
-      hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName,
-        batchSize, rdd, f,
-        result => if (result != null && result.getRow != null) {
-          (new ImmutableBytesWritable(result.getRow), result)
-        } else {
-          null
-        })
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * Delete.  This will not return a new RDD.
-     *
-     * @param hc         The hbaseContext object to identify which HBase
-     *                   cluster connection to use
-     * @param tableName  The tableName that the deletes will be sent to
-     * @param f          The function that will convert the RDD value into
-     *                   a HBase Delete Object
-     * @param batchSize  The number of Deletes to be sent in a single batch
-     */
-    def hbaseBulkDelete(hc: HBaseContext,
-                        tableName: TableName, f:(T) => Delete, batchSize:Int): 
Unit = {
-      hc.bulkDelete(rdd, tableName, f, batchSize)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's
-     * foreachPartition method.  This will ack very much like a normal RDD
-     * foreach method but for the fact that you will now have a HBase 
connection
-     * while iterating through the values.
-     *
-     * @param hc  The hbaseContext object to identify which HBase
-     *            cluster connection to use
-     * @param f   This function will get an iterator for a Partition of an
-     *            RDD along with a connection object to HBase
-     */
-    def hbaseForeachPartition(hc: HBaseContext,
-                              f: (Iterator[T], Connection) => Unit): Unit = {
-      hc.foreachPartition(rdd, f)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's
-     * mapPartitions method.  This will ask very much like a normal RDD
-     * map partitions method but for the fact that you will now have a
-     * HBase connection while iterating through the values
-     *
-     * @param hc  The hbaseContext object to identify which HBase
-     *            cluster connection to use
-     * @param f   This function will get an iterator for a Partition of an
-     *            RDD along with a connection object to HBase
-     * @tparam R  This is the type of objects that will go into the resulting
-     *            RDD
-     * @return    A resulting RDD of type R
-     */
-    def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
-                                        f: (Iterator[T], Connection) => 
Iterator[R]):
-    RDD[R] = {
-      hc.mapPartitions[T,R](rdd, f)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's
-     * bulkLoad method.
-     *
-     * A Spark Implementation of HBase Bulk load
-     *
-     * This will take the content from an existing RDD then sort and shuffle
-     * it with respect to region splits.  The result of that sort and shuffle
-     * will be written to HFiles.
-     *
-     * After this function is executed the user will have to call
-     * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
-     *
-     * Also note this version of bulk load is different from past versions in
-     * that it includes the qualifier as part of the sort process. The
-     * reason for this is to be able to support rows will very large number
-     * of columns.
-     *
-     * @param tableName                      The HBase table we are loading 
into
-     * @param flatMap                        A flapMap function that will make 
every row in the RDD
-     *                                       into N cells for the bulk load
-     * @param stagingDir                     The location on the FileSystem to 
bulk load into
-     * @param familyHFileWriteOptionsMap     Options that will define how the 
HFile for a
-     *                                       column family is written
-     * @param compactionExclude              Compaction excluded for the HFiles
-     * @param maxSize                        Max size for the HFiles before 
they roll
-     */
-    def hbaseBulkLoad(hc: HBaseContext,
-                         tableName: TableName,
-                         flatMap: (T) => Iterator[(KeyFamilyQualifier, 
Array[Byte])],
-                         stagingDir:String,
-                         familyHFileWriteOptionsMap:
-                         util.Map[Array[Byte], FamilyHFileWriteOptions] =
-                         new util.HashMap[Array[Byte], 
FamilyHFileWriteOptions](),
-                         compactionExclude: Boolean = false,
-                         maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit 
= {
-      hc.bulkLoad(rdd, tableName,
-        flatMap, stagingDir, familyHFileWriteOptionsMap,
-        compactionExclude, maxSize)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/JavaHBaseContext.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/spark/JavaHBaseContext.scala 
b/loader/src/main/scala/spark/JavaHBaseContext.scala
deleted file mode 100644
index 39ddf2a..0000000
--- a/loader/src/main/scala/spark/JavaHBaseContext.scala
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, 
Scan}
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.api.java.function.{FlatMapFunction, Function, 
VoidFunction}
-import org.apache.spark.streaming.api.java.JavaDStream
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-/**
- * This is the Java Wrapper over HBaseContext which is written in
- * Scala.  This class will be used by developers that want to
- * work with Spark or Spark Streaming in Java
- *
- * @param jsc    This is the JavaSparkContext that we will wrap
- * @param config This is the config information to out HBase cluster
- */
-class JavaHBaseContext(@transient jsc: JavaSparkContext,
-                       @transient config: Configuration) extends Serializable {
-  val hbaseContext = new HBaseContext(jsc.sc, config)
-
-  /**
-   * A simple enrichment of the traditional Spark javaRdd foreachPartition.
-   * This function differs from the original in that it offers the
-   * developer access to a already connected HConnection object
-   *
-   * Note: Do not close the HConnection object.  All HConnection
-   * management is handled outside this method
-   *
-   * @param javaRdd Original javaRdd with data to iterate over
-   * @param f       Function to be given a iterator to iterate through
-   *                the RDD values and a HConnection object to interact
-   *                with HBase
-   */
-  def foreachPartition[T](javaRdd: JavaRDD[T],
-                          f: VoidFunction[(java.util.Iterator[T], 
Connection)]) = {
-
-    hbaseContext.foreachPartition(javaRdd.rdd,
-      (it: Iterator[T], conn: Connection) => {
-        f.call((it, conn))
-      })
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark Streaming dStream foreach
-   * This function differs from the original in that it offers the
-   * developer access to a already connected HConnection object
-   *
-   * Note: Do not close the HConnection object.  All HConnection
-   * management is handled outside this method
-   *
-   * @param javaDstream Original DStream with data to iterate over
-   * @param f           Function to be given a iterator to iterate through
-   *                    the JavaDStream values and a HConnection object to
-   *                    interact with HBase
-   */
-  def foreachPartition[T](javaDstream: JavaDStream[T],
-                          f: VoidFunction[(Iterator[T], Connection)]) = {
-    hbaseContext.foreachPartition(javaDstream.dstream,
-      (it: Iterator[T], conn: Connection) => f.call(it, conn))
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark JavaRDD mapPartition.
-   * This function differs from the original in that it offers the
-   * developer access to a already connected HConnection object
-   *
-   * Note: Do not close the HConnection object.  All HConnection
-   * management is handled outside this method
-   *
-   * Note: Make sure to partition correctly to avoid memory issue when
-   * getting data from HBase
-   *
-   * @param javaRdd Original JavaRdd with data to iterate over
-   * @param f       Function to be given a iterator to iterate through
-   *                the RDD values and a HConnection object to interact
-   *                with HBase
-   * @return        Returns a new RDD generated by the user definition
-   *                function just like normal mapPartition
-   */
-  def mapPartitions[T, R](javaRdd: JavaRDD[T],
-                          f: FlatMapFunction[(java.util.Iterator[T],
-                            Connection), R]): JavaRDD[R] = {
-
-    def fn = (it: Iterator[T], conn: Connection) =>
-      asScalaIterator(
-        f.call((asJavaIterator(it), conn)).iterator()
-      )
-
-    JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
-      (iterator: Iterator[T], connection: Connection) =>
-        fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R])
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark Streaming JavaDStream
-   * mapPartition.
-   *
-   * This function differs from the original in that it offers the
-   * developer access to a already connected HConnection object
-   *
-   * Note: Do not close the HConnection object.  All HConnection
-   * management is handled outside this method
-   *
-   * Note: Make sure to partition correctly to avoid memory issue when
-   * getting data from HBase
-   *
-   * @param javaDstream Original JavaDStream with data to iterate over
-   * @param mp          Function to be given a iterator to iterate through
-   *                    the JavaDStream values and a HConnection object to
-   *                    interact with HBase
-   * @return            Returns a new JavaDStream generated by the user
-   *                    definition function just like normal mapPartition
-   */
-  def streamMap[T, U](javaDstream: JavaDStream[T],
-                      mp: Function[(Iterator[T], Connection), Iterator[U]]):
-  JavaDStream[U] = {
-    
JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
-      (it: Iterator[T], conn: Connection) =>
-        mp.call(it, conn))(fakeClassTag[U]))(fakeClassTag[U])
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.foreachPartition method.
-   *
-   * It allow addition support for a user to take JavaRDD
-   * and generate puts and send them to HBase.
-   * The complexity of managing the HConnection is
-   * removed from the developer
-   *
-   * @param javaRdd   Original JavaRDD with data to iterate over
-   * @param tableName The name of the table to put into
-   * @param f         Function to convert a value in the JavaRDD
-   *                  to a HBase Put
-   */
-  def bulkPut[T](javaRdd: JavaRDD[T],
-                 tableName: TableName,
-                 f: Function[(T), Put]) {
-
-    hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t))
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamMapPartition method.
-   *
-   * It allow addition support for a user to take a JavaDStream and
-   * generate puts and send them to HBase.
-   *
-   * The complexity of managing the HConnection is
-   * removed from the developer
-   *
-   * @param javaDstream Original DStream with data to iterate over
-   * @param tableName   The name of the table to put into
-   * @param f           Function to convert a value in
-   *                    the JavaDStream to a HBase Put
-   */
-  def streamBulkPut[T](javaDstream: JavaDStream[T],
-                       tableName: TableName,
-                       f: Function[T, Put]) = {
-    hbaseContext.streamBulkPut(javaDstream.dstream,
-      tableName,
-      (t: T) => f.call(t))
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.foreachPartition method.
-   *
-   * It allow addition support for a user to take a JavaRDD and
-   * generate delete and send them to HBase.
-   *
-   * The complexity of managing the HConnection is
-   * removed from the developer
-   *
-   * @param javaRdd   Original JavaRDD with data to iterate over
-   * @param tableName The name of the table to delete from
-   * @param f         Function to convert a value in the JavaRDD to a
-   *                  HBase Deletes
-   * @param batchSize The number of deletes to batch before sending to HBase
-   */
-  def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
-                    f: Function[T, Delete], batchSize: Integer) {
-    hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), 
batchSize)
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamBulkMutation method.
-   *
-   * It allow addition support for a user to take a JavaDStream and
-   * generate Delete and send them to HBase.
-   *
-   * The complexity of managing the HConnection is
-   * removed from the developer
-   *
-   * @param javaDStream Original DStream with data to iterate over
-   * @param tableName   The name of the table to delete from
-   * @param f           Function to convert a value in the JavaDStream to a
-   *                    HBase Delete
-   * @param batchSize   The number of deletes to be sent at once
-   */
-  def streamBulkDelete[T](javaDStream: JavaDStream[T],
-                          tableName: TableName,
-                          f: Function[T, Delete],
-                          batchSize: Integer) = {
-    hbaseContext.streamBulkDelete(javaDStream.dstream, tableName,
-      (t: T) => f.call(t),
-      batchSize)
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.mapPartition method.
-   *
-   * It allow addition support for a user to take a JavaRDD and generates a
-   * new RDD based on Gets and the results they bring back from HBase
-   *
-   * @param tableName     The name of the table to get from
-   * @param batchSize     batch size of how many gets to retrieve in a single 
fetch
-   * @param javaRdd       Original JavaRDD with data to iterate over
-   * @param makeGet       Function to convert a value in the JavaRDD to a
-   *                      HBase Get
-   * @param convertResult This will convert the HBase Result object to
-   *                      what ever the user wants to put in the resulting
-   *                      JavaRDD
-   * @return              New JavaRDD that is created by the Get to HBase
-   */
-  def bulkGet[T, U](tableName: TableName,
-                    batchSize: Integer,
-                    javaRdd: JavaRDD[T],
-                    makeGet: Function[T, Get],
-                    convertResult: Function[Result, U]): JavaRDD[U] = {
-
-    JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
-      batchSize,
-      javaRdd.rdd,
-      (t: T) => makeGet.call(t),
-      (r: Result) => {
-        convertResult.call(r)
-      })(fakeClassTag[U]))(fakeClassTag[U])
-
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamMap method.
-   *
-   * It allow addition support for a user to take a DStream and
-   * generates a new DStream based on Gets and the results
-   * they bring back from HBase
-   *
-
-   * @param tableName     The name of the table to get from
-   * @param batchSize     The number of gets to be batched together
-   * @param javaDStream   Original DStream with data to iterate over
-   * @param makeGet       Function to convert a value in the JavaDStream to a
-   *                      HBase Get
-   * @param convertResult This will convert the HBase Result object to
-   *                      what ever the user wants to put in the resulting
-   *                      JavaDStream
-   * @return              New JavaDStream that is created by the Get to HBase
-   */
-  def streamBulkGet[T, U](tableName: TableName,
-                          batchSize: Integer,
-                          javaDStream: JavaDStream[T],
-                          makeGet: Function[T, Get],
-                          convertResult: Function[Result, U]) {
-    JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
-      batchSize,
-      javaDStream.dstream,
-      (t: T) => makeGet.call(t),
-      (r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U])
-  }
-
-  /**
-   * This function will use the native HBase TableInputFormat with the
-   * given scan object to generate a new JavaRDD
-   *
-   * @param tableName The name of the table to scan
-   * @param scans     The HBase scan object to use to read data from HBase
-   * @param f         Function to convert a Result object from HBase into
-   *                  What the user wants in the final generated JavaRDD
-   * @return          New JavaRDD with results from scan
-   */
-  def hbaseRDD[U](tableName: TableName,
-                  scans: Scan,
-                  f: Function[(ImmutableBytesWritable, Result), U]):
-  JavaRDD[U] = {
-    JavaRDD.fromRDD(
-      hbaseContext.hbaseRDD[U](tableName,
-        scans,
-        (v: (ImmutableBytesWritable, Result)) =>
-          f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
-  }
-
-  /**
-   * A overloaded version of HBaseContext hbaseRDD that define the
-   * type of the resulting JavaRDD
-   *
-   * @param tableName The name of the table to scan
-   * @param scans     The HBase scan object to use to read data from HBase
-   * @return          New JavaRDD with results from scan
-   */
-  def hbaseRDD(tableName: TableName,
-               scans: Scan):
-  JavaRDD[(ImmutableBytesWritable, Result)] = {
-    JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
-  }
-
-  /**
-   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
-   *
-   * This method is used to keep ClassTags out of the external Java API, as 
the Java compiler
-   * cannot produce them automatically. While this ClassTag-faking does please 
the compiler,
-   * it can cause problems at runtime if the Scala API relies on ClassTags for 
correctness.
-   *
-   * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
-   * just worse performance or security issues.
-   * For instance, an Array[AnyRef] can hold any type T,
-   * but may lose primitive
-   * specialization.
-   */
-  private[spark]
-  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/KeyFamilyQualifier.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/spark/KeyFamilyQualifier.scala 
b/loader/src/main/scala/spark/KeyFamilyQualifier.scala
deleted file mode 100644
index 4a00b63..0000000
--- a/loader/src/main/scala/spark/KeyFamilyQualifier.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io.Serializable
-
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * This is the key to be used for sorting and shuffling.
- *
- * We will only partition on the rowKey but we will sort on all three
- *
- * @param rowKey    Record RowKey
- * @param family    Record ColumnFamily
- * @param qualifier Cell Qualifier
- */
-class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val 
qualifier:Array[Byte])
-  extends Comparable[KeyFamilyQualifier] with Serializable {
-  override def compareTo(o: KeyFamilyQualifier): Int = {
-    var result = Bytes.compareTo(rowKey, o.rowKey)
-    if (result == 0) {
-      result = Bytes.compareTo(family, o.family)
-      if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier)
-    }
-    result
-  }
-  override def toString: String = {
-    Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + 
Bytes.toString(qualifier)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/GraphSubscriber.scala 
b/loader/src/main/scala/subscriber/GraphSubscriber.scala
deleted file mode 100644
index f4f7865..0000000
--- a/loader/src/main/scala/subscriber/GraphSubscriber.scala
+++ /dev/null
@@ -1,339 +0,0 @@
-package subscriber
-
-
-import java.util
-
-import com.kakao.s2graph.core.{Graph, _}
-import com.typesafe.config.{Config, ConfigFactory}
-import kafka.javaapi.producer.Producer
-import kafka.producer.KeyedMessage
-import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
-import org.apache.hadoop.hbase.client._
-import org.apache.spark.{Accumulable, SparkContext}
-import s2.spark.{HashMapParam, SparkApp, WithKafka}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-import scala.concurrent.ExecutionContext
-
-object GraphConfig {
-  var database = ""
-  var zkQuorum = ""
-  var kafkaBrokers = ""
-  var cacheTTL = s"${60 * 60 * 1}"
-  def apply(phase: String, dbUrl: Option[String], zkAddr: Option[String], 
kafkaBrokerList: Option[String]): Config = {
-    database = dbUrl.getOrElse("jdbc:mysql://localhost:3306/graph_dev")
-    zkQuorum = zkAddr.getOrElse("localhost")
-
-//    val newConf = new util.HashMap[String, Object]()
-//    newConf.put("hbase.zookeeper.quorum", zkQuorum)
-//    newConf.put("db.default.url", database)
-//    newConf.put("kafka.metadata.broker.list", kafkaBrokers)
-    val newConf =
-      if (kafkaBrokerList.isEmpty) Map("hbase.zookeeper.quorum" -> zkQuorum, 
"db.default.url" -> database, "cache.ttl.seconds" -> cacheTTL)
-      else Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> 
database, "kafka.metadata.broker.list" -> kafkaBrokers, "cache.ttl.seconds" -> 
cacheTTL)
-
-    ConfigFactory.parseMap(newConf).withFallback(Graph.DefaultConfig)
-  }
-}
-
-object GraphSubscriberHelper extends WithKafka {
-
-
-  type HashMapAccumulable = Accumulable[HashMap[String, Long], (String, Long)]
-
-
-  lazy val producer = new Producer[String, 
String](kafkaConf(GraphConfig.kafkaBrokers))
-  var config: Config = _
-  private val writeBufferSize = 1024 * 1024 * 8
-  private val sleepPeriod = 10000
-  private val maxTryNum = 10
-
-  var g: Graph = null
-  var management: Management = null
-  val conns = new scala.collection.mutable.HashMap[String, Connection]()
-
-  def toOption(s: String) = {
-    s match {
-      case "" | "none" => None
-      case _ => Some(s)
-    }
-  }
-
-  def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: 
String): Unit = {
-    config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), 
toOption(kafkaBrokerList))
-
-    if (g == null) {
-      val ec = ExecutionContext.Implicits.global
-      g = new Graph(config)(ec)
-      management = new Management(g)(ec)
-    }
-  }
-
-  def getConn(zkQuorum: String): Connection = {
-    conns.getOrElseUpdate(zkQuorum, {
-      val hbaseConf = HBaseConfiguration.create()
-      hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
-      ConnectionFactory.createConnection(hbaseConf)
-    })
-    conns(zkQuorum)
-  }
-  //  def apply(phase: String, dbUrl: Option[String], zkQuorum: 
Option[String], kafkaBrokerList: Option[String]): Unit  = {
-  //    Graph.apply(GraphConfig(phase, dbUrl, zkQuorum, 
kafkaBrokerList))(ExecutionContext.Implicits.global)
-  //  }
-  def report(key: String, value: Option[String], topic: String = "report") = {
-    val msg = Seq(Some(key), value).flatten.mkString("\t")
-    val kafkaMsg = new KeyedMessage[String, String](topic, msg)
-    producer.send(kafkaMsg)
-  }
-
-  def toGraphElements(msgs: Seq[String], labelMapping: Map[String, String] = 
Map.empty)
-                     (statFunc: (String, Int) => Unit): Iterable[GraphElement] 
= {
-    (for (msg <- msgs) yield {
-      statFunc("total", 1)
-      Graph.toGraphElement(msg, labelMapping) match {
-        case Some(e) if e.isInstanceOf[Edge] =>
-          statFunc("EdgeParseOk", 1)
-          e.asInstanceOf[Edge]
-        case Some(v) if v.isInstanceOf[Vertex] =>
-          statFunc("VertexParseOk", 1)
-          v.asInstanceOf[Vertex]
-        case Some(x) =>
-          throw new RuntimeException(s">>>>> GraphSubscriber.toGraphElements: 
parsing failed. ${x.serviceName}")
-        case None =>
-          throw new RuntimeException(s"GraphSubscriber.toGraphElements: 
parsing failed. $msg")
-      }
-
-    }).toList
-  }
-
-//  private def storeRec(zkQuorum: String, tableName: String, puts: List[Put], 
elementsSize: Int, tryNum: Int)
-//                      (statFunc: (String, Int) => Unit, statPrefix: String = 
"edge"): Unit = {
-//    if (tryNum <= 0) {
-//      statFunc("errorStore", elementsSize)
-//      throw new RuntimeException(s"retry failed after $maxTryNum")
-//    }
-//    val conn = getConn(zkQuorum)
-//    val mutator = conn.getBufferedMutator(TableName.valueOf(tableName))
-//    //      val table = conn.getTable(TableName.valueOf(tableName))
-//    //      table.setAutoFlush(false, false)
-//
-//    try {
-//      puts.foreach { put => put.setDurability(Durability.ASYNC_WAL) }
-//      mutator.mutate(puts)
-//      //        table.put(puts)
-//      statFunc(s"$statPrefix:storeOk", elementsSize)
-//    } catch {
-//      case e: Throwable =>
-//        e.printStackTrace()
-//        Thread.sleep(sleepPeriod)
-//        storeRec(zkQuorum, tableName, puts, elementsSize, tryNum - 
1)(statFunc)
-//    } finally {
-//      mutator.close()
-//      //        table.close()
-//    }
-//  }
-//
-//  def storeDegreeBulk(zkQuorum: String, tableName: String)
-//                     (degrees: Iterable[(String, String, String, Int)], 
labelMapping: Map[String, String] = Map.empty)
-//                     (mapAccOpt: Option[HashMapAccumulable]): 
Iterable[(String, Long)] = {
-//    val counts = HashMap[String, Long]()
-//    val statFunc = storeStat(counts)(mapAccOpt) _
-//
-//    for {
-//      (vertexId, labelName, direction, degreeVal) <- degrees
-//      incrementRequests <- TransferToHFile.buildDegreePutRequests(vertexId, 
labelName, direction, degreeVal)
-//    } {
-//      storeRec(zkQuorum, tableName, incrementRequests, degrees.size, 
maxTryNum)(statFunc, "degree")
-//    }
-//    counts
-//  }
-//  def storeBulk(zkQuorum: String, tableName: String)
-//               (msgs: Seq[String], labelMapping: Map[String, String] = 
Map.empty, autoCreateEdge: Boolean = false)
-//               (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, 
Long)] = {
-//
-//    val counts = HashMap[String, Long]()
-//    val statFunc = storeStat(counts)(mapAccOpt) _
-//    val elements = toGraphElements(msgs, labelMapping)(statFunc)
-//
-//    val puts = elements.flatMap { element =>
-//      element match {
-//        case v: Vertex if v.op == GraphUtil.operations("insert") || v.op == 
GraphUtil.operations("insertBulk") =>
-//          v.buildPuts()
-//        case e: Edge if e.op == GraphUtil.operations("insert") || e.op == 
GraphUtil.operations("insertBulk") =>
-//          EdgeWriter(e).insertBulkForLoader(autoCreateEdge)
-//        case _ => Nil
-//      }
-//    } toList
-//
-//    storeRec(zkQuorum, tableName, puts, msgs.size, maxTryNum)(statFunc)
-//    counts
-//  }
-
-  def storeStat(counts: HashMap[String, Long])(mapAccOpt: 
Option[HashMapAccumulable])(key: String, value: Int) = {
-    counts.put(key, counts.getOrElse(key, 0L) + value)
-    mapAccOpt match {
-      case None =>
-      case Some(mapAcc) => mapAcc += (key -> value)
-    }
-  }
-
-  def toLabelMapping(lableMapping: String): Map[String, String] = {
-    (for {
-      token <- lableMapping.split(",")
-      inner = token.split(":") if inner.length == 2
-    } yield {
-        (inner.head, inner.last)
-      }).toMap
-  }
-
-  def isValidQuorum(quorum: String) = {
-    quorum.split(",").size > 1
-  }
-}
-
-//object GraphSubscriber extends SparkApp with WithKafka {
-//  val sleepPeriod = 5000
-//  val usages =
-//    s"""
-//       |/**
-//       |* this job read edge format(TSV) from HDFS file system then bulk 
load edges into s2graph. assumes that newLabelName is already created by API.
-//       |* params:
-//       |*  0. hdfsPath: where is your data in hdfs. require full path with 
hdfs:// predix
-//       |*  1. dbUrl: jdbc database connection string to specify database for 
meta.
-//       |*  2. labelMapping: oldLabel:newLabel delimited by ,
-//       |*  3. zkQuorum: target hbase zkQuorum where this job will publish 
data to.
-//       |*  4. hTableName: target hbase physical table name where this job 
will publish data to.
-//       |*  5. batchSize: how many edges will be batched for Put request to 
target hbase.
-//       |*  6. kafkaBrokerList: using kafka as fallback queue. when something 
goes wrong during batch, data needs to be replay will be stored in kafka.
-//       |*  7. kafkaTopic: fallback queue topic.
-//       |*  8. edgeAutoCreate: true if need to create reversed edge 
automatically.
-//       |*
-//       |* after this job finished, s2graph will have data with sequence 
corresponding newLabelName.
-//       |* change this newLabelName to ogirinalName if you want to online 
replace of label.
-//       |*
-//       |*/
-//     """.stripMargin
-//
-//  override def run() = {
-//    /**
-//     * Main function
-//     */
-//    println(args.toList)
-////    if (args.length != 10) {
-////      System.err.println(usages)
-////      System.exit(1)
-////    }
-//    val hdfsPath = args(0)
-//    val dbUrl = args(1)
-//    val labelMapping = GraphSubscriberHelper.toLabelMapping(args(2))
-//
-//    val zkQuorum = args(3)
-//    val hTableName = args(4)
-//    val batchSize = args(5).toInt
-//    val kafkaBrokerList = args(6)
-//    val kafkaTopic = args(7)
-//    val edgeAutoCreate = args(8).toBoolean
-//    val vertexDegreePathOpt = if (args.length >= 10) 
GraphSubscriberHelper.toOption(args(9)) else None
-//
-//    val conf = sparkConf(s"$hdfsPath: GraphSubscriber")
-//    val sc = new SparkContext(conf)
-//    val mapAcc = sc.accumulable(HashMap.empty[String, Long], 
"counter")(HashMapParam[String, Long](_ + _))
-//
-//
-//    if (!GraphSubscriberHelper.isValidQuorum(zkQuorum)) throw new 
RuntimeException(s"$zkQuorum is not valid.")
-//
-//    /** this job expect only one hTableName. all labels in this job will be 
stored in same physical hbase table */
-//    try {
-//
-//      import GraphSubscriberHelper._
-//      // set local driver setting.
-//      val phase = System.getProperty("phase")
-//      GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
-//
-//      /** copy when oldLabel exist and newLabel done exist. otherwise 
ignore. */
-//
-//      if (labelMapping.isEmpty) {
-//        // pass
-//      } else {
-//        for {
-//          (oldLabelName, newLabelName) <- labelMapping
-//        } {
-//          Management.copyLabel(oldLabelName, newLabelName, 
toOption(hTableName))
-//        }
-//      }
-//
-//      vertexDegreePathOpt.foreach { vertexDegreePath =>
-//        val vertexDegrees = sc.textFile(vertexDegreePath).filter(line => 
line.split("\t").length == 4).map { line =>
-//          val tokens = line.split("\t")
-//          (tokens(0), tokens(1), tokens(2), tokens(3).toInt)
-//        }
-//        vertexDegrees.foreachPartition { partition =>
-//
-//          // init Graph
-//          val phase = System.getProperty("phase")
-//          GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, 
kafkaBrokerList)
-//
-//          partition.grouped(batchSize).foreach { msgs =>
-//            try {
-//              val start = System.currentTimeMillis()
-//              val counts = GraphSubscriberHelper.storeDegreeBulk(zkQuorum, 
hTableName)(msgs, labelMapping)(Some(mapAcc))
-//              for ((k, v) <- counts) {
-//                mapAcc +=(k, v)
-//              }
-//              val duration = System.currentTimeMillis() - start
-//              println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, 
$hTableName")
-//            } catch {
-//              case e: Throwable =>
-//                println(s"[Failed]: store $e")
-//
-//                msgs.foreach { msg =>
-//                  GraphSubscriberHelper.report(msg.toString(), 
Some(e.getMessage()), topic = kafkaTopic)
-//                }
-//            }
-//          }
-//        }
-//      }
-//
-//
-//      val msgs = sc.textFile(hdfsPath)
-//      msgs.foreachPartition(partition => {
-//        // set executor setting.
-//        val phase = System.getProperty("phase")
-//        GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
-//
-//        partition.grouped(batchSize).foreach { msgs =>
-//          try {
-//            val start = System.currentTimeMillis()
-//            //            val counts =
-//            //              GraphSubscriberHelper.store(msgs, 
GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
-//            val counts =
-//              GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, 
labelMapping, edgeAutoCreate)(Some(mapAcc))
-//
-//            for ((k, v) <- counts) {
-//              mapAcc +=(k, v)
-//            }
-//            val duration = System.currentTimeMillis() - start
-//            println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, 
$hTableName")
-//          } catch {
-//            case e: Throwable =>
-//              println(s"[Failed]: store $e")
-//
-//              msgs.foreach { msg =>
-//                GraphSubscriberHelper.report(msg, Some(e.getMessage()), 
topic = kafkaTopic)
-//              }
-//          }
-//        }
-//      })
-//
-//      logInfo(s"counter: $mapAcc")
-//      println(s"Stats: ${mapAcc}")
-//
-//    } catch {
-//      case e: Throwable =>
-//        println(s"job failed with exception: $e")
-//        throw e
-//    }
-//
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/subscriber/GraphSubscriberStreaming.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/GraphSubscriberStreaming.scala 
b/loader/src/main/scala/subscriber/GraphSubscriberStreaming.scala
deleted file mode 100644
index 1063560..0000000
--- a/loader/src/main/scala/subscriber/GraphSubscriberStreaming.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-package subscriber
-
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.client.{ConnectionFactory}
-import org.apache.spark.streaming.Durations._
-import s2.spark.{HashMapParam, SparkApp, WithKafka}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.language.postfixOps
-
-//object GraphSubscriberStreaming extends SparkApp with WithKafka {
-//  val usages =
-//    s"""
-//       |/**
-//       |this job consume edges/vertices from kafka topic then load them into 
s2graph.
-//       |params:
-//       |  1. kafkaZkQuorum: kafka zk address to consume events
-//       |  2. brokerList: kafka cluster`s broker list.
-//       |  3. topics: , delimited list of topics to consume
-//       |  4. intervalInSec: batch  interval for this job.
-//       |  5. batchSize: how many edges/vertices will be grouped for bulk 
mutations.
-//       |  6. hbaseZkQuorum: s2graph zookeeper address.
-//       |  7. hTableName: physical hbase table name.
-//       |  8. labelMapping: oldLabel:newLabel delimited by ,
-//       |*/
-//   """.stripMargin
-//  override def run() = {
-//    if (args.length != 9) {
-//      System.err.println(usages)
-//      System.exit(1)
-//    }
-//    val kafkaZkQuorum = args(0)
-//    val brokerList = args(1)
-//    val topics = args(2)
-//    val intervalInSec = seconds(args(3).toLong)
-//    val dbUrl = args(4)
-//    val batchSize = args(5).toInt
-//    val hbaseZkQuorum = args(6)
-//    val hTableName = args(7)
-//    val labelMapping = GraphSubscriberHelper.toLabelMapping(args(8))
-//
-//
-//    if (!GraphSubscriberHelper.isValidQuorum(hbaseZkQuorum))
-//      throw new RuntimeException(s"$hbaseZkQuorum is not valid.")
-//
-//    val conf = sparkConf(s"$topics: GraphSubscriberStreaming")
-//    val ssc = streamingContext(conf, intervalInSec)
-//    val sc = ssc.sparkContext
-//
-//    val groupId = topics.replaceAll(",", "_") + "_stream"
-//    val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed"
-//
-//    val kafkaParams = Map(
-//      "zookeeper.connect" -> kafkaZkQuorum,
-//      "group.id" -> groupId,
-//      "zookeeper.connection.timeout.ms" -> "10000",
-//      "metadata.broker.list" -> brokerList,
-//      "auto.offset.reset" -> "largest")
-//
-//    val stream = createKafkaValueStreamMulti(ssc, kafkaParams, topics, 8, 
None).flatMap(s => s.split("\n"))
-//
-//    val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), 
"Throughput")(HashMapParam[String, Long](_ + _))
-//
-//
-//    stream.foreachRDD(rdd => {
-//
-//      rdd.foreachPartition(partition => {
-//        // set executor setting.
-//        val phase = System.getProperty("phase")
-//        GraphSubscriberHelper.apply(phase, dbUrl, hbaseZkQuorum, brokerList)
-//
-//        partition.grouped(batchSize).foreach { msgs =>
-//          try {
-//            val start = System.currentTimeMillis()
-//            //            val counts =
-//            //              GraphSubscriberHelper.store(msgs, 
GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
-//            val counts =
-//              GraphSubscriberHelper.storeBulk(hbaseZkQuorum, 
hTableName)(msgs, labelMapping)(Some(mapAcc))
-//
-//            for ((k, v) <- counts) {
-//              mapAcc +=(k, v)
-//            }
-//            val duration = System.currentTimeMillis() - start
-//            println(s"[Success]: store, $mapAcc, $duration, $hbaseZkQuorum, 
$hTableName")
-//          } catch {
-//            case e: Throwable =>
-//              println(s"[Failed]: store $e")
-//
-//              msgs.foreach { msg =>
-//                GraphSubscriberHelper.report(msg, Some(e.getMessage()), 
topic = fallbackTopic)
-//              }
-//          }
-//        }
-//      })
-//    })
-//
-//
-//    logInfo(s"counter: ${mapAcc.value}")
-//    println(s"counter: ${mapAcc.value}")
-//    ssc.start()
-//    ssc.awaitTermination()
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/subscriber/KafkaToHdfs.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/KafkaToHdfs.scala 
b/loader/src/main/scala/subscriber/KafkaToHdfs.scala
deleted file mode 100644
index 785cb69..0000000
--- a/loader/src/main/scala/subscriber/KafkaToHdfs.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-package subscriber
-
-import org.apache.spark.streaming.Durations._
-import s2.spark.{HashMapParam, SparkApp, WithKafka}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.language.postfixOps
-
-/**
- * Created by shon on 9/2/15.
- */
-//object KafkaToHdfs extends SparkApp with WithKafka {
-//  val usages =
-//    s"""
-//       |/**
-//       |this job consume edges/vertices from kafka topic then load them into 
s2graph.
-//       |params:
-//       |  1. kafkaZkQuorum: kafka zk address to consume events
-//       |  2. brokerList: kafka cluster`s broker list.
-//       |  3. topics: , delimited list of topics to consume
-//       |  4. intervalInSec: batch  interval for this job.
-//       |  5. batchSize: how many edges/vertices will be grouped for bulk 
mutations.
-//       |  6. hbaseZkQuorum: s2graph zookeeper address.
-//       |  7. hTableName: physical hbase table name.
-//       |  8. labelMapping: oldLabel:newLabel delimited by ,
-//       |*/
-//   """.stripMargin
-//  override def run() = {
-//    if (args.length != 9) {
-//      System.err.println(usages)
-//      System.exit(1)
-//    }
-//    val kafkaZkQuorum = args(0)
-//    val brokerList = args(1)
-//    val topics = args(2)
-//    val intervalInSec = seconds(args(3).toLong)
-//    val dbUrl = args(4)
-//    val batchSize = args(5).toInt
-//    val hbaseZkQuorum = args(6)
-//    val hTableName = args(7)
-//    val labelMapping = GraphSubscriberHelper.toLabelMapping(args(8))
-//
-//
-//    if (!GraphSubscriberHelper.isValidQuorum(hbaseZkQuorum))
-//      throw new RuntimeException(s"$hbaseZkQuorum is not valid.")
-//
-//    val conf = sparkConf(s"$topics: GraphSubscriberStreaming")
-//    val ssc = streamingContext(conf, intervalInSec)
-//    val sc = ssc.sparkContext
-//
-//    val groupId = topics.replaceAll(",", "_") + "_stream"
-//    val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed"
-//
-//    val kafkaParams = Map(
-//      "zookeeper.connect" -> kafkaZkQuorum,
-//      "group.id" -> groupId,
-//      "zookeeper.connection.timeout.ms" -> "10000",
-//      "metadata.broker.list" -> brokerList,
-//      "auto.offset.reset" -> "largest")
-//
-//    val stream = createKafkaValueStreamMulti(ssc, kafkaParams, topics, 8, 
None).flatMap(s => s.split("\n"))
-//
-//    val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), 
"Throughput")(HashMapParam[String, Long](_ + _))
-//
-//
-//    stream.foreachRDD(rdd => {
-//
-//      rdd.foreachPartition(partition => {
-//        // set executor setting.
-//        val phase = System.getProperty("phase")
-//        GraphSubscriberHelper.apply(phase, dbUrl, hbaseZkQuorum, brokerList)
-//
-//        partition.grouped(batchSize).foreach { msgs =>
-//          try {
-//            val start = System.currentTimeMillis()
-//            //            val counts =
-//            //              GraphSubscriberHelper.store(msgs, 
GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
-//            val dummyStatFunc = (s: String, i: Int) => {}
-//            val elements = 
GraphSubscriberHelper.toGraphElements(msgs)(dummyStatFunc)
-//            for {
-//              element <- elements
-//            } {
-//              element.serviceName
-//            }
-////            val counts =
-////              GraphSubscriberHelper.storeBulk(hbaseZkQuorum, 
hTableName)(msgs, labelMapping)(Some(mapAcc))
-////
-////            for ((k, v) <- counts) {
-////              mapAcc +=(k, v)
-////            }
-//            val duration = System.currentTimeMillis() - start
-//            println(s"[Success]: store, $mapAcc, $duration, $hbaseZkQuorum, 
$hTableName")
-//          } catch {
-//            case e: Throwable =>
-//              println(s"[Failed]: store $e")
-//
-//              msgs.foreach { msg =>
-//                GraphSubscriberHelper.report(msg, Some(e.getMessage()), 
topic = fallbackTopic)
-//              }
-//          }
-//        }
-//      })
-//    })
-//
-//
-//    logInfo(s"counter: ${mapAcc.value}")
-//    println(s"counter: ${mapAcc.value}")
-//    ssc.start()
-//    ssc.awaitTermination()
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/subscriber/TestEdgeBuilder.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/TestEdgeBuilder.scala 
b/loader/src/main/scala/subscriber/TestEdgeBuilder.scala
deleted file mode 100644
index f5f43f8..0000000
--- a/loader/src/main/scala/subscriber/TestEdgeBuilder.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-package subscriber
-
-import org.apache.spark.SparkContext
-import s2.spark.{SparkApp, WithKafka}
-
-import scala.util.Random
-
-/**
- * Created by shon on 7/27/15.
- */
-//object TestEdgeBuilder  extends SparkApp with WithKafka {
-//  val sleepPeriod = 5000
-//  val usages =
-//    s"""
-//       |/**
-//       |0: numOfRows =
-//       |*/
-//     """.stripMargin
-//
-//  override def run() = {
-//    /**
-//     * label definition can be found on migrate/s2graph/bmt.schema
-//     * Main function
-//     * numOfRows: number of rows
-//     * numOfCols: number of cols
-//     * numOfMetas: number of metas
-//     *
-//     */
-//    println(args.toList)
-//    val conf = sparkConf(s"TestEdgeBuilder")
-//    val sc = new SparkContext(conf)
-//    val phase = args(0)
-//    val dbUrl = args(1)
-//    val zkQuorum = args(2)
-//    val hTableName = args(3)
-//    val labelName = args(4)
-//    val metaName = args(5)
-//
-//    val numOfRows = if (args.length >= 7) args(6).toInt else 100000
-//    val numOfCols = if (args.length >= 8) args(7).toInt else 10000
-//    val dimOfCols = if (args.length >= 9) args(8).toInt else 10000
-//    val numOfSlice = if (args.length >= 10) args(9).toInt else 10
-//    val batchSize = if (args.length >= 11) args(10).toInt else 100
-//
-//    sc.parallelize((0 until numOfRows), numOfSlice).foreachPartition { 
partition =>
-//
-//      GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, "none")
-//
-//      partition.grouped(batchSize).foreach { rows =>
-//        for {
-//          rowId <- rows
-//        } {
-//          val ts = System.currentTimeMillis()
-//          val msgs = for {
-//            colId <- (0 until numOfCols)
-//            metaId = Random.nextInt(dimOfCols)
-//          } yield {
-//              List(ts, "insertBulk", "edge", rowId, colId, labelName, 
s"""{"$metaName": $metaId}""").mkString("\t")
-//            }
-//          GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs)(None)
-//        }
-//      }
-//    }
-//  }
-//}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/TransferToHFile.scala 
b/loader/src/main/scala/subscriber/TransferToHFile.scala
deleted file mode 100644
index 516bb39..0000000
--- a/loader/src/main/scala/subscriber/TransferToHFile.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-package subscriber
-
-
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.{LabelMeta, Label}
-import com.kakao.s2graph.core.types.{InnerValLikeWithTs, LabelWithDirection, 
SourceVertexId}
-import org.apache.hadoop.hbase.client.Put
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.mapreduce.{TableOutputFormat}
-import org.apache.hadoop.hbase._
-import org.apache.hadoop.hbase.regionserver.BloomType
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.{SparkContext}
-import org.apache.spark.rdd.RDD
-import org.hbase.async.{PutRequest}
-import play.api.libs.json.Json
-import s2.spark.{SparkApp}
-import spark.{FamilyHFileWriteOptions, KeyFamilyQualifier, HBaseContext}
-import scala.collection.JavaConversions._
-
-
-object TransferToHFile extends SparkApp with JSONParser {
-
-  val usages =
-    s"""
-       |create HFiles for hbase table on zkQuorum specified.
-       |note that hbase table is created already and pre-splitted properly.
-       |
-       |params:
-       |0. input: hdfs path for tsv file(bulk format).
-       |1. output: hdfs path for storing HFiles.
-       |2. zkQuorum: running hbase cluster zkQuorum.
-       |3. tableName: table name for this bulk upload.
-       |4. dbUrl: db url for parsing to graph element.
-     """.stripMargin
-
-  //TODO: Process AtomicIncrementRequest too.
-  /** build key values */
-  case class DegreeKey(vertexIdStr: String, labelName: String, direction: 
String)
-
-  private def insertBulkForLoaderAsync(edge: Edge, createRelEdges: Boolean = 
true): List[PutRequest] = {
-    val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
-    buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e =>
-      e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(indexEdge) }
-    }
-  }
-
-  def buildDegrees(msgs: RDD[String], labelMapping: Map[String, String], 
edgeAutoCreate: Boolean) = {
-    for {
-      msg <- msgs
-      tokens = GraphUtil.split(msg)
-      if tokens(2) == "e" || tokens(2) == "edge"
-      tempDirection = if (tokens.length == 7) "out" else tokens(7)
-      direction = if (tempDirection != "out" && tempDirection != "in") "out" 
else tempDirection
-      reverseDirection = if (direction == "out") "in" else "out"
-      convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5))
-      (vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4))
-      degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction)
-      degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, 
reverseDirection)
-      extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil
-      output <- List(degreeKey -> 1L) ++ extra
-    } yield output
-  }
-  def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = {
-    val kvs = 
GraphSubscriberHelper.g.storage.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
-    kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
-  }
-  def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = {
-    val kvs = 
GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.toList
-    kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
-  }
-  def buildDegreePutRequests(vertexId: String, labelName: String, direction: 
String, degreeVal: Long): List[PutRequest] = {
-    val label = Label.findByName(labelName).getOrElse(throw new 
RuntimeException(s"$labelName is not found in DB."))
-    val dir = GraphUtil.directions(direction)
-    val innerVal = jsValueToInnerVal(Json.toJson(vertexId), 
label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
-      throw new RuntimeException(s"$vertexId can not be converted into 
innerval")
-    }
-    val vertex = Vertex(SourceVertexId(label.srcColumn.id.get, innerVal))
-
-    val ts = System.currentTimeMillis()
-    val propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
-    val labelWithDir = LabelWithDirection(label.id.get, dir)
-    val edge = Edge(vertex, vertex, labelWithDir, propsWithTs=propsWithTs)
-
-    edge.edgesWithIndex.flatMap { indexEdge =>
-      
GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.map 
{ kv =>
-        new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], 
Bytes.toBytes(degreeVal), kv.timestamp)
-      }
-    }
-  }
-
-  def toKeyValues(degreeKeyVals: Seq[(DegreeKey, Long)]): Iterator[KeyValue] = 
{
-    val kvs = for {
-      (key, value) <- degreeKeyVals
-      putRequest <- buildDegreePutRequests(key.vertexIdStr, key.labelName, 
key.direction, value)
-    } yield {
-        val p = putRequest
-        val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
-        kv
-      }
-    kvs.toIterator
-  }
-
-  def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], 
autoEdgeCreate: Boolean): Iterator[KeyValue] = {
-    val kvs = for {
-      s <- strs
-      element <- Graph.toGraphElement(s, labelMapping).toSeq if 
element.isInstanceOf[Edge]
-      edge = element.asInstanceOf[Edge]
-      putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate)
-    } yield {
-        val p = putRequest
-        val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
-
-        //        println(s"[Edge]: $edge\n[Put]: $p\n[KeyValue]: 
${kv.getRow.toList}, ${kv.getQualifier.toList}, ${kv.getValue.toList}, 
${kv.getTimestamp}")
-
-        kv
-      }
-    kvs.toIterator
-  }
-
-
-  override def run() = {
-    val input = args(0)
-    val tmpPath = args(1)
-    val zkQuorum = args(2)
-    val tableName = args(3)
-    val dbUrl = args(4)
-    val maxHFilePerResionServer = args(5).toInt
-    val labelMapping = if (args.length >= 7) 
GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String]
-    val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false
-    val buildDegree = if (args.length >= 9) args(8).toBoolean else true
-    val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4"
-    val conf = sparkConf(s"$input: TransferToHFile")
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.kryoserializer.buffer.mb", "24")
-
-    val sc = new SparkContext(conf)
-
-    Management.createTable(zkQuorum, tableName, List("e", "v"), 
maxHFilePerResionServer, None, compressionAlgorithm)
-
-    /** set up hbase init */
-    val hbaseConf = HBaseConfiguration.create()
-    hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
-    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
-    hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName")
-
-
-    val rdd = sc.textFile(input)
-
-
-    val kvs = rdd.mapPartitions { iter =>
-      val phase = System.getProperty("phase")
-      GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
-      toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate)
-    }
-    //
-    //    val newRDD = if (!buildDegree) new HFileRDD(kvs)
-    //    else {
-    //      val degreeKVs = buildDegrees(rdd, labelMapping, 
autoEdgeCreate).reduceByKey { (agg, current) =>
-    //        agg + current
-    //      }.mapPartitions { iter =>
-    //        val phase = System.getProperty("phase")
-    //        GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
-    //        toKeyValues(iter.toSeq)
-    //      }
-    //      new HFileRDD(kvs ++ degreeKVs)
-    //    }
-    //
-    //    newRDD.toHFile(hbaseConf, zkQuorum, tableName, 
maxHFilePerResionServer, tmpPath)
-    val merged = if (!buildDegree) kvs
-    else {
-      kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { 
(agg, current) =>
-        agg + current
-      }.mapPartitions { iter =>
-        val phase = System.getProperty("phase")
-        GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
-        toKeyValues(iter.toSeq)
-      }
-    }
-
-    val hbaseSc = new HBaseContext(sc, hbaseConf)
-    def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
-      val k = new KeyFamilyQualifier(kv.getRow(), kv.getFamily(), 
kv.getQualifier())
-      val v = kv.getValue()
-      Seq((k -> v)).toIterator
-    }
-    val familyOptions = new 
FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase,
-      BloomType.ROW.name().toUpperCase, 32768, 
DataBlockEncoding.FAST_DIFF.name().toUpperCase)
-    val familyOptionsMap = Map("e".getBytes() -> familyOptions, "v".getBytes() 
-> familyOptions)
-
-    hbaseSc.bulkLoad(merged, TableName.valueOf(tableName), flatMap, tmpPath, 
familyOptionsMap)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/subscriber/VertexDegreeBuilder.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/VertexDegreeBuilder.scala 
b/loader/src/main/scala/subscriber/VertexDegreeBuilder.scala
deleted file mode 100644
index c8255dd..0000000
--- a/loader/src/main/scala/subscriber/VertexDegreeBuilder.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-package subscriber
-
-/**
- * Created by shon on 7/3/15.
- */
-
-import com.kakao.s2graph.core.{GraphUtil, Management}
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import s2.spark.{HashMapParam, WithKafka, SparkApp}
-
-import scala.collection.mutable
-
-/**
- * Created by shon on 7/3/15.
- */
-
-//object VertexDegreeBuilder extends SparkApp with WithKafka {
-//  val sleepPeriod = 5000
-//  val usages =
-//    s"""
-//       |/**
-//       |* this job read edge format(TSV) from HDFS file system then bulk 
load edges into s2graph. assumes that newLabelName is already created by API.
-//       |* params:
-//       |*  0. hdfsPath: where is your data in hdfs. require full path with 
hdfs:// predix
-//       |*  1. dbUrl: jdbc database connection string to specify database for 
meta.
-//       |*  2. labelMapping: oldLabel:newLabel delimited by ,
-//       |*  3. outputPath: degree output Path.
-//       |*  4. edgeAutoCreate: true if need to create reversed edge 
automatically.
-//       |*
-//       |* after this job finished, s2graph will have data with sequence 
corresponding newLabelName.
-//       |* change this newLabelName to ogirinalName if you want to online 
replace of label.
-//       |*
-//       |*/
-//     """.stripMargin
-//
-//  override def run() = {
-//    /**
-//     * Main function
-//     */
-//    println(args.toList)
-//    if (args.length != 5) {
-//      System.err.println(usages)
-//      System.exit(1)
-//    }
-//    val hdfsPath = args(0)
-//    val dbUrl = args(1)
-//    val labelMapping = GraphSubscriberHelper.toLabelMapping(args(2))
-//    val outputPath = args(3)
-//    val edgeAutoCreate = args(4).toBoolean
-//
-//    val conf = sparkConf(s"$hdfsPath: VertexDegreeBuilder")
-//    val sc = new SparkContext(conf)
-//    val mapAcc = sc.accumulable(mutable.HashMap.empty[String, Long], 
"counter")(HashMapParam[String, Long](_ + _))
-//
-//    /** this job expect only one hTableName. all labels in this job will be 
stored in same physical hbase table */
-//    try {
-//
-//      // set local driver setting.
-//      val phase = System.getProperty("phase")
-//      GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
-//
-//      /** copy when oldLabel exist and newLabel done exist. otherwise 
ignore. */
-//
-//
-//      val msgs = sc.textFile(hdfsPath)
-//
-//      /** change assumption here. this job only take care of one label data 
*/
-//      val degreeStart: RDD[((String, String, String), Int)] = msgs.filter { 
msg =>
-//        val tokens = GraphUtil.split(msg)
-//        (tokens(2) == "e" || tokens(2) == "edge")
-//      } flatMap { msg =>
-//        val tokens = GraphUtil.split(msg)
-//        val tempDirection = if (tokens.length == 7) "out" else tokens(7)
-//        val direction = if (tempDirection != "out" && tempDirection != "in") 
"out" else tempDirection
-//        val reverseDirection = if (direction == "out") "in" else "out"
-//        val convertedLabelName = 
labelMapping.get(tokens(5)).getOrElse(tokens(5))
-//        val (vertexWithLabel, reverseVertexWithLabel) = if (direction == 
"out") {
-//          (
-//            (tokens(3), convertedLabelName, direction),
-//            (tokens(4), convertedLabelName, reverseDirection)
-//            )
-//        } else {
-//          (
-//            (tokens(4), convertedLabelName, direction),
-//            (tokens(3), convertedLabelName, reverseDirection)
-//            )
-//        }
-//        if (edgeAutoCreate) {
-//          List((vertexWithLabel -> 1), (reverseVertexWithLabel -> 1))
-//        } else {
-//          List((vertexWithLabel -> 1))
-//        }
-//      }
-//      val vertexDegrees = degreeStart.reduceByKey(_ + _)
-//      vertexDegrees.map { case ((vertexId, labelName, dir), degreeVal) =>
-//        Seq(vertexId, labelName, dir, degreeVal).mkString("\t")
-//      }.saveAsTextFile(outputPath)
-//
-//    }
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/subscriber/WalLogStat.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/WalLogStat.scala 
b/loader/src/main/scala/subscriber/WalLogStat.scala
deleted file mode 100644
index f5db2c1..0000000
--- a/loader/src/main/scala/subscriber/WalLogStat.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-package subscriber
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import com.kakao.s2graph.core.Graph
-import kafka.producer.KeyedMessage
-import kafka.serializer.StringDecoder
-import org.apache.spark.streaming.Durations._
-import org.apache.spark.streaming.kafka.HasOffsetRanges
-import s2.spark.{HashMapParam, SparkApp, WithKafka}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.language.postfixOps
-
-object WalLogStat extends SparkApp with WithKafka {
-
-  override def run() = {
-
-    validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", 
"dbUrl", "statTopic")
-
-    val kafkaZkQuorum = args(0)
-    val brokerList = args(1)
-    val topics = args(2)
-    val intervalInSec = seconds(args(3).toLong)
-    val dbUrl = args(4)
-    val statTopic = args(5)
-
-
-    val conf = sparkConf(s"$topics: ${getClass.getSimpleName}")
-    val ssc = streamingContext(conf, intervalInSec)
-    val sc = ssc.sparkContext
-
-    val groupId = topics.replaceAll(",", "_") + "_stat"
-
-    val kafkaParams = Map(
-      "zookeeper.connect" -> kafkaZkQuorum,
-      "group.id" -> groupId,
-      "metadata.broker.list" -> brokerList,
-      "zookeeper.connection.timeout.ms" -> "10000",
-      "auto.offset.reset" -> "largest")
-
-    val stream = getStreamHelper(kafkaParams).createStream[String, String, 
StringDecoder, StringDecoder](ssc, topics.split(",").toSet)
-    val statProducer = getProducer[String, String](brokerList)
-
-    stream.foreachRDD { (rdd, time) =>
-
-      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-
-      val ts = time.milliseconds
-
-      val elements = rdd.mapPartitions { partition =>
-        // set executor setting.
-        val phase = System.getProperty("phase")
-        GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
-        partition.map { case (key, msg) =>
-          Graph.toGraphElement(msg) match {
-            case Some(elem) =>
-              val serviceName = elem.serviceName
-              msg.split("\t", 7) match {
-                case Array(_, operation, log_type, _, _, label, _*) =>
-                  Seq(serviceName, label, operation, log_type).mkString("\t")
-                case _ =>
-                  Seq("no_service_name", "no_label", "no_operation", 
"parsing_error").mkString("\t")
-              }
-            case None =>
-              Seq("no_service_name", "no_label", "no_operation", 
"no_element_error").mkString("\t")
-          }
-        }
-      }
-
-      val countByKey = elements.map(_ -> 1L).reduceByKey(_ + _).collect()
-      val totalCount = countByKey.map(_._2).sum
-      val keyedMessage = countByKey.map { case (key, value) =>
-        new KeyedMessage[String, String](statTopic, 
s"$ts\t$key\t$value\t$totalCount")
-      }
-
-      statProducer.send(keyedMessage: _*)
-
-      elements.mapPartitionsWithIndex { (i, part) =>
-        // commit offset range
-        val osr = offsets(i)
-        getStreamHelper(kafkaParams).commitConsumerOffset(osr)
-        Iterator.empty
-      }.foreach {
-        (_: Nothing) => ()
-      }
-
-    }
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/subscriber/WalLogToHDFS.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/WalLogToHDFS.scala 
b/loader/src/main/scala/subscriber/WalLogToHDFS.scala
deleted file mode 100644
index 44902c2..0000000
--- a/loader/src/main/scala/subscriber/WalLogToHDFS.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-package subscriber
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import com.kakao.s2graph.core.Graph
-import kafka.serializer.StringDecoder
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.streaming.Durations._
-import org.apache.spark.streaming.kafka.HasOffsetRanges
-import s2.spark.{HashMapParam, SparkApp, WithKafka}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.language.postfixOps
-
-object WalLogToHDFS extends SparkApp with WithKafka {
-
-  override def run() = {
-
-    validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", 
"dbUrl", "outputPath", "hiveDatabase", "hiveTable", "splitListPath")
-
-    val kafkaZkQuorum = args(0)
-    val brokerList = args(1)
-    val topics = args(2)
-    val intervalInSec = seconds(args(3).toLong)
-    val dbUrl = args(4)
-    val outputPath = args(5)
-    val hiveDatabase = args(6)
-    val hiveTable = args(7)
-    val splitListPath = args(8)
-
-    val conf = sparkConf(s"$topics: WalLogToHDFS")
-    val ssc = streamingContext(conf, intervalInSec)
-    val sc = ssc.sparkContext
-
-    val groupId = topics.replaceAll(",", "_") + "_stream"
-    val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed"
-
-    val kafkaParams = Map(
-      "zookeeper.connect" -> kafkaZkQuorum,
-      "group.id" -> groupId,
-      "metadata.broker.list" -> brokerList,
-      "zookeeper.connection.timeout.ms" -> "10000",
-      "auto.offset.reset" -> "largest")
-
-    val stream = getStreamHelper(kafkaParams).createStream[String, String, 
StringDecoder, StringDecoder](ssc, topics.split(",").toSet)
-
-    val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), 
"Throughput")(HashMapParam[String, Long](_ + _))
-
-    val hdfsBlockSize = 134217728 // 128M
-    val hiveContext = new HiveContext(sc)
-    var splits = Array[String]()
-    var excludeLabels = Set[String]()
-    var excludeServices = Set[String]()
-    stream.foreachRDD { (rdd, time) =>
-      try {
-        val read = 
sc.textFile(splitListPath).collect().map(_.split("=")).flatMap {
-          case Array(value) => Some(("split", value))
-          case Array(key, value) => Some((key, value))
-          case _ => None
-        }
-        splits = read.filter(_._1 == "split").map(_._2)
-        excludeLabels = read.filter(_._1 == "exclude_label").map(_._2).toSet
-        excludeServices = read.filter(_._1 == 
"exclude_service").map(_._2).toSet
-      } catch {
-        case _: Throwable => // use previous information
-      }
-
-      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-
-      val elements = rdd.mapPartitions { partition =>
-        // set executor setting.
-        val phase = System.getProperty("phase")
-        GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
-
-        partition.flatMap { case (key, msg) =>
-          val optMsg = Graph.toGraphElement(msg).flatMap { element =>
-            val arr = msg.split("\t", 7)
-            val service = element.serviceName
-            val label = arr(5)
-            val n = arr.length
-
-            if (excludeServices.contains(service) || 
excludeLabels.contains(label)) {
-              None
-            } else if(n == 6) {
-              Some(Seq(msg, "{}", service).mkString("\t"))
-            }
-            else if(n == 7) {
-              Some(Seq(msg, service).mkString("\t"))
-            }
-            else {
-              None
-            }
-          }
-          optMsg
-        }
-      }
-
-      val ts = time.milliseconds
-      val dateId = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts))
-
-      /** make sure that `elements` are not running at the same time */
-      val elementsWritten = {
-        elements.cache()
-        (Array("all") ++ splits).foreach {
-          case split if split == "all" =>
-            val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
-            elements.saveAsTextFile(path)
-          case split =>
-            val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
-            val strlen = split.length
-            val splitData = elements.filter(_.takeRight(strlen) == 
split).cache()
-            val totalSize = splitData
-                .mapPartitions { iterator =>
-                  val s = iterator.map(_.length.toLong).sum
-                  Iterator.single(s)
-                }
-                .sum
-                .toLong
-            val numPartitions = math.max(1, (totalSize / 
hdfsBlockSize.toDouble).toInt)
-            splitData.coalesce(math.min(splitData.partitions.length, 
numPartitions)).saveAsTextFile(path)
-            splitData.unpersist()
-        }
-        elements.unpersist()
-        elements
-      }
-
-      elementsWritten.mapPartitionsWithIndex { (i, part) =>
-        // commit offset range
-        val osr = offsets(i)
-        getStreamHelper(kafkaParams).commitConsumerOffset(osr)
-        Iterator.empty
-      }.foreach {
-        (_: Nothing) => ()
-      }
-
-      (Array("all") ++ splits).foreach { split =>
-        val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
-        hiveContext.sql(s"use $hiveDatabase")
-        hiveContext.sql(s"alter table $hiveTable add partition 
(split='$split', date_id='$dateId', ts='$ts') location '$path'")
-      }
-    }
-
-    logInfo(s"counter: ${mapAcc.value}")
-    println(s"counter: ${mapAcc.value}")
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala
 
b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala
new file mode 100644
index 0000000..1f93134
--- /dev/null
+++ 
b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala
@@ -0,0 +1,45 @@
+package org.apache.s2graph.loader.subscriber
+
+import org.apache.s2graph.core.Management
+import org.apache.s2graph.spark.spark.WithKafka
+import org.scalatest.{ FunSuite, Matchers }
+import play.api.libs.json.{JsBoolean, JsNumber}
+
+class GraphSubscriberTest extends FunSuite with Matchers with WithKafka {
+  val phase = "dev"
+  val dbUrl = "jdbc:mysql://localhost:3306/graph_dev"
+  val zkQuorum = "localhost"
+  val kafkaBrokerList = "localhost:9099"
+  val currentTs = System.currentTimeMillis()
+  val op = "insertBulk"
+  val testLabelName = "s2graph_label_test"
+  val labelToReplace = "s2graph_label_test_new"
+  val serviceName = "s2graph"
+  val columnName = "user_id"
+  val columnType = "long"
+  val indexProps = Seq("time" -> JsNumber(0), "weight" -> JsNumber(0))
+  val props = Seq("is_hidden" -> JsBoolean(false), "is_blocked" -> 
JsBoolean(false))
+  val hTableName = "s2graph-dev_new"
+  val ttl = 86000
+  val testStrings = 
List("1431788400000\tinsertBulk\te\t147229417\t99240432\ts2graph_label_test\t{\"is_hidden\":
 true}")
+
+  GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
+
+  test("GraphSubscriberHelper.store") {
+    // actually we need to delete labelToReplace first for each test.
+    val labelMapping = Map(testLabelName -> labelToReplace)
+    GraphSubscriberHelper.management.copyLabel(testLabelName, labelToReplace, 
Some(hTableName))
+
+//
+//    val msgs = (for {
+//      i <- (1 until 10)
+//      j <- (100 until 110)
+//    } yield {
+//        s"$currentTs\t$op\tedge\t$i\t$j\t$testLabelName"
+//      }).toSeq
+    val msgs = testStrings
+
+//    val stat = GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, 
labelMapping = labelMapping, autoCreateEdge = false)(None)
+//    println(stat)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
 
b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
new file mode 100644
index 0000000..b0dd80d
--- /dev/null
+++ 
b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
@@ -0,0 +1,169 @@
+package org.apache.s2graph.loader.subscriber
+
+import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest._
+import TransferToHFile._
+
+/**
+  * Created by Eric on 2015. 12. 2..
+  */
+class TransferToHFileTest  extends FlatSpec with BeforeAndAfterAll with 
Matchers {
+
+  private val master = "local[2]"
+  private val appName = "example-spark"
+
+  private var sc: SparkContext = _
+
+  val dataWithoutDir =
+    """
+      |1447686000000   insertBulk      e       a       b       friends_rel     
{}
+      |1447686000000   insertBulk      e       a       c       friends_rel     
{}
+      |1447686000000   insertBulk      e       a       d       friends_rel     
{}
+      |1447686000000   insertBulk      e       b       d       friends_rel     
{}
+      |1447686000000   insertBulk      e       b       e       friends_rel     
{}
+    """.stripMargin.trim
+
+  val dataWithDir =
+    """
+     |1447686000000    insertBulk      e       a       b       friends_rel     
{}      out
+     |1447686000000    insertBulk      e       b       a       friends_rel     
{}      in
+     |1447686000000    insertBulk      e       a       c       friends_rel     
{}      out
+     |1447686000000    insertBulk      e       c       a       friends_rel     
{}      in
+     |1447686000000    insertBulk      e       a       d       friends_rel     
{}      out
+     |1447686000000    insertBulk      e       d       a       friends_rel     
{}      in
+     |1447686000000    insertBulk      e       b       d       friends_rel     
{}      out
+     |1447686000000    insertBulk      e       d       b       friends_rel     
{}      in
+     |1447686000000    insertBulk      e       b       e       friends_rel     
{}      out
+     |1447686000000    insertBulk      e       e       b       friends_rel     
{}      in
+    """.stripMargin.trim
+
+  override def beforeAll(): Unit = {
+    println("### beforeAll")
+
+    GraphSubscriberHelper.apply("dev", "none", "none", "none")
+    // 1. create service
+    if(Management.findService("loader-test").isEmpty) {
+      println(">>> create service...")
+      Management.createService("loader-test", "localhost", "loader-test-dev", 
1, None, "gz")
+    }
+
+    // 2. create label
+    if(Management.findLabel("friends_rel").isEmpty) {
+      println(">>> create label...")
+      Management.createLabel(
+        "friends_rel",
+        "loader-test", "user_id", "string",
+        "loader-test", "user_id", "string",
+        true,
+        "loader-test",
+        Seq(),
+        Seq(),
+        "weak",
+        None, None,
+        HBaseType.DEFAULT_VERSION,
+        false,
+        Management.defaultCompressionAlgorithm
+      )
+    }
+
+    // create spark context
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName(appName)
+
+    sc = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+    println("### afterALL")
+    if (sc != null) {
+      sc.stop()
+    }
+
+    Management.deleteLabel("friends_rel")
+  }
+
+  "buildDegreePutRequest" should "transform degree to PutRequest" in {
+    val putReqs = buildDegreePutRequests("a", "friends_rel", "out", 3L)
+    putReqs.size should equal(1)
+  }
+
+  "toKeyValues" should "transform edges to KeyValues on edge format data 
without direction" in {
+    val rdd = sc.parallelize(dataWithoutDir.split("\n"))
+
+    val kvs = rdd.mapPartitions { iter =>
+      GraphSubscriberHelper.apply("dev", "none", "none", "none")
+      TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false)
+    }
+    kvs.foreach(println)
+    // edges * 2 (snapshot edges + indexed edges)
+    kvs.count() should equal(10)
+
+
+    val kvsAutoCreated = rdd.mapPartitions { iter =>
+      GraphSubscriberHelper.apply("dev", "none", "none", "none")
+      TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], true)
+    }
+
+    // edges * 3 (snapshot edges + indexed edges + reverse edges)
+    kvsAutoCreated.count() should equal(15)
+  }
+
+  "toKeyValues" should "transform edges to KeyValues on edge format data with 
direction" in {
+    val rdd = sc.parallelize(dataWithDir.split("\n"))
+
+    val kvs = rdd.mapPartitions { iter =>
+      GraphSubscriberHelper.apply("dev", "none", "none", "none")
+      TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false)
+    }
+
+    // edges * 2 (snapshot edges + indexed edges)
+    kvs.count() should equal(20)
+  }
+
+  "buildDegrees" should "build degrees on edge format data without direction" 
in {
+    val rdd = sc.parallelize(dataWithoutDir.split("\n"))
+
+    // autoCreate = false
+    val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], 
false).reduceByKey { (agg, current) =>
+      agg + current
+    }.collectAsMap()
+    degrees.size should equal(2)
+
+    degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L)
+    degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L)
+
+
+    // autoCreate = true
+    val degreesAutoCreated = TransferToHFile.buildDegrees(rdd, 
Map.empty[String, String], true).reduceByKey { (agg, current) =>
+      agg + current
+    }.collectAsMap()
+    degreesAutoCreated.size should equal(6)
+
+    degreesAutoCreated should contain(DegreeKey("a", "friends_rel", "out") -> 
3L)
+    degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "out") -> 
2L)
+    degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "in") -> 
1L)
+    degreesAutoCreated should contain(DegreeKey("c", "friends_rel", "in") -> 
1L)
+    degreesAutoCreated should contain(DegreeKey("d", "friends_rel", "in") -> 
2L)
+    degreesAutoCreated should contain(DegreeKey("e", "friends_rel", "in") -> 
1L)
+  }
+
+  "buildDegrees" should "build degrees on edge format data with direction" in {
+    val rdd = sc.parallelize(dataWithDir.split("\n"))
+
+    val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], 
false).reduceByKey { (agg, current) =>
+      agg + current
+    }.collectAsMap()
+
+    degrees.size should equal(6)
+
+    degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L)
+    degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L)
+    degrees should contain(DegreeKey("b", "friends_rel", "in") -> 1L)
+    degrees should contain(DegreeKey("c", "friends_rel", "in") -> 1L)
+    degrees should contain(DegreeKey("d", "friends_rel", "in") -> 2L)
+    degrees should contain(DegreeKey("e", "friends_rel", "in") -> 1L)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
----------------------------------------------------------------------
diff --git a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala 
b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
deleted file mode 100644
index f927509..0000000
--- a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-package subscriber
-
-import com.kakao.s2graph.core.Management
-import org.scalatest.{ FunSuite, Matchers }
-import play.api.libs.json.{JsBoolean, JsNumber}
-import s2.spark.WithKafka
-
-class GraphSubscriberTest extends FunSuite with Matchers with WithKafka {
-  val phase = "dev"
-  val dbUrl = "jdbc:mysql://localhost:3306/graph_dev"
-  val zkQuorum = "localhost"
-  val kafkaBrokerList = "localhost:9099"
-  val currentTs = System.currentTimeMillis()
-  val op = "insertBulk"
-  val testLabelName = "s2graph_label_test"
-  val labelToReplace = "s2graph_label_test_new"
-  val serviceName = "s2graph"
-  val columnName = "user_id"
-  val columnType = "long"
-  val indexProps = Seq("time" -> JsNumber(0), "weight" -> JsNumber(0))
-  val props = Seq("is_hidden" -> JsBoolean(false), "is_blocked" -> 
JsBoolean(false))
-  val hTableName = "s2graph-dev_new"
-  val ttl = 86000
-  val testStrings = 
List("1431788400000\tinsertBulk\te\t147229417\t99240432\ts2graph_label_test\t{\"is_hidden\":
 true}")
-
-  GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
-
-  test("GraphSubscriberHelper.store") {
-    // actually we need to delete labelToReplace first for each test.
-    val labelMapping = Map(testLabelName -> labelToReplace)
-    GraphSubscriberHelper.management.copyLabel(testLabelName, labelToReplace, 
Some(hTableName))
-
-//
-//    val msgs = (for {
-//      i <- (1 until 10)
-//      j <- (100 until 110)
-//    } yield {
-//        s"$currentTs\t$op\tedge\t$i\t$j\t$testLabelName"
-//      }).toSeq
-    val msgs = testStrings
-
-//    val stat = GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, 
labelMapping = labelMapping, autoCreateEdge = false)(None)
-//    println(stat)
-  }
-}

Reply via email to