[S2GRAPH-17]: Remove unnecessary abstraction layer, Storage. 1. refactor all common codes from AsynchbaseStorage to Storage trait. 2. little bit of documentation. 3. restructuring package(move serializer/deserializer from storage/hbase into storage). 4. extract future cache from AsynchbaseStorage into utils.
JIRA: [S2GRAPH-17] https://issues.apache.org/jira/browse/S2GRAPH-17 Pull Request: Closes #34 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e207f676 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e207f676 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e207f676 Branch: refs/heads/master Commit: e207f676fe145690b027a8015fd25794ec47ff94 Parents: 36d5485 Author: DO YUNG YOON <[email protected]> Authored: Mon Feb 29 15:14:07 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Feb 29 15:14:07 2016 +0900 ---------------------------------------------------------------------- .gitignore | 2 +- CHANGES | 2 + README.md | 2 +- dev_support/README.md | 45 + dev_support/docker-compose.yml | 30 + dev_support/graph_mysql/Dockerfile | 5 + dev_support/graph_mysql/schema.sql | 218 ++++ docker-compose.yml | 30 - loader/build.sbt | 7 + .../main/scala/subscriber/GraphSubscriber.scala | 9 +- .../main/scala/subscriber/TransferToHFile.scala | 19 +- .../src/main/scala/subscriber/WalLogStat.scala | 94 ++ .../main/scala/subscriber/WalLogToHDFS.scala | 235 ++-- .../scala/subscriber/GraphSubscriberTest.scala | 8 +- .../scala/subscriber/TransferToHFileTest.scala | 169 +++ project/plugins.sbt | 5 +- s2core/build.sbt | 1 - s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar | Bin 1305124 -> 1302410 bytes s2core/migrate/mysql/schema.sql | 1 + s2core/src/main/resources/create_model.hql | 1 - s2core/src/main/resources/logback.xml | 13 +- s2core/src/main/resources/reference.conf | 16 +- .../scala/com/kakao/s2graph/core/Edge.scala | 1 + .../scala/com/kakao/s2graph/core/Graph.scala | 21 +- .../com/kakao/s2graph/core/Management.scala | 1 - .../com/kakao/s2graph/core/PostProcess.scala | 33 + .../com/kakao/s2graph/core/QueryParam.scala | 20 +- .../com/kakao/s2graph/core/QueryRequest.scala | 12 - .../com/kakao/s2graph/core/QueryResult.scala | 28 +- .../scala/com/kakao/s2graph/core/Vertex.scala | 1 + .../kakao/s2graph/core/mysqls/Experiment.scala | 2 + .../kakao/s2graph/core/rest/RequestParser.scala | 43 +- .../s2graph/core/storage/Deserializable.scala | 25 + .../core/storage/IndexEdgeDeserializable.scala | 128 ++ .../core/storage/IndexEdgeSerializable.scala | 58 + .../s2graph/core/storage/MutationBuilder.scala | 57 - .../s2graph/core/storage/QueryBuilder.scala | 104 -- .../kakao/s2graph/core/storage/SKeyValue.scala | 22 +- .../s2graph/core/storage/Serializable.scala | 10 + .../storage/SnapshotEdgeDeserializable.scala | 142 +++ .../core/storage/SnapshotEdgeSerializable.scala | 76 ++ .../kakao/s2graph/core/storage/Storage.scala | 1137 +++++++++++++++++- .../core/storage/StorageDeserializable.scala | 17 +- .../core/storage/VertexDeserializable.scala | 46 + .../core/storage/VertexSerializable.scala | 18 + .../hbase/AsynchbaseMutationBuilder.scala | 119 -- .../storage/hbase/AsynchbaseQueryBuilder.scala | 255 ---- .../core/storage/hbase/AsynchbaseStorage.scala | 961 ++++----------- .../core/storage/hbase/HDeserializable.scala | 25 - .../core/storage/hbase/HSerializable.scala | 10 - .../storage/hbase/IndexEdgeDeserializable.scala | 116 -- .../storage/hbase/IndexEdgeSerializable.scala | 45 - .../hbase/SnapshotEdgeDeserializable.scala | 140 --- .../hbase/SnapshotEdgeSerializable.scala | 91 -- .../storage/hbase/VertexDeserializable.scala | 47 - .../core/storage/hbase/VertexSerializable.scala | 19 - .../kakao/s2graph/core/utils/DeferCache.scala | 82 ++ .../kakao/s2graph/core/utils/Extentions.scala | 2 - .../kakao/s2graph/core/utils/FutureCache.scala | 82 ++ .../kakao/s2graph/core/Integrate/CrudTest.scala | 2 +- .../core/Integrate/IntegrateCommon.scala | 22 +- .../s2graph/core/Integrate/QueryTest.scala | 546 +++++++-- .../core/Integrate/StrongLabelDeleteTest.scala | 17 +- .../core/Integrate/WeakLabelDeleteTest.scala | 4 +- .../com/kakao/s2graph/core/ManagementTest.scala | 121 -- .../s2graph/core/parsers/WhereParserTest.scala | 54 +- .../hbase/AsynchbaseQueryBuilderTest.scala | 106 +- .../s2/counter/core/CounterFunctions.scala | 2 +- .../stream/ExactCounterStreamingSpec.scala | 196 +++ s2rest_netty/build.sbt | 7 + s2rest_netty/conf/logger.xml | 83 ++ s2rest_netty/conf/reference.conf | 131 ++ .../src/main/resources/application.conf | 0 s2rest_netty/src/main/resources/reference.conf | 131 ++ s2rest_netty/src/main/scala/Server.scala | 14 +- s2rest_play/app/Bootstrap.scala | 2 +- .../app/controllers/EdgeController.scala | 5 +- s2rest_play/build.sbt | 0 .../test/benchmark/JsonBenchmarkSpec.scala | 24 +- .../benchmark/OrderingUtilBenchmarkSpec.scala | 21 +- 80 files changed, 4039 insertions(+), 2357 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 76f2bb8..1e21302 100644 --- a/.gitignore +++ b/.gitignore @@ -94,7 +94,7 @@ bin/ /db .eclipse /lib/ -/logs/ +logs/ /modules /project/project /project/target http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 1512043..97434d6 100644 --- a/CHANGES +++ b/CHANGES @@ -63,6 +63,8 @@ Release 0.12.1 - unreleased S2GRAPH-5: Add Apache RAT to valid LICENSE errors. (Committed by DOYUNG YOON). + S2GRAPH-17: Remove unnecessary abstraction layer, Storage. (Committed by DOYUNG YOON). + SUB TASKS S2GRAPH-9: Provide rest server using netty. (Committed by daewon). http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 07a49f6..61430ee 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ S2Graph comes with a Vagrantfile that lets you spin up a virtual environment for test and development purposes. (On setting up S2Graph in your local environment directly, please refer to [Quick Start in Your Local Environment](https://steamshon.gitbooks.io/s2graph-book/content/getting_started.html).) -You will need [VirtualBox](https://www.virtualbox.org/wiki/Downloads) and [Vagrant](http://docs.ansible.com/ansible/intro_installation.html) installed on your system. +You will need [VirtualBox](https://www.virtualbox.org/wiki/Downloads) and [Vagrant](https://www.vagrantup.com/downloads.html) installed on your system. With everything ready, let's get started by running the following commands: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/dev_support/README.md ---------------------------------------------------------------------- diff --git a/dev_support/README.md b/dev_support/README.md new file mode 100644 index 0000000..556a7d9 --- /dev/null +++ b/dev_support/README.md @@ -0,0 +1,45 @@ +# Run S2Graph using Docker + +1. Build a docker image of the s2graph in the project's root directory + - `sbt "project s2rest_play" docker:publishLocal` +2. Run MySQL and HBase container first. + - change directory to dev-support. `cd dev-support` + - `docker-compose build` + - `docker-compose up -d graph_mysql` will run MySQL and HBase at same time. +3. Run graph container + - `docker-compose up -d` + +> S2Graph should be connected with MySQL at initial state. Therefore you have to run MySQL and HBase before running it. + +## For OS X + +In OS X, the docker container is running on VirtualBox. In order to connect with HBase in the docker container from your local machine. You have to register the IP of the docker-machine into the `/etc/hosts` file. + +Within the `docker-compose.yml` file, I had supposed the name of docker-machine as `default`. So, in the `/etc/hosts` file, register the docker-machine name as `default`. + +``` +ex) +192.168.99.100 default +``` + +# Run S2Graph on your local machine + +In order to develop and test S2Graph. You might be want to run S2Graph as `dev` mode on your local machine. In this case, the following commands are helpful. + +- Run only MySQL and HBase + +``` +# docker-compose up -d graph_mysql +``` + +- Run s2graph as 'dev' mode + +``` +# sbt "project s2rest_play" run -Dhost=default +``` + +- or run test cases + +``` +# sbt test -Dhost=default +``` http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/dev_support/docker-compose.yml ---------------------------------------------------------------------- diff --git a/dev_support/docker-compose.yml b/dev_support/docker-compose.yml new file mode 100644 index 0000000..9818251 --- /dev/null +++ b/dev_support/docker-compose.yml @@ -0,0 +1,30 @@ +graph: + image: s2rest_play:0.12.1-SNAPSHOT + container_name: graph + net: container:graph_hbase + links: + - graph_mysql + - graph_hbase + +graph_mysql: + build: graph_mysql + container_name: graph_mysql + environment: + MYSQL_ROOT_PASSWORD: graph + net: container:graph_hbase + +graph_hbase: + image: nerdammer/hbase:0.98.10.1 + container_name: graph_hbase + hostname: "${DOCKER_MACHINE_NAME}" + ports: + - "3306:3306" + - "2181:2181" + - "60010:60010" + - "60000:60000" + - "60020:60020" + - "60030:60030" + - "9000:9000" + expose: + - "3306" + - "9000" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/dev_support/graph_mysql/Dockerfile ---------------------------------------------------------------------- diff --git a/dev_support/graph_mysql/Dockerfile b/dev_support/graph_mysql/Dockerfile new file mode 100644 index 0000000..c6849f0 --- /dev/null +++ b/dev_support/graph_mysql/Dockerfile @@ -0,0 +1,5 @@ +FROM mysql + +MAINTAINER Jaesang Kim <[email protected]> + +ADD ./schema.sql /docker-entrypoint-initdb.d/ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/dev_support/graph_mysql/schema.sql ---------------------------------------------------------------------- diff --git a/dev_support/graph_mysql/schema.sql b/dev_support/graph_mysql/schema.sql new file mode 100644 index 0000000..2216896 --- /dev/null +++ b/dev_support/graph_mysql/schema.sql @@ -0,0 +1,218 @@ +CREATE DATABASE IF NOT EXISTS graph_dev; + +CREATE USER 'graph'@'%' IDENTIFIED BY 'graph'; + +GRANT ALL PRIVILEGES ON graph_dev.* TO 'graph'@'%' identified by 'graph'; + +flush privileges; + +use graph_dev; + + +SET FOREIGN_KEY_CHECKS = 0; + +-- ---------------------------- +-- Table structure for `services` +-- ---------------------------- +DROP TABLE IF EXISTS `services`; +CREATE TABLE `services` ( + `id` integer NOT NULL AUTO_INCREMENT, + `service_name` varchar(64) NOT NULL, + `access_token` varchar(64) NOT NULL, + `cluster` varchar(255) NOT NULL, + `hbase_table_name` varchar(255) NOT NULL, + `pre_split_size` integer NOT NULL default 0, + `hbase_table_ttl` integer, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_service_name` (`service_name`), + INDEX `idx_access_token` (`access_token`), + INDEX `idx_cluster` (cluster(75)) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + +-- ---------------------------- +-- Table structure for `services_columns` +-- ---------------------------- +DROP TABLE IF EXISTS `service_columns`; +CREATE TABLE `service_columns` ( + `id` integer NOT NULL AUTO_INCREMENT, + `service_id` integer NOT NULL, + `column_name` varchar(64) NOT NULL, + `column_type` varchar(8) NOT NULL, + `schema_version` varchar(8) NOT NULL default 'v2', + PRIMARY KEY (`id`), + UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +ALTER TABLE service_columns add FOREIGN KEY(service_id) REFERENCES services(id) ON DELETE CASCADE; + + +-- ---------------------------- +-- Table structure for `column_metas` +-- ---------------------------- +DROP TABLE IF EXISTS `column_metas`; +CREATE TABLE `column_metas` ( + `id` integer NOT NULL AUTO_INCREMENT, + `column_id` integer NOT NULL, + `name` varchar(64) NOT NULL, + `seq` tinyint NOT NULL, + `data_type` varchar(8) NOT NULL DEFAULT 'string', + PRIMARY KEY (`id`), + UNIQUE KEY `ux_column_id_name` (`column_id`, `name`), + INDEX `idx_column_id_seq` (`column_id`, `seq`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +ALTER TABLE column_metas ADD FOREIGN KEY(column_id) REFERENCES service_columns(id) ON DELETE CASCADE; + +-- ---------------------------- +-- Table structure for `labels` +-- ---------------------------- + +DROP TABLE IF EXISTS `labels`; +CREATE TABLE `labels` ( + `id` integer NOT NULL AUTO_INCREMENT, + `label` varchar(64) NOT NULL, + `src_service_id` integer NOT NULL, + `src_column_name` varchar(64) NOT NULL, + `src_column_type` varchar(8) NOT NULL, + `tgt_service_id` integer NOT NULL, + `tgt_column_name` varchar(64) NOT NULL, + `tgt_column_type` varchar(8) NOT NULL, + `is_directed` tinyint NOT NULL DEFAULT 1, + `service_name` varchar(64), + `service_id` integer NOT NULL, + `consistency_level` varchar(8) NOT NULL DEFAULT 'weak', + `hbase_table_name` varchar(255) NOT NULL DEFAULT 's2graph', + `hbase_table_ttl` integer, + `schema_version` varchar(8) NOT NULL default 'v2', + `is_async` tinyint(4) NOT NULL default '0', + `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4', + `options` text, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_label` (`label`), + INDEX `idx_src_column_name` (`src_column_name`), + INDEX `idx_tgt_column_name` (`tgt_column_name`), + INDEX `idx_src_service_id` (`src_service_id`), + INDEX `idx_tgt_service_id` (`tgt_service_id`), + INDEX `idx_service_name` (`service_name`), + INDEX `idx_service_id` (`service_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +ALTER TABLE labels add FOREIGN KEY(service_id) REFERENCES services(id); + + + +-- ---------------------------- +-- Table structure for `label_metas` +-- ---------------------------- +DROP TABLE IF EXISTS `label_metas`; +CREATE TABLE `label_metas` ( + `id` integer NOT NULL AUTO_INCREMENT, + `label_id` integer NOT NULL, + `name` varchar(64) NOT NULL, + `seq` tinyint NOT NULL, + `default_value` varchar(64) NOT NULL, + `data_type` varchar(8) NOT NULL DEFAULT 'long', + `used_in_index` tinyint NOT NULL DEFAULT 0, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_label_id_name` (`label_id`, `name`), + INDEX `idx_label_id_seq` (`label_id`, `seq`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +ALTER TABLE label_metas ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELETE CASCADE; + + +-- ---------------------------- +-- Table structure for `label_indices` +-- ---------------------------- +DROP TABLE IF EXISTS `label_indices`; +CREATE TABLE `label_indices` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `label_id` int(11) NOT NULL, + `name` varchar(64) NOT NULL DEFAULT '_PK', + `seq` tinyint(4) NOT NULL, + `meta_seqs` varchar(64) NOT NULL, + `formulars` varchar(255) DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`), + UNIQUE KEY `ux_label_id_name` (`label_id`,`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +ALTER TABLE label_indices ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON DELETE CASCADE; + + +-- ---------------------------- +-- Table structure for `experiments` +-- ---------------------------- +DROP TABLE IF EXISTS `experiments`; +CREATE TABLE `experiments` ( + `id` integer NOT NULL AUTO_INCREMENT, + `service_id` integer NOT NULL, + `service_name` varchar(128) NOT NULL, + `name` varchar(64) NOT NULL, + `description` varchar(255) NOT NULL, + `experiment_type` varchar(8) NOT NULL DEFAULT 'u', + `total_modular` int NOT NULL DEFAULT 100, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_service_id_name` (`service_id`, `name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- ALTER TABLE experiments ADD FOREIGN KEY(service_id) REFERENCES service(id) ON DELETE CASCADE; + + +-- ---------------------------- +-- Table structure for `buckets` +-- ---------------------------- +CREATE TABLE `buckets` ( + `id` integer NOT NULL AUTO_INCREMENT, + `experiment_id` integer NOT NULL, + `modular` varchar(64) NOT NULL, + `http_verb` varchar(8) NOT NULL, + `api_path` text NOT NULL, + `uuid_key` varchar(128), + `uuid_placeholder` varchar(64), + `request_body` text NOT NULL, + `timeout` int NOT NULL DEFAULT 1000, + `impression_id` varchar(64) NOT NULL, + `is_graph_query` tinyint NOT NULL DEFAULT 1, + `is_empty` tinyint NOT NULL DEFAULT 0, + PRIMARY KEY (`id`), + UNIQUE KEY `ux_impression_id` (`impression_id`), + INDEX `idx_experiment_id` (`experiment_id`), + INDEX `idx_impression_id` (`impression_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +SET FOREIGN_KEY_CHECKS = 1; + + +-- ---------------------------- +-- Table structure for `counter` +-- ---------------------------- +DROP TABLE IF EXISTS `counter`; +CREATE TABLE `counter` ( + `id` int(11) unsigned NOT NULL AUTO_INCREMENT, + `use_flag` tinyint(1) NOT NULL DEFAULT '0', + `version` smallint(1) NOT NULL DEFAULT '1', + `service` varchar(64) NOT NULL DEFAULT '', + `action` varchar(64) NOT NULL DEFAULT '', + `item_type` int(11) NOT NULL DEFAULT '0', + `auto_comb` tinyint(1) NOT NULL DEFAULT '1', + `dimension` varchar(1024) NOT NULL, + `use_profile` tinyint(1) NOT NULL DEFAULT '0', + `bucket_imp_id` varchar(64) DEFAULT NULL, + `use_exact` tinyint(1) NOT NULL DEFAULT '1', + `use_rank` tinyint(1) NOT NULL DEFAULT '1', + `ttl` int(11) NOT NULL DEFAULT '172800', + `daily_ttl` int(11) DEFAULT NULL, + `hbase_table` varchar(1024) DEFAULT NULL, + `interval_unit` varchar(1024) DEFAULT NULL, + `rate_action_id` int(11) unsigned DEFAULT NULL, + `rate_base_id` int(11) unsigned DEFAULT NULL, + `rate_threshold` int(11) DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `svc` (`service`,`action`), + KEY `rate_action_id` (`rate_action_id`), + KEY `rate_base_id` (`rate_base_id`), + CONSTRAINT `rate_action_id` FOREIGN KEY (`rate_action_id`) REFERENCES `counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION, + CONSTRAINT `rate_base_id` FOREIGN KEY (`rate_base_id`) REFERENCES `counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/docker-compose.yml ---------------------------------------------------------------------- diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 3149a89..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,30 +0,0 @@ -graph: - image: s2graph:0.12.0-SNAPSHOT - container_name: graph - net: container:graph_hbase - links: - - graph_mysql - - graph_hbase - -graph_mysql: - image: devsupport_graph_mysql - container_name: graph_mysql - environment: - MYSQL_ROOT_PASSWORD: graph - net: container:graph_hbase - -graph_hbase: - image: nerdammer/hbase:0.98.10.1 - container_name: graph_hbase - hostname: default - ports: - - "3306:3306" - - "2181:2181" - - "60010:60010" - - "60000:60000" - - "60020:60020" - - "60030:60030" - - "9000:9000" - expose: - - "3306:3306" - - "9000" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/build.sbt ---------------------------------------------------------------------- diff --git a/loader/build.sbt b/loader/build.sbt index b497a9c..43e1126 100644 --- a/loader/build.sbt +++ b/loader/build.sbt @@ -4,9 +4,14 @@ name := "s2loader" scalacOptions ++= Seq("-deprecation") +projectDependencies := Seq( + (projectID in "s2core").value exclude("org.mortbay.jetty", "*") exclude("javax.xml.stream", "*") exclude("javax.servlet", "*") +) + libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % Common.sparkVersion % "provided", "org.apache.spark" %% "spark-streaming" % Common.sparkVersion % "provided", + "org.apache.spark" %% "spark-hive" % Common.sparkVersion % "provided", "org.apache.spark" %% "spark-streaming-kafka" % Common.sparkVersion, "org.apache.httpcomponents" % "fluent-hc" % "4.2.5", "org.specs2" %% "specs2-core" % "2.4.11" % "test", @@ -30,3 +35,5 @@ excludedJars in assembly := { } test in assembly := {} + +parallelExecution in Test := false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/main/scala/subscriber/GraphSubscriber.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/GraphSubscriber.scala b/loader/src/main/scala/subscriber/GraphSubscriber.scala index 88de3d7..f4f7865 100644 --- a/loader/src/main/scala/subscriber/GraphSubscriber.scala +++ b/loader/src/main/scala/subscriber/GraphSubscriber.scala @@ -22,7 +22,7 @@ object GraphConfig { var kafkaBrokers = "" var cacheTTL = s"${60 * 60 * 1}" def apply(phase: String, dbUrl: Option[String], zkAddr: Option[String], kafkaBrokerList: Option[String]): Config = { - database = dbUrl.getOrElse("jdbc:mysql://localhost:3306/graph") + database = dbUrl.getOrElse("jdbc:mysql://localhost:3306/graph_dev") zkQuorum = zkAddr.getOrElse("localhost") // val newConf = new util.HashMap[String, Object]() @@ -50,6 +50,7 @@ object GraphSubscriberHelper extends WithKafka { private val maxTryNum = 10 var g: Graph = null + var management: Management = null val conns = new scala.collection.mutable.HashMap[String, Connection]() def toOption(s: String) = { @@ -63,7 +64,9 @@ object GraphSubscriberHelper extends WithKafka { config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), toOption(kafkaBrokerList)) if (g == null) { - g = new Graph(config)(ExecutionContext.Implicits.global) + val ec = ExecutionContext.Implicits.global + g = new Graph(config)(ec) + management = new Management(g)(ec) } } @@ -95,6 +98,8 @@ object GraphSubscriberHelper extends WithKafka { case Some(v) if v.isInstanceOf[Vertex] => statFunc("VertexParseOk", 1) v.asInstanceOf[Vertex] + case Some(x) => + throw new RuntimeException(s">>>>> GraphSubscriber.toGraphElements: parsing failed. ${x.serviceName}") case None => throw new RuntimeException(s"GraphSubscriber.toGraphElements: parsing failed. $msg") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/main/scala/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/TransferToHFile.scala b/loader/src/main/scala/subscriber/TransferToHFile.scala index aba9673..516bb39 100644 --- a/loader/src/main/scala/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/subscriber/TransferToHFile.scala @@ -2,8 +2,8 @@ package subscriber import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.Label -import com.kakao.s2graph.core.types.{LabelWithDirection, SourceVertexId} +import com.kakao.s2graph.core.mysqls.{LabelMeta, Label} +import com.kakao.s2graph.core.types.{InnerValLikeWithTs, LabelWithDirection, SourceVertexId} import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding @@ -55,10 +55,7 @@ object TransferToHFile extends SparkApp with JSONParser { direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection reverseDirection = if (direction == "out") "in" else "out" convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5)) - (vertexIdStr, vertexIdStrReversed) = direction match { - case "out" => (tokens(3), tokens(4)) - case _ => (tokens(4), tokens(3)) - } + (vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4)) degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction) degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, reverseDirection) extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil @@ -80,8 +77,12 @@ object TransferToHFile extends SparkApp with JSONParser { throw new RuntimeException(s"$vertexId can not be converted into innerval") } val vertex = Vertex(SourceVertexId(label.srcColumn.id.get, innerVal)) + + val ts = System.currentTimeMillis() + val propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) val labelWithDir = LabelWithDirection(label.id.get, dir) - val edge = Edge(vertex, vertex, labelWithDir) + val edge = Edge(vertex, vertex, labelWithDir, propsWithTs=propsWithTs) + edge.edgesWithIndex.flatMap { indexEdge => GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.map { kv => new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], Bytes.toBytes(degreeVal), kv.timestamp) @@ -129,14 +130,14 @@ object TransferToHFile extends SparkApp with JSONParser { val labelMapping = if (args.length >= 7) GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String] val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false val buildDegree = if (args.length >= 9) args(8).toBoolean else true - + val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4" val conf = sparkConf(s"$input: TransferToHFile") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryoserializer.buffer.mb", "24") val sc = new SparkContext(conf) - + Management.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm) /** set up hbase init */ val hbaseConf = HBaseConfiguration.create() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/main/scala/subscriber/WalLogStat.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/WalLogStat.scala b/loader/src/main/scala/subscriber/WalLogStat.scala new file mode 100644 index 0000000..f5db2c1 --- /dev/null +++ b/loader/src/main/scala/subscriber/WalLogStat.scala @@ -0,0 +1,94 @@ +package subscriber + +import java.text.SimpleDateFormat +import java.util.Date + +import com.kakao.s2graph.core.Graph +import kafka.producer.KeyedMessage +import kafka.serializer.StringDecoder +import org.apache.spark.streaming.Durations._ +import org.apache.spark.streaming.kafka.HasOffsetRanges +import s2.spark.{HashMapParam, SparkApp, WithKafka} + +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.language.postfixOps + +object WalLogStat extends SparkApp with WithKafka { + + override def run() = { + + validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", "dbUrl", "statTopic") + + val kafkaZkQuorum = args(0) + val brokerList = args(1) + val topics = args(2) + val intervalInSec = seconds(args(3).toLong) + val dbUrl = args(4) + val statTopic = args(5) + + + val conf = sparkConf(s"$topics: ${getClass.getSimpleName}") + val ssc = streamingContext(conf, intervalInSec) + val sc = ssc.sparkContext + + val groupId = topics.replaceAll(",", "_") + "_stat" + + val kafkaParams = Map( + "zookeeper.connect" -> kafkaZkQuorum, + "group.id" -> groupId, + "metadata.broker.list" -> brokerList, + "zookeeper.connection.timeout.ms" -> "10000", + "auto.offset.reset" -> "largest") + + val stream = getStreamHelper(kafkaParams).createStream[String, String, StringDecoder, StringDecoder](ssc, topics.split(",").toSet) + val statProducer = getProducer[String, String](brokerList) + + stream.foreachRDD { (rdd, time) => + + val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + + val ts = time.milliseconds + + val elements = rdd.mapPartitions { partition => + // set executor setting. + val phase = System.getProperty("phase") + GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) + partition.map { case (key, msg) => + Graph.toGraphElement(msg) match { + case Some(elem) => + val serviceName = elem.serviceName + msg.split("\t", 7) match { + case Array(_, operation, log_type, _, _, label, _*) => + Seq(serviceName, label, operation, log_type).mkString("\t") + case _ => + Seq("no_service_name", "no_label", "no_operation", "parsing_error").mkString("\t") + } + case None => + Seq("no_service_name", "no_label", "no_operation", "no_element_error").mkString("\t") + } + } + } + + val countByKey = elements.map(_ -> 1L).reduceByKey(_ + _).collect() + val totalCount = countByKey.map(_._2).sum + val keyedMessage = countByKey.map { case (key, value) => + new KeyedMessage[String, String](statTopic, s"$ts\t$key\t$value\t$totalCount") + } + + statProducer.send(keyedMessage: _*) + + elements.mapPartitionsWithIndex { (i, part) => + // commit offset range + val osr = offsets(i) + getStreamHelper(kafkaParams).commitConsumerOffset(osr) + Iterator.empty + }.foreach { + (_: Nothing) => () + } + + } + + ssc.start() + ssc.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/main/scala/subscriber/WalLogToHDFS.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/subscriber/WalLogToHDFS.scala index 2c37aef..44902c2 100644 --- a/loader/src/main/scala/subscriber/WalLogToHDFS.scala +++ b/loader/src/main/scala/subscriber/WalLogToHDFS.scala @@ -5,6 +5,7 @@ import java.util.Date import com.kakao.s2graph.core.Graph import kafka.serializer.StringDecoder +import org.apache.spark.sql.hive.HiveContext import org.apache.spark.streaming.Durations._ import org.apache.spark.streaming.kafka.HasOffsetRanges import s2.spark.{HashMapParam, SparkApp, WithKafka} @@ -12,103 +13,137 @@ import s2.spark.{HashMapParam, SparkApp, WithKafka} import scala.collection.mutable.{HashMap => MutableHashMap} import scala.language.postfixOps -//object WalLogToHDFS extends SparkApp with WithKafka { -// private def toOutputPath(ts: Long): String = { -// val dateId = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts)) -// s"date_id=$dateId/ts=$ts" -// } -// -// val usages = -// s""" -// |/** -// |this job consume edges/vertices from kafka topic then load them into s2graph. -// |params: -// | 1. kafkaZkQuorum: kafka zk address to consume events -// | 2. brokerList: kafka cluster`s broker list. -// | 3. topics: , delimited list of topics to consume -// | 4. intervalInSec: batch interval for this job. -// | 5. dbUrl: -// | 6. outputPath: -// |*/ -// """.stripMargin -// override def run() = { -// validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", "dbUrl", "outputPath") -//// if (args.length != 7) { -//// System.err.println(usages) -//// System.exit(1) -//// } -// val kafkaZkQuorum = args(0) -// val brokerList = args(1) -// val topics = args(2) -// val intervalInSec = seconds(args(3).toLong) -// val dbUrl = args(4) -// val outputPath = args(5) -// -// val conf = sparkConf(s"$topics: WalLogToHDFS") -// val ssc = streamingContext(conf, intervalInSec) -// val sc = ssc.sparkContext -// -// val groupId = topics.replaceAll(",", "_") + "_stream" -// val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed" -// -// val kafkaParams = Map( -// "zookeeper.connect" -> kafkaZkQuorum, -// "group.id" -> groupId, -// "metadata.broker.list" -> brokerList, -// "zookeeper.connection.timeout.ms" -> "10000", -// "auto.offset.reset" -> "largest") -// -// val stream = getStreamHelper(kafkaParams).createStream[String, String, StringDecoder, StringDecoder](ssc, topics.split(",").toSet) -// -// val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), "Throughput")(HashMapParam[String, Long](_ + _)) -// -// stream.foreachRDD { (rdd, time) => -// val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges -// -// val elements = rdd.mapPartitions { partition => -// // set executor setting. -// val phase = System.getProperty("phase") -// GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) -// -// partition.flatMap { case (key, msg) => -// val optMsg = Graph.toGraphElement(msg).map { element => -// val n = msg.split("\t", -1).length -// if(n == 6) { -// Seq(msg, "{}", element.serviceName).mkString("\t") -// } -// else if(n == 7) { -// Seq(msg, element.serviceName).mkString("\t") -// } -// else { -// null -// } -// } -// optMsg -// } -// } -// -// val ts = time.milliseconds -// val path = s"$outputPath/${toOutputPath(ts)}" -// -// /** make sure that `elements` are not running at the same time */ -// val elementsWritten = { -// elements.saveAsTextFile(path) -// elements -// } -// -// elementsWritten.mapPartitionsWithIndex { (i, part) => -// // commit offset range -// val osr = offsets(i) -// getStreamHelper(kafkaParams).commitConsumerOffset(osr) -// Iterator.empty -// }.foreach { -// (_: Nothing) => () -// } -// } -// -// logInfo(s"counter: ${mapAcc.value}") -// println(s"counter: ${mapAcc.value}") -// ssc.start() -// ssc.awaitTermination() -// } -//} +object WalLogToHDFS extends SparkApp with WithKafka { + + override def run() = { + + validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", "dbUrl", "outputPath", "hiveDatabase", "hiveTable", "splitListPath") + + val kafkaZkQuorum = args(0) + val brokerList = args(1) + val topics = args(2) + val intervalInSec = seconds(args(3).toLong) + val dbUrl = args(4) + val outputPath = args(5) + val hiveDatabase = args(6) + val hiveTable = args(7) + val splitListPath = args(8) + + val conf = sparkConf(s"$topics: WalLogToHDFS") + val ssc = streamingContext(conf, intervalInSec) + val sc = ssc.sparkContext + + val groupId = topics.replaceAll(",", "_") + "_stream" + val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed" + + val kafkaParams = Map( + "zookeeper.connect" -> kafkaZkQuorum, + "group.id" -> groupId, + "metadata.broker.list" -> brokerList, + "zookeeper.connection.timeout.ms" -> "10000", + "auto.offset.reset" -> "largest") + + val stream = getStreamHelper(kafkaParams).createStream[String, String, StringDecoder, StringDecoder](ssc, topics.split(",").toSet) + + val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), "Throughput")(HashMapParam[String, Long](_ + _)) + + val hdfsBlockSize = 134217728 // 128M + val hiveContext = new HiveContext(sc) + var splits = Array[String]() + var excludeLabels = Set[String]() + var excludeServices = Set[String]() + stream.foreachRDD { (rdd, time) => + try { + val read = sc.textFile(splitListPath).collect().map(_.split("=")).flatMap { + case Array(value) => Some(("split", value)) + case Array(key, value) => Some((key, value)) + case _ => None + } + splits = read.filter(_._1 == "split").map(_._2) + excludeLabels = read.filter(_._1 == "exclude_label").map(_._2).toSet + excludeServices = read.filter(_._1 == "exclude_service").map(_._2).toSet + } catch { + case _: Throwable => // use previous information + } + + val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + + val elements = rdd.mapPartitions { partition => + // set executor setting. + val phase = System.getProperty("phase") + GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) + + partition.flatMap { case (key, msg) => + val optMsg = Graph.toGraphElement(msg).flatMap { element => + val arr = msg.split("\t", 7) + val service = element.serviceName + val label = arr(5) + val n = arr.length + + if (excludeServices.contains(service) || excludeLabels.contains(label)) { + None + } else if(n == 6) { + Some(Seq(msg, "{}", service).mkString("\t")) + } + else if(n == 7) { + Some(Seq(msg, service).mkString("\t")) + } + else { + None + } + } + optMsg + } + } + + val ts = time.milliseconds + val dateId = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts)) + + /** make sure that `elements` are not running at the same time */ + val elementsWritten = { + elements.cache() + (Array("all") ++ splits).foreach { + case split if split == "all" => + val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts" + elements.saveAsTextFile(path) + case split => + val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts" + val strlen = split.length + val splitData = elements.filter(_.takeRight(strlen) == split).cache() + val totalSize = splitData + .mapPartitions { iterator => + val s = iterator.map(_.length.toLong).sum + Iterator.single(s) + } + .sum + .toLong + val numPartitions = math.max(1, (totalSize / hdfsBlockSize.toDouble).toInt) + splitData.coalesce(math.min(splitData.partitions.length, numPartitions)).saveAsTextFile(path) + splitData.unpersist() + } + elements.unpersist() + elements + } + + elementsWritten.mapPartitionsWithIndex { (i, part) => + // commit offset range + val osr = offsets(i) + getStreamHelper(kafkaParams).commitConsumerOffset(osr) + Iterator.empty + }.foreach { + (_: Nothing) => () + } + + (Array("all") ++ splits).foreach { split => + val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts" + hiveContext.sql(s"use $hiveDatabase") + hiveContext.sql(s"alter table $hiveTable add partition (split='$split', date_id='$dateId', ts='$ts') location '$path'") + } + } + + logInfo(s"counter: ${mapAcc.value}") + println(s"counter: ${mapAcc.value}") + ssc.start() + ssc.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/test/scala/subscriber/GraphSubscriberTest.scala ---------------------------------------------------------------------- diff --git a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala index a70c0f6..f927509 100644 --- a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala +++ b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala @@ -1,6 +1,6 @@ package subscriber -import com.kakao.s2graph.core.{Label, Service, Management} +import com.kakao.s2graph.core.Management import org.scalatest.{ FunSuite, Matchers } import play.api.libs.json.{JsBoolean, JsNumber} import s2.spark.WithKafka @@ -28,7 +28,7 @@ class GraphSubscriberTest extends FunSuite with Matchers with WithKafka { test("GraphSubscriberHelper.store") { // actually we need to delete labelToReplace first for each test. val labelMapping = Map(testLabelName -> labelToReplace) - Management.copyLabel(testLabelName, labelToReplace, Some(hTableName)) + GraphSubscriberHelper.management.copyLabel(testLabelName, labelToReplace, Some(hTableName)) // // val msgs = (for { @@ -39,7 +39,7 @@ class GraphSubscriberTest extends FunSuite with Matchers with WithKafka { // }).toSeq val msgs = testStrings - val stat = GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, labelMapping = labelMapping, autoCreateEdge = false)(None) - println(stat) +// val stat = GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, labelMapping = labelMapping, autoCreateEdge = false)(None) +// println(stat) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/loader/src/test/scala/subscriber/TransferToHFileTest.scala ---------------------------------------------------------------------- diff --git a/loader/src/test/scala/subscriber/TransferToHFileTest.scala b/loader/src/test/scala/subscriber/TransferToHFileTest.scala new file mode 100644 index 0000000..7b3f72f --- /dev/null +++ b/loader/src/test/scala/subscriber/TransferToHFileTest.scala @@ -0,0 +1,169 @@ +package subscriber + +import com.kakao.s2graph.core.Management +import com.kakao.s2graph.core.types.HBaseType +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest._ +import subscriber.TransferToHFile._ + +/** + * Created by Eric on 2015. 12. 2.. + */ +class TransferToHFileTest extends FlatSpec with BeforeAndAfterAll with Matchers { + + private val master = "local[2]" + private val appName = "example-spark" + + private var sc: SparkContext = _ + + val dataWithoutDir = + """ + |1447686000000 insertBulk e a b friends_rel {} + |1447686000000 insertBulk e a c friends_rel {} + |1447686000000 insertBulk e a d friends_rel {} + |1447686000000 insertBulk e b d friends_rel {} + |1447686000000 insertBulk e b e friends_rel {} + """.stripMargin.trim + + val dataWithDir = + """ + |1447686000000 insertBulk e a b friends_rel {} out + |1447686000000 insertBulk e b a friends_rel {} in + |1447686000000 insertBulk e a c friends_rel {} out + |1447686000000 insertBulk e c a friends_rel {} in + |1447686000000 insertBulk e a d friends_rel {} out + |1447686000000 insertBulk e d a friends_rel {} in + |1447686000000 insertBulk e b d friends_rel {} out + |1447686000000 insertBulk e d b friends_rel {} in + |1447686000000 insertBulk e b e friends_rel {} out + |1447686000000 insertBulk e e b friends_rel {} in + """.stripMargin.trim + + override def beforeAll(): Unit = { + println("### beforeAll") + + GraphSubscriberHelper.apply("dev", "none", "none", "none") + // 1. create service + if(Management.findService("loader-test").isEmpty) { + println(">>> create service...") + Management.createService("loader-test", "localhost", "loader-test-dev", 1, None, "gz") + } + + // 2. create label + if(Management.findLabel("friends_rel").isEmpty) { + println(">>> create label...") + Management.createLabel( + "friends_rel", + "loader-test", "user_id", "string", + "loader-test", "user_id", "string", + true, + "loader-test", + Seq(), + Seq(), + "weak", + None, None, + HBaseType.DEFAULT_VERSION, + false, + Management.defaultCompressionAlgorithm + ) + } + + // create spark context + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + + sc = new SparkContext(conf) + } + + override def afterAll(): Unit = { + println("### afterALL") + if (sc != null) { + sc.stop() + } + + Management.deleteLabel("friends_rel") + } + + "buildDegreePutRequest" should "transform degree to PutRequest" in { + val putReqs = buildDegreePutRequests("a", "friends_rel", "out", 3L) + putReqs.size should equal(1) + } + + "toKeyValues" should "transform edges to KeyValues on edge format data without direction" in { + val rdd = sc.parallelize(dataWithoutDir.split("\n")) + + val kvs = rdd.mapPartitions { iter => + GraphSubscriberHelper.apply("dev", "none", "none", "none") + TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false) + } + kvs.foreach(println) + // edges * 2 (snapshot edges + indexed edges) + kvs.count() should equal(10) + + + val kvsAutoCreated = rdd.mapPartitions { iter => + GraphSubscriberHelper.apply("dev", "none", "none", "none") + TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], true) + } + + // edges * 3 (snapshot edges + indexed edges + reverse edges) + kvsAutoCreated.count() should equal(15) + } + + "toKeyValues" should "transform edges to KeyValues on edge format data with direction" in { + val rdd = sc.parallelize(dataWithDir.split("\n")) + + val kvs = rdd.mapPartitions { iter => + GraphSubscriberHelper.apply("dev", "none", "none", "none") + TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false) + } + + // edges * 2 (snapshot edges + indexed edges) + kvs.count() should equal(20) + } + + "buildDegrees" should "build degrees on edge format data without direction" in { + val rdd = sc.parallelize(dataWithoutDir.split("\n")) + + // autoCreate = false + val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], false).reduceByKey { (agg, current) => + agg + current + }.collectAsMap() + degrees.size should equal(2) + + degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L) + degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L) + + + // autoCreate = true + val degreesAutoCreated = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], true).reduceByKey { (agg, current) => + agg + current + }.collectAsMap() + degreesAutoCreated.size should equal(6) + + degreesAutoCreated should contain(DegreeKey("a", "friends_rel", "out") -> 3L) + degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "out") -> 2L) + degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "in") -> 1L) + degreesAutoCreated should contain(DegreeKey("c", "friends_rel", "in") -> 1L) + degreesAutoCreated should contain(DegreeKey("d", "friends_rel", "in") -> 2L) + degreesAutoCreated should contain(DegreeKey("e", "friends_rel", "in") -> 1L) + } + + "buildDegrees" should "build degrees on edge format data with direction" in { + val rdd = sc.parallelize(dataWithDir.split("\n")) + + val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], false).reduceByKey { (agg, current) => + agg + current + }.collectAsMap() + + degrees.size should equal(6) + + degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L) + degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L) + degrees should contain(DegreeKey("b", "friends_rel", "in") -> 1L) + degrees should contain(DegreeKey("c", "friends_rel", "in") -> 1L) + degrees should contain(DegreeKey("d", "friends_rel", "in") -> 2L) + degrees should contain(DegreeKey("e", "friends_rel", "in") -> 1L) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/project/plugins.sbt ---------------------------------------------------------------------- diff --git a/project/plugins.sbt b/project/plugins.sbt index 4c90a1f..7f33c0e 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,5 @@ -// Use the Play sbt plugin for Play projects +// use the Play sbt plugin for Play projects + addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.3.10") // http://www.scalastyle.org/sbt.html @@ -8,3 +9,5 @@ addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.7.0") addSbtPlugin("io.spray" % "sbt-revolver" % "0.8.0") addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.3") + +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.0") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index 7a3c2d5..ca40ca7 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -30,4 +30,3 @@ libraryDependencies := { libraryDependencies.value } } - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar ---------------------------------------------------------------------- diff --git a/s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar b/s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar index 6dcc75d..801d87d 100644 Binary files a/s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar and b/s2core/lib/asynchbase-1.7.1-SNAPSHOT.jar differ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/migrate/mysql/schema.sql ---------------------------------------------------------------------- diff --git a/s2core/migrate/mysql/schema.sql b/s2core/migrate/mysql/schema.sql index 06c029b..4911348 100644 --- a/s2core/migrate/mysql/schema.sql +++ b/s2core/migrate/mysql/schema.sql @@ -87,6 +87,7 @@ CREATE TABLE `labels` ( `schema_version` varchar(8) NOT NULL default 'v2', `is_async` tinyint(4) NOT NULL default '0', `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4', + `options` text, PRIMARY KEY (`id`), UNIQUE KEY `ux_label` (`label`), INDEX `idx_src_column_name` (`src_column_name`), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/resources/create_model.hql ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/create_model.hql b/s2core/src/main/resources/create_model.hql index df8e4dc..18c870b 100644 --- a/s2core/src/main/resources/create_model.hql +++ b/s2core/src/main/resources/create_model.hql @@ -1,2 +1 @@ -create 'models-dev', {NAME => 'm'} create 's2graph-dev', {NAME => 'e'}, {NAME => 'v'} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/resources/logback.xml ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/logback.xml b/s2core/src/main/resources/logback.xml index 543365c..18aa2bd 100644 --- a/s2core/src/main/resources/logback.xml +++ b/s2core/src/main/resources/logback.xml @@ -9,12 +9,17 @@ </encoder> </appender> - <root level="ERROR"> + <!--<root level="INFO">--> + <!--<appender-ref ref="STDOUT"/>--> + <!--</root>--> + + <logger name="application" level="DEBUG"> <appender-ref ref="STDOUT"/> - </root> + </logger> - <logger name="application" level="DEBUG"></logger> + <logger name="error" level="DEBUG"> + <appender-ref ref="STDOUT"/> + </logger> - <logger name="error" level="DEBUG"></logger> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/resources/reference.conf ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/reference.conf b/s2core/src/main/resources/reference.conf index cef0284..c93d204 100644 --- a/s2core/src/main/resources/reference.conf +++ b/s2core/src/main/resources/reference.conf @@ -1,6 +1,7 @@ # APP PHASE -phase=dev -host=localhost +phase = dev + +host = localhost # Hbase hbase.table.compression.algorithm="gz" @@ -25,11 +26,12 @@ cache.ttl.seconds=60 cache.max.size=100000 # DB -s2graph.models.table.name="models-dev" -db.default.driver="com.mysql.jdbc.Driver" -db.default.url="jdbc:mysql://"${host}":3306/graph_dev" -db.default.user="graph" -db.default.password="graph" +s2graph.models.table.name = "models-dev" +db.default.driver = "com.mysql.jdbc.Driver" +db.default.url = "jdbc:mysql://"${host}":3306/graph_dev" +db.default.user = "graph" +db.default.password = "graph" + akka { loggers = ["akka.event.slf4j.Slf4jLogger"] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala index 10443c3..8e6ad7d 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala @@ -57,6 +57,7 @@ case class IndexEdge(srcVertex: Vertex, // assert(props.containsKey(LabelMeta.timeStampSeq)) val ts = props(LabelMeta.timeStampSeq).toString.toLong + val degreeEdge = props.contains(LabelMeta.degreeSeq) lazy val label = Label.findById(labelWithDir.labelId) val schemaVer = label.schemaVersion lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala index 76d3151..fdc8553 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala @@ -42,7 +42,8 @@ object Graph { "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000), "future.cache.max.size" -> java.lang.Integer.valueOf(100000), "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), - "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000) + "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), + "s2graph.storage.backend" -> "hbase" ) var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs) @@ -99,9 +100,6 @@ object Graph { case _ => innerVal.toString().toLong } } getOrElse(edge.ts) - // val innerVal = edge.propsWithTs(timeDecay.labelMetaSeq).innerVal - // - // edge.propsWithTs.get(timeDecay.labelMetaSeq).map(_.toString.toLong).getOrElse(edge.ts) } catch { case e: Exception => logger.error(s"processTimeDecay error. ${edge.toLogString}", e) @@ -332,19 +330,24 @@ object Graph { logger.error(s"toVertex: $e", e) throw e } get + + def initStorage(config: Config)(ec: ExecutionContext) = { + config.getString("s2graph.storage.backend") match { + case "hbase" => new AsynchbaseStorage(config)(ec) + case _ => throw new RuntimeException("not supported storage.") + } + } } -class Graph(_config: Config)(implicit ec: ExecutionContext) { +class Graph(_config: Config)(implicit val ec: ExecutionContext) { val config = _config.withFallback(Graph.DefaultConfig) - val cacheSize = config.getInt("cache.max.size") -// val cache = CacheBuilder.newBuilder().maximumSize(cacheSize).build[java.lang.Integer, Seq[QueryResult]]() - val vertexCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build[java.lang.Integer, Option[Vertex]]() Model.apply(config) Model.loadCache() // TODO: Make storage client by config param - val storage: Storage = new AsynchbaseStorage(config, vertexCache)(ec) + val storage = Graph.initStorage(config)(ec) + for { entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala index 32f7a5a..ccf9d1f 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala @@ -4,7 +4,6 @@ package com.kakao.s2graph.core import com.kakao.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException} import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop} import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.storage.Storage import com.kakao.s2graph.core.types.HBaseType._ import com.kakao.s2graph.core.types._ import play.api.libs.json.Reads._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala index 42b0146..f301d68 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala @@ -165,6 +165,37 @@ object PostProcess extends JSONParser { } } + private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): JsValue = { + def traverse(js: JsValue): JsValue = js match { + case JsNull => mapper(JsNull) + case JsUndefined() => mapper(JsUndefined("")) + case JsNumber(v) => mapper(js) + case JsString(v) => mapper(js) + case JsBoolean(v) => mapper(js) + case JsArray(elements) => JsArray(elements.map { t => traverse(mapper(t)) }) + case JsObject(values) => JsObject(values.map { case (k, v) => k -> traverse(mapper(v)) }) + } + + traverse(jsValue) + } + + /** test query with filterOut is not working since it can not diffrentate filterOut */ + private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): JsValue = { + val cursors = _cursors.flatten.iterator + + buildReplaceJson(jsonQuery) { + case js@JsObject(fields) => + val isStep = fields.find { case (k, _) => k == "label" } // find label group + if (isStep.isEmpty) js + else { + // TODO: Order not ensured + val withCursor = js.fieldSet | Set("cursor" -> JsString(cursors.next)) + JsObject(withCursor.toSeq) + } + case js => js + } + } + private def buildRawEdges(queryOption: QueryOption, queryRequestWithResultLs: Seq[QueryRequestWithResult], excludeIds: Map[Int, Boolean], @@ -267,6 +298,7 @@ object PostProcess extends JSONParser { Json.obj( "size" -> edges.size, "degrees" -> resultDegrees, +// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()), "results" -> edges ) } else { @@ -316,6 +348,7 @@ object PostProcess extends JSONParser { Json.obj( "size" -> groupedSortedJsons.size, "degrees" -> resultDegrees, +// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()), "results" -> groupedSortedJsons ) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala index a9a5112..184cd08 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala @@ -6,7 +6,7 @@ import com.kakao.s2graph.core.parsers.{Where, WhereParser} import com.kakao.s2graph.core.types._ import org.apache.hadoop.hbase.util.Bytes import org.hbase.async.ColumnRangeFilter -import play.api.libs.json.{JsNumber, JsValue, Json} +import play.api.libs.json.{JsNull, JsNumber, JsValue, Json} import scala.util.hashing.MurmurHash3 import scala.util.{Success, Try} @@ -32,6 +32,11 @@ object Query { } } +case class MultiQuery(queries: Seq[Query], + weights: Seq[Double], + queryOption: QueryOption, + jsonQuery: JsValue = JsNull) + case class QueryOption(removeCycle: Boolean = false, selectColumns: Seq[String] = Seq.empty, groupByColumns: Seq[String] = Seq.empty, @@ -45,14 +50,15 @@ case class QueryOption(removeCycle: Boolean = false, scoreThreshold: Double = Double.MinValue, returnDegree: Boolean = true) -case class MultiQuery(queries: Seq[Query], weights: Seq[Double], queryOption: QueryOption) case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], steps: IndexedSeq[Step] = Vector.empty[Step], - queryOption: QueryOption = QueryOption()) { + queryOption: QueryOption = QueryOption(), + jsonQuery: JsValue = JsNull) { val removeCycle = queryOption.removeCycle val selectColumns = queryOption.selectColumns +// val groupBy = queryOption.groupBy val groupByColumns = queryOption.groupByColumns val orderByColumns = queryOption.orderByColumns val filterOutQuery = queryOption.filterOutQuery @@ -477,13 +483,13 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System this } - def whereRawOpt(sqlOpt: Option[String]): QueryParam = { - this.whereRawOpt = sqlOpt + def shouldNormalize(shouldNormalize: Boolean): QueryParam = { + this.shouldNormalize = shouldNormalize this } - def shouldNormalize(shouldNormalize: Boolean): QueryParam = { - this.shouldNormalize = shouldNormalize + def whereRawOpt(sqlOpt: Option[String]): QueryParam = { + this.whereRawOpt = sqlOpt this } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/QueryRequest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryRequest.scala b/s2core/src/main/scala/com/kakao/s2graph/core/QueryRequest.scala deleted file mode 100644 index 413bfdb..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryRequest.scala +++ /dev/null @@ -1,12 +0,0 @@ -//package com.kakao.s2graph.core -// -//import scala.collection.Seq -// -//case class QueryRequest(query: Query, -// stepIdx: Int, -// vertex: Vertex, -// queryParam: QueryParam, -// prevStepScore: Double, -// tgtVertexOpt: Option[Vertex] = None, -// parentEdges: Seq[EdgeWithScore] = Nil, -// isInnerCall: Boolean = false) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala b/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala index 36c5a01..0ca92b5 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala @@ -7,18 +7,22 @@ import scala.collection.Seq object QueryResult { def fromVertices(query: Query): Seq[QueryRequestWithResult] = { - val queryParam = query.steps.head.queryParams.head - val label = queryParam.label - val currentTs = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timeStampSeq -> - InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)) - for { - vertex <- query.vertices - } yield { - val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs) - val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore) - QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam), - QueryResult(edgeWithScoreLs = Seq(edgeWithScore))) + if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) { + Seq.empty + } else { + val queryParam = query.steps.head.queryParams.head + val label = queryParam.label + val currentTs = System.currentTimeMillis() + val propsWithTs = Map(LabelMeta.timeStampSeq -> + InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)) + for { + vertex <- query.vertices + } yield { + val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs) + val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore) + QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam), + QueryResult(edgeWithScoreLs = Seq(edgeWithScore))) + } } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala index 9cc0607..c2b86c2 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala @@ -1,5 +1,6 @@ package com.kakao.s2graph.core + import com.kakao.s2graph.core.mysqls._ //import com.kakao.s2graph.core.models._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala index 88c85b7..46b92ab 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala @@ -1,6 +1,7 @@ package com.kakao.s2graph.core.mysqls import com.kakao.s2graph.core.GraphUtil +import com.kakao.s2graph.core.utils.logger import scalikejdbc._ import scala.util.Random @@ -50,6 +51,7 @@ case class Experiment(id: Option[Int], totalModular: Int) { def buckets = Bucket.finds(id.get) + def rangeBuckets = for { bucket <- buckets range <- bucket.rangeOpt http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala index 2449082..22fbd8b 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala @@ -2,7 +2,7 @@ package com.kakao.s2graph.core.rest import java.util.concurrent.{Callable, TimeUnit} -import com.google.common.cache.CacheBuilder +import com.google.common.cache.{CacheLoader, CacheBuilder} import com.kakao.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException} import com.kakao.s2graph.core._ import com.kakao.s2graph.core.mysqls._ @@ -15,11 +15,9 @@ import scala.util.{Failure, Success, Try} object TemplateHelper { val findVar = """\"?\$\{(.*?)\}\"?""".r val num = """(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r - val hour = 60 * 60 * 1000L val day = hour * 24L val week = day * 7L - def calculate(now: Long, n: Int, unit: String): Long = { val duration = unit match { case "hour" | "HOUR" => n * hour @@ -59,6 +57,7 @@ class RequestParser(config: Config) extends JSONParser { val hardLimit = 100000 val defaultLimit = 100 + val maxLimit = Int.MaxValue - 1 val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout") val DefaultMaxAttempt = config.getInt("hbase.client.retries.number") val DefaultCluster = config.getString("hbase.zookeeper.quorum") @@ -71,7 +70,6 @@ class RequestParser(config: Config) extends JSONParser { .initialCapacity(1000) .build[String, Try[Where]] - private def extractScoring(labelId: Int, value: JsValue) = { val ret = for { js <- parse[Option[JsObject]](value, "scoring") @@ -149,7 +147,6 @@ class RequestParser(config: Config) extends JSONParser { ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike]) } - def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = { whereClauseOpt match { case None => Success(WhereParser.success) @@ -186,7 +183,8 @@ class RequestParser(config: Config) extends JSONParser { toQuery(queryJson, isEdgeQuery) } val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0)) - MultiQuery(queries = queries, weights = weights, queryOption = toQueryOption(jsValue)) + MultiQuery(queries = queries, weights = weights, + queryOption = toQueryOption(jsValue), jsonQuery = jsValue) } def toQueryOption(jsValue: JsValue): QueryOption = { @@ -313,7 +311,7 @@ class RequestParser(config: Config) extends JSONParser { } - val ret = Query(vertices, querySteps, queryOption) + val ret = Query(vertices, querySteps, queryOption, jsValue) // logger.debug(ret.toString) ret } catch { @@ -335,7 +333,7 @@ class RequestParser(config: Config) extends JSONParser { val limit = { parse[Option[Int]](labelGroup, "limit") match { case None => defaultLimit - case Some(l) if l < 0 => Int.MaxValue + case Some(l) if l < 0 => maxLimit case Some(l) if l >= 0 => val default = hardLimit Math.min(l, default) @@ -381,7 +379,6 @@ class RequestParser(config: Config) extends JSONParser { val scorePropagateOp = (labelGroup \ "scorePropagateOp").asOpt[String].getOrElse("multiply") val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1) val shouldNormalize = (labelGroup \ "normalize").asOpt[Boolean].getOrElse(false) - // FIXME: Order of command matter QueryParam(labelWithDir) .sample(sample) @@ -432,31 +429,39 @@ class RequestParser(config: Config) extends JSONParser { def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], List[JsValue]) = { val jsValues = toJsValues(jsValue) - val edges = jsValues.map(toEdge(_, operation)) + val edges = jsValues.flatMap(toEdge(_, operation)) (edges, jsValues) } def toEdges(jsValue: JsValue, operation: String): List[Edge] = { - toJsValues(jsValue).map(toEdge(_, operation)) + toJsValues(jsValue).flatMap { edgeJson => + toEdge(edgeJson, operation) + } } - def toEdge(jsValue: JsValue, operation: String) = { - val srcId = parse[JsValue](jsValue, "from") match { - case s: JsString => s.as[String] - case o@_ => s"${o}" - } - val tgtId = parse[JsValue](jsValue, "to") match { + private def toEdge(jsValue: JsValue, operation: String): List[Edge] = { + + def parseId(js: JsValue) = js match { case s: JsString => s.as[String] case o@_ => s"${o}" } + val srcId = (jsValue \ "from").asOpt[JsValue].toList.map(parseId(_)) + val tgtId = (jsValue \ "to").asOpt[JsValue].toList.map(parseId(_)) + val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ srcId + val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ tgtId + val label = parse[String](jsValue, "label") val timestamp = parse[Long](jsValue, "timestamp") val direction = parse[Option[String]](jsValue, "direction").getOrElse("") val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}") - Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString) - + for { + srcId <- srcIds + tgtId <- tgtIds + } yield { + Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString) + } } def toVertices(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None) = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala new file mode 100644 index 0000000..cff968f --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala @@ -0,0 +1,25 @@ +package com.kakao.s2graph.core.storage + +import com.kakao.s2graph.core.storage.{SKeyValue, StorageDeserializable} +import com.kakao.s2graph.core.types.{LabelWithDirection, SourceVertexId, VertexId} +import org.apache.hadoop.hbase.util.Bytes + + +trait Deserializable[E] extends StorageDeserializable[E] { + import StorageDeserializable._ + + type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int) + + /** version 1 and version 2 share same code for parsing row key part */ + def parseRow(kv: SKeyValue, version: String): RowKeyRaw = { + var pos = 0 + val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) + pos += srcIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + + val rowLen = srcIdLen + 4 + 1 + (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala new file mode 100644 index 0000000..2190222 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala @@ -0,0 +1,128 @@ +package com.kakao.s2graph.core.storage + +import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.mysqls.LabelMeta +import com.kakao.s2graph.core.storage.{CanSKeyValue, StorageDeserializable, SKeyValue} +import com.kakao.s2graph.core.types._ +import org.apache.hadoop.hbase.util.Bytes +import StorageDeserializable._ + +class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { + + import StorageDeserializable._ + + type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) + type ValueRaw = (Array[(Byte, InnerValLike)], Int) + + private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { +// val degree = Bytes.toLong(kv.value) + val degree = bytesToLongFunc(kv.value, 0) + val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) + val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) + (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) + } + + private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { + var qualifierLen = 0 + var pos = 0 + val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { + val (props, endAt) = bytesToProps(kv.qualifier, pos, version) + pos = endAt + qualifierLen += endAt + val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) + } + qualifierLen += tgtVertexIdLen + (props, endAt, tgtVertexId, tgtVertexIdLen) + } + val (op, opLen) = + if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) + else (kv.qualifier(qualifierLen), 1) + + qualifierLen += opLen + + (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) + } + + private def parseValue(kv: SKeyValue, version: String): ValueRaw = { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + (props, endAt) + } + + private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { + (Array.empty[(Byte, InnerValLike)], 0) + } + + + + /** version 1 and version 2 is same logic */ + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[IndexEdge] = None): IndexEdge = { + fromKeyValuesInnerOld(queryParam, _kvs, version, cacheElementOpt) + } + + def fromKeyValuesInnerOld[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[IndexEdge] = None): IndexEdge = { + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) + }.getOrElse(parseRow(kv, version)) + + val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = + if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) + else parseQualifier(kv, version) + + val (props, _) = if (op == GraphUtil.operations("incrementCount")) { +// val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) + (dummyProps, 8) + } else if (kv.qualifier.isEmpty) { + parseDegreeValue(kv, version) + } else { + parseValue(kv, version) + } + + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + + // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size) + + val idxProps = for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } yield { + if (k == LabelMeta.degreeSeq) k -> v + else seq -> v + } + + val idxPropsMap = idxProps.toMap + val tgtVertexId = if (tgtVertexIdInQualifier) { + idxPropsMap.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) + } + } else tgtVertexIdRaw + + val _mergedProps = (idxProps ++ props).toMap + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) + + // logger.error(s"$mergedProps") + // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong + + val ts = kv.timestamp + IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) + + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala new file mode 100644 index 0000000..56f70b9 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala @@ -0,0 +1,58 @@ +package com.kakao.s2graph.core.storage + +import com.kakao.s2graph.core.mysqls.LabelMeta +import com.kakao.s2graph.core.storage.{StorageSerializable, SKeyValue} +import com.kakao.s2graph.core.types.{HBaseType, VertexId} +import com.kakao.s2graph.core.utils.logger +import com.kakao.s2graph.core.{GraphUtil, IndexEdge} +import org.apache.hadoop.hbase.util.Bytes + +case class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { + + import StorageSerializable._ + + val label = indexEdge.label + val table = label.hbaseTableName.getBytes() + val cf = Serializable.edgeCf + + val idxPropsMap = indexEdge.orders.toMap + val idxPropsBytes = propsToBytes(indexEdge.orders) + + /** version 1 and version 2 share same code for serialize row key part */ + override def toKeyValues: Seq[SKeyValue] = { + toKeyValuesInner + } + def toKeyValuesInner: Seq[SKeyValue] = { + val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes + val labelWithDirBytes = indexEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) + + val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") + val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes + val qualifier = + if (indexEdge.degreeEdge) Array.empty[Byte] + else { + if (indexEdge.op == GraphUtil.operations("incrementCount")) { + Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) + } else { + idxPropsMap.get(LabelMeta.toSeq) match { + case None => Bytes.add(idxPropsBytes, tgtIdBytes) + case Some(vId) => idxPropsBytes + } + } + } + + + val value = + if (indexEdge.degreeEdge) + Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + else if (indexEdge.op == GraphUtil.operations("incrementCount")) + Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + else propsToKeyValues(indexEdge.metas.toSeq) + + val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) + + Seq(kv) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/MutationBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/MutationBuilder.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/MutationBuilder.scala deleted file mode 100644 index c5d4882..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/MutationBuilder.scala +++ /dev/null @@ -1,57 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core._ - -import scala.collection.Seq -import scala.concurrent.ExecutionContext - - -abstract class MutationBuilder[T](storage: Storage)(implicit ex: ExecutionContext) { - /** operation that needs to be supported by backend persistent storage system */ - def put(kvs: Seq[SKeyValue]): Seq[T] - - def increment(kvs: Seq[SKeyValue]): Seq[T] - - def delete(kvs: Seq[SKeyValue]): Seq[T] - - - /** build mutation for backend persistent storage system */ - - /** EdgeMutate */ - def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[T] - - def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[T] - - def increments(edgeMutate: EdgeMutate): Seq[T] - - /** IndexEdge */ - def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[T] - - def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[T] - - def buildDeletesAsync(indexedEdge: IndexEdge): Seq[T] - - def buildPutsAsync(indexedEdge: IndexEdge): Seq[T] - - /** SnapshotEdge */ - def buildPutAsync(snapshotEdge: SnapshotEdge): Seq[T] - - def buildDeleteAsync(snapshotEdge: SnapshotEdge): Seq[T] - - /** Vertex */ - def buildPutsAsync(vertex: Vertex): Seq[T] - - def buildDeleteAsync(vertex: Vertex): Seq[T] - - def buildDeleteBelongsToId(vertex: Vertex): Seq[T] - - def buildVertexPutsAsync(edge: Edge): Seq[T] - - def buildPutsAll(vertex: Vertex): Seq[T] = { - vertex.op match { - case d: Byte if d == GraphUtil.operations("delete") => buildDeleteAsync(vertex) - case _ => buildPutsAsync(vertex) - } - } - -}
