[S2GRAPH-122]: Change data types of Edge/IndexEdge/SnapshotEdge. JIRA: [S2GRAPH-122] https://issues.apache.org/jira/browse/S2GRAPH-122
Pull Request: Closes #97 Authors DO YUNG YOON: [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/66bdf1bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/66bdf1bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/66bdf1bc Branch: refs/heads/master Commit: 66bdf1bc09b1a976a8d9d0f07b6af572a6074aae Parents: 8dbb9a3 Author: DO YUNG YOON <[email protected]> Authored: Wed Nov 16 17:32:02 2016 +0100 Committer: DO YUNG YOON <[email protected]> Committed: Wed Nov 16 17:32:02 2016 +0100 ---------------------------------------------------------------------- CHANGES | 2 + dev_support/docker-compose.yml | 5 - dev_support/graph_mysql/schema.sql | 248 ++-- .../loader/subscriber/GraphSubscriber.scala | 145 --- .../loader/subscriber/TransferToHFile.scala | 28 +- .../scala/org/apache/s2graph/core/Edge.scala | 266 ++-- .../scala/org/apache/s2graph/core/Graph.scala | 1164 ++++++++++++++---- .../apache/s2graph/core/GraphExceptions.scala | 24 +- .../org/apache/s2graph/core/GraphUtil.scala | 8 +- .../org/apache/s2graph/core/Management.scala | 97 +- .../org/apache/s2graph/core/PostProcess.scala | 195 ++- .../org/apache/s2graph/core/QueryParam.scala | 575 +++------ .../org/apache/s2graph/core/QueryResult.scala | 328 ++--- .../scala/org/apache/s2graph/core/Vertex.scala | 59 - .../org/apache/s2graph/core/mysqls/Bucket.scala | 26 +- .../apache/s2graph/core/mysqls/Experiment.scala | 13 +- .../org/apache/s2graph/core/mysqls/Label.scala | 203 ++- .../apache/s2graph/core/mysqls/LabelIndex.scala | 13 +- .../apache/s2graph/core/mysqls/LabelMeta.scala | 64 +- .../org/apache/s2graph/core/mysqls/Model.scala | 37 +- .../apache/s2graph/core/mysqls/Service.scala | 15 +- .../s2graph/core/parsers/WhereParser.scala | 103 +- .../s2graph/core/rest/RequestParser.scala | 334 ++--- .../apache/s2graph/core/rest/RestHandler.scala | 118 +- .../apache/s2graph/core/storage/SKeyValue.scala | 12 +- .../apache/s2graph/core/storage/Storage.scala | 771 ++++-------- .../core/storage/StorageDeserializable.scala | 35 +- .../core/storage/StorageSerializable.scala | 24 +- .../core/storage/hbase/AsynchbaseStorage.scala | 454 ++++--- .../tall/IndexEdgeDeserializable.scala | 222 ++-- .../indexedge/tall/IndexEdgeSerializable.scala | 26 +- .../wide/IndexEdgeDeserializable.scala | 230 ++-- .../indexedge/wide/IndexEdgeSerializable.scala | 13 +- .../tall/SnapshotEdgeDeserializable.scala | 30 +- .../tall/SnapshotEdgeSerializable.scala | 4 +- .../wide/SnapshotEdgeDeserializable.scala | 28 +- .../wide/SnapshotEdgeSerializable.scala | 4 +- .../serde/vertex/VertexDeserializable.scala | 20 +- .../serde/vertex/VertexSerializable.scala | 15 +- .../apache/s2graph/core/types/HBaseType.scala | 5 +- .../s2graph/core/types/InnerValLike.scala | 15 +- .../apache/s2graph/core/utils/DeferCache.scala | 5 +- .../apache/s2graph/core/utils/Extentions.scala | 65 +- .../s2graph/core/utils/SafeUpdateCache.scala | 9 +- .../org/apache/s2graph/core/EdgeTest.scala | 48 +- .../s2graph/core/Integrate/CrudTest.scala | 6 +- .../core/Integrate/IntegrateCommon.scala | 20 +- .../s2graph/core/Integrate/QueryTest.scala | 530 ++++---- .../core/Integrate/WeakLabelDeleteTest.scala | 6 +- .../apache/s2graph/core/ManagementTest.scala | 117 +- .../apache/s2graph/core/QueryParamTest.scala | 316 ++--- .../s2graph/core/TestCommonWithModels.scala | 8 + .../apache/s2graph/core/models/ModelTest.scala | 90 -- .../s2graph/core/mysqls/ExperimentSpec.scala | 83 -- .../s2graph/core/parsers/WhereParserTest.scala | 168 +-- .../core/storage/hbase/IndexEdgeTest.scala | 31 +- .../org/apache/s2graph/rest/netty/Server.scala | 63 +- .../rest/play/controllers/AdminController.scala | 2 +- .../rest/play/controllers/EdgeController.scala | 6 +- 59 files changed, 4071 insertions(+), 3480 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 73b3597..01d1cc8 100644 --- a/CHANGES +++ b/CHANGES @@ -95,6 +95,8 @@ Release 0.1.0 - unreleased S2GRAPH-127: Refactor ExceptionHander Object into Class (Committed by DOYUNG YOON). S2GRAPH-121: Create `Result` class to hold traverse result edges (Committed by DOYUNG YOON). + + S2GRAPH-122: Change data types of Edge/IndexEdge/SnapshotEdge (Committed by DOYUNG YOON). BUG FIXES http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/dev_support/docker-compose.yml ---------------------------------------------------------------------- diff --git a/dev_support/docker-compose.yml b/dev_support/docker-compose.yml index 5531fde..fb95a39 100644 --- a/dev_support/docker-compose.yml +++ b/dev_support/docker-compose.yml @@ -13,11 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -graph: - image: s2rest_play:latest - container_name: graph - net: container:graph_hbase - graph_mysql: build: graph_mysql container_name: graph_mysql http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/dev_support/graph_mysql/schema.sql ---------------------------------------------------------------------- diff --git a/dev_support/graph_mysql/schema.sql b/dev_support/graph_mysql/schema.sql index 862062b..df283a8 100644 --- a/dev_support/graph_mysql/schema.sql +++ b/dev_support/graph_mysql/schema.sql @@ -35,17 +35,17 @@ SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- DROP TABLE IF EXISTS `services`; CREATE TABLE `services` ( - `id` integer NOT NULL AUTO_INCREMENT, - `service_name` varchar(64) NOT NULL, - `access_token` varchar(64) NOT NULL, - `cluster` varchar(255) NOT NULL, - `hbase_table_name` varchar(255) NOT NULL, - `pre_split_size` integer NOT NULL default 0, - `hbase_table_ttl` integer, - PRIMARY KEY (`id`), - UNIQUE KEY `ux_service_name` (`service_name`), - INDEX `idx_access_token` (`access_token`), - INDEX `idx_cluster` (cluster(75)) + `id` integer NOT NULL AUTO_INCREMENT, + `service_name` varchar(64) NOT NULL, + `access_token` varchar(64) NOT NULL, + `cluster` varchar(255) NOT NULL, + `hbase_table_name` varchar(255) NOT NULL, + `pre_split_size` integer NOT NULL default 0, + `hbase_table_ttl` integer, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_service_name` (`service_name`), + INDEX `idx_access_token` (`access_token`), + INDEX `idx_cluster` (cluster(75)) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; @@ -54,13 +54,13 @@ CREATE TABLE `services` ( -- ---------------------------- DROP TABLE IF EXISTS `service_columns`; CREATE TABLE `service_columns` ( - `id` integer NOT NULL AUTO_INCREMENT, - `service_id` integer NOT NULL, - `column_name` varchar(64) NOT NULL, - `column_type` varchar(8) NOT NULL, - `schema_version` varchar(8) NOT NULL default 'v2', - PRIMARY KEY (`id`), - UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`) + `id` integer NOT NULL AUTO_INCREMENT, + `service_id` integer NOT NULL, + `column_name` varchar(64) NOT NULL, + `column_type` varchar(8) NOT NULL, + `schema_version` varchar(8) NOT NULL default 'v2', + PRIMARY KEY (`id`), + UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ALTER TABLE service_columns add FOREIGN KEY(service_id) REFERENCES services(id) ON DELETE CASCADE; @@ -71,14 +71,14 @@ ALTER TABLE service_columns add FOREIGN KEY(service_id) REFERENCES services(id) -- ---------------------------- DROP TABLE IF EXISTS `column_metas`; CREATE TABLE `column_metas` ( - `id` integer NOT NULL AUTO_INCREMENT, - `column_id` integer NOT NULL, - `name` varchar(64) NOT NULL, - `seq` tinyint NOT NULL, - `data_type` varchar(8) NOT NULL DEFAULT 'string', - PRIMARY KEY (`id`), - UNIQUE KEY `ux_column_id_name` (`column_id`, `name`), - INDEX `idx_column_id_seq` (`column_id`, `seq`) + `id` integer NOT NULL AUTO_INCREMENT, + `column_id` integer NOT NULL, + `name` varchar(64) NOT NULL, + `seq` tinyint NOT NULL, + `data_type` varchar(8) NOT NULL DEFAULT 'string', + PRIMARY KEY (`id`), + UNIQUE KEY `ux_column_id_name` (`column_id`, `name`), + INDEX `idx_column_id_seq` (`column_id`, `seq`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ALTER TABLE column_metas ADD FOREIGN KEY(column_id) REFERENCES service_columns(id) ON DELETE CASCADE; @@ -89,32 +89,33 @@ ALTER TABLE column_metas ADD FOREIGN KEY(column_id) REFERENCES service_columns(i DROP TABLE IF EXISTS `labels`; CREATE TABLE `labels` ( - `id` integer NOT NULL AUTO_INCREMENT, - `label` varchar(64) NOT NULL, - `src_service_id` integer NOT NULL, - `src_column_name` varchar(64) NOT NULL, - `src_column_type` varchar(8) NOT NULL, - `tgt_service_id` integer NOT NULL, - `tgt_column_name` varchar(64) NOT NULL, - `tgt_column_type` varchar(8) NOT NULL, - `is_directed` tinyint NOT NULL DEFAULT 1, - `service_name` varchar(64), - `service_id` integer NOT NULL, - `consistency_level` varchar(8) NOT NULL DEFAULT 'weak', - `hbase_table_name` varchar(255) NOT NULL DEFAULT 's2graph', - `hbase_table_ttl` integer, - `schema_version` varchar(8) NOT NULL default 'v2', - `is_async` tinyint(4) NOT NULL default '0', - `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4', - `options` text, - PRIMARY KEY (`id`), - UNIQUE KEY `ux_label` (`label`), - INDEX `idx_src_column_name` (`src_column_name`), - INDEX `idx_tgt_column_name` (`tgt_column_name`), - INDEX `idx_src_service_id` (`src_service_id`), - INDEX `idx_tgt_service_id` (`tgt_service_id`), - INDEX `idx_service_name` (`service_name`), - INDEX `idx_service_id` (`service_id`) + `id` integer NOT NULL AUTO_INCREMENT, + `label` varchar(128) NOT NULL, + `src_service_id` integer NOT NULL, + `src_column_name` varchar(64) NOT NULL, + `src_column_type` varchar(8) NOT NULL, + `tgt_service_id` integer NOT NULL, + `tgt_column_name` varchar(64) NOT NULL, + `tgt_column_type` varchar(8) NOT NULL, + `is_directed` tinyint NOT NULL DEFAULT 1, + `service_name` varchar(64), + `service_id` integer NOT NULL, + `consistency_level` varchar(8) NOT NULL DEFAULT 'weak', + `hbase_table_name` varchar(255) NOT NULL DEFAULT 's2graph', + `hbase_table_ttl` integer, + `schema_version` varchar(8) NOT NULL default 'v2', + `is_async` tinyint(4) NOT NULL default '0', + `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4', + `options` text, + `deleted_at` datetime DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_label` (`label`), + INDEX `idx_src_column_name` (`src_column_name`), + INDEX `idx_tgt_column_name` (`tgt_column_name`), + INDEX `idx_src_service_id` (`src_service_id`), + INDEX `idx_tgt_service_id` (`tgt_service_id`), + INDEX `idx_service_name` (`service_name`), + INDEX `idx_service_id` (`service_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ALTER TABLE labels add FOREIGN KEY(service_id) REFERENCES services(id); @@ -126,16 +127,16 @@ ALTER TABLE labels add FOREIGN KEY(service_id) REFERENCES services(id); -- ---------------------------- DROP TABLE IF EXISTS `label_metas`; CREATE TABLE `label_metas` ( - `id` integer NOT NULL AUTO_INCREMENT, - `label_id` integer NOT NULL, - `name` varchar(64) NOT NULL, - `seq` tinyint NOT NULL, - `default_value` varchar(64) NOT NULL, - `data_type` varchar(8) NOT NULL DEFAULT 'long', - `used_in_index` tinyint NOT NULL DEFAULT 0, - PRIMARY KEY (`id`), - UNIQUE KEY `ux_label_id_name` (`label_id`, `name`), - INDEX `idx_label_id_seq` (`label_id`, `seq`) + `id` integer NOT NULL AUTO_INCREMENT, + `label_id` integer NOT NULL, + `name` varchar(64) NOT NULL, + `seq` tinyint NOT NULL, + `default_value` varchar(64) NOT NULL, + `data_type` varchar(8) NOT NULL DEFAULT 'long', + `used_in_index` tinyint NOT NULL DEFAULT 0, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_label_id_name` (`label_id`, `name`), + INDEX `idx_label_id_seq` (`label_id`, `seq`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ALTER TABLE label_metas ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELETE CASCADE; @@ -146,15 +147,18 @@ ALTER TABLE label_metas ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELET -- ---------------------------- DROP TABLE IF EXISTS `label_indices`; CREATE TABLE `label_indices` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `label_id` int(11) NOT NULL, - `name` varchar(64) NOT NULL DEFAULT '_PK', - `seq` tinyint(4) NOT NULL, - `meta_seqs` varchar(64) NOT NULL, - `formulars` varchar(255) DEFAULT NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`), - UNIQUE KEY `ux_label_id_name` (`label_id`,`name`) + `id` int(11) NOT NULL AUTO_INCREMENT, + `label_id` int(11) NOT NULL, + `name` varchar(64) NOT NULL DEFAULT '_PK', + `seq` tinyint(4) NOT NULL, + `meta_seqs` varchar(64) NOT NULL, + `formulars` varchar(255) DEFAULT NULL, + `dir` int DEFAULT NULL, + `options` text, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`), + UNIQUE KEY `ux_label_id_name` (`label_id`,`name`), + UNIQUE KEY `ux_label_id_meta_seqs_dir` (`label_id`,`meta_seqs`,`dir`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ALTER TABLE label_indices ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELETE CASCADE; @@ -165,15 +169,15 @@ ALTER TABLE label_indices ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DEL -- ---------------------------- DROP TABLE IF EXISTS `experiments`; CREATE TABLE `experiments` ( - `id` integer NOT NULL AUTO_INCREMENT, - `service_id` integer NOT NULL, - `service_name` varchar(128) NOT NULL, - `name` varchar(64) NOT NULL, - `description` varchar(255) NOT NULL, - `experiment_type` varchar(8) NOT NULL DEFAULT 'u', - `total_modular` int NOT NULL DEFAULT 100, - PRIMARY KEY (`id`), - UNIQUE KEY `ux_service_id_name` (`service_id`, `name`) + `id` integer NOT NULL AUTO_INCREMENT, + `service_id` integer NOT NULL, + `service_name` varchar(128) NOT NULL, + `name` varchar(64) NOT NULL, + `description` varchar(255) NOT NULL, + `experiment_type` varchar(8) NOT NULL DEFAULT 'u', + `total_modular` int NOT NULL DEFAULT 100, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_service_id_name` (`service_id`, `name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; -- ALTER TABLE experiments ADD FOREIGN KEY(service_id) REFERENCES service(id) ON DELETE CASCADE; @@ -182,23 +186,24 @@ CREATE TABLE `experiments` ( -- ---------------------------- -- Table structure for `buckets` -- ---------------------------- +DROP TABLE IF EXISTS `buckets`; CREATE TABLE `buckets` ( - `id` integer NOT NULL AUTO_INCREMENT, - `experiment_id` integer NOT NULL, - `modular` varchar(64) NOT NULL, - `http_verb` varchar(8) NOT NULL, - `api_path` text NOT NULL, - `uuid_key` varchar(128), - `uuid_placeholder` varchar(64), - `request_body` text NOT NULL, - `timeout` int NOT NULL DEFAULT 1000, - `impression_id` varchar(64) NOT NULL, - `is_graph_query` tinyint NOT NULL DEFAULT 1, - `is_empty` tinyint NOT NULL DEFAULT 0, - PRIMARY KEY (`id`), - UNIQUE KEY `ux_impression_id` (`impression_id`), - INDEX `idx_experiment_id` (`experiment_id`), - INDEX `idx_impression_id` (`impression_id`) + `id` integer NOT NULL AUTO_INCREMENT, + `experiment_id` integer NOT NULL, + `modular` varchar(64) NOT NULL, + `http_verb` varchar(8) NOT NULL, + `api_path` text NOT NULL, + `uuid_key` varchar(128), + `uuid_placeholder` varchar(64), + `request_body` text NOT NULL, + `timeout` int NOT NULL DEFAULT 1000, + `impression_id` varchar(64) NOT NULL, + `is_graph_query` tinyint NOT NULL DEFAULT 1, + `is_empty` tinyint NOT NULL DEFAULT 0, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_impression_id` (`impression_id`), + INDEX `idx_experiment_id` (`experiment_id`), + INDEX `idx_impression_id` (`impression_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; SET FOREIGN_KEY_CHECKS = 1; @@ -209,29 +214,30 @@ SET FOREIGN_KEY_CHECKS = 1; -- ---------------------------- DROP TABLE IF EXISTS `counter`; CREATE TABLE `counter` ( - `id` int(11) unsigned NOT NULL AUTO_INCREMENT, - `use_flag` tinyint(1) NOT NULL DEFAULT '0', - `version` smallint(1) NOT NULL DEFAULT '1', - `service` varchar(64) NOT NULL DEFAULT '', - `action` varchar(64) NOT NULL DEFAULT '', - `item_type` int(11) NOT NULL DEFAULT '0', - `auto_comb` tinyint(1) NOT NULL DEFAULT '1', - `dimension` varchar(1024) NOT NULL, - `use_profile` tinyint(1) NOT NULL DEFAULT '0', - `bucket_imp_id` varchar(64) DEFAULT NULL, - `use_exact` tinyint(1) NOT NULL DEFAULT '1', - `use_rank` tinyint(1) NOT NULL DEFAULT '1', - `ttl` int(11) NOT NULL DEFAULT '172800', - `daily_ttl` int(11) DEFAULT NULL, - `hbase_table` varchar(1024) DEFAULT NULL, - `interval_unit` varchar(1024) DEFAULT NULL, - `rate_action_id` int(11) unsigned DEFAULT NULL, - `rate_base_id` int(11) unsigned DEFAULT NULL, - `rate_threshold` int(11) DEFAULT NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `svc` (`service`,`action`), - KEY `rate_action_id` (`rate_action_id`), - KEY `rate_base_id` (`rate_base_id`), - CONSTRAINT `rate_action_id` FOREIGN KEY (`rate_action_id`) REFERENCES `counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION, - CONSTRAINT `rate_base_id` FOREIGN KEY (`rate_base_id`) REFERENCES `counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION + `id` int(11) unsigned NOT NULL AUTO_INCREMENT, + `use_flag` tinyint(1) NOT NULL DEFAULT '0', + `version` smallint(1) NOT NULL DEFAULT '1', + `service` varchar(64) NOT NULL DEFAULT '', + `action` varchar(64) NOT NULL DEFAULT '', + `item_type` int(11) NOT NULL DEFAULT '0', + `auto_comb` tinyint(1) NOT NULL DEFAULT '1', + `dimension` varchar(1024) NOT NULL, + `use_profile` tinyint(1) NOT NULL DEFAULT '0', + `bucket_imp_id` varchar(64) DEFAULT NULL, + `use_exact` tinyint(1) NOT NULL DEFAULT '1', + `use_rank` tinyint(1) NOT NULL DEFAULT '1', + `ttl` int(11) NOT NULL DEFAULT '172800', + `daily_ttl` int(11) DEFAULT NULL, + `hbase_table` varchar(1024) DEFAULT NULL, + `interval_unit` varchar(1024) DEFAULT NULL, + `rate_action_id` int(11) unsigned DEFAULT NULL, + `rate_base_id` int(11) unsigned DEFAULT NULL, + `rate_threshold` int(11) DEFAULT NULL, + `top_k` int(11) DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `svc` (`service`,`action`), + KEY `rate_action_id` (`rate_action_id`), + KEY `rate_base_id` (`rate_base_id`), + CONSTRAINT `rate_action_id` FOREIGN KEY (`rate_action_id`) REFERENCES `counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION, + CONSTRAINT `rate_base_id` FOREIGN KEY (`rate_base_id`) REFERENCES `counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala index 9e9fe4c..05aed34 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala @@ -207,148 +207,3 @@ object GraphSubscriberHelper extends WithKafka { } } -//object GraphSubscriber extends SparkApp with WithKafka { -// val sleepPeriod = 5000 -// val usages = -// s""" -// |/** -// |* this job read edge format(TSV) from HDFS file system then bulk load edges into s2graph. assumes that newLabelName is already created by API. -// |* params: -// |* 0. hdfsPath: where is your data in hdfs. require full path with hdfs:// predix -// |* 1. dbUrl: jdbc database connection string to specify database for meta. -// |* 2. labelMapping: oldLabel:newLabel delimited by , -// |* 3. zkQuorum: target hbase zkQuorum where this job will publish data to. -// |* 4. hTableName: target hbase physical table name where this job will publish data to. -// |* 5. batchSize: how many edges will be batched for Put request to target hbase. -// |* 6. kafkaBrokerList: using kafka as fallback queue. when something goes wrong during batch, data needs to be replay will be stored in kafka. -// |* 7. kafkaTopic: fallback queue topic. -// |* 8. edgeAutoCreate: true if need to create reversed edge automatically. -// |* -// |* after this job finished, s2graph will have data with sequence corresponding newLabelName. -// |* change this newLabelName to ogirinalName if you want to online replace of label. -// |* -// |*/ -// """.stripMargin -// -// override def run() = { -// /** -// * Main function -// */ -// println(args.toList) -//// if (args.length != 10) { -//// System.err.println(usages) -//// System.exit(1) -//// } -// val hdfsPath = args(0) -// val dbUrl = args(1) -// val labelMapping = GraphSubscriberHelper.toLabelMapping(args(2)) -// -// val zkQuorum = args(3) -// val hTableName = args(4) -// val batchSize = args(5).toInt -// val kafkaBrokerList = args(6) -// val kafkaTopic = args(7) -// val edgeAutoCreate = args(8).toBoolean -// val vertexDegreePathOpt = if (args.length >= 10) GraphSubscriberHelper.toOption(args(9)) else None -// -// val conf = sparkConf(s"$hdfsPath: GraphSubscriber") -// val sc = new SparkContext(conf) -// val mapAcc = sc.accumulable(HashMap.empty[String, Long], "counter")(HashMapParam[String, Long](_ + _)) -// -// -// if (!GraphSubscriberHelper.isValidQuorum(zkQuorum)) throw new RuntimeException(s"$zkQuorum is not valid.") -// -// /** this job expect only one hTableName. all labels in this job will be stored in same physical hbase table */ -// try { -// -// import GraphSubscriberHelper._ -// // set local driver setting. -// val phase = System.getProperty("phase") -// GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) -// -// /** copy when oldLabel exist and newLabel done exist. otherwise ignore. */ -// -// if (labelMapping.isEmpty) { -// // pass -// } else { -// for { -// (oldLabelName, newLabelName) <- labelMapping -// } { -// Management.copyLabel(oldLabelName, newLabelName, toOption(hTableName)) -// } -// } -// -// vertexDegreePathOpt.foreach { vertexDegreePath => -// val vertexDegrees = sc.textFile(vertexDegreePath).filter(line => line.split("\t").length == 4).map { line => -// val tokens = line.split("\t") -// (tokens(0), tokens(1), tokens(2), tokens(3).toInt) -// } -// vertexDegrees.foreachPartition { partition => -// -// // init Graph -// val phase = System.getProperty("phase") -// GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) -// -// partition.grouped(batchSize).foreach { msgs => -// try { -// val start = System.currentTimeMillis() -// val counts = GraphSubscriberHelper.storeDegreeBulk(zkQuorum, hTableName)(msgs, labelMapping)(Some(mapAcc)) -// for ((k, v) <- counts) { -// mapAcc +=(k, v) -// } -// val duration = System.currentTimeMillis() - start -// println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $hTableName") -// } catch { -// case e: Throwable => -// println(s"[Failed]: store $e") -// -// msgs.foreach { msg => -// GraphSubscriberHelper.report(msg.toString(), Some(e.getMessage()), topic = kafkaTopic) -// } -// } -// } -// } -// } -// -// -// val msgs = sc.textFile(hdfsPath) -// msgs.foreachPartition(partition => { -// // set executor setting. -// val phase = System.getProperty("phase") -// GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) -// -// partition.grouped(batchSize).foreach { msgs => -// try { -// val start = System.currentTimeMillis() -// // val counts = -// // GraphSubscriberHelper.store(msgs, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc)) -// val counts = -// GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, labelMapping, edgeAutoCreate)(Some(mapAcc)) -// -// for ((k, v) <- counts) { -// mapAcc +=(k, v) -// } -// val duration = System.currentTimeMillis() - start -// println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $hTableName") -// } catch { -// case e: Throwable => -// println(s"[Failed]: store $e") -// -// msgs.foreach { msg => -// GraphSubscriberHelper.report(msg, Some(e.getMessage()), topic = kafkaTopic) -// } -// } -// } -// }) -// -// logInfo(s"counter: $mapAcc") -// println(s"Stats: ${mapAcc}") -// -// } catch { -// case e: Throwable => -// println(s"job failed with exception: $e") -// throw e -// } -// -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index f347ba9..9ebff03 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -84,11 +84,11 @@ object TransferToHFile extends SparkApp { } yield output } def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = { - val kvs = GraphSubscriberHelper.g.storage.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList + val kvs = GraphSubscriberHelper.g.getStorage(snapshotEdge.label).snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } } def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = { - val kvs = GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.toList + val kvs = GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.toList kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } } def buildDegreePutRequests(vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = { @@ -100,12 +100,11 @@ object TransferToHFile extends SparkApp { val vertex = Vertex(SourceVertexId(label.srcColumn.id.get, innerVal)) val ts = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) - val labelWithDir = LabelWithDirection(label.id.get, dir) - val edge = Edge(vertex, vertex, labelWithDir, propsWithTs=propsWithTs) + val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) + val edge = Edge(vertex, vertex, label, dir, propsWithTs=propsWithTs) edge.edgesWithIndex.flatMap { indexEdge => - GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.map { kv => + GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.map { kv => new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], Bytes.toBytes(degreeVal), kv.timestamp) } } @@ -158,7 +157,7 @@ object TransferToHFile extends SparkApp { val sc = new SparkContext(conf) - GraphSubscriberHelper.management.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm) + GraphSubscriberHelper.management.createStorageTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm) /* set up hbase init */ val hbaseConf = HBaseConfiguration.create() @@ -175,20 +174,7 @@ object TransferToHFile extends SparkApp { GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate) } - // - // val newRDD = if (!buildDegree) new HFileRDD(kvs) - // else { - // val degreeKVs = buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => - // agg + current - // }.mapPartitions { iter => - // val phase = System.getProperty("phase") - // GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") - // toKeyValues(iter.toSeq) - // } - // new HFileRDD(kvs ++ degreeKVs) - // } - // - // newRDD.toHFile(hbaseConf, zkQuorum, tableName, maxHFilePerResionServer, tmpPath) + val merged = if (!buildDegree) kvs else { kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala index 97730f3..657cfed 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala @@ -24,38 +24,39 @@ import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.logger -import play.api.libs.json.{JsNumber, Json} -import scala.collection.JavaConversions._ +import play.api.libs.json.{JsNumber, JsObject, Json} import scala.util.hashing.MurmurHash3 case class SnapshotEdge(srcVertex: Vertex, tgtVertex: Vertex, - labelWithDir: LabelWithDirection, + label: Label, + direction: Int, op: Byte, version: Long, - props: Map[Byte, InnerValLikeWithTs], + props: Map[LabelMeta, InnerValLikeWithTs], pendingEdgeOpt: Option[Edge], statusCode: Byte = 0, - lockTs: Option[Long]) { + lockTs: Option[Long], + tsInnerValOpt: Option[InnerValLike] = None) { - if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") + lazy val labelWithDir = LabelWithDirection(label.id.get, direction) + if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") - val label = Label.findById(labelWithDir.labelId) - val schemaVer = label.schemaVersion +// val label = Label.findById(labelWithDir.labelId) + lazy val schemaVer = label.schemaVersion lazy val propsWithoutTs = props.mapValues(_.innerVal) - val ts = props(LabelMeta.timeStampSeq).innerVal.toString().toLong + lazy val ts = props(LabelMeta.timestamp).innerVal.toString().toLong def toEdge: Edge = { - val ts = props.get(LabelMeta.timeStampSeq).map(v => v.ts).getOrElse(version) - Edge(srcVertex, tgtVertex, labelWithDir, op, + val ts = props.get(LabelMeta.timestamp).map(v => v.ts).getOrElse(version) + Edge(srcVertex, tgtVertex, label, direction, op, version, props, pendingEdgeOpt = pendingEdgeOpt, - statusCode = statusCode, lockTs = lockTs) + statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) } def propsWithName = (for { - (seq, v) <- props - meta <- label.metaPropsMap.get(seq) + (meta, v) <- props jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version)) @@ -67,17 +68,23 @@ case class SnapshotEdge(srcVertex: Vertex, case class IndexEdge(srcVertex: Vertex, tgtVertex: Vertex, - labelWithDir: LabelWithDirection, + label: Label, + direction: Int, op: Byte, version: Long, labelIndexSeq: Byte, - props: Map[Byte, InnerValLikeWithTs]) { - // if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") - // assert(props.containsKey(LabelMeta.timeStampSeq)) + props: Map[LabelMeta, InnerValLikeWithTs], + tsInnerValOpt: Option[InnerValLike] = None) { +// if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") + // assert(props.contains(LabelMeta.timeStampSeq)) + lazy val labelWithDir = LabelWithDirection(label.id.get, direction) + + lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in") + lazy val isOutEdge = !isInEdge + + lazy val ts = props(LabelMeta.timestamp).innerVal.toString.toLong + lazy val degreeEdge = props.contains(LabelMeta.degree) - lazy val ts = props(LabelMeta.timeStampSeq).innerVal.toString.toLong - lazy val degreeEdge = props.contains(LabelMeta.degreeSeq) - lazy val label = Label.findById(labelWithDir.labelId) lazy val schemaVer = label.schemaVersion lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta => @@ -85,33 +92,33 @@ case class IndexEdge(srcVertex: Vertex, meta.seq -> innerVal }.toMap - lazy val labelIndexMetaSeqs = labelIndex.metaSeqs + lazy val labelIndexMetaSeqs = labelIndex.sortKeyTypes /** TODO: make sure call of this class fill props as this assumes */ - lazy val orders = for (k <- labelIndexMetaSeqs) yield { - props.get(k) match { + lazy val orders = for (meta <- labelIndexMetaSeqs) yield { + props.get(meta) match { case None => - /* - * TODO: agly hack - * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once - */ - val v = k match { - case LabelMeta.timeStampSeq => InnerVal.withLong(version, schemaVer) - case LabelMeta.toSeq => tgtVertex.innerId - case LabelMeta.fromSeq => //srcVertex.innerId - // for now, it does not make sense to build index on srcVertex.innerId since all edges have same data. - throw new RuntimeException("_from on indexProps is not supported") - case _ => defaultIndexMetas(k) + /** + * TODO: agly hack + * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once + */ + val v = meta match { + case LabelMeta.timestamp=> InnerVal.withLong(version, schemaVer) + case LabelMeta.to => toEdge.tgtVertex.innerId + case LabelMeta.from => toEdge.srcVertex.innerId + // for now, it does not make sense to build index on srcVertex.innerId since all edges have same data. + // throw new RuntimeException("_from on indexProps is not supported") + case _ => toInnerVal(meta.defaultValue, meta.dataType, schemaVer) } - k -> v - case Some(v) => k -> v.innerVal + meta -> v + case Some(v) => meta -> v.innerVal } } lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet - lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v.innerVal + lazy val metas = for ((meta, v) <- props if !ordersKeyMap.contains(meta)) yield meta -> v.innerVal // lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } @@ -121,13 +128,12 @@ case class IndexEdge(srcVertex: Vertex, lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length def propsWithName = for { - (seq, v) <- props - meta <- label.metaPropsMap.get(seq) if seq >= 0 + (meta, v) <- props if meta.seq >= 0 jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue - def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, props) + def toEdge: Edge = Edge(srcVertex, tgtVertex, label, direction, op, version, props, tsInnerValOpt = tsInnerValOpt) // only for debug def toLogString() = { @@ -137,25 +143,37 @@ case class IndexEdge(srcVertex: Vertex, case class Edge(srcVertex: Vertex, tgtVertex: Vertex, - labelWithDir: LabelWithDirection, + label: Label, + dir: Int, op: Byte = GraphUtil.defaultOpByte, version: Long = System.currentTimeMillis(), - propsWithTs: Map[Byte, InnerValLikeWithTs], + propsWithTs: Map[LabelMeta, InnerValLikeWithTs], parentEdges: Seq[EdgeWithScore] = Nil, originalEdgeOpt: Option[Edge] = None, pendingEdgeOpt: Option[Edge] = None, statusCode: Byte = 0, - lockTs: Option[Long] = None) extends GraphElement { + lockTs: Option[Long] = None, + tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement { - if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") - // assert(propsWithTs.containsKey(LabelMeta.timeStampSeq)) - val schemaVer = label.schemaVersion - val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong + lazy val labelWithDir = LabelWithDirection(label.id.get, dir) +// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") + // assert(propsWithTs.contains(LabelMeta.timeStampSeq)) + lazy val schemaVer = label.schemaVersion + lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.value match { + case b: BigDecimal => b.longValue() + case l: Long => l + case i: Int => i.toLong + case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].") + } + //FIXME + lazy val tsInnerVal = tsInnerValOpt.get.value +// propsWithTs(LabelMeta.timestamp).innerVal.value +// lazy val label = Label.findById(labelWithDir.labelId) lazy val srcId = srcVertex.innerIdVal lazy val tgtId = tgtVertex.innerIdVal lazy val labelName = label.label - lazy val direction = GraphUtil.fromDirection(labelWithDir.dir) + lazy val direction = GraphUtil.fromDirection(dir) lazy val properties = toProps() def props = propsWithTs.mapValues(_.innerVal) @@ -165,7 +183,7 @@ case class Edge(srcVertex: Vertex, for { (labelMeta, defaultVal) <- label.metaPropsDefaultMapInner } yield { - labelMeta.name -> propsWithTs.getOrElse(labelMeta.seq, defaultVal).innerVal.value + labelMeta.name -> propsWithTs.getOrElse(labelMeta, defaultVal).innerVal.value } } @@ -174,8 +192,9 @@ case class Edge(srcVertex: Vertex, val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) if (skipReverse) List(this) else List(this, duplicateEdge) } else { - val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) - val base = copy(labelWithDir = outDir) +// val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) +// val base = copy(labelWithDir = outDir) + val base = copy(dir = GraphUtil.directions("out")) List(base, base.reverseSrcTgtEdge) } } @@ -202,12 +221,11 @@ case class Edge(srcVertex: Vertex, def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge - def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled) +// def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled) + def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir)) def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex) - def label = Label.findById(labelWithDir.labelId) - def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId) override def serviceName = label.serviceName @@ -218,32 +236,32 @@ case class Edge(srcVertex: Vertex, override def isAsync = label.isAsync - def isDegree = propsWithTs.contains(LabelMeta.degreeSeq) + def isDegree = propsWithTs.contains(LabelMeta.degree) // def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { // case Some(_) => props // case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) // } - def propsPlusTsValid = propsWithTs.filter(kv => kv._1 >= 0) + def propsPlusTsValid = propsWithTs.filter(kv => LabelMeta.isValidSeq(kv._1.seq)) def edgesWithIndex = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsWithTs) + IndexEdge(srcVertex, tgtVertex, label, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt) } def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTsValid) + IndexEdge(srcVertex, tgtVertex, label, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt) } /** force direction as out on invertedEdge */ def toSnapshotEdge: SnapshotEdge = { val (smaller, larger) = (srcForVertex, tgtForVertex) - val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) +// val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) - val ret = SnapshotEdge(smaller, larger, newLabelWithDir, op, version, - Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs, - pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs) + val ret = SnapshotEdge(smaller, larger, label, GraphUtil.directions("out"), op, version, + Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs, + pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) ret } @@ -259,16 +277,20 @@ case class Edge(srcVertex: Vertex, case _ => false } - def propsWithName = for { - (seq, v) <- props - meta <- label.metaPropsMap.get(seq) if seq > 0 - jsValue <- innerValToJsValue(v, meta.dataType) - } yield meta.name -> jsValue + def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(), + "label" -> label.label, "service" -> label.serviceName) + + def propsWithName = + for { + (meta, v) <- props if meta.seq > 0 + jsValue <- innerValToJsValue(v, meta.dataType) + } yield meta.name -> jsValue + def updateTgtVertex(id: InnerValLike) = { val newId = TargetVertexId(tgtVertex.id.colId, id) val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props) - Edge(srcVertex, newTgtVertex, labelWithDir, op, version, propsWithTs) + Edge(srcVertex, newTgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) } def rank(r: RankParam): Double = @@ -277,20 +299,15 @@ case class Edge(srcVertex: Vertex, var sum: Double = 0 for ((seq, w) <- r.keySeqAndWeights) { - seq match { - case LabelMeta.countSeq => sum += 1 - case _ => { - propsWithTs.get(seq) match { - case None => // do nothing - case Some(innerValWithTs) => { - val cost = try innerValWithTs.innerVal.toString.toDouble catch { - case e: Exception => - logger.error("toInnerval failed in rank", e) - 1.0 - } - sum += w * cost - } + propsWithTs.get(seq) match { + case None => // do nothing + case Some(innerValWithTs) => { + val cost = try innerValWithTs.innerVal.toString.toDouble catch { + case e: Exception => + logger.error("toInnerval failed in rank", e) + 1.0 } + sum += w * cost } } } @@ -298,35 +315,8 @@ case class Edge(srcVertex: Vertex, } def toLogString: String = { - val ret = - if (propsWithName.nonEmpty) - List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).map(_.toString) - else - List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label).map(_.toString) - - ret.mkString("\t") - } - - def selectValues(selectColumns: Seq[String], - useToString: Boolean = true, - score: Double = 0.0): Seq[Option[Any]] = { - //TODO: Option should be matched in JsonParser anyTo* - for { - selectColumn <- selectColumns - } yield { - val valueOpt = selectColumn match { - case LabelMeta.from.name | "from" => Option(srcId) - case LabelMeta.to.name | "to" => Option(tgtId) - case "label" => Option(labelName) - case "direction" => Option(direction) - case "score" => Option(score) - case LabelMeta.timestamp.name | "timestamp" => Option(ts) - case _ => - properties.get(selectColumn) - } - if (useToString) valueOpt.map(_.toString) - else valueOpt - } + val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) + List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, allPropsWithName).mkString("\t") } } @@ -373,17 +363,17 @@ object Edge { val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) - new Edge(srcVertex, tgtVertex, labelWithDir, op = op, version = ts, propsWithTs = propsWithTs) + new Edge(srcVertex, tgtVertex, label, dir, op = op, version = ts, propsWithTs = propsWithTs) } /** now version information is required also **/ - type State = Map[Byte, InnerValLikeWithTs] + type State = Map[LabelMeta, InnerValLikeWithTs] type PropsPairWithTs = (State, State, Long, String) type MergeState = PropsPairWithTs => (State, Boolean) type UpdateFunc = (Option[Edge], Edge, MergeState) - def allPropsDeleted(props: Map[Byte, InnerValLikeWithTs]): Boolean = - if (!props.containsKey(LabelMeta.lastDeletedAt)) false + def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean = + if (!props.contains(LabelMeta.lastDeletedAt)) false else { val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt @@ -398,14 +388,14 @@ object Edge { val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } val edgeInverted = Option(requestEdge.toSnapshotEdge) - (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, edgeInverted)) + (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, newSnapshotEdge = edgeInverted)) } def buildOperation(invertedEdge: Option[Edge], requestEdges: Seq[Edge]): (Edge, EdgeMutate) = { // logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}") // logger.debug(s"requestEdge: ${requestEdge.toStringRaw}") val oldPropsWithTs = - if (invertedEdge.isEmpty) Map.empty[Byte, InnerValLikeWithTs] else invertedEdge.get.propsWithTs + if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs] else invertedEdge.get.propsWithTs val funcs = requestEdges.map { edge => if (edge.op == GraphUtil.operations("insert")) { @@ -438,17 +428,18 @@ object Edge { for { (requestEdge, func) <- requestWithFuncs } { - val (_newPropsWithTs, _) = func((prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer)) + val (_newPropsWithTs, _) = func(prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer) prevPropsWithTs = _newPropsWithTs // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n") } val requestTs = requestEdge.ts - /* version should be monotoniously increasing so our RPC mutation should be applied safely */ + /** version should be monotoniously increasing so our RPC mutation should be applied safely */ val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs) val maxTs = prevPropsWithTs.map(_._2.ts).max val newTs = if (maxTs > requestTs) maxTs else requestTs val propsWithTs = prevPropsWithTs ++ - Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.label.schemaVersion), newTs)) + Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.label.schemaVersion), newTs)) + val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) // logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}") @@ -460,14 +451,14 @@ object Edge { def buildMutation(snapshotEdgeOpt: Option[Edge], requestEdge: Edge, newVersion: Long, - oldPropsWithTs: Map[Byte, InnerValLikeWithTs], - newPropsWithTs: Map[Byte, InnerValLikeWithTs]): EdgeMutate = { + oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs], + newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): EdgeMutate = { + if (oldPropsWithTs == newPropsWithTs) { // all requests should be dropped. so empty mutation. - // logger.error(s"Case 1") EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None) } else { - val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAt) + val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAtSeq) val newOp = snapshotEdgeOpt match { case None => requestEdge.op case Some(old) => @@ -479,26 +470,29 @@ object Edge { val newSnapshotEdgeOpt = Option(requestEdge.copy(op = newOp, propsWithTs = newPropsWithTs, version = newVersion).toSnapshotEdge) // delete request must always update snapshot. - if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.containsKey(LabelMeta.lastDeletedAt)) { + if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.contains(LabelMeta.lastDeletedAt)) { // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt. - // logger.error(s"Case 2") EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt) } else { - // logger.error(s"Case 3") val edgesToDelete = snapshotEdgeOpt match { case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") => - snapshotEdge.copy(op = GraphUtil.defaultOpByte). - relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } + snapshotEdge.copy(op = GraphUtil.defaultOpByte) + .relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } case _ => Nil } val edgesToInsert = if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil else - requestEdge.copy(version = newVersion, propsWithTs = newPropsWithTs, op = GraphUtil.defaultOpByte). - relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } - - EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt) + requestEdge.copy( + version = newVersion, + propsWithTs = newPropsWithTs, + op = GraphUtil.defaultOpByte + ).relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } + + EdgeMutate(edgesToDelete = edgesToDelete, + edgesToInsert = edgesToInsert, + newSnapshotEdge = newSnapshotEdgeOpt) } } } @@ -520,7 +514,7 @@ object Edge { case None => assert(oldValWithTs.ts >= lastDeletedAt) - if (oldValWithTs.ts >= requestTs || k < 0) Some(k -> oldValWithTs) + if (oldValWithTs.ts >= requestTs || k.seq < 0) Some(k -> oldValWithTs) else { shouldReplace = true None @@ -575,7 +569,7 @@ object Edge { val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { propsWithTs.get(k) match { case Some(newValWithTs) => - if (k == LabelMeta.timeStampSeq) { + if (k == LabelMeta.timestamp) { val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs else { shouldReplace = true @@ -625,7 +619,7 @@ object Edge { } } val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { - if (k == LabelMeta.timeStampSeq) { + if (k == LabelMeta.timestamp) { if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs) else { shouldReplace = true
