[S2GRAPH-122]: Change data types of Edge/IndexEdge/SnapshotEdge.

JIRA:
  [S2GRAPH-122] https://issues.apache.org/jira/browse/S2GRAPH-122

Pull Request:
  Closes #97

Authors
  DO YUNG YOON: [email protected]


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/66bdf1bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/66bdf1bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/66bdf1bc

Branch: refs/heads/master
Commit: 66bdf1bc09b1a976a8d9d0f07b6af572a6074aae
Parents: 8dbb9a3
Author: DO YUNG YOON <[email protected]>
Authored: Wed Nov 16 17:32:02 2016 +0100
Committer: DO YUNG YOON <[email protected]>
Committed: Wed Nov 16 17:32:02 2016 +0100

----------------------------------------------------------------------
 CHANGES                                         |    2 +
 dev_support/docker-compose.yml                  |    5 -
 dev_support/graph_mysql/schema.sql              |  248 ++--
 .../loader/subscriber/GraphSubscriber.scala     |  145 ---
 .../loader/subscriber/TransferToHFile.scala     |   28 +-
 .../scala/org/apache/s2graph/core/Edge.scala    |  266 ++--
 .../scala/org/apache/s2graph/core/Graph.scala   | 1164 ++++++++++++++----
 .../apache/s2graph/core/GraphExceptions.scala   |   24 +-
 .../org/apache/s2graph/core/GraphUtil.scala     |    8 +-
 .../org/apache/s2graph/core/Management.scala    |   97 +-
 .../org/apache/s2graph/core/PostProcess.scala   |  195 ++-
 .../org/apache/s2graph/core/QueryParam.scala    |  575 +++------
 .../org/apache/s2graph/core/QueryResult.scala   |  328 ++---
 .../scala/org/apache/s2graph/core/Vertex.scala  |   59 -
 .../org/apache/s2graph/core/mysqls/Bucket.scala |   26 +-
 .../apache/s2graph/core/mysqls/Experiment.scala |   13 +-
 .../org/apache/s2graph/core/mysqls/Label.scala  |  203 ++-
 .../apache/s2graph/core/mysqls/LabelIndex.scala |   13 +-
 .../apache/s2graph/core/mysqls/LabelMeta.scala  |   64 +-
 .../org/apache/s2graph/core/mysqls/Model.scala  |   37 +-
 .../apache/s2graph/core/mysqls/Service.scala    |   15 +-
 .../s2graph/core/parsers/WhereParser.scala      |  103 +-
 .../s2graph/core/rest/RequestParser.scala       |  334 ++---
 .../apache/s2graph/core/rest/RestHandler.scala  |  118 +-
 .../apache/s2graph/core/storage/SKeyValue.scala |   12 +-
 .../apache/s2graph/core/storage/Storage.scala   |  771 ++++--------
 .../core/storage/StorageDeserializable.scala    |   35 +-
 .../core/storage/StorageSerializable.scala      |   24 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  |  454 ++++---
 .../tall/IndexEdgeDeserializable.scala          |  222 ++--
 .../indexedge/tall/IndexEdgeSerializable.scala  |   26 +-
 .../wide/IndexEdgeDeserializable.scala          |  230 ++--
 .../indexedge/wide/IndexEdgeSerializable.scala  |   13 +-
 .../tall/SnapshotEdgeDeserializable.scala       |   30 +-
 .../tall/SnapshotEdgeSerializable.scala         |    4 +-
 .../wide/SnapshotEdgeDeserializable.scala       |   28 +-
 .../wide/SnapshotEdgeSerializable.scala         |    4 +-
 .../serde/vertex/VertexDeserializable.scala     |   20 +-
 .../serde/vertex/VertexSerializable.scala       |   15 +-
 .../apache/s2graph/core/types/HBaseType.scala   |    5 +-
 .../s2graph/core/types/InnerValLike.scala       |   15 +-
 .../apache/s2graph/core/utils/DeferCache.scala  |    5 +-
 .../apache/s2graph/core/utils/Extentions.scala  |   65 +-
 .../s2graph/core/utils/SafeUpdateCache.scala    |    9 +-
 .../org/apache/s2graph/core/EdgeTest.scala      |   48 +-
 .../s2graph/core/Integrate/CrudTest.scala       |    6 +-
 .../core/Integrate/IntegrateCommon.scala        |   20 +-
 .../s2graph/core/Integrate/QueryTest.scala      |  530 ++++----
 .../core/Integrate/WeakLabelDeleteTest.scala    |    6 +-
 .../apache/s2graph/core/ManagementTest.scala    |  117 +-
 .../apache/s2graph/core/QueryParamTest.scala    |  316 ++---
 .../s2graph/core/TestCommonWithModels.scala     |    8 +
 .../apache/s2graph/core/models/ModelTest.scala  |   90 --
 .../s2graph/core/mysqls/ExperimentSpec.scala    |   83 --
 .../s2graph/core/parsers/WhereParserTest.scala  |  168 +--
 .../core/storage/hbase/IndexEdgeTest.scala      |   31 +-
 .../org/apache/s2graph/rest/netty/Server.scala  |   63 +-
 .../rest/play/controllers/AdminController.scala |    2 +-
 .../rest/play/controllers/EdgeController.scala  |    6 +-
 59 files changed, 4071 insertions(+), 3480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 73b3597..01d1cc8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -95,6 +95,8 @@ Release 0.1.0 - unreleased
     S2GRAPH-127: Refactor ExceptionHander Object into Class (Committed by 
DOYUNG YOON).
 
     S2GRAPH-121: Create `Result` class to hold traverse result edges 
(Committed by DOYUNG YOON).
+ 
+    S2GRAPH-122: Change data types of Edge/IndexEdge/SnapshotEdge (Committed 
by DOYUNG YOON).
 
   BUG FIXES
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/dev_support/docker-compose.yml
----------------------------------------------------------------------
diff --git a/dev_support/docker-compose.yml b/dev_support/docker-compose.yml
index 5531fde..fb95a39 100644
--- a/dev_support/docker-compose.yml
+++ b/dev_support/docker-compose.yml
@@ -13,11 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-graph:
-    image: s2rest_play:latest
-    container_name: graph
-    net: container:graph_hbase
-
 graph_mysql:
     build: graph_mysql
     container_name: graph_mysql

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/dev_support/graph_mysql/schema.sql
----------------------------------------------------------------------
diff --git a/dev_support/graph_mysql/schema.sql 
b/dev_support/graph_mysql/schema.sql
index 862062b..df283a8 100644
--- a/dev_support/graph_mysql/schema.sql
+++ b/dev_support/graph_mysql/schema.sql
@@ -35,17 +35,17 @@ SET FOREIGN_KEY_CHECKS = 0;
 -- ----------------------------
 DROP TABLE IF EXISTS `services`;
 CREATE TABLE `services` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `service_name` varchar(64) NOT NULL,
-  `access_token` varchar(64) NOT NULL,
-  `cluster` varchar(255) NOT NULL,
-  `hbase_table_name` varchar(255) NOT NULL,
-  `pre_split_size` integer NOT NULL default 0,
-  `hbase_table_ttl` integer,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_service_name` (`service_name`),
-  INDEX `idx_access_token` (`access_token`),
-  INDEX `idx_cluster` (cluster(75))
+       `id` integer NOT NULL AUTO_INCREMENT,
+       `service_name` varchar(64) NOT NULL,
+       `access_token` varchar(64) NOT NULL,
+       `cluster` varchar(255) NOT NULL,
+       `hbase_table_name` varchar(255) NOT NULL,
+       `pre_split_size` integer NOT NULL default 0,
+       `hbase_table_ttl` integer,
+       PRIMARY KEY (`id`),
+       UNIQUE KEY `ux_service_name` (`service_name`),
+       INDEX `idx_access_token` (`access_token`),
+       INDEX `idx_cluster` (cluster(75))
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
 
@@ -54,13 +54,13 @@ CREATE TABLE `services` (
 -- ----------------------------
 DROP TABLE IF EXISTS `service_columns`;
 CREATE TABLE `service_columns` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `service_id` integer NOT NULL,
-  `column_name` varchar(64) NOT NULL,
-  `column_type` varchar(8) NOT NULL,
-  `schema_version` varchar(8) NOT NULL default 'v2',
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`)
+       `id` integer NOT NULL AUTO_INCREMENT,
+       `service_id` integer NOT NULL,
+       `column_name` varchar(64) NOT NULL,
+       `column_type` varchar(8) NOT NULL,
+       `schema_version` varchar(8) NOT NULL default 'v2',
+       PRIMARY KEY (`id`),
+       UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
 ALTER TABLE service_columns add FOREIGN KEY(service_id) REFERENCES 
services(id) ON DELETE CASCADE;
@@ -71,14 +71,14 @@ ALTER TABLE service_columns add FOREIGN KEY(service_id) 
REFERENCES services(id)
 -- ----------------------------
 DROP TABLE IF EXISTS `column_metas`;
 CREATE TABLE `column_metas` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `column_id` integer NOT NULL,
-  `name` varchar(64) NOT NULL,
-  `seq` tinyint        NOT NULL,
-  `data_type` varchar(8) NOT NULL DEFAULT 'string',
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_column_id_name` (`column_id`, `name`),
-  INDEX `idx_column_id_seq` (`column_id`, `seq`)
+       `id` integer NOT NULL AUTO_INCREMENT,
+       `column_id` integer NOT NULL,
+       `name` varchar(64) NOT NULL,
+       `seq` tinyint   NOT NULL,
+       `data_type` varchar(8) NOT NULL DEFAULT 'string',
+       PRIMARY KEY (`id`),
+       UNIQUE KEY `ux_column_id_name` (`column_id`, `name`),
+       INDEX `idx_column_id_seq` (`column_id`, `seq`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
 ALTER TABLE column_metas ADD FOREIGN KEY(column_id) REFERENCES 
service_columns(id) ON DELETE CASCADE;
@@ -89,32 +89,33 @@ ALTER TABLE column_metas ADD FOREIGN KEY(column_id) 
REFERENCES service_columns(i
 
 DROP TABLE IF EXISTS `labels`;
 CREATE TABLE `labels` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `label` varchar(64) NOT NULL,
-  `src_service_id` integer NOT NULL,
-  `src_column_name` varchar(64) NOT NULL,
-  `src_column_type` varchar(8) NOT NULL,
-  `tgt_service_id` integer NOT NULL,
-  `tgt_column_name` varchar(64) NOT NULL,
-  `tgt_column_type` varchar(8) NOT NULL,
-  `is_directed` tinyint        NOT NULL DEFAULT 1,
-  `service_name` varchar(64),
-  `service_id` integer NOT NULL,
-  `consistency_level` varchar(8) NOT NULL DEFAULT 'weak',
-  `hbase_table_name` varchar(255) NOT NULL DEFAULT 's2graph',
-  `hbase_table_ttl` integer,
-  `schema_version` varchar(8) NOT NULL default 'v2',
-  `is_async` tinyint(4) NOT NULL default '0',
-  `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4',
-  `options` text,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_label` (`label`),
-  INDEX `idx_src_column_name` (`src_column_name`),
-  INDEX        `idx_tgt_column_name` (`tgt_column_name`),
-  INDEX `idx_src_service_id` (`src_service_id`),
-  INDEX `idx_tgt_service_id` (`tgt_service_id`),
-  INDEX `idx_service_name` (`service_name`), 
-  INDEX `idx_service_id` (`service_id`)
+       `id` integer NOT NULL AUTO_INCREMENT,
+       `label` varchar(128) NOT NULL,
+       `src_service_id` integer NOT NULL,
+       `src_column_name` varchar(64) NOT NULL,
+       `src_column_type` varchar(8) NOT NULL,
+       `tgt_service_id` integer NOT NULL,
+       `tgt_column_name` varchar(64) NOT NULL,
+       `tgt_column_type` varchar(8) NOT NULL,
+       `is_directed` tinyint   NOT NULL DEFAULT 1,
+       `service_name` varchar(64),
+       `service_id` integer NOT NULL,
+       `consistency_level` varchar(8) NOT NULL DEFAULT 'weak',
+       `hbase_table_name` varchar(255) NOT NULL DEFAULT 's2graph',
+       `hbase_table_ttl` integer,
+       `schema_version` varchar(8) NOT NULL default 'v2',
+       `is_async` tinyint(4) NOT NULL default '0',
+       `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4',
+       `options` text,
+       `deleted_at` datetime DEFAULT NULL,
+       PRIMARY KEY (`id`),
+       UNIQUE KEY `ux_label` (`label`),
+       INDEX `idx_src_column_name` (`src_column_name`),
+       INDEX   `idx_tgt_column_name` (`tgt_column_name`),
+       INDEX `idx_src_service_id` (`src_service_id`),
+       INDEX `idx_tgt_service_id` (`tgt_service_id`),
+       INDEX `idx_service_name` (`service_name`),
+       INDEX `idx_service_id` (`service_id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
 ALTER TABLE labels add FOREIGN KEY(service_id) REFERENCES services(id);
@@ -126,16 +127,16 @@ ALTER TABLE labels add FOREIGN KEY(service_id) REFERENCES 
services(id);
 -- ----------------------------
 DROP TABLE IF EXISTS `label_metas`;
 CREATE TABLE `label_metas` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `label_id` integer NOT NULL,
-  `name` varchar(64) NOT NULL,
-  `seq` tinyint        NOT NULL,
-  `default_value` varchar(64) NOT NULL,
-  `data_type` varchar(8) NOT NULL DEFAULT 'long',
-  `used_in_index` tinyint      NOT NULL DEFAULT 0,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_label_id_name` (`label_id`, `name`),
-  INDEX `idx_label_id_seq` (`label_id`, `seq`)
+       `id` integer NOT NULL AUTO_INCREMENT,
+       `label_id` integer NOT NULL,
+       `name` varchar(64) NOT NULL,
+       `seq` tinyint   NOT NULL,
+       `default_value` varchar(64) NOT NULL,
+       `data_type` varchar(8) NOT NULL DEFAULT 'long',
+       `used_in_index` tinyint NOT NULL DEFAULT 0,
+       PRIMARY KEY (`id`),
+       UNIQUE KEY `ux_label_id_name` (`label_id`, `name`),
+       INDEX `idx_label_id_seq` (`label_id`, `seq`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
 ALTER TABLE label_metas ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON 
DELETE CASCADE;
@@ -146,15 +147,18 @@ ALTER TABLE label_metas ADD FOREIGN KEY(label_id) 
REFERENCES labels(id) ON DELET
 -- ----------------------------
 DROP TABLE IF EXISTS `label_indices`;
 CREATE TABLE `label_indices` (
-  `id` int(11) NOT NULL AUTO_INCREMENT,
-  `label_id` int(11) NOT NULL,
-  `name` varchar(64) NOT NULL DEFAULT '_PK',
-  `seq` tinyint(4) NOT NULL,
-  `meta_seqs` varchar(64) NOT NULL,
-  `formulars` varchar(255) DEFAULT NULL,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`),
-  UNIQUE KEY `ux_label_id_name` (`label_id`,`name`)
+       `id` int(11) NOT NULL AUTO_INCREMENT,
+       `label_id` int(11) NOT NULL,
+       `name` varchar(64) NOT NULL DEFAULT '_PK',
+       `seq` tinyint(4) NOT NULL,
+       `meta_seqs` varchar(64) NOT NULL,
+       `formulars` varchar(255) DEFAULT NULL,
+       `dir` int DEFAULT NULL,
+       `options` text,
+       PRIMARY KEY (`id`),
+       UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`),
+       UNIQUE KEY `ux_label_id_name` (`label_id`,`name`),
+       UNIQUE KEY `ux_label_id_meta_seqs_dir` (`label_id`,`meta_seqs`,`dir`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
 ALTER TABLE label_indices ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON 
DELETE CASCADE;
@@ -165,15 +169,15 @@ ALTER TABLE label_indices ADD FOREIGN KEY(label_id) 
REFERENCES labels(id) ON DEL
 -- ----------------------------
 DROP TABLE IF EXISTS `experiments`;
 CREATE TABLE `experiments` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `service_id` integer NOT NULL,
-  `service_name` varchar(128) NOT NULL,
-  `name` varchar(64) NOT NULL,
-  `description` varchar(255) NOT NULL,
-  `experiment_type` varchar(8) NOT NULL DEFAULT 'u',
-  `total_modular` int NOT NULL DEFAULT 100,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_service_id_name` (`service_id`, `name`)
+       `id` integer NOT NULL AUTO_INCREMENT,
+       `service_id` integer NOT NULL,
+       `service_name` varchar(128) NOT NULL,
+       `name` varchar(64) NOT NULL,
+       `description` varchar(255) NOT NULL,
+       `experiment_type` varchar(8) NOT NULL DEFAULT 'u',
+       `total_modular` int NOT NULL DEFAULT 100,
+       PRIMARY KEY (`id`),
+       UNIQUE KEY `ux_service_id_name` (`service_id`, `name`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
 -- ALTER TABLE experiments ADD FOREIGN KEY(service_id) REFERENCES service(id) 
ON DELETE CASCADE;
@@ -182,23 +186,24 @@ CREATE TABLE `experiments` (
 -- ----------------------------
 --  Table structure for `buckets`
 -- ----------------------------
+DROP TABLE IF EXISTS `buckets`;
 CREATE TABLE `buckets` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `experiment_id` integer NOT NULL,
-  `modular` varchar(64) NOT NULL,
-  `http_verb` varchar(8) NOT NULL,
-  `api_path` text NOT NULL,
-  `uuid_key` varchar(128),
-  `uuid_placeholder` varchar(64),
-  `request_body` text NOT NULL,
-  `timeout` int NOT NULL DEFAULT 1000,
-  `impression_id` varchar(64) NOT NULL,
-  `is_graph_query` tinyint NOT NULL DEFAULT 1,
-  `is_empty` tinyint NOT NULL DEFAULT 0,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_impression_id` (`impression_id`),
-  INDEX `idx_experiment_id` (`experiment_id`),
-  INDEX `idx_impression_id` (`impression_id`)
+       `id` integer NOT NULL AUTO_INCREMENT,
+       `experiment_id` integer NOT NULL,
+       `modular` varchar(64) NOT NULL,
+       `http_verb` varchar(8) NOT NULL,
+       `api_path` text NOT NULL,
+       `uuid_key` varchar(128),
+       `uuid_placeholder` varchar(64),
+       `request_body` text NOT NULL,
+       `timeout` int NOT NULL DEFAULT 1000,
+       `impression_id` varchar(64) NOT NULL,
+       `is_graph_query` tinyint NOT NULL DEFAULT 1,
+       `is_empty` tinyint NOT NULL DEFAULT 0,
+       PRIMARY KEY (`id`),
+       UNIQUE KEY `ux_impression_id` (`impression_id`),
+       INDEX `idx_experiment_id` (`experiment_id`),
+       INDEX `idx_impression_id` (`impression_id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
 SET FOREIGN_KEY_CHECKS = 1;
@@ -209,29 +214,30 @@ SET FOREIGN_KEY_CHECKS = 1;
 -- ----------------------------
 DROP TABLE IF EXISTS `counter`;
 CREATE TABLE `counter` (
-  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
-  `use_flag` tinyint(1) NOT NULL DEFAULT '0',
-  `version` smallint(1) NOT NULL DEFAULT '1',
-  `service` varchar(64) NOT NULL DEFAULT '',
-  `action` varchar(64) NOT NULL DEFAULT '',
-  `item_type` int(11) NOT NULL DEFAULT '0',
-  `auto_comb` tinyint(1) NOT NULL DEFAULT '1',
-  `dimension` varchar(1024) NOT NULL,
-  `use_profile` tinyint(1) NOT NULL DEFAULT '0',
-  `bucket_imp_id` varchar(64) DEFAULT NULL,
-  `use_exact` tinyint(1) NOT NULL DEFAULT '1',
-  `use_rank` tinyint(1) NOT NULL DEFAULT '1',
-  `ttl` int(11) NOT NULL DEFAULT '172800',
-  `daily_ttl` int(11) DEFAULT NULL,
-  `hbase_table` varchar(1024) DEFAULT NULL,
-  `interval_unit` varchar(1024) DEFAULT NULL,
-  `rate_action_id` int(11) unsigned DEFAULT NULL,
-  `rate_base_id` int(11) unsigned DEFAULT NULL,
-  `rate_threshold` int(11) DEFAULT NULL,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `svc` (`service`,`action`),
-  KEY `rate_action_id` (`rate_action_id`),
-  KEY `rate_base_id` (`rate_base_id`),
-  CONSTRAINT `rate_action_id` FOREIGN KEY (`rate_action_id`) REFERENCES 
`counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION,
-  CONSTRAINT `rate_base_id` FOREIGN KEY (`rate_base_id`) REFERENCES `counter` 
(`id`) ON DELETE NO ACTION ON UPDATE NO ACTION
+       `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
+       `use_flag` tinyint(1) NOT NULL DEFAULT '0',
+       `version` smallint(1) NOT NULL DEFAULT '1',
+       `service` varchar(64) NOT NULL DEFAULT '',
+       `action` varchar(64) NOT NULL DEFAULT '',
+       `item_type` int(11) NOT NULL DEFAULT '0',
+       `auto_comb` tinyint(1) NOT NULL DEFAULT '1',
+       `dimension` varchar(1024) NOT NULL,
+       `use_profile` tinyint(1) NOT NULL DEFAULT '0',
+       `bucket_imp_id` varchar(64) DEFAULT NULL,
+       `use_exact` tinyint(1) NOT NULL DEFAULT '1',
+       `use_rank` tinyint(1) NOT NULL DEFAULT '1',
+       `ttl` int(11) NOT NULL DEFAULT '172800',
+       `daily_ttl` int(11) DEFAULT NULL,
+       `hbase_table` varchar(1024) DEFAULT NULL,
+       `interval_unit` varchar(1024) DEFAULT NULL,
+       `rate_action_id` int(11) unsigned DEFAULT NULL,
+       `rate_base_id` int(11) unsigned DEFAULT NULL,
+       `rate_threshold` int(11) DEFAULT NULL,
+       `top_k` int(11) DEFAULT NULL,
+       PRIMARY KEY (`id`),
+       UNIQUE KEY `svc` (`service`,`action`),
+       KEY `rate_action_id` (`rate_action_id`),
+       KEY `rate_base_id` (`rate_base_id`),
+       CONSTRAINT `rate_action_id` FOREIGN KEY (`rate_action_id`) REFERENCES 
`counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION,
+       CONSTRAINT `rate_base_id` FOREIGN KEY (`rate_base_id`) REFERENCES 
`counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index 9e9fe4c..05aed34 100644
--- 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -207,148 +207,3 @@ object GraphSubscriberHelper extends WithKafka {
   }
 }
 
-//object GraphSubscriber extends SparkApp with WithKafka {
-//  val sleepPeriod = 5000
-//  val usages =
-//    s"""
-//       |/**
-//       |* this job read edge format(TSV) from HDFS file system then bulk 
load edges into s2graph. assumes that newLabelName is already created by API.
-//       |* params:
-//       |*  0. hdfsPath: where is your data in hdfs. require full path with 
hdfs:// predix
-//       |*  1. dbUrl: jdbc database connection string to specify database for 
meta.
-//       |*  2. labelMapping: oldLabel:newLabel delimited by ,
-//       |*  3. zkQuorum: target hbase zkQuorum where this job will publish 
data to.
-//       |*  4. hTableName: target hbase physical table name where this job 
will publish data to.
-//       |*  5. batchSize: how many edges will be batched for Put request to 
target hbase.
-//       |*  6. kafkaBrokerList: using kafka as fallback queue. when something 
goes wrong during batch, data needs to be replay will be stored in kafka.
-//       |*  7. kafkaTopic: fallback queue topic.
-//       |*  8. edgeAutoCreate: true if need to create reversed edge 
automatically.
-//       |*
-//       |* after this job finished, s2graph will have data with sequence 
corresponding newLabelName.
-//       |* change this newLabelName to ogirinalName if you want to online 
replace of label.
-//       |*
-//       |*/
-//     """.stripMargin
-//
-//  override def run() = {
-//    /**
-//     * Main function
-//     */
-//    println(args.toList)
-////    if (args.length != 10) {
-////      System.err.println(usages)
-////      System.exit(1)
-////    }
-//    val hdfsPath = args(0)
-//    val dbUrl = args(1)
-//    val labelMapping = GraphSubscriberHelper.toLabelMapping(args(2))
-//
-//    val zkQuorum = args(3)
-//    val hTableName = args(4)
-//    val batchSize = args(5).toInt
-//    val kafkaBrokerList = args(6)
-//    val kafkaTopic = args(7)
-//    val edgeAutoCreate = args(8).toBoolean
-//    val vertexDegreePathOpt = if (args.length >= 10) 
GraphSubscriberHelper.toOption(args(9)) else None
-//
-//    val conf = sparkConf(s"$hdfsPath: GraphSubscriber")
-//    val sc = new SparkContext(conf)
-//    val mapAcc = sc.accumulable(HashMap.empty[String, Long], 
"counter")(HashMapParam[String, Long](_ + _))
-//
-//
-//    if (!GraphSubscriberHelper.isValidQuorum(zkQuorum)) throw new 
RuntimeException(s"$zkQuorum is not valid.")
-//
-//    /** this job expect only one hTableName. all labels in this job will be 
stored in same physical hbase table */
-//    try {
-//
-//      import GraphSubscriberHelper._
-//      // set local driver setting.
-//      val phase = System.getProperty("phase")
-//      GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
-//
-//      /** copy when oldLabel exist and newLabel done exist. otherwise 
ignore. */
-//
-//      if (labelMapping.isEmpty) {
-//        // pass
-//      } else {
-//        for {
-//          (oldLabelName, newLabelName) <- labelMapping
-//        } {
-//          Management.copyLabel(oldLabelName, newLabelName, 
toOption(hTableName))
-//        }
-//      }
-//
-//      vertexDegreePathOpt.foreach { vertexDegreePath =>
-//        val vertexDegrees = sc.textFile(vertexDegreePath).filter(line => 
line.split("\t").length == 4).map { line =>
-//          val tokens = line.split("\t")
-//          (tokens(0), tokens(1), tokens(2), tokens(3).toInt)
-//        }
-//        vertexDegrees.foreachPartition { partition =>
-//
-//          // init Graph
-//          val phase = System.getProperty("phase")
-//          GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, 
kafkaBrokerList)
-//
-//          partition.grouped(batchSize).foreach { msgs =>
-//            try {
-//              val start = System.currentTimeMillis()
-//              val counts = GraphSubscriberHelper.storeDegreeBulk(zkQuorum, 
hTableName)(msgs, labelMapping)(Some(mapAcc))
-//              for ((k, v) <- counts) {
-//                mapAcc +=(k, v)
-//              }
-//              val duration = System.currentTimeMillis() - start
-//              println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, 
$hTableName")
-//            } catch {
-//              case e: Throwable =>
-//                println(s"[Failed]: store $e")
-//
-//                msgs.foreach { msg =>
-//                  GraphSubscriberHelper.report(msg.toString(), 
Some(e.getMessage()), topic = kafkaTopic)
-//                }
-//            }
-//          }
-//        }
-//      }
-//
-//
-//      val msgs = sc.textFile(hdfsPath)
-//      msgs.foreachPartition(partition => {
-//        // set executor setting.
-//        val phase = System.getProperty("phase")
-//        GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList)
-//
-//        partition.grouped(batchSize).foreach { msgs =>
-//          try {
-//            val start = System.currentTimeMillis()
-//            //            val counts =
-//            //              GraphSubscriberHelper.store(msgs, 
GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc))
-//            val counts =
-//              GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, 
labelMapping, edgeAutoCreate)(Some(mapAcc))
-//
-//            for ((k, v) <- counts) {
-//              mapAcc +=(k, v)
-//            }
-//            val duration = System.currentTimeMillis() - start
-//            println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, 
$hTableName")
-//          } catch {
-//            case e: Throwable =>
-//              println(s"[Failed]: store $e")
-//
-//              msgs.foreach { msg =>
-//                GraphSubscriberHelper.report(msg, Some(e.getMessage()), 
topic = kafkaTopic)
-//              }
-//          }
-//        }
-//      })
-//
-//      logInfo(s"counter: $mapAcc")
-//      println(s"Stats: ${mapAcc}")
-//
-//    } catch {
-//      case e: Throwable =>
-//        println(s"job failed with exception: $e")
-//        throw e
-//    }
-//
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index f347ba9..9ebff03 100644
--- 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -84,11 +84,11 @@ object TransferToHFile extends SparkApp {
     } yield output
   }
   def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = {
-    val kvs = 
GraphSubscriberHelper.g.storage.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
+    val kvs = 
GraphSubscriberHelper.g.getStorage(snapshotEdge.label).snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
     kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
   }
   def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = {
-    val kvs = 
GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.toList
+    val kvs = 
GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.toList
     kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
   }
   def buildDegreePutRequests(vertexId: String, labelName: String, direction: 
String, degreeVal: Long): List[PutRequest] = {
@@ -100,12 +100,11 @@ object TransferToHFile extends SparkApp {
     val vertex = Vertex(SourceVertexId(label.srcColumn.id.get, innerVal))
 
     val ts = System.currentTimeMillis()
-    val propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
-    val labelWithDir = LabelWithDirection(label.id.get, dir)
-    val edge = Edge(vertex, vertex, labelWithDir, propsWithTs=propsWithTs)
+    val propsWithTs = Map(LabelMeta.timestamp -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
+    val edge = Edge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
 
     edge.edgesWithIndex.flatMap { indexEdge =>
-      
GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.map 
{ kv =>
+      
GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.map
 { kv =>
         new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], 
Bytes.toBytes(degreeVal), kv.timestamp)
       }
     }
@@ -158,7 +157,7 @@ object TransferToHFile extends SparkApp {
 
     val sc = new SparkContext(conf)
 
-    GraphSubscriberHelper.management.createTable(zkQuorum, tableName, 
List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm)
+    GraphSubscriberHelper.management.createStorageTable(zkQuorum, tableName, 
List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm)
 
     /* set up hbase init */
     val hbaseConf = HBaseConfiguration.create()
@@ -175,20 +174,7 @@ object TransferToHFile extends SparkApp {
       GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
       toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate)
     }
-    //
-    //    val newRDD = if (!buildDegree) new HFileRDD(kvs)
-    //    else {
-    //      val degreeKVs = buildDegrees(rdd, labelMapping, 
autoEdgeCreate).reduceByKey { (agg, current) =>
-    //        agg + current
-    //      }.mapPartitions { iter =>
-    //        val phase = System.getProperty("phase")
-    //        GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
-    //        toKeyValues(iter.toSeq)
-    //      }
-    //      new HFileRDD(kvs ++ degreeKVs)
-    //    }
-    //
-    //    newRDD.toHFile(hbaseConf, zkQuorum, tableName, 
maxHFilePerResionServer, tmpPath)
+
     val merged = if (!buildDegree) kvs
     else {
       kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { 
(agg, current) =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 97730f3..657cfed 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -24,38 +24,39 @@ import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.logger
-import play.api.libs.json.{JsNumber, Json}
-import scala.collection.JavaConversions._
+import play.api.libs.json.{JsNumber, JsObject, Json}
 import scala.util.hashing.MurmurHash3
 
 
 case class SnapshotEdge(srcVertex: Vertex,
                         tgtVertex: Vertex,
-                        labelWithDir: LabelWithDirection,
+                        label: Label,
+                        direction: Int,
                         op: Byte,
                         version: Long,
-                        props: Map[Byte, InnerValLikeWithTs],
+                        props: Map[LabelMeta, InnerValLikeWithTs],
                         pendingEdgeOpt: Option[Edge],
                         statusCode: Byte = 0,
-                        lockTs: Option[Long]) {
+                        lockTs: Option[Long],
+                        tsInnerValOpt: Option[InnerValLike] = None) {
 
-  if (!props.containsKey(LabelMeta.timeStampSeq)) throw new 
Exception("Timestamp is required.")
+  lazy val labelWithDir = LabelWithDirection(label.id.get, direction)
+  if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is 
required.")
 
-  val label = Label.findById(labelWithDir.labelId)
-  val schemaVer = label.schemaVersion
+//  val label = Label.findById(labelWithDir.labelId)
+  lazy val schemaVer = label.schemaVersion
   lazy val propsWithoutTs = props.mapValues(_.innerVal)
-  val ts = props(LabelMeta.timeStampSeq).innerVal.toString().toLong
+  lazy val ts = props(LabelMeta.timestamp).innerVal.toString().toLong
 
   def toEdge: Edge = {
-    val ts = props.get(LabelMeta.timeStampSeq).map(v => 
v.ts).getOrElse(version)
-    Edge(srcVertex, tgtVertex, labelWithDir, op,
+    val ts = props.get(LabelMeta.timestamp).map(v => v.ts).getOrElse(version)
+    Edge(srcVertex, tgtVertex, label, direction, op,
       version, props, pendingEdgeOpt = pendingEdgeOpt,
-      statusCode = statusCode, lockTs = lockTs)
+      statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
   }
 
   def propsWithName = (for {
-    (seq, v) <- props
-    meta <- label.metaPropsMap.get(seq)
+    (meta, v) <- props
     jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
   } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
 
@@ -67,17 +68,23 @@ case class SnapshotEdge(srcVertex: Vertex,
 
 case class IndexEdge(srcVertex: Vertex,
                      tgtVertex: Vertex,
-                     labelWithDir: LabelWithDirection,
+                     label: Label,
+                     direction: Int,
                      op: Byte,
                      version: Long,
                      labelIndexSeq: Byte,
-                     props: Map[Byte, InnerValLikeWithTs]) {
-  //  if (!props.containsKey(LabelMeta.timeStampSeq)) throw new 
Exception("Timestamp is required.")
-  //  assert(props.containsKey(LabelMeta.timeStampSeq))
+                     props: Map[LabelMeta, InnerValLikeWithTs],
+                     tsInnerValOpt: Option[InnerValLike] = None)  {
+//  if (!props.contains(LabelMeta.timeStampSeq)) throw new 
Exception("Timestamp is required.")
+  //  assert(props.contains(LabelMeta.timeStampSeq))
+  lazy val labelWithDir = LabelWithDirection(label.id.get, direction)
+
+  lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in")
+  lazy val isOutEdge = !isInEdge
+
+  lazy val ts = props(LabelMeta.timestamp).innerVal.toString.toLong
+  lazy val degreeEdge = props.contains(LabelMeta.degree)
 
-  lazy val ts = props(LabelMeta.timeStampSeq).innerVal.toString.toLong
-  lazy val degreeEdge = props.contains(LabelMeta.degreeSeq)
-  lazy val label = Label.findById(labelWithDir.labelId)
   lazy val schemaVer = label.schemaVersion
   lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, 
labelIndexSeq).get
   lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta =>
@@ -85,33 +92,33 @@ case class IndexEdge(srcVertex: Vertex,
     meta.seq -> innerVal
   }.toMap
 
-  lazy val labelIndexMetaSeqs = labelIndex.metaSeqs
+  lazy val labelIndexMetaSeqs = labelIndex.sortKeyTypes
 
   /** TODO: make sure call of this class fill props as this assumes */
-  lazy val orders = for (k <- labelIndexMetaSeqs) yield {
-    props.get(k) match {
+  lazy val orders = for (meta <- labelIndexMetaSeqs) yield {
+    props.get(meta) match {
       case None =>
 
-        /*
-         * TODO: agly hack
-         * now we double store target vertex.innerId/srcVertex.innerId for 
easy development. later fix this to only store id once
-         */
-        val v = k match {
-          case LabelMeta.timeStampSeq => InnerVal.withLong(version, schemaVer)
-          case LabelMeta.toSeq => tgtVertex.innerId
-          case LabelMeta.fromSeq => //srcVertex.innerId
-            // for now, it does not make sense to build index on 
srcVertex.innerId since all edges have same data.
-            throw new RuntimeException("_from on indexProps is not supported")
-          case _ => defaultIndexMetas(k)
+        /**
+          * TODO: agly hack
+          * now we double store target vertex.innerId/srcVertex.innerId for 
easy development. later fix this to only store id once
+          */
+        val v = meta match {
+          case LabelMeta.timestamp=> InnerVal.withLong(version, schemaVer)
+          case LabelMeta.to => toEdge.tgtVertex.innerId
+          case LabelMeta.from => toEdge.srcVertex.innerId
+          // for now, it does not make sense to build index on 
srcVertex.innerId since all edges have same data.
+          //            throw new RuntimeException("_from on indexProps is not 
supported")
+          case _ => toInnerVal(meta.defaultValue, meta.dataType, schemaVer)
         }
 
-        k -> v
-      case Some(v) => k -> v.innerVal
+        meta -> v
+      case Some(v) => meta -> v.innerVal
     }
   }
 
   lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
-  lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k 
-> v.innerVal
+  lazy val metas = for ((meta, v) <- props if !ordersKeyMap.contains(meta)) 
yield meta -> v.innerVal
 
 //  lazy val propsWithTs = props.map { case (k, v) => k -> 
InnerValLikeWithTs(v, version) }
 
@@ -121,13 +128,12 @@ case class IndexEdge(srcVertex: Vertex,
   lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
 
   def propsWithName = for {
-    (seq, v) <- props
-    meta <- label.metaPropsMap.get(seq) if seq >= 0
+    (meta, v) <- props if meta.seq >= 0
     jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
   } yield meta.name -> jsValue
 
 
-  def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, 
props)
+  def toEdge: Edge = Edge(srcVertex, tgtVertex, label, direction, op, version, 
props, tsInnerValOpt = tsInnerValOpt)
 
   // only for debug
   def toLogString() = {
@@ -137,25 +143,37 @@ case class IndexEdge(srcVertex: Vertex,
 
 case class Edge(srcVertex: Vertex,
                 tgtVertex: Vertex,
-                labelWithDir: LabelWithDirection,
+                label: Label,
+                dir: Int,
                 op: Byte = GraphUtil.defaultOpByte,
                 version: Long = System.currentTimeMillis(),
-                propsWithTs: Map[Byte, InnerValLikeWithTs],
+                propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
                 parentEdges: Seq[EdgeWithScore] = Nil,
                 originalEdgeOpt: Option[Edge] = None,
                 pendingEdgeOpt: Option[Edge] = None,
                 statusCode: Byte = 0,
-                lockTs: Option[Long] = None) extends GraphElement {
+                lockTs: Option[Long] = None,
+                tsInnerValOpt: Option[InnerValLike] = None) extends 
GraphElement {
 
-  if (!props.containsKey(LabelMeta.timeStampSeq)) throw new 
Exception("Timestamp is required.")
-  //  assert(propsWithTs.containsKey(LabelMeta.timeStampSeq))
-  val schemaVer = label.schemaVersion
-  val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong
+  lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
+//  if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp 
is required.")
+  //  assert(propsWithTs.contains(LabelMeta.timeStampSeq))
+  lazy val schemaVer = label.schemaVersion
+  lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.value match {
+    case b: BigDecimal => b.longValue()
+    case l: Long => l
+    case i: Int => i.toLong
+    case _ => throw new RuntimeException("ts should be in 
[BigDecimal/Long/Int].")
+  }
+  //FIXME
+  lazy val tsInnerVal = tsInnerValOpt.get.value
+//    propsWithTs(LabelMeta.timestamp).innerVal.value
 
+//  lazy val label = Label.findById(labelWithDir.labelId)
   lazy val srcId = srcVertex.innerIdVal
   lazy val tgtId = tgtVertex.innerIdVal
   lazy val labelName = label.label
-  lazy val direction = GraphUtil.fromDirection(labelWithDir.dir)
+  lazy val direction = GraphUtil.fromDirection(dir)
   lazy val properties = toProps()
 
   def props = propsWithTs.mapValues(_.innerVal)
@@ -165,7 +183,7 @@ case class Edge(srcVertex: Vertex,
     for {
       (labelMeta, defaultVal) <- label.metaPropsDefaultMapInner
     } yield {
-      labelMeta.name -> propsWithTs.getOrElse(labelMeta.seq, 
defaultVal).innerVal.value
+      labelMeta.name -> propsWithTs.getOrElse(labelMeta, 
defaultVal).innerVal.value
     }
   }
 
@@ -174,8 +192,9 @@ case class Edge(srcVertex: Vertex,
       val skipReverse = 
label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
       if (skipReverse) List(this) else List(this, duplicateEdge)
     } else {
-      val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
-      val base = copy(labelWithDir = outDir)
+//      val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
+//      val base = copy(labelWithDir = outDir)
+      val base = copy(dir = GraphUtil.directions("out"))
       List(base, base.reverseSrcTgtEdge)
     }
   }
@@ -202,12 +221,11 @@ case class Edge(srcVertex: Vertex,
 
   def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
 
-  def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
+//  def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
+  def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir))
 
   def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
 
-  def label = Label.findById(labelWithDir.labelId)
-
   def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
 
   override def serviceName = label.serviceName
@@ -218,32 +236,32 @@ case class Edge(srcVertex: Vertex,
 
   override def isAsync = label.isAsync
 
-  def isDegree = propsWithTs.contains(LabelMeta.degreeSeq)
+  def isDegree = propsWithTs.contains(LabelMeta.degree)
 
 //  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
 //    case Some(_) => props
 //    case None => props ++ Map(LabelMeta.timeStampSeq -> 
InnerVal.withLong(ts, schemaVer))
 //  }
 
-  def propsPlusTsValid = propsWithTs.filter(kv => kv._1 >= 0)
+  def propsPlusTsValid = propsWithTs.filter(kv => 
LabelMeta.isValidSeq(kv._1.seq))
 
   def edgesWithIndex = for (labelOrder <- labelOrders) yield {
-    IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, 
propsWithTs)
+    IndexEdge(srcVertex, tgtVertex, label, dir, op, version, labelOrder.seq, 
propsWithTs, tsInnerValOpt = tsInnerValOpt)
   }
 
   def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
-    IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, 
propsPlusTsValid)
+    IndexEdge(srcVertex, tgtVertex, label, dir, op, version, labelOrder.seq, 
propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
   }
 
   /** force direction as out on invertedEdge */
   def toSnapshotEdge: SnapshotEdge = {
     val (smaller, larger) = (srcForVertex, tgtForVertex)
 
-    val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, 
GraphUtil.directions("out"))
+//    val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, 
GraphUtil.directions("out"))
 
-    val ret = SnapshotEdge(smaller, larger, newLabelWithDir, op, version,
-      Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, 
schemaVer), ts)) ++ propsWithTs,
-      pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = 
lockTs)
+    val ret = SnapshotEdge(smaller, larger, label, 
GraphUtil.directions("out"), op, version,
+      Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(ts, 
schemaVer), ts)) ++ propsWithTs,
+      pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = 
lockTs, tsInnerValOpt = tsInnerValOpt)
     ret
   }
 
@@ -259,16 +277,20 @@ case class Edge(srcVertex: Vertex,
     case _ => false
   }
 
-  def propsWithName = for {
-    (seq, v) <- props
-    meta <- label.metaPropsMap.get(seq) if seq > 0
-    jsValue <- innerValToJsValue(v, meta.dataType)
-  } yield meta.name -> jsValue
+  def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), 
"to" -> tgtVertex.innerId.toString(),
+    "label" -> label.label, "service" -> label.serviceName)
+
+  def propsWithName =
+    for {
+      (meta, v) <- props if meta.seq > 0
+      jsValue <- innerValToJsValue(v, meta.dataType)
+    } yield meta.name -> jsValue
+
 
   def updateTgtVertex(id: InnerValLike) = {
     val newId = TargetVertexId(tgtVertex.id.colId, id)
     val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props)
-    Edge(srcVertex, newTgtVertex, labelWithDir, op, version, propsWithTs)
+    Edge(srcVertex, newTgtVertex, label, dir, op, version, propsWithTs, 
tsInnerValOpt = tsInnerValOpt)
   }
 
   def rank(r: RankParam): Double =
@@ -277,20 +299,15 @@ case class Edge(srcVertex: Vertex,
       var sum: Double = 0
 
       for ((seq, w) <- r.keySeqAndWeights) {
-        seq match {
-          case LabelMeta.countSeq => sum += 1
-          case _ => {
-            propsWithTs.get(seq) match {
-              case None => // do nothing
-              case Some(innerValWithTs) => {
-                val cost = try innerValWithTs.innerVal.toString.toDouble catch 
{
-                  case e: Exception =>
-                    logger.error("toInnerval failed in rank", e)
-                    1.0
-                }
-                sum += w * cost
-              }
+        propsWithTs.get(seq) match {
+          case None => // do nothing
+          case Some(innerValWithTs) => {
+            val cost = try innerValWithTs.innerVal.toString.toDouble catch {
+              case e: Exception =>
+                logger.error("toInnerval failed in rank", e)
+                1.0
             }
+            sum += w * cost
           }
         }
       }
@@ -298,35 +315,8 @@ case class Edge(srcVertex: Vertex,
     }
 
   def toLogString: String = {
-    val ret =
-      if (propsWithName.nonEmpty)
-        List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, 
tgtVertex.innerId, label.label, Json.toJson(propsWithName)).map(_.toString)
-      else
-        List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, 
tgtVertex.innerId, label.label).map(_.toString)
-
-    ret.mkString("\t")
-  }
-
-  def selectValues(selectColumns: Seq[String],
-                   useToString: Boolean = true,
-                   score: Double = 0.0): Seq[Option[Any]] = {
-    //TODO: Option should be matched in JsonParser anyTo*
-    for {
-      selectColumn <- selectColumns
-    } yield {
-      val valueOpt = selectColumn match {
-        case LabelMeta.from.name | "from" => Option(srcId)
-        case LabelMeta.to.name | "to" => Option(tgtId)
-        case "label" => Option(labelName)
-        case "direction" => Option(direction)
-        case "score" => Option(score)
-        case LabelMeta.timestamp.name | "timestamp" => Option(ts)
-        case _ =>
-          properties.get(selectColumn)
-      }
-      if (useToString) valueOpt.map(_.toString)
-      else valueOpt
-    }
+    val allPropsWithName = defaultPropsWithName ++ 
Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
+    List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, 
label.label, allPropsWithName).mkString("\t")
   }
 }
 
@@ -373,17 +363,17 @@ object Edge {
     val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
     val op = GraphUtil.toOp(operation).getOrElse(throw new 
RuntimeException(s"$operation is not supported."))
 
-    new Edge(srcVertex, tgtVertex, labelWithDir, op = op, version = ts, 
propsWithTs = propsWithTs)
+    new Edge(srcVertex, tgtVertex, label, dir, op = op, version = ts, 
propsWithTs = propsWithTs)
   }
 
   /** now version information is required also **/
-  type State = Map[Byte, InnerValLikeWithTs]
+  type State = Map[LabelMeta, InnerValLikeWithTs]
   type PropsPairWithTs = (State, State, Long, String)
   type MergeState = PropsPairWithTs => (State, Boolean)
   type UpdateFunc = (Option[Edge], Edge, MergeState)
 
-  def allPropsDeleted(props: Map[Byte, InnerValLikeWithTs]): Boolean =
-    if (!props.containsKey(LabelMeta.lastDeletedAt)) false
+  def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean =
+    if (!props.contains(LabelMeta.lastDeletedAt)) false
     else {
       val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts
       val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt
@@ -398,14 +388,14 @@ object Edge {
     val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => 
relEdge.edgesWithIndexValid }
     val edgeInverted = Option(requestEdge.toSnapshotEdge)
 
-    (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, edgeInverted))
+    (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, 
newSnapshotEdge = edgeInverted))
   }
 
   def buildOperation(invertedEdge: Option[Edge], requestEdges: Seq[Edge]): 
(Edge, EdgeMutate) = {
     //            logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
     //            logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
     val oldPropsWithTs =
-      if (invertedEdge.isEmpty) Map.empty[Byte, InnerValLikeWithTs] else 
invertedEdge.get.propsWithTs
+      if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs] else 
invertedEdge.get.propsWithTs
 
     val funcs = requestEdges.map { edge =>
       if (edge.op == GraphUtil.operations("insert")) {
@@ -438,17 +428,18 @@ object Edge {
       for {
         (requestEdge, func) <- requestWithFuncs
       } {
-        val (_newPropsWithTs, _) = func((prevPropsWithTs, 
requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer))
+        val (_newPropsWithTs, _) = func(prevPropsWithTs, 
requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer)
         prevPropsWithTs = _newPropsWithTs
         //        
logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
       }
       val requestTs = requestEdge.ts
-      /* version should be monotoniously increasing so our RPC mutation should 
be applied safely */
+      /** version should be monotoniously increasing so our RPC mutation 
should be applied safely */
       val newVersion = invertedEdge.map(e => e.version + 
incrementVersion).getOrElse(requestTs)
       val maxTs = prevPropsWithTs.map(_._2.ts).max
       val newTs = if (maxTs > requestTs) maxTs else requestTs
       val propsWithTs = prevPropsWithTs ++
-        Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.label.schemaVersion), 
newTs))
+        Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, 
requestEdge.label.schemaVersion), newTs))
+
       val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, 
oldPropsWithTs, propsWithTs)
 
       //      logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
@@ -460,14 +451,14 @@ object Edge {
   def buildMutation(snapshotEdgeOpt: Option[Edge],
                     requestEdge: Edge,
                     newVersion: Long,
-                    oldPropsWithTs: Map[Byte, InnerValLikeWithTs],
-                    newPropsWithTs: Map[Byte, InnerValLikeWithTs]): EdgeMutate 
= {
+                    oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs],
+                    newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): 
EdgeMutate = {
+
     if (oldPropsWithTs == newPropsWithTs) {
       // all requests should be dropped. so empty mutation.
-      //      logger.error(s"Case 1")
       EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = 
None)
     } else {
-      val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != 
LabelMeta.lastDeletedAt)
+      val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != 
LabelMeta.lastDeletedAtSeq)
       val newOp = snapshotEdgeOpt match {
         case None => requestEdge.op
         case Some(old) =>
@@ -479,26 +470,29 @@ object Edge {
       val newSnapshotEdgeOpt =
         Option(requestEdge.copy(op = newOp, propsWithTs = newPropsWithTs, 
version = newVersion).toSnapshotEdge)
       // delete request must always update snapshot.
-      if (withOutDeletedAt == oldPropsWithTs && 
newPropsWithTs.containsKey(LabelMeta.lastDeletedAt)) {
+      if (withOutDeletedAt == oldPropsWithTs && 
newPropsWithTs.contains(LabelMeta.lastDeletedAt)) {
         // no mutation on indexEdges. only snapshotEdge should be updated to 
record lastDeletedAt.
-        //        logger.error(s"Case 2")
         EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = 
newSnapshotEdgeOpt)
       } else {
-        //        logger.error(s"Case 3")
         val edgesToDelete = snapshotEdgeOpt match {
           case Some(snapshotEdge) if snapshotEdge.op != 
GraphUtil.operations("delete") =>
-            snapshotEdge.copy(op = GraphUtil.defaultOpByte).
-              relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+            snapshotEdge.copy(op = GraphUtil.defaultOpByte)
+              .relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
           case _ => Nil
         }
 
         val edgesToInsert =
           if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
           else
-            requestEdge.copy(version = newVersion, propsWithTs = 
newPropsWithTs, op = GraphUtil.defaultOpByte).
-              relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
-
-        EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = 
edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt)
+            requestEdge.copy(
+              version = newVersion,
+              propsWithTs = newPropsWithTs,
+              op = GraphUtil.defaultOpByte
+            ).relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+
+        EdgeMutate(edgesToDelete = edgesToDelete,
+          edgesToInsert = edgesToInsert,
+          newSnapshotEdge = newSnapshotEdgeOpt)
       }
     }
   }
@@ -520,7 +514,7 @@ object Edge {
 
         case None =>
           assert(oldValWithTs.ts >= lastDeletedAt)
-          if (oldValWithTs.ts >= requestTs || k < 0) Some(k -> oldValWithTs)
+          if (oldValWithTs.ts >= requestTs || k.seq < 0) Some(k -> 
oldValWithTs)
           else {
             shouldReplace = true
             None
@@ -575,7 +569,7 @@ object Edge {
     val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
       propsWithTs.get(k) match {
         case Some(newValWithTs) =>
-          if (k == LabelMeta.timeStampSeq) {
+          if (k == LabelMeta.timestamp) {
             val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
             else {
               shouldReplace = true
@@ -625,7 +619,7 @@ object Edge {
       }
     }
     val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
-      if (k == LabelMeta.timeStampSeq) {
+      if (k == LabelMeta.timestamp) {
         if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs)
         else {
           shouldReplace = true

Reply via email to