[S2GRAPH-17]: Remove unnecessary abstraction layer, Storage.

  1. refactor all common codes from AsynchbaseStorage to Storage trait.
  2. little bit of documentation.
  3. restructuring package(move serializer/deserializer from storage/hbase into 
storage).
  4. extract future cache from AsynchbaseStorage into utils.

JIRA:
  [S2GRAPH-17] https://issues.apache.org/jira/browse/S2GRAPH-17

Pull Request:
  Closes #34


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e207f676
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e207f676
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e207f676

Branch: refs/heads/master
Commit: e207f676fe145690b027a8015fd25794ec47ff94
Parents: 36d5485
Author: DO YUNG YOON <[email protected]>
Authored: Mon Feb 29 15:14:07 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Mon Feb 29 15:14:07 2016 +0900

----------------------------------------------------------------------
 .gitignore                                      |    2 +-
 CHANGES                                         |    2 +
 README.md                                       |    2 +-
 dev_support/README.md                           |   45 +
 dev_support/docker-compose.yml                  |   30 +
 dev_support/graph_mysql/Dockerfile              |    5 +
 dev_support/graph_mysql/schema.sql              |  218 ++++
 docker-compose.yml                              |   30 -
 loader/build.sbt                                |    7 +
 .../main/scala/subscriber/GraphSubscriber.scala |    9 +-
 .../main/scala/subscriber/TransferToHFile.scala |   19 +-
 .../src/main/scala/subscriber/WalLogStat.scala  |   94 ++
 .../main/scala/subscriber/WalLogToHDFS.scala    |  235 ++--
 .../scala/subscriber/GraphSubscriberTest.scala  |    8 +-
 .../scala/subscriber/TransferToHFileTest.scala  |  169 +++
 project/plugins.sbt                             |    5 +-
 s2core/build.sbt                                |    1 -
 s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar        |  Bin 1305124 -> 1302410 bytes
 s2core/migrate/mysql/schema.sql                 |    1 +
 s2core/src/main/resources/create_model.hql      |    1 -
 s2core/src/main/resources/logback.xml           |   13 +-
 s2core/src/main/resources/reference.conf        |   16 +-
 .../scala/com/kakao/s2graph/core/Edge.scala     |    1 +
 .../scala/com/kakao/s2graph/core/Graph.scala    |   21 +-
 .../com/kakao/s2graph/core/Management.scala     |    1 -
 .../com/kakao/s2graph/core/PostProcess.scala    |   33 +
 .../com/kakao/s2graph/core/QueryParam.scala     |   20 +-
 .../com/kakao/s2graph/core/QueryRequest.scala   |   12 -
 .../com/kakao/s2graph/core/QueryResult.scala    |   28 +-
 .../scala/com/kakao/s2graph/core/Vertex.scala   |    1 +
 .../kakao/s2graph/core/mysqls/Experiment.scala  |    2 +
 .../kakao/s2graph/core/rest/RequestParser.scala |   43 +-
 .../s2graph/core/storage/Deserializable.scala   |   25 +
 .../core/storage/IndexEdgeDeserializable.scala  |  128 ++
 .../core/storage/IndexEdgeSerializable.scala    |   58 +
 .../s2graph/core/storage/MutationBuilder.scala  |   57 -
 .../s2graph/core/storage/QueryBuilder.scala     |  104 --
 .../kakao/s2graph/core/storage/SKeyValue.scala  |   22 +-
 .../s2graph/core/storage/Serializable.scala     |   10 +
 .../storage/SnapshotEdgeDeserializable.scala    |  142 +++
 .../core/storage/SnapshotEdgeSerializable.scala |   76 ++
 .../kakao/s2graph/core/storage/Storage.scala    | 1137 +++++++++++++++++-
 .../core/storage/StorageDeserializable.scala    |   17 +-
 .../core/storage/VertexDeserializable.scala     |   46 +
 .../core/storage/VertexSerializable.scala       |   18 +
 .../hbase/AsynchbaseMutationBuilder.scala       |  119 --
 .../storage/hbase/AsynchbaseQueryBuilder.scala  |  255 ----
 .../core/storage/hbase/AsynchbaseStorage.scala  |  961 ++++-----------
 .../core/storage/hbase/HDeserializable.scala    |   25 -
 .../core/storage/hbase/HSerializable.scala      |   10 -
 .../storage/hbase/IndexEdgeDeserializable.scala |  116 --
 .../storage/hbase/IndexEdgeSerializable.scala   |   45 -
 .../hbase/SnapshotEdgeDeserializable.scala      |  140 ---
 .../hbase/SnapshotEdgeSerializable.scala        |   91 --
 .../storage/hbase/VertexDeserializable.scala    |   47 -
 .../core/storage/hbase/VertexSerializable.scala |   19 -
 .../kakao/s2graph/core/utils/DeferCache.scala   |   82 ++
 .../kakao/s2graph/core/utils/Extentions.scala   |    2 -
 .../kakao/s2graph/core/utils/FutureCache.scala  |   82 ++
 .../kakao/s2graph/core/Integrate/CrudTest.scala |    2 +-
 .../core/Integrate/IntegrateCommon.scala        |   22 +-
 .../s2graph/core/Integrate/QueryTest.scala      |  546 +++++++--
 .../core/Integrate/StrongLabelDeleteTest.scala  |   17 +-
 .../core/Integrate/WeakLabelDeleteTest.scala    |    4 +-
 .../com/kakao/s2graph/core/ManagementTest.scala |  121 --
 .../s2graph/core/parsers/WhereParserTest.scala  |   54 +-
 .../hbase/AsynchbaseQueryBuilderTest.scala      |  106 +-
 .../s2/counter/core/CounterFunctions.scala      |    2 +-
 .../stream/ExactCounterStreamingSpec.scala      |  196 +++
 s2rest_netty/build.sbt                          |    7 +
 s2rest_netty/conf/logger.xml                    |   83 ++
 s2rest_netty/conf/reference.conf                |  131 ++
 .../src/main/resources/application.conf         |    0
 s2rest_netty/src/main/resources/reference.conf  |  131 ++
 s2rest_netty/src/main/scala/Server.scala        |   14 +-
 s2rest_play/app/Bootstrap.scala                 |    2 +-
 .../app/controllers/EdgeController.scala        |    5 +-
 s2rest_play/build.sbt                           |    0
 .../test/benchmark/JsonBenchmarkSpec.scala      |   24 +-
 .../benchmark/OrderingUtilBenchmarkSpec.scala   |   21 +-
 80 files changed, 4039 insertions(+), 2357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 76f2bb8..1e21302 100644
--- a/.gitignore
+++ b/.gitignore
@@ -94,7 +94,7 @@ bin/
 /db
 .eclipse
 /lib/
-/logs/
+logs/
 /modules
 /project/project
 /project/target

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 1512043..97434d6 100644
--- a/CHANGES
+++ b/CHANGES
@@ -63,6 +63,8 @@ Release 0.12.1 - unreleased
 
     S2GRAPH-5: Add Apache RAT to valid LICENSE errors. (Committed by DOYUNG 
YOON).
 
+    S2GRAPH-17: Remove unnecessary abstraction layer, Storage. (Committed by 
DOYUNG YOON).
+
   SUB TASKS
 
     S2GRAPH-9: Provide rest server using netty. (Committed by daewon).

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 07a49f6..61430ee 100644
--- a/README.md
+++ b/README.md
@@ -10,7 +10,7 @@
 S2Graph comes with a Vagrantfile that lets you spin up a virtual environment 
for test and development purposes.
 (On setting up S2Graph in your local environment directly, please refer to 
[Quick Start in Your Local 
Environment](https://steamshon.gitbooks.io/s2graph-book/content/getting_started.html).)
 
-You will need [VirtualBox](https://www.virtualbox.org/wiki/Downloads) and 
[Vagrant](http://docs.ansible.com/ansible/intro_installation.html) installed on 
your system.
+You will need [VirtualBox](https://www.virtualbox.org/wiki/Downloads) and 
[Vagrant](https://www.vagrantup.com/downloads.html) installed on your system.
 
 With everything ready, let's get started by running the following commands:
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/dev_support/README.md
----------------------------------------------------------------------
diff --git a/dev_support/README.md b/dev_support/README.md
new file mode 100644
index 0000000..556a7d9
--- /dev/null
+++ b/dev_support/README.md
@@ -0,0 +1,45 @@
+# Run S2Graph using Docker
+
+1. Build a docker image of the s2graph in the project's root directory
+       - `sbt "project s2rest_play" docker:publishLocal`
+2. Run MySQL and HBase container first.
+       - change directory to dev-support. `cd dev-support`
+       - `docker-compose build` 
+       - `docker-compose up -d graph_mysql` will run MySQL and HBase at same 
time.
+3. Run graph container
+       - `docker-compose up -d`
+
+> S2Graph should be connected with MySQL at initial state. Therefore you have 
to run MySQL and HBase before running it.
+
+## For OS X
+
+In OS X, the docker container is running on VirtualBox. In order to connect 
with HBase in the docker container from your local machine. You have to 
register the IP of the docker-machine into the `/etc/hosts` file.
+
+Within the `docker-compose.yml` file, I had supposed the name of 
docker-machine as `default`. So, in the `/etc/hosts` file, register the 
docker-machine name as `default`.
+
+```
+ex)
+192.168.99.100 default
+```
+
+# Run S2Graph on your local machine
+
+In order to develop and test S2Graph. You might be want to run S2Graph as 
`dev` mode on your local machine. In this case, the following commands are 
helpful.
+
+- Run only MySQL and HBase
+
+```
+# docker-compose up -d graph_mysql
+```
+
+- Run s2graph as 'dev' mode
+
+```
+# sbt "project s2rest_play" run -Dhost=default
+```
+
+- or run test cases
+
+```
+# sbt test -Dhost=default
+```

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/dev_support/docker-compose.yml
----------------------------------------------------------------------
diff --git a/dev_support/docker-compose.yml b/dev_support/docker-compose.yml
new file mode 100644
index 0000000..9818251
--- /dev/null
+++ b/dev_support/docker-compose.yml
@@ -0,0 +1,30 @@
+graph:
+    image: s2rest_play:0.12.1-SNAPSHOT
+    container_name: graph
+    net: container:graph_hbase
+    links:
+        - graph_mysql
+        - graph_hbase
+
+graph_mysql:
+    build: graph_mysql
+    container_name: graph_mysql
+    environment:
+        MYSQL_ROOT_PASSWORD: graph
+    net: container:graph_hbase
+
+graph_hbase:
+    image: nerdammer/hbase:0.98.10.1
+    container_name: graph_hbase
+    hostname: "${DOCKER_MACHINE_NAME}"
+    ports:
+        - "3306:3306"
+        - "2181:2181"
+        - "60010:60010"
+        - "60000:60000"
+        - "60020:60020"
+        - "60030:60030"
+        - "9000:9000"
+    expose:
+        - "3306"
+        - "9000"

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/dev_support/graph_mysql/Dockerfile
----------------------------------------------------------------------
diff --git a/dev_support/graph_mysql/Dockerfile 
b/dev_support/graph_mysql/Dockerfile
new file mode 100644
index 0000000..c6849f0
--- /dev/null
+++ b/dev_support/graph_mysql/Dockerfile
@@ -0,0 +1,5 @@
+FROM mysql
+
+MAINTAINER Jaesang Kim <[email protected]>
+
+ADD ./schema.sql /docker-entrypoint-initdb.d/

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/dev_support/graph_mysql/schema.sql
----------------------------------------------------------------------
diff --git a/dev_support/graph_mysql/schema.sql 
b/dev_support/graph_mysql/schema.sql
new file mode 100644
index 0000000..2216896
--- /dev/null
+++ b/dev_support/graph_mysql/schema.sql
@@ -0,0 +1,218 @@
+CREATE DATABASE IF NOT EXISTS graph_dev;
+
+CREATE USER 'graph'@'%' IDENTIFIED BY 'graph';
+
+GRANT ALL PRIVILEGES ON graph_dev.* TO 'graph'@'%' identified by 'graph';
+
+flush privileges;
+
+use graph_dev;
+
+
+SET FOREIGN_KEY_CHECKS = 0;
+
+-- ----------------------------
+--  Table structure for `services`
+-- ----------------------------
+DROP TABLE IF EXISTS `services`;
+CREATE TABLE `services` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `service_name` varchar(64) NOT NULL,
+  `access_token` varchar(64) NOT NULL,
+  `cluster` varchar(255) NOT NULL,
+  `hbase_table_name` varchar(255) NOT NULL,
+  `pre_split_size` integer NOT NULL default 0,
+  `hbase_table_ttl` integer,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_service_name` (`service_name`),
+  INDEX `idx_access_token` (`access_token`),
+  INDEX `idx_cluster` (cluster(75))
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+
+-- ----------------------------
+--  Table structure for `services_columns`
+-- ----------------------------
+DROP TABLE IF EXISTS `service_columns`;
+CREATE TABLE `service_columns` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `service_id` integer NOT NULL,
+  `column_name` varchar(64) NOT NULL,
+  `column_type` varchar(8) NOT NULL,
+  `schema_version` varchar(8) NOT NULL default 'v2',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+ALTER TABLE service_columns add FOREIGN KEY(service_id) REFERENCES 
services(id) ON DELETE CASCADE;
+
+
+-- ----------------------------
+--  Table structure for `column_metas`
+-- ----------------------------
+DROP TABLE IF EXISTS `column_metas`;
+CREATE TABLE `column_metas` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `column_id` integer NOT NULL,
+  `name` varchar(64) NOT NULL,
+  `seq` tinyint        NOT NULL,
+  `data_type` varchar(8) NOT NULL DEFAULT 'string',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_column_id_name` (`column_id`, `name`),
+  INDEX `idx_column_id_seq` (`column_id`, `seq`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+ALTER TABLE column_metas ADD FOREIGN KEY(column_id) REFERENCES 
service_columns(id) ON DELETE CASCADE;
+
+-- ----------------------------
+--  Table structure for `labels`
+-- ----------------------------
+
+DROP TABLE IF EXISTS `labels`;
+CREATE TABLE `labels` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `label` varchar(64) NOT NULL,
+  `src_service_id` integer NOT NULL,
+  `src_column_name` varchar(64) NOT NULL,
+  `src_column_type` varchar(8) NOT NULL,
+  `tgt_service_id` integer NOT NULL,
+  `tgt_column_name` varchar(64) NOT NULL,
+  `tgt_column_type` varchar(8) NOT NULL,
+  `is_directed` tinyint        NOT NULL DEFAULT 1,
+  `service_name` varchar(64),
+  `service_id` integer NOT NULL,
+  `consistency_level` varchar(8) NOT NULL DEFAULT 'weak',
+  `hbase_table_name` varchar(255) NOT NULL DEFAULT 's2graph',
+  `hbase_table_ttl` integer,
+  `schema_version` varchar(8) NOT NULL default 'v2',
+  `is_async` tinyint(4) NOT NULL default '0',
+  `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4',
+  `options` text,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_label` (`label`),
+  INDEX `idx_src_column_name` (`src_column_name`),
+  INDEX        `idx_tgt_column_name` (`tgt_column_name`),
+  INDEX `idx_src_service_id` (`src_service_id`),
+  INDEX `idx_tgt_service_id` (`tgt_service_id`),
+  INDEX `idx_service_name` (`service_name`), 
+  INDEX `idx_service_id` (`service_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+ALTER TABLE labels add FOREIGN KEY(service_id) REFERENCES services(id);
+
+
+
+-- ----------------------------
+--  Table structure for `label_metas`
+-- ----------------------------
+DROP TABLE IF EXISTS `label_metas`;
+CREATE TABLE `label_metas` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `label_id` integer NOT NULL,
+  `name` varchar(64) NOT NULL,
+  `seq` tinyint        NOT NULL,
+  `default_value` varchar(64) NOT NULL,
+  `data_type` varchar(8) NOT NULL DEFAULT 'long',
+  `used_in_index` tinyint      NOT NULL DEFAULT 0,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_label_id_name` (`label_id`, `name`),
+  INDEX `idx_label_id_seq` (`label_id`, `seq`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+ALTER TABLE label_metas ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON 
DELETE CASCADE;
+
+
+-- ----------------------------
+--  Table structure for `label_indices`
+-- ----------------------------
+DROP TABLE IF EXISTS `label_indices`;
+CREATE TABLE `label_indices` (
+  `id` int(11) NOT NULL AUTO_INCREMENT,
+  `label_id` int(11) NOT NULL,
+  `name` varchar(64) NOT NULL DEFAULT '_PK',
+  `seq` tinyint(4) NOT NULL,
+  `meta_seqs` varchar(64) NOT NULL,
+  `formulars` varchar(255) DEFAULT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`),
+  UNIQUE KEY `ux_label_id_name` (`label_id`,`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+ALTER TABLE label_indices ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON 
DELETE CASCADE;
+
+
+-- ----------------------------
+--  Table structure for `experiments`
+-- ----------------------------
+DROP TABLE IF EXISTS `experiments`;
+CREATE TABLE `experiments` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `service_id` integer NOT NULL,
+  `service_name` varchar(128) NOT NULL,
+  `name` varchar(64) NOT NULL,
+  `description` varchar(255) NOT NULL,
+  `experiment_type` varchar(8) NOT NULL DEFAULT 'u',
+  `total_modular` int NOT NULL DEFAULT 100,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_service_id_name` (`service_id`, `name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+-- ALTER TABLE experiments ADD FOREIGN KEY(service_id) REFERENCES service(id) 
ON DELETE CASCADE;
+
+
+-- ----------------------------
+--  Table structure for `buckets`
+-- ----------------------------
+CREATE TABLE `buckets` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `experiment_id` integer NOT NULL,
+  `modular` varchar(64) NOT NULL,
+  `http_verb` varchar(8) NOT NULL,
+  `api_path` text NOT NULL,
+  `uuid_key` varchar(128),
+  `uuid_placeholder` varchar(64),
+  `request_body` text NOT NULL,
+  `timeout` int NOT NULL DEFAULT 1000,
+  `impression_id` varchar(64) NOT NULL,
+  `is_graph_query` tinyint NOT NULL DEFAULT 1,
+  `is_empty` tinyint NOT NULL DEFAULT 0,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_impression_id` (`impression_id`),
+  INDEX `idx_experiment_id` (`experiment_id`),
+  INDEX `idx_impression_id` (`impression_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+SET FOREIGN_KEY_CHECKS = 1;
+
+
+-- ----------------------------
+--  Table structure for `counter`
+-- ----------------------------
+DROP TABLE IF EXISTS `counter`;
+CREATE TABLE `counter` (
+  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
+  `use_flag` tinyint(1) NOT NULL DEFAULT '0',
+  `version` smallint(1) NOT NULL DEFAULT '1',
+  `service` varchar(64) NOT NULL DEFAULT '',
+  `action` varchar(64) NOT NULL DEFAULT '',
+  `item_type` int(11) NOT NULL DEFAULT '0',
+  `auto_comb` tinyint(1) NOT NULL DEFAULT '1',
+  `dimension` varchar(1024) NOT NULL,
+  `use_profile` tinyint(1) NOT NULL DEFAULT '0',
+  `bucket_imp_id` varchar(64) DEFAULT NULL,
+  `use_exact` tinyint(1) NOT NULL DEFAULT '1',
+  `use_rank` tinyint(1) NOT NULL DEFAULT '1',
+  `ttl` int(11) NOT NULL DEFAULT '172800',
+  `daily_ttl` int(11) DEFAULT NULL,
+  `hbase_table` varchar(1024) DEFAULT NULL,
+  `interval_unit` varchar(1024) DEFAULT NULL,
+  `rate_action_id` int(11) unsigned DEFAULT NULL,
+  `rate_base_id` int(11) unsigned DEFAULT NULL,
+  `rate_threshold` int(11) DEFAULT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `svc` (`service`,`action`),
+  KEY `rate_action_id` (`rate_action_id`),
+  KEY `rate_base_id` (`rate_base_id`),
+  CONSTRAINT `rate_action_id` FOREIGN KEY (`rate_action_id`) REFERENCES 
`counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION,
+  CONSTRAINT `rate_base_id` FOREIGN KEY (`rate_base_id`) REFERENCES `counter` 
(`id`) ON DELETE NO ACTION ON UPDATE NO ACTION
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/docker-compose.yml
----------------------------------------------------------------------
diff --git a/docker-compose.yml b/docker-compose.yml
deleted file mode 100644
index 3149a89..0000000
--- a/docker-compose.yml
+++ /dev/null
@@ -1,30 +0,0 @@
-graph:
-    image: s2graph:0.12.0-SNAPSHOT
-    container_name: graph
-    net: container:graph_hbase
-    links:
-        - graph_mysql
-        - graph_hbase
-
-graph_mysql:
-    image: devsupport_graph_mysql
-    container_name: graph_mysql
-    environment:
-        MYSQL_ROOT_PASSWORD: graph
-    net: container:graph_hbase
-
-graph_hbase:
-    image: nerdammer/hbase:0.98.10.1
-    container_name: graph_hbase
-    hostname: default
-    ports:
-        - "3306:3306"
-        - "2181:2181"
-        - "60010:60010"
-        - "60000:60000"
-        - "60020:60020"
-        - "60030:60030"
-        - "9000:9000"
-    expose:
-        - "3306:3306"
-        - "9000"

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/build.sbt
----------------------------------------------------------------------
diff --git a/loader/build.sbt b/loader/build.sbt
index b497a9c..43e1126 100644
--- a/loader/build.sbt
+++ b/loader/build.sbt
@@ -4,9 +4,14 @@ name := "s2loader"
 
 scalacOptions ++= Seq("-deprecation")
 
+projectDependencies := Seq(
+  (projectID in "s2core").value exclude("org.mortbay.jetty", "*") 
exclude("javax.xml.stream", "*") exclude("javax.servlet", "*")
+)
+
 libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-core" % Common.sparkVersion % "provided",
   "org.apache.spark" %% "spark-streaming" % Common.sparkVersion % "provided",
+  "org.apache.spark" %% "spark-hive" % Common.sparkVersion % "provided",
   "org.apache.spark" %% "spark-streaming-kafka" % Common.sparkVersion,
   "org.apache.httpcomponents" % "fluent-hc" % "4.2.5",
   "org.specs2" %% "specs2-core" % "2.4.11" % "test",
@@ -30,3 +35,5 @@ excludedJars in assembly := {
 }
 
 test in assembly := {}
+
+parallelExecution in Test := false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/main/scala/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/GraphSubscriber.scala 
b/loader/src/main/scala/subscriber/GraphSubscriber.scala
index 88de3d7..f4f7865 100644
--- a/loader/src/main/scala/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/subscriber/GraphSubscriber.scala
@@ -22,7 +22,7 @@ object GraphConfig {
   var kafkaBrokers = ""
   var cacheTTL = s"${60 * 60 * 1}"
   def apply(phase: String, dbUrl: Option[String], zkAddr: Option[String], 
kafkaBrokerList: Option[String]): Config = {
-    database = dbUrl.getOrElse("jdbc:mysql://localhost:3306/graph")
+    database = dbUrl.getOrElse("jdbc:mysql://localhost:3306/graph_dev")
     zkQuorum = zkAddr.getOrElse("localhost")
 
 //    val newConf = new util.HashMap[String, Object]()
@@ -50,6 +50,7 @@ object GraphSubscriberHelper extends WithKafka {
   private val maxTryNum = 10
 
   var g: Graph = null
+  var management: Management = null
   val conns = new scala.collection.mutable.HashMap[String, Connection]()
 
   def toOption(s: String) = {
@@ -63,7 +64,9 @@ object GraphSubscriberHelper extends WithKafka {
     config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), 
toOption(kafkaBrokerList))
 
     if (g == null) {
-      g = new Graph(config)(ExecutionContext.Implicits.global)
+      val ec = ExecutionContext.Implicits.global
+      g = new Graph(config)(ec)
+      management = new Management(g)(ec)
     }
   }
 
@@ -95,6 +98,8 @@ object GraphSubscriberHelper extends WithKafka {
         case Some(v) if v.isInstanceOf[Vertex] =>
           statFunc("VertexParseOk", 1)
           v.asInstanceOf[Vertex]
+        case Some(x) =>
+          throw new RuntimeException(s">>>>> GraphSubscriber.toGraphElements: 
parsing failed. ${x.serviceName}")
         case None =>
           throw new RuntimeException(s"GraphSubscriber.toGraphElements: 
parsing failed. $msg")
       }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/main/scala/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/TransferToHFile.scala 
b/loader/src/main/scala/subscriber/TransferToHFile.scala
index aba9673..516bb39 100644
--- a/loader/src/main/scala/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/subscriber/TransferToHFile.scala
@@ -2,8 +2,8 @@ package subscriber
 
 
 import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.Label
-import com.kakao.s2graph.core.types.{LabelWithDirection, SourceVertexId}
+import com.kakao.s2graph.core.mysqls.{LabelMeta, Label}
+import com.kakao.s2graph.core.types.{InnerValLikeWithTs, LabelWithDirection, 
SourceVertexId}
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
@@ -55,10 +55,7 @@ object TransferToHFile extends SparkApp with JSONParser {
       direction = if (tempDirection != "out" && tempDirection != "in") "out" 
else tempDirection
       reverseDirection = if (direction == "out") "in" else "out"
       convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5))
-      (vertexIdStr, vertexIdStrReversed) = direction match {
-        case "out" => (tokens(3), tokens(4))
-        case _ => (tokens(4), tokens(3))
-      }
+      (vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4))
       degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction)
       degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, 
reverseDirection)
       extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil
@@ -80,8 +77,12 @@ object TransferToHFile extends SparkApp with JSONParser {
       throw new RuntimeException(s"$vertexId can not be converted into 
innerval")
     }
     val vertex = Vertex(SourceVertexId(label.srcColumn.id.get, innerVal))
+
+    val ts = System.currentTimeMillis()
+    val propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
     val labelWithDir = LabelWithDirection(label.id.get, dir)
-    val edge = Edge(vertex, vertex, labelWithDir)
+    val edge = Edge(vertex, vertex, labelWithDir, propsWithTs=propsWithTs)
+
     edge.edgesWithIndex.flatMap { indexEdge =>
       
GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.map 
{ kv =>
         new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], 
Bytes.toBytes(degreeVal), kv.timestamp)
@@ -129,14 +130,14 @@ object TransferToHFile extends SparkApp with JSONParser {
     val labelMapping = if (args.length >= 7) 
GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String]
     val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false
     val buildDegree = if (args.length >= 9) args(8).toBoolean else true
-
+    val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4"
     val conf = sparkConf(s"$input: TransferToHFile")
     conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     conf.set("spark.kryoserializer.buffer.mb", "24")
 
     val sc = new SparkContext(conf)
 
-
+    Management.createTable(zkQuorum, tableName, List("e", "v"), 
maxHFilePerResionServer, None, compressionAlgorithm)
 
     /** set up hbase init */
     val hbaseConf = HBaseConfiguration.create()

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/main/scala/subscriber/WalLogStat.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/WalLogStat.scala 
b/loader/src/main/scala/subscriber/WalLogStat.scala
new file mode 100644
index 0000000..f5db2c1
--- /dev/null
+++ b/loader/src/main/scala/subscriber/WalLogStat.scala
@@ -0,0 +1,94 @@
+package subscriber
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import com.kakao.s2graph.core.Graph
+import kafka.producer.KeyedMessage
+import kafka.serializer.StringDecoder
+import org.apache.spark.streaming.Durations._
+import org.apache.spark.streaming.kafka.HasOffsetRanges
+import s2.spark.{HashMapParam, SparkApp, WithKafka}
+
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.language.postfixOps
+
+object WalLogStat extends SparkApp with WithKafka {
+
+  override def run() = {
+
+    validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", 
"dbUrl", "statTopic")
+
+    val kafkaZkQuorum = args(0)
+    val brokerList = args(1)
+    val topics = args(2)
+    val intervalInSec = seconds(args(3).toLong)
+    val dbUrl = args(4)
+    val statTopic = args(5)
+
+
+    val conf = sparkConf(s"$topics: ${getClass.getSimpleName}")
+    val ssc = streamingContext(conf, intervalInSec)
+    val sc = ssc.sparkContext
+
+    val groupId = topics.replaceAll(",", "_") + "_stat"
+
+    val kafkaParams = Map(
+      "zookeeper.connect" -> kafkaZkQuorum,
+      "group.id" -> groupId,
+      "metadata.broker.list" -> brokerList,
+      "zookeeper.connection.timeout.ms" -> "10000",
+      "auto.offset.reset" -> "largest")
+
+    val stream = getStreamHelper(kafkaParams).createStream[String, String, 
StringDecoder, StringDecoder](ssc, topics.split(",").toSet)
+    val statProducer = getProducer[String, String](brokerList)
+
+    stream.foreachRDD { (rdd, time) =>
+
+      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+
+      val ts = time.milliseconds
+
+      val elements = rdd.mapPartitions { partition =>
+        // set executor setting.
+        val phase = System.getProperty("phase")
+        GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
+        partition.map { case (key, msg) =>
+          Graph.toGraphElement(msg) match {
+            case Some(elem) =>
+              val serviceName = elem.serviceName
+              msg.split("\t", 7) match {
+                case Array(_, operation, log_type, _, _, label, _*) =>
+                  Seq(serviceName, label, operation, log_type).mkString("\t")
+                case _ =>
+                  Seq("no_service_name", "no_label", "no_operation", 
"parsing_error").mkString("\t")
+              }
+            case None =>
+              Seq("no_service_name", "no_label", "no_operation", 
"no_element_error").mkString("\t")
+          }
+        }
+      }
+
+      val countByKey = elements.map(_ -> 1L).reduceByKey(_ + _).collect()
+      val totalCount = countByKey.map(_._2).sum
+      val keyedMessage = countByKey.map { case (key, value) =>
+        new KeyedMessage[String, String](statTopic, 
s"$ts\t$key\t$value\t$totalCount")
+      }
+
+      statProducer.send(keyedMessage: _*)
+
+      elements.mapPartitionsWithIndex { (i, part) =>
+        // commit offset range
+        val osr = offsets(i)
+        getStreamHelper(kafkaParams).commitConsumerOffset(osr)
+        Iterator.empty
+      }.foreach {
+        (_: Nothing) => ()
+      }
+
+    }
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/main/scala/subscriber/WalLogToHDFS.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/subscriber/WalLogToHDFS.scala 
b/loader/src/main/scala/subscriber/WalLogToHDFS.scala
index 2c37aef..44902c2 100644
--- a/loader/src/main/scala/subscriber/WalLogToHDFS.scala
+++ b/loader/src/main/scala/subscriber/WalLogToHDFS.scala
@@ -5,6 +5,7 @@ import java.util.Date
 
 import com.kakao.s2graph.core.Graph
 import kafka.serializer.StringDecoder
+import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.streaming.Durations._
 import org.apache.spark.streaming.kafka.HasOffsetRanges
 import s2.spark.{HashMapParam, SparkApp, WithKafka}
@@ -12,103 +13,137 @@ import s2.spark.{HashMapParam, SparkApp, WithKafka}
 import scala.collection.mutable.{HashMap => MutableHashMap}
 import scala.language.postfixOps
 
-//object WalLogToHDFS extends SparkApp with WithKafka {
-//  private def toOutputPath(ts: Long): String = {
-//    val dateId = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts))
-//    s"date_id=$dateId/ts=$ts"
-//  }
-//
-//  val usages =
-//    s"""
-//       |/**
-//       |this job consume edges/vertices from kafka topic then load them into 
s2graph.
-//       |params:
-//       |  1. kafkaZkQuorum: kafka zk address to consume events
-//       |  2. brokerList: kafka cluster`s broker list.
-//       |  3. topics: , delimited list of topics to consume
-//       |  4. intervalInSec: batch interval for this job.
-//       |  5. dbUrl:
-//       |  6. outputPath:
-//       |*/
-//   """.stripMargin
-//  override def run() = {
-//    validateArgument("kafkaZkQuorum", "brokerList", "topics", 
"intervalInSec", "dbUrl", "outputPath")
-////    if (args.length != 7) {
-////      System.err.println(usages)
-////      System.exit(1)
-////    }
-//    val kafkaZkQuorum = args(0)
-//    val brokerList = args(1)
-//    val topics = args(2)
-//    val intervalInSec = seconds(args(3).toLong)
-//    val dbUrl = args(4)
-//    val outputPath = args(5)
-//
-//    val conf = sparkConf(s"$topics: WalLogToHDFS")
-//    val ssc = streamingContext(conf, intervalInSec)
-//    val sc = ssc.sparkContext
-//
-//    val groupId = topics.replaceAll(",", "_") + "_stream"
-//    val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed"
-//
-//    val kafkaParams = Map(
-//      "zookeeper.connect" -> kafkaZkQuorum,
-//      "group.id" -> groupId,
-//      "metadata.broker.list" -> brokerList,
-//      "zookeeper.connection.timeout.ms" -> "10000",
-//      "auto.offset.reset" -> "largest")
-//
-//    val stream = getStreamHelper(kafkaParams).createStream[String, String, 
StringDecoder, StringDecoder](ssc, topics.split(",").toSet)
-//
-//    val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), 
"Throughput")(HashMapParam[String, Long](_ + _))
-//
-//    stream.foreachRDD { (rdd, time) =>
-//      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-//
-//      val elements = rdd.mapPartitions { partition =>
-//        // set executor setting.
-//        val phase = System.getProperty("phase")
-//        GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
-//
-//        partition.flatMap { case (key, msg) =>
-//          val optMsg = Graph.toGraphElement(msg).map { element =>
-//            val n = msg.split("\t", -1).length
-//            if(n == 6) {
-//              Seq(msg, "{}", element.serviceName).mkString("\t")
-//            }
-//            else if(n == 7) {
-//              Seq(msg, element.serviceName).mkString("\t")
-//            }
-//            else {
-//              null
-//            }
-//          }
-//          optMsg
-//        }
-//      }
-//
-//      val ts = time.milliseconds
-//      val path = s"$outputPath/${toOutputPath(ts)}"
-//
-//      /** make sure that `elements` are not running at the same time */
-//      val elementsWritten = {
-//        elements.saveAsTextFile(path)
-//        elements
-//      }
-//
-//      elementsWritten.mapPartitionsWithIndex { (i, part) =>
-//        // commit offset range
-//        val osr = offsets(i)
-//        getStreamHelper(kafkaParams).commitConsumerOffset(osr)
-//        Iterator.empty
-//      }.foreach {
-//        (_: Nothing) => ()
-//      }
-//    }
-//
-//    logInfo(s"counter: ${mapAcc.value}")
-//    println(s"counter: ${mapAcc.value}")
-//    ssc.start()
-//    ssc.awaitTermination()
-//  }
-//}
+object WalLogToHDFS extends SparkApp with WithKafka {
+
+  override def run() = {
+
+    validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", 
"dbUrl", "outputPath", "hiveDatabase", "hiveTable", "splitListPath")
+
+    val kafkaZkQuorum = args(0)
+    val brokerList = args(1)
+    val topics = args(2)
+    val intervalInSec = seconds(args(3).toLong)
+    val dbUrl = args(4)
+    val outputPath = args(5)
+    val hiveDatabase = args(6)
+    val hiveTable = args(7)
+    val splitListPath = args(8)
+
+    val conf = sparkConf(s"$topics: WalLogToHDFS")
+    val ssc = streamingContext(conf, intervalInSec)
+    val sc = ssc.sparkContext
+
+    val groupId = topics.replaceAll(",", "_") + "_stream"
+    val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed"
+
+    val kafkaParams = Map(
+      "zookeeper.connect" -> kafkaZkQuorum,
+      "group.id" -> groupId,
+      "metadata.broker.list" -> brokerList,
+      "zookeeper.connection.timeout.ms" -> "10000",
+      "auto.offset.reset" -> "largest")
+
+    val stream = getStreamHelper(kafkaParams).createStream[String, String, 
StringDecoder, StringDecoder](ssc, topics.split(",").toSet)
+
+    val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), 
"Throughput")(HashMapParam[String, Long](_ + _))
+
+    val hdfsBlockSize = 134217728 // 128M
+    val hiveContext = new HiveContext(sc)
+    var splits = Array[String]()
+    var excludeLabels = Set[String]()
+    var excludeServices = Set[String]()
+    stream.foreachRDD { (rdd, time) =>
+      try {
+        val read = 
sc.textFile(splitListPath).collect().map(_.split("=")).flatMap {
+          case Array(value) => Some(("split", value))
+          case Array(key, value) => Some((key, value))
+          case _ => None
+        }
+        splits = read.filter(_._1 == "split").map(_._2)
+        excludeLabels = read.filter(_._1 == "exclude_label").map(_._2).toSet
+        excludeServices = read.filter(_._1 == 
"exclude_service").map(_._2).toSet
+      } catch {
+        case _: Throwable => // use previous information
+      }
+
+      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+
+      val elements = rdd.mapPartitions { partition =>
+        // set executor setting.
+        val phase = System.getProperty("phase")
+        GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
+
+        partition.flatMap { case (key, msg) =>
+          val optMsg = Graph.toGraphElement(msg).flatMap { element =>
+            val arr = msg.split("\t", 7)
+            val service = element.serviceName
+            val label = arr(5)
+            val n = arr.length
+
+            if (excludeServices.contains(service) || 
excludeLabels.contains(label)) {
+              None
+            } else if(n == 6) {
+              Some(Seq(msg, "{}", service).mkString("\t"))
+            }
+            else if(n == 7) {
+              Some(Seq(msg, service).mkString("\t"))
+            }
+            else {
+              None
+            }
+          }
+          optMsg
+        }
+      }
+
+      val ts = time.milliseconds
+      val dateId = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts))
+
+      /** make sure that `elements` are not running at the same time */
+      val elementsWritten = {
+        elements.cache()
+        (Array("all") ++ splits).foreach {
+          case split if split == "all" =>
+            val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
+            elements.saveAsTextFile(path)
+          case split =>
+            val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
+            val strlen = split.length
+            val splitData = elements.filter(_.takeRight(strlen) == 
split).cache()
+            val totalSize = splitData
+                .mapPartitions { iterator =>
+                  val s = iterator.map(_.length.toLong).sum
+                  Iterator.single(s)
+                }
+                .sum
+                .toLong
+            val numPartitions = math.max(1, (totalSize / 
hdfsBlockSize.toDouble).toInt)
+            splitData.coalesce(math.min(splitData.partitions.length, 
numPartitions)).saveAsTextFile(path)
+            splitData.unpersist()
+        }
+        elements.unpersist()
+        elements
+      }
+
+      elementsWritten.mapPartitionsWithIndex { (i, part) =>
+        // commit offset range
+        val osr = offsets(i)
+        getStreamHelper(kafkaParams).commitConsumerOffset(osr)
+        Iterator.empty
+      }.foreach {
+        (_: Nothing) => ()
+      }
+
+      (Array("all") ++ splits).foreach { split =>
+        val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts"
+        hiveContext.sql(s"use $hiveDatabase")
+        hiveContext.sql(s"alter table $hiveTable add partition 
(split='$split', date_id='$dateId', ts='$ts') location '$path'")
+      }
+    }
+
+    logInfo(s"counter: ${mapAcc.value}")
+    println(s"counter: ${mapAcc.value}")
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
----------------------------------------------------------------------
diff --git a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala 
b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
index a70c0f6..f927509 100644
--- a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
+++ b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala
@@ -1,6 +1,6 @@
 package subscriber
 
-import com.kakao.s2graph.core.{Label, Service, Management}
+import com.kakao.s2graph.core.Management
 import org.scalatest.{ FunSuite, Matchers }
 import play.api.libs.json.{JsBoolean, JsNumber}
 import s2.spark.WithKafka
@@ -28,7 +28,7 @@ class GraphSubscriberTest extends FunSuite with Matchers with 
WithKafka {
   test("GraphSubscriberHelper.store") {
     // actually we need to delete labelToReplace first for each test.
     val labelMapping = Map(testLabelName -> labelToReplace)
-    Management.copyLabel(testLabelName, labelToReplace, Some(hTableName))
+    GraphSubscriberHelper.management.copyLabel(testLabelName, labelToReplace, 
Some(hTableName))
 
 //
 //    val msgs = (for {
@@ -39,7 +39,7 @@ class GraphSubscriberTest extends FunSuite with Matchers with 
WithKafka {
 //      }).toSeq
     val msgs = testStrings
 
-    val stat = GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, 
labelMapping = labelMapping, autoCreateEdge = false)(None)
-    println(stat)
+//    val stat = GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, 
labelMapping = labelMapping, autoCreateEdge = false)(None)
+//    println(stat)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/test/scala/subscriber/TransferToHFileTest.scala
----------------------------------------------------------------------
diff --git a/loader/src/test/scala/subscriber/TransferToHFileTest.scala 
b/loader/src/test/scala/subscriber/TransferToHFileTest.scala
new file mode 100644
index 0000000..7b3f72f
--- /dev/null
+++ b/loader/src/test/scala/subscriber/TransferToHFileTest.scala
@@ -0,0 +1,169 @@
+package subscriber
+
+import com.kakao.s2graph.core.Management
+import com.kakao.s2graph.core.types.HBaseType
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest._
+import subscriber.TransferToHFile._
+
+/**
+  * Created by Eric on 2015. 12. 2..
+  */
+class TransferToHFileTest  extends FlatSpec with BeforeAndAfterAll with 
Matchers {
+
+  private val master = "local[2]"
+  private val appName = "example-spark"
+
+  private var sc: SparkContext = _
+
+  val dataWithoutDir =
+    """
+      |1447686000000   insertBulk      e       a       b       friends_rel     
{}
+      |1447686000000   insertBulk      e       a       c       friends_rel     
{}
+      |1447686000000   insertBulk      e       a       d       friends_rel     
{}
+      |1447686000000   insertBulk      e       b       d       friends_rel     
{}
+      |1447686000000   insertBulk      e       b       e       friends_rel     
{}
+    """.stripMargin.trim
+
+  val dataWithDir =
+    """
+     |1447686000000    insertBulk      e       a       b       friends_rel     
{}      out
+     |1447686000000    insertBulk      e       b       a       friends_rel     
{}      in
+     |1447686000000    insertBulk      e       a       c       friends_rel     
{}      out
+     |1447686000000    insertBulk      e       c       a       friends_rel     
{}      in
+     |1447686000000    insertBulk      e       a       d       friends_rel     
{}      out
+     |1447686000000    insertBulk      e       d       a       friends_rel     
{}      in
+     |1447686000000    insertBulk      e       b       d       friends_rel     
{}      out
+     |1447686000000    insertBulk      e       d       b       friends_rel     
{}      in
+     |1447686000000    insertBulk      e       b       e       friends_rel     
{}      out
+     |1447686000000    insertBulk      e       e       b       friends_rel     
{}      in
+    """.stripMargin.trim
+
+  override def beforeAll(): Unit = {
+    println("### beforeAll")
+
+    GraphSubscriberHelper.apply("dev", "none", "none", "none")
+    // 1. create service
+    if(Management.findService("loader-test").isEmpty) {
+      println(">>> create service...")
+      Management.createService("loader-test", "localhost", "loader-test-dev", 
1, None, "gz")
+    }
+
+    // 2. create label
+    if(Management.findLabel("friends_rel").isEmpty) {
+      println(">>> create label...")
+      Management.createLabel(
+        "friends_rel",
+        "loader-test", "user_id", "string",
+        "loader-test", "user_id", "string",
+        true,
+        "loader-test",
+        Seq(),
+        Seq(),
+        "weak",
+        None, None,
+        HBaseType.DEFAULT_VERSION,
+        false,
+        Management.defaultCompressionAlgorithm
+      )
+    }
+
+    // create spark context
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName(appName)
+
+    sc = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+    println("### afterALL")
+    if (sc != null) {
+      sc.stop()
+    }
+
+    Management.deleteLabel("friends_rel")
+  }
+
+  "buildDegreePutRequest" should "transform degree to PutRequest" in {
+    val putReqs = buildDegreePutRequests("a", "friends_rel", "out", 3L)
+    putReqs.size should equal(1)
+  }
+
+  "toKeyValues" should "transform edges to KeyValues on edge format data 
without direction" in {
+    val rdd = sc.parallelize(dataWithoutDir.split("\n"))
+
+    val kvs = rdd.mapPartitions { iter =>
+      GraphSubscriberHelper.apply("dev", "none", "none", "none")
+      TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false)
+    }
+    kvs.foreach(println)
+    // edges * 2 (snapshot edges + indexed edges)
+    kvs.count() should equal(10)
+
+
+    val kvsAutoCreated = rdd.mapPartitions { iter =>
+      GraphSubscriberHelper.apply("dev", "none", "none", "none")
+      TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], true)
+    }
+
+    // edges * 3 (snapshot edges + indexed edges + reverse edges)
+    kvsAutoCreated.count() should equal(15)
+  }
+
+  "toKeyValues" should "transform edges to KeyValues on edge format data with 
direction" in {
+    val rdd = sc.parallelize(dataWithDir.split("\n"))
+
+    val kvs = rdd.mapPartitions { iter =>
+      GraphSubscriberHelper.apply("dev", "none", "none", "none")
+      TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false)
+    }
+
+    // edges * 2 (snapshot edges + indexed edges)
+    kvs.count() should equal(20)
+  }
+
+  "buildDegrees" should "build degrees on edge format data without direction" 
in {
+    val rdd = sc.parallelize(dataWithoutDir.split("\n"))
+
+    // autoCreate = false
+    val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], 
false).reduceByKey { (agg, current) =>
+      agg + current
+    }.collectAsMap()
+    degrees.size should equal(2)
+
+    degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L)
+    degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L)
+
+
+    // autoCreate = true
+    val degreesAutoCreated = TransferToHFile.buildDegrees(rdd, 
Map.empty[String, String], true).reduceByKey { (agg, current) =>
+      agg + current
+    }.collectAsMap()
+    degreesAutoCreated.size should equal(6)
+
+    degreesAutoCreated should contain(DegreeKey("a", "friends_rel", "out") -> 
3L)
+    degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "out") -> 
2L)
+    degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "in") -> 
1L)
+    degreesAutoCreated should contain(DegreeKey("c", "friends_rel", "in") -> 
1L)
+    degreesAutoCreated should contain(DegreeKey("d", "friends_rel", "in") -> 
2L)
+    degreesAutoCreated should contain(DegreeKey("e", "friends_rel", "in") -> 
1L)
+  }
+
+  "buildDegrees" should "build degrees on edge format data with direction" in {
+    val rdd = sc.parallelize(dataWithDir.split("\n"))
+
+    val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], 
false).reduceByKey { (agg, current) =>
+      agg + current
+    }.collectAsMap()
+
+    degrees.size should equal(6)
+
+    degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L)
+    degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L)
+    degrees should contain(DegreeKey("b", "friends_rel", "in") -> 1L)
+    degrees should contain(DegreeKey("c", "friends_rel", "in") -> 1L)
+    degrees should contain(DegreeKey("d", "friends_rel", "in") -> 2L)
+    degrees should contain(DegreeKey("e", "friends_rel", "in") -> 1L)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/project/plugins.sbt
----------------------------------------------------------------------
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 4c90a1f..7f33c0e 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,4 +1,5 @@
-// Use the Play sbt plugin for Play projects
+// use the Play sbt plugin for Play projects
+
 addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.3.10")
 
 // http://www.scalastyle.org/sbt.html
@@ -8,3 +9,5 @@ addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % 
"0.7.0")
 addSbtPlugin("io.spray" % "sbt-revolver" % "0.8.0")
 
 addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.3")
+
+addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.0")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 7a3c2d5..ca40ca7 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -30,4 +30,3 @@ libraryDependencies := {
       libraryDependencies.value
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar
----------------------------------------------------------------------
diff --git a/s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar 
b/s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar
index 6dcc75d..801d87d 100644
Binary files a/s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar and 
b/s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar differ

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/migrate/mysql/schema.sql
----------------------------------------------------------------------
diff --git a/s2core/migrate/mysql/schema.sql b/s2core/migrate/mysql/schema.sql
index 06c029b..4911348 100644
--- a/s2core/migrate/mysql/schema.sql
+++ b/s2core/migrate/mysql/schema.sql
@@ -87,6 +87,7 @@ CREATE TABLE `labels` (
   `schema_version` varchar(8) NOT NULL default 'v2',
   `is_async` tinyint(4) NOT NULL default '0',
   `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4',
+  `options` text,
   PRIMARY KEY (`id`),
   UNIQUE KEY `ux_label` (`label`),
   INDEX `idx_src_column_name` (`src_column_name`),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/resources/create_model.hql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/create_model.hql 
b/s2core/src/main/resources/create_model.hql
index df8e4dc..18c870b 100644
--- a/s2core/src/main/resources/create_model.hql
+++ b/s2core/src/main/resources/create_model.hql
@@ -1,2 +1 @@
-create 'models-dev', {NAME => 'm'}
 create 's2graph-dev', {NAME => 'e'}, {NAME => 'v'}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/logback.xml 
b/s2core/src/main/resources/logback.xml
index 543365c..18aa2bd 100644
--- a/s2core/src/main/resources/logback.xml
+++ b/s2core/src/main/resources/logback.xml
@@ -9,12 +9,17 @@
         </encoder>
     </appender>
 
-    <root level="ERROR">
+    <!--<root level="INFO">-->
+        <!--<appender-ref ref="STDOUT"/>-->
+    <!--</root>-->
+
+    <logger name="application" level="DEBUG">
         <appender-ref ref="STDOUT"/>
-    </root>
+    </logger>
 
-    <logger name="application" level="DEBUG"></logger>
+    <logger name="error" level="DEBUG">
+        <appender-ref ref="STDOUT"/>
+    </logger>
 
-    <logger name="error" level="DEBUG"></logger>
 </configuration>
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/reference.conf 
b/s2core/src/main/resources/reference.conf
index cef0284..c93d204 100644
--- a/s2core/src/main/resources/reference.conf
+++ b/s2core/src/main/resources/reference.conf
@@ -1,6 +1,7 @@
 # APP PHASE
-phase=dev
-host=localhost
+phase = dev
+
+host = localhost
 
 # Hbase
 hbase.table.compression.algorithm="gz"
@@ -25,11 +26,12 @@ cache.ttl.seconds=60
 cache.max.size=100000
 
 # DB
-s2graph.models.table.name="models-dev"
-db.default.driver="com.mysql.jdbc.Driver"
-db.default.url="jdbc:mysql://"${host}":3306/graph_dev"
-db.default.user="graph"
-db.default.password="graph"
+s2graph.models.table.name = "models-dev"
+db.default.driver = "com.mysql.jdbc.Driver"
+db.default.url = "jdbc:mysql://"${host}":3306/graph_dev"
+db.default.user = "graph"
+db.default.password = "graph"
+
 
 akka {
   loggers = ["akka.event.slf4j.Slf4jLogger"]

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala
index 10443c3..8e6ad7d 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala
@@ -57,6 +57,7 @@ case class IndexEdge(srcVertex: Vertex,
   //  assert(props.containsKey(LabelMeta.timeStampSeq))
 
   val ts = props(LabelMeta.timeStampSeq).toString.toLong
+  val degreeEdge = props.contains(LabelMeta.degreeSeq)
   lazy val label = Label.findById(labelWithDir.labelId)
   val schemaVer = label.schemaVersion
   lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, 
labelIndexSeq).get

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
index 76d3151..fdc8553 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
@@ -42,7 +42,8 @@ object Graph {
     "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
     "future.cache.max.size" -> java.lang.Integer.valueOf(100000),
     "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
-    "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000)
+    "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
+    "s2graph.storage.backend" -> "hbase"
   )
 
   var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
@@ -99,9 +100,6 @@ object Graph {
               case _ => innerVal.toString().toLong
             }
           } getOrElse(edge.ts)
-          //          val innerVal = 
edge.propsWithTs(timeDecay.labelMetaSeq).innerVal
-          //
-          //          
edge.propsWithTs.get(timeDecay.labelMetaSeq).map(_.toString.toLong).getOrElse(edge.ts)
         } catch {
           case e: Exception =>
             logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
@@ -332,19 +330,24 @@ object Graph {
       logger.error(s"toVertex: $e", e)
       throw e
   } get
+
+  def initStorage(config: Config)(ec: ExecutionContext) = {
+    config.getString("s2graph.storage.backend") match {
+      case "hbase" => new AsynchbaseStorage(config)(ec)
+      case _ => throw new RuntimeException("not supported storage.")
+    }
+  }
 }
 
-class Graph(_config: Config)(implicit ec: ExecutionContext) {
+class Graph(_config: Config)(implicit val ec: ExecutionContext) {
   val config = _config.withFallback(Graph.DefaultConfig)
-  val cacheSize = config.getInt("cache.max.size")
-//  val cache = 
CacheBuilder.newBuilder().maximumSize(cacheSize).build[java.lang.Integer, 
Seq[QueryResult]]()
-  val vertexCache = 
CacheBuilder.newBuilder().maximumSize(cacheSize).build[java.lang.Integer, 
Option[Vertex]]()
 
   Model.apply(config)
   Model.loadCache()
 
   // TODO: Make storage client by config param
-  val storage: Storage = new AsynchbaseStorage(config, vertexCache)(ec)
+  val storage = Graph.initStorage(config)(ec)
+
 
   for {
     entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
index 32f7a5a..ccf9d1f 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
@@ -4,7 +4,6 @@ package com.kakao.s2graph.core
 import com.kakao.s2graph.core.GraphExceptions.{InvalidHTableException, 
LabelAlreadyExistException, LabelNotExistException}
 import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop}
 import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.storage.Storage
 import com.kakao.s2graph.core.types.HBaseType._
 import com.kakao.s2graph.core.types._
 import play.api.libs.json.Reads._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
index 42b0146..f301d68 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
@@ -165,6 +165,37 @@ object PostProcess extends JSONParser {
     }
   }
 
+  private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): 
JsValue = {
+    def traverse(js: JsValue): JsValue = js match {
+      case JsNull => mapper(JsNull)
+      case JsUndefined() => mapper(JsUndefined(""))
+      case JsNumber(v) => mapper(js)
+      case JsString(v) => mapper(js)
+      case JsBoolean(v) => mapper(js)
+      case JsArray(elements) => JsArray(elements.map { t => 
traverse(mapper(t)) })
+      case JsObject(values) => JsObject(values.map { case (k, v) => k -> 
traverse(mapper(v)) })
+    }
+
+    traverse(jsValue)
+  }
+
+  /** test query with filterOut is not working since it can not diffrentate 
filterOut */
+  private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): 
JsValue = {
+    val cursors = _cursors.flatten.iterator
+
+    buildReplaceJson(jsonQuery) {
+      case js@JsObject(fields) =>
+        val isStep = fields.find { case (k, _) => k == "label" } // find label 
group
+        if (isStep.isEmpty) js
+        else {
+          // TODO: Order not ensured
+          val withCursor = js.fieldSet | Set("cursor" -> 
JsString(cursors.next))
+          JsObject(withCursor.toSeq)
+        }
+      case js => js
+    }
+  }
+
   private def buildRawEdges(queryOption: QueryOption,
                             queryRequestWithResultLs: 
Seq[QueryRequestWithResult],
                             excludeIds: Map[Int, Boolean],
@@ -267,6 +298,7 @@ object PostProcess extends JSONParser {
       Json.obj(
         "size" -> edges.size,
         "degrees" -> resultDegrees,
+//        "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
         "results" -> edges
       )
     } else {
@@ -316,6 +348,7 @@ object PostProcess extends JSONParser {
       Json.obj(
         "size" -> groupedSortedJsons.size,
         "degrees" -> resultDegrees,
+//        "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
         "results" -> groupedSortedJsons
       )
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
index a9a5112..184cd08 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
@@ -6,7 +6,7 @@ import com.kakao.s2graph.core.parsers.{Where, WhereParser}
 import com.kakao.s2graph.core.types._
 import org.apache.hadoop.hbase.util.Bytes
 import org.hbase.async.ColumnRangeFilter
-import play.api.libs.json.{JsNumber, JsValue, Json}
+import play.api.libs.json.{JsNull, JsNumber, JsValue, Json}
 
 import scala.util.hashing.MurmurHash3
 import scala.util.{Success, Try}
@@ -32,6 +32,11 @@ object Query {
   }
 }
 
+case class MultiQuery(queries: Seq[Query],
+                      weights: Seq[Double],
+                      queryOption: QueryOption,
+                      jsonQuery: JsValue = JsNull)
+
 case class QueryOption(removeCycle: Boolean = false,
                        selectColumns: Seq[String] = Seq.empty,
                        groupByColumns: Seq[String] = Seq.empty,
@@ -45,14 +50,15 @@ case class QueryOption(removeCycle: Boolean = false,
                        scoreThreshold: Double = Double.MinValue,
                        returnDegree: Boolean = true)
 
-case class MultiQuery(queries: Seq[Query], weights: Seq[Double], queryOption: 
QueryOption)
 
 case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
                  steps: IndexedSeq[Step] = Vector.empty[Step],
-                 queryOption: QueryOption = QueryOption()) {
+                 queryOption: QueryOption = QueryOption(),
+                 jsonQuery: JsValue = JsNull) {
 
   val removeCycle = queryOption.removeCycle
   val selectColumns = queryOption.selectColumns
+//  val groupBy = queryOption.groupBy
   val groupByColumns = queryOption.groupByColumns
   val orderByColumns = queryOption.orderByColumns
   val filterOutQuery = queryOption.filterOutQuery
@@ -477,13 +483,13 @@ case class QueryParam(labelWithDir: LabelWithDirection, 
timestamp: Long = System
     this
   }
 
-  def whereRawOpt(sqlOpt: Option[String]): QueryParam = {
-    this.whereRawOpt = sqlOpt
+  def shouldNormalize(shouldNormalize: Boolean): QueryParam = {
+    this.shouldNormalize = shouldNormalize
     this
   }
 
-  def shouldNormalize(shouldNormalize: Boolean): QueryParam = {
-    this.shouldNormalize = shouldNormalize
+  def whereRawOpt(sqlOpt: Option[String]): QueryParam = {
+    this.whereRawOpt = sqlOpt
     this
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/QueryRequest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryRequest.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/QueryRequest.scala
deleted file mode 100644
index 413bfdb..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryRequest.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-//package com.kakao.s2graph.core
-//
-//import scala.collection.Seq
-//
-//case class QueryRequest(query: Query,
-//                        stepIdx: Int,
-//                        vertex: Vertex,
-//                        queryParam: QueryParam,
-//                        prevStepScore: Double,
-//                        tgtVertexOpt: Option[Vertex] = None,
-//                        parentEdges: Seq[EdgeWithScore] = Nil,
-//                        isInnerCall: Boolean = false)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
index 36c5a01..0ca92b5 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
@@ -7,18 +7,22 @@ import scala.collection.Seq
 
 object QueryResult {
   def fromVertices(query: Query): Seq[QueryRequestWithResult] = {
-    val queryParam = query.steps.head.queryParams.head
-    val label = queryParam.label
-    val currentTs = System.currentTimeMillis()
-    val propsWithTs = Map(LabelMeta.timeStampSeq ->
-      InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), 
currentTs))
-    for {
-      vertex <- query.vertices
-    } yield {
-      val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = 
propsWithTs)
-      val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
-      QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam),
-        QueryResult(edgeWithScoreLs = Seq(edgeWithScore)))
+    if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) {
+      Seq.empty
+    } else {
+      val queryParam = query.steps.head.queryParams.head
+      val label = queryParam.label
+      val currentTs = System.currentTimeMillis()
+      val propsWithTs = Map(LabelMeta.timeStampSeq ->
+        InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), 
currentTs))
+      for {
+        vertex <- query.vertices
+      } yield {
+        val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = 
propsWithTs)
+        val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
+        QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam),
+          QueryResult(edgeWithScoreLs = Seq(edgeWithScore)))
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala
index 9cc0607..c2b86c2 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala
@@ -1,5 +1,6 @@
 package com.kakao.s2graph.core
 
+
 import com.kakao.s2graph.core.mysqls._
 
 //import com.kakao.s2graph.core.models._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
index 88c85b7..46b92ab 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
@@ -1,6 +1,7 @@
 package com.kakao.s2graph.core.mysqls
 
 import com.kakao.s2graph.core.GraphUtil
+import com.kakao.s2graph.core.utils.logger
 import scalikejdbc._
 
 import scala.util.Random
@@ -50,6 +51,7 @@ case class Experiment(id: Option[Int],
                       totalModular: Int) {
 
   def buckets = Bucket.finds(id.get)
+
   def rangeBuckets = for {
     bucket <- buckets
     range <- bucket.rangeOpt

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
index 2449082..22fbd8b 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
@@ -2,7 +2,7 @@ package com.kakao.s2graph.core.rest
 
 import java.util.concurrent.{Callable, TimeUnit}
 
-import com.google.common.cache.CacheBuilder
+import com.google.common.cache.{CacheLoader, CacheBuilder}
 import com.kakao.s2graph.core.GraphExceptions.{BadQueryException, 
ModelNotFoundException}
 import com.kakao.s2graph.core._
 import com.kakao.s2graph.core.mysqls._
@@ -15,11 +15,9 @@ import scala.util.{Failure, Success, Try}
 object TemplateHelper {
   val findVar = """\"?\$\{(.*?)\}\"?""".r
   val num = 
"""(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r
-
   val hour = 60 * 60 * 1000L
   val day = hour * 24L
   val week = day * 7L
-
   def calculate(now: Long, n: Int, unit: String): Long = {
     val duration = unit match {
       case "hour" | "HOUR" => n * hour
@@ -59,6 +57,7 @@ class RequestParser(config: Config) extends JSONParser {
 
   val hardLimit = 100000
   val defaultLimit = 100
+  val maxLimit = Int.MaxValue - 1
   val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout")
   val DefaultMaxAttempt = config.getInt("hbase.client.retries.number")
   val DefaultCluster = config.getString("hbase.zookeeper.quorum")
@@ -71,7 +70,6 @@ class RequestParser(config: Config) extends JSONParser {
     .initialCapacity(1000)
     .build[String, Try[Where]]
 
-
   private def extractScoring(labelId: Int, value: JsValue) = {
     val ret = for {
       js <- parse[Option[JsObject]](value, "scoring")
@@ -149,7 +147,6 @@ class RequestParser(config: Config) extends JSONParser {
     ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike])
   }
   
-
   def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = 
{
     whereClauseOpt match {
       case None => Success(WhereParser.success)
@@ -186,7 +183,8 @@ class RequestParser(config: Config) extends JSONParser {
       toQuery(queryJson, isEdgeQuery)
     }
     val weights = (jsValue \ 
"weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0))
-    MultiQuery(queries = queries, weights = weights, queryOption = 
toQueryOption(jsValue))
+    MultiQuery(queries = queries, weights = weights,
+      queryOption = toQueryOption(jsValue), jsonQuery = jsValue)
   }
 
   def toQueryOption(jsValue: JsValue): QueryOption = {
@@ -313,7 +311,7 @@ class RequestParser(config: Config) extends JSONParser {
 
         }
 
-      val ret = Query(vertices, querySteps, queryOption)
+      val ret = Query(vertices, querySteps, queryOption, jsValue)
       //      logger.debug(ret.toString)
       ret
     } catch {
@@ -335,7 +333,7 @@ class RequestParser(config: Config) extends JSONParser {
       val limit = {
         parse[Option[Int]](labelGroup, "limit") match {
           case None => defaultLimit
-          case Some(l) if l < 0 => Int.MaxValue
+          case Some(l) if l < 0 => maxLimit
           case Some(l) if l >= 0 =>
             val default = hardLimit
             Math.min(l, default)
@@ -381,7 +379,6 @@ class RequestParser(config: Config) extends JSONParser {
       val scorePropagateOp = (labelGroup \ 
"scorePropagateOp").asOpt[String].getOrElse("multiply")
       val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1)
       val shouldNormalize = (labelGroup \ 
"normalize").asOpt[Boolean].getOrElse(false)
-
       // FIXME: Order of command matter
       QueryParam(labelWithDir)
         .sample(sample)
@@ -432,31 +429,39 @@ class RequestParser(config: Config) extends JSONParser {
 
   def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], 
List[JsValue]) = {
     val jsValues = toJsValues(jsValue)
-    val edges = jsValues.map(toEdge(_, operation))
+    val edges = jsValues.flatMap(toEdge(_, operation))
 
     (edges, jsValues)
   }
 
   def toEdges(jsValue: JsValue, operation: String): List[Edge] = {
-    toJsValues(jsValue).map(toEdge(_, operation))
+    toJsValues(jsValue).flatMap { edgeJson =>
+      toEdge(edgeJson, operation)
+    }
   }
 
-  def toEdge(jsValue: JsValue, operation: String) = {
 
-    val srcId = parse[JsValue](jsValue, "from") match {
-      case s: JsString => s.as[String]
-      case o@_ => s"${o}"
-    }
-    val tgtId = parse[JsValue](jsValue, "to") match {
+  private def toEdge(jsValue: JsValue, operation: String): List[Edge] = {
+
+    def parseId(js: JsValue) = js match {
       case s: JsString => s.as[String]
       case o@_ => s"${o}"
     }
+    val srcId = (jsValue \ "from").asOpt[JsValue].toList.map(parseId(_))
+    val tgtId = (jsValue \ "to").asOpt[JsValue].toList.map(parseId(_))
+    val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms 
=> froms.map(js => parseId(js))) ++ srcId
+    val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(froms 
=> froms.map(js => parseId(js))) ++ tgtId
+
     val label = parse[String](jsValue, "label")
     val timestamp = parse[Long](jsValue, "timestamp")
     val direction = parse[Option[String]](jsValue, "direction").getOrElse("")
     val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
-    Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, 
props.toString)
-
+    for {
+      srcId <- srcIds
+      tgtId <- tgtIds
+    } yield {
+      Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, 
props.toString)
+    }
   }
 
   def toVertices(jsValue: JsValue, operation: String, serviceName: 
Option[String] = None, columnName: Option[String] = None) = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala
new file mode 100644
index 0000000..cff968f
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala
@@ -0,0 +1,25 @@
+package com.kakao.s2graph.core.storage
+
+import com.kakao.s2graph.core.storage.{SKeyValue, StorageDeserializable}
+import com.kakao.s2graph.core.types.{LabelWithDirection, SourceVertexId, 
VertexId}
+import org.apache.hadoop.hbase.util.Bytes
+
+
+trait Deserializable[E] extends StorageDeserializable[E] {
+  import StorageDeserializable._
+
+  type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int)
+
+  /** version 1 and version 2 share same code for parsing row key part */
+  def parseRow(kv: SKeyValue, version: String): RowKeyRaw = {
+    var pos = 0
+    val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, version)
+    pos += srcIdLen
+    val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+    pos += 4
+    val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, 
pos)
+
+    val rowLen = srcIdLen + 4 + 1
+    (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala
new file mode 100644
index 0000000..2190222
--- /dev/null
+++ 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala
@@ -0,0 +1,128 @@
+package com.kakao.s2graph.core.storage
+
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls.LabelMeta
+import com.kakao.s2graph.core.storage.{CanSKeyValue, StorageDeserializable, 
SKeyValue}
+import com.kakao.s2graph.core.types._
+import org.apache.hadoop.hbase.util.Bytes
+import StorageDeserializable._
+
+class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = 
bytesToLong) extends Deserializable[IndexEdge] {
+
+  import StorageDeserializable._
+
+  type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, 
Int)
+  type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+
+  private def parseDegreeQualifier(kv: SKeyValue, version: String): 
QualifierRaw = {
+//    val degree = Bytes.toLong(kv.value)
+    val degree = bytesToLongFunc(kv.value, 0)
+    val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, 
version))
+    val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, 
InnerVal.withStr("0", version))
+    (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+  }
+
+  private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+    var qualifierLen = 0
+    var pos = 0
+    val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+      val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+      pos = endAt
+      qualifierLen += endAt
+      val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+        (HBaseType.defaultTgtVertexId, 0)
+      } else {
+        TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, 
version)
+      }
+      qualifierLen += tgtVertexIdLen
+      (props, endAt, tgtVertexId, tgtVertexIdLen)
+    }
+    val (op, opLen) =
+      if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+      else (kv.qualifier(qualifierLen), 1)
+
+    qualifierLen += opLen
+
+    (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+  }
+
+  private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
+    val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, 
version)
+    (props, endAt)
+  }
+
+  private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
+    (Array.empty[(Byte, InnerValLike)], 0)
+  }
+
+
+
+  /** version 1 and version 2 is same logic */
+  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                              _kvs: Seq[T],
+                                              version: String,
+                                              cacheElementOpt: 
Option[IndexEdge] = None): IndexEdge = {
+    fromKeyValuesInnerOld(queryParam, _kvs, version, cacheElementOpt)
+  }
+
+  def fromKeyValuesInnerOld[T: CanSKeyValue](queryParam: QueryParam,
+                                          _kvs: Seq[T],
+                                          version: String,
+                                          cacheElementOpt: Option[IndexEdge] = 
None): IndexEdge = {
+    assert(_kvs.size == 1)
+
+    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+    val kv = kvs.head
+    val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { 
e =>
+      (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
+    }.getOrElse(parseRow(kv, version))
+
+    val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
+      if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version)
+      else parseQualifier(kv, version)
+
+    val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+//      val countVal = Bytes.toLong(kv.value)
+      val countVal = bytesToLongFunc(kv.value, 0)
+      val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, 
version))
+      (dummyProps, 8)
+    } else if (kv.qualifier.isEmpty) {
+      parseDegreeValue(kv, version)
+    } else {
+      parseValue(kv, version)
+    }
+
+    val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new 
RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, 
${labelIdxSeq}"))
+
+
+    //    assert(kv.qualifier.nonEmpty && index.metaSeqs.size == 
idxPropsRaw.size)
+
+    val idxProps = for {
+      (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+    } yield {
+        if (k == LabelMeta.degreeSeq) k -> v
+        else seq -> v
+      }
+
+    val idxPropsMap = idxProps.toMap
+    val tgtVertexId = if (tgtVertexIdInQualifier) {
+      idxPropsMap.get(LabelMeta.toSeq) match {
+        case None => tgtVertexIdRaw
+        case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
+      }
+    } else tgtVertexIdRaw
+
+    val _mergedProps = (idxProps ++ props).toMap
+    val mergedProps =
+      if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+      else _mergedProps + (LabelMeta.timeStampSeq -> 
InnerVal.withLong(kv.timestamp, version))
+
+    //    logger.error(s"$mergedProps")
+    //    val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong
+
+    val ts = kv.timestamp
+    IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, 
op, ts, labelIdxSeq, mergedProps)
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala
new file mode 100644
index 0000000..56f70b9
--- /dev/null
+++ 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala
@@ -0,0 +1,58 @@
+package com.kakao.s2graph.core.storage
+
+import com.kakao.s2graph.core.mysqls.LabelMeta
+import com.kakao.s2graph.core.storage.{StorageSerializable, SKeyValue}
+import com.kakao.s2graph.core.types.{HBaseType, VertexId}
+import com.kakao.s2graph.core.utils.logger
+import com.kakao.s2graph.core.{GraphUtil, IndexEdge}
+import org.apache.hadoop.hbase.util.Bytes
+
+case class IndexEdgeSerializable(indexEdge: IndexEdge) extends 
Serializable[IndexEdge] {
+
+  import StorageSerializable._
+
+  val label = indexEdge.label
+  val table = label.hbaseTableName.getBytes()
+  val cf = Serializable.edgeCf
+
+  val idxPropsMap = indexEdge.orders.toMap
+  val idxPropsBytes = propsToBytes(indexEdge.orders)
+
+  /** version 1 and version 2 share same code for serialize row key part */
+  override def toKeyValues: Seq[SKeyValue] = {
+    toKeyValuesInner
+  }
+  def toKeyValuesInner: Seq[SKeyValue] = {
+    val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+    val labelWithDirBytes = indexEdge.labelWithDir.bytes
+    val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+
+    val row = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
+    //    
logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
+    val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes
+    val qualifier =
+      if (indexEdge.degreeEdge) Array.empty[Byte]
+      else {
+        if (indexEdge.op == GraphUtil.operations("incrementCount")) {
+          Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
+        } else {
+          idxPropsMap.get(LabelMeta.toSeq) match {
+            case None => Bytes.add(idxPropsBytes, tgtIdBytes)
+            case Some(vId) => idxPropsBytes
+          }
+        }
+      }
+
+
+    val value =
+      if (indexEdge.degreeEdge)
+        
Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
+      else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+        
Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
+      else propsToKeyValues(indexEdge.metas.toSeq)
+
+    val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version)
+
+    Seq(kv)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/MutationBuilder.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/MutationBuilder.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/MutationBuilder.scala
deleted file mode 100644
index c5d4882..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/MutationBuilder.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core._
-
-import scala.collection.Seq
-import scala.concurrent.ExecutionContext
-
-
-abstract class MutationBuilder[T](storage: Storage)(implicit ex: 
ExecutionContext) {
-  /** operation that needs to be supported by backend persistent storage 
system */
-  def put(kvs: Seq[SKeyValue]): Seq[T]
-
-  def increment(kvs: Seq[SKeyValue]): Seq[T]
-
-  def delete(kvs: Seq[SKeyValue]): Seq[T]
-
-
-  /** build mutation for backend persistent storage system */
-
-  /** EdgeMutate */
-  def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[T]
-
-  def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[T]
-
-  def increments(edgeMutate: EdgeMutate): Seq[T]
-
-  /** IndexEdge */
-  def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[T]
-
-  def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): 
Seq[T]
-
-  def buildDeletesAsync(indexedEdge: IndexEdge): Seq[T]
-
-  def buildPutsAsync(indexedEdge: IndexEdge): Seq[T]
-
-  /** SnapshotEdge */
-  def buildPutAsync(snapshotEdge: SnapshotEdge): Seq[T]
-
-  def buildDeleteAsync(snapshotEdge: SnapshotEdge): Seq[T]
-
-  /** Vertex */
-  def buildPutsAsync(vertex: Vertex): Seq[T]
-
-  def buildDeleteAsync(vertex: Vertex): Seq[T]
-
-  def buildDeleteBelongsToId(vertex: Vertex): Seq[T]
-
-  def buildVertexPutsAsync(edge: Edge): Seq[T]
-
-  def buildPutsAll(vertex: Vertex): Seq[T] = {
-    vertex.op match {
-      case d: Byte if d == GraphUtil.operations("delete") => 
buildDeleteAsync(vertex)
-      case _ => buildPutsAsync(vertex)
-    }
-  }
-
-}

Reply via email to