[PIO-49] Add support for Elasticsearch 5 * Add support for Elasticsearch 5 over REST API * Refactor storage implementations to submodules * Build storage implementation as separate assemblies
Closes #352 Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/d78b3cbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/d78b3cbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/d78b3cbe Branch: refs/heads/develop Commit: d78b3cbe912cf57e2f0278e407a0d6432bd12849 Parents: 8fd59fd Author: Shinsuke Sugaya <[email protected]> Authored: Tue Mar 7 20:33:30 2017 -0800 Committer: Donald Szeto <[email protected]> Committed: Tue Mar 7 20:33:30 2017 -0800 ---------------------------------------------------------------------- bin/compute-classpath.sh | 2 +- build.sbt | 28 +- conf/pio-env.sh.template | 10 +- core/build.sbt | 1 - data/build.sbt | 14 - .../storage/elasticsearch/ESAccessKeys.scala | 119 ------ .../data/storage/elasticsearch/ESApps.scala | 130 ------ .../data/storage/elasticsearch/ESChannels.scala | 117 ------ .../elasticsearch/ESEngineInstances.scala | 158 ------- .../elasticsearch/ESEvaluationInstances.scala | 136 ------ .../storage/elasticsearch/ESSequences.scala | 64 --- .../data/storage/elasticsearch/ESUtils.scala | 48 --- .../storage/elasticsearch/StorageClient.scala | 50 --- .../data/storage/elasticsearch/package.scala | 25 -- .../data/storage/hbase/HBEventsUtil.scala | 415 ------------------- .../data/storage/hbase/HBLEvents.scala | 195 --------- .../data/storage/hbase/HBPEvents.scala | 131 ------ .../data/storage/hbase/PIOHBaseUtil.scala | 32 -- .../data/storage/hbase/StorageClient.scala | 86 ---- .../data/storage/hbase/package.scala | 25 -- .../data/storage/hbase/upgrade/HB_0_8_0.scala | 193 --------- .../data/storage/hbase/upgrade/Upgrade.scala | 75 ---- .../storage/hbase/upgrade/Upgrade_0_8_3.scala | 224 ---------- .../data/storage/hdfs/HDFSModels.scala | 63 --- .../data/storage/hdfs/StorageClient.scala | 36 -- .../data/storage/hdfs/package.scala | 25 -- .../data/storage/jdbc/JDBCAccessKeys.scala | 87 ---- .../data/storage/jdbc/JDBCApps.scala | 89 ---- .../data/storage/jdbc/JDBCChannels.scala | 69 --- .../data/storage/jdbc/JDBCEngineInstances.scala | 197 --------- .../storage/jdbc/JDBCEvaluationInstances.scala | 165 -------- .../data/storage/jdbc/JDBCLEvents.scala | 244 ----------- .../data/storage/jdbc/JDBCModels.scala | 55 --- .../data/storage/jdbc/JDBCPEvents.scala | 188 --------- .../data/storage/jdbc/JDBCUtils.scala | 106 ----- .../data/storage/jdbc/StorageClient.scala | 53 --- .../data/storage/jdbc/package.scala | 26 -- .../data/storage/localfs/LocalFSModels.scala | 62 --- .../data/storage/localfs/StorageClient.scala | 46 -- .../data/storage/localfs/package.scala | 25 -- .../predictionio/data/view/PBatchView.scala | 212 ---------- make-distribution.sh | 6 +- storage/elasticsearch/.gitignore | 1 + storage/elasticsearch/build.sbt | 57 +++ .../storage/elasticsearch/ESAccessKeys.scala | 178 ++++++++ .../data/storage/elasticsearch/ESApps.scala | 194 +++++++++ .../data/storage/elasticsearch/ESChannels.scala | 165 ++++++++ .../elasticsearch/ESEngineInstances.scala | 248 +++++++++++ .../elasticsearch/ESEvaluationInstances.scala | 194 +++++++++ .../storage/elasticsearch/ESEventsUtil.scala | 123 ++++++ .../data/storage/elasticsearch/ESLEvents.scala | 291 +++++++++++++ .../data/storage/elasticsearch/ESPEvents.scala | 144 +++++++ .../storage/elasticsearch/ESSequences.scala | 79 ++++ .../data/storage/elasticsearch/ESUtils.scala | 184 ++++++++ .../storage/elasticsearch/StorageClient.scala | 44 ++ .../data/storage/elasticsearch/package.scala | 25 ++ .../src/test/resources/application.conf | 28 ++ storage/elasticsearch1/.gitignore | 1 + storage/elasticsearch1/build.sbt | 47 +++ .../storage/elasticsearch/ESAccessKeys.scala | 119 ++++++ .../data/storage/elasticsearch/ESApps.scala | 130 ++++++ .../data/storage/elasticsearch/ESChannels.scala | 117 ++++++ .../elasticsearch/ESEngineInstances.scala | 158 +++++++ .../elasticsearch/ESEvaluationInstances.scala | 136 ++++++ .../storage/elasticsearch/ESSequences.scala | 64 +++ .../data/storage/elasticsearch/ESUtils.scala | 48 +++ .../storage/elasticsearch/StorageClient.scala | 50 +++ .../data/storage/elasticsearch/package.scala | 25 ++ .../src/test/resources/application.conf | 28 ++ storage/hbase/.gitignore | 1 + storage/hbase/build.sbt | 56 +++ .../data/storage/hbase/HBEventsUtil.scala | 415 +++++++++++++++++++ .../data/storage/hbase/HBLEvents.scala | 195 +++++++++ .../data/storage/hbase/HBPEvents.scala | 131 ++++++ .../data/storage/hbase/PIOHBaseUtil.scala | 32 ++ .../data/storage/hbase/StorageClient.scala | 86 ++++ .../data/storage/hbase/package.scala | 25 ++ .../data/storage/hbase/upgrade/HB_0_8_0.scala | 193 +++++++++ .../data/storage/hbase/upgrade/Upgrade.scala | 75 ++++ .../storage/hbase/upgrade/Upgrade_0_8_3.scala | 224 ++++++++++ .../predictionio/data/view/PBatchView.scala | 212 ++++++++++ .../hbase/src/test/resources/application.conf | 28 ++ storage/hdfs/.gitignore | 1 + storage/hdfs/build.sbt | 44 ++ .../data/storage/hdfs/HDFSModels.scala | 63 +++ .../data/storage/hdfs/StorageClient.scala | 36 ++ .../data/storage/hdfs/package.scala | 25 ++ .../hdfs/src/test/resources/application.conf | 28 ++ storage/jdbc/.gitignore | 1 + storage/jdbc/build.sbt | 47 +++ .../data/storage/jdbc/JDBCAccessKeys.scala | 87 ++++ .../data/storage/jdbc/JDBCApps.scala | 89 ++++ .../data/storage/jdbc/JDBCChannels.scala | 69 +++ .../data/storage/jdbc/JDBCEngineInstances.scala | 197 +++++++++ .../storage/jdbc/JDBCEvaluationInstances.scala | 165 ++++++++ .../data/storage/jdbc/JDBCLEvents.scala | 244 +++++++++++ .../data/storage/jdbc/JDBCModels.scala | 55 +++ .../data/storage/jdbc/JDBCPEvents.scala | 188 +++++++++ .../data/storage/jdbc/JDBCUtils.scala | 106 +++++ .../data/storage/jdbc/StorageClient.scala | 53 +++ .../data/storage/jdbc/package.scala | 26 ++ .../jdbc/src/test/resources/application.conf | 28 ++ storage/localfs/.gitignore | 1 + storage/localfs/build.sbt | 44 ++ .../data/storage/localfs/LocalFSModels.scala | 62 +++ .../data/storage/localfs/StorageClient.scala | 46 ++ .../data/storage/localfs/package.scala | 25 ++ .../localfs/src/test/resources/application.conf | 28 ++ tests/Dockerfile | 10 +- tests/build-docker.sh | 8 +- tests/docker-compose.yml | 2 +- tests/docker-files/env-conf/pio-env.sh | 3 +- tests/pio_tests/scenarios/eventserver_test.py | 5 +- tests/run_docker.sh | 4 +- .../org/apache/predictionio/tools/Common.scala | 7 + .../org/apache/predictionio/tools/Runner.scala | 3 +- .../predictionio/tools/commands/Engine.scala | 4 +- 117 files changed, 6382 insertions(+), 4005 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/bin/compute-classpath.sh ---------------------------------------------------------------------- diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 3bf6814..69cbb25 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -27,7 +27,7 @@ FWDIR="$(cd `dirname $0`/..; pwd)" # Build up classpath CLASSPATH="${FWDIR}/conf" -CLASSPATH="$CLASSPATH:${FWDIR}/plugins/*" +CLASSPATH="$CLASSPATH:${FWDIR}/plugins/*:${FWDIR}/lib/spark/*" ASSEMBLY_DIR="${FWDIR}/assembly" http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index eeb3724..98444b9 100644 --- a/build.sbt +++ b/build.sbt @@ -34,11 +34,9 @@ fork in (ThisBuild, run) := true javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.7", "-target", "1.7", "-Xlint:deprecation", "-Xlint:unchecked") -elasticsearchVersion in ThisBuild := "1.4.4" - json4sVersion in ThisBuild := "3.2.10" -sparkVersion in ThisBuild := "1.4.0" +sparkVersion in ThisBuild := "1.6.3" val pioBuildInfoSettings = buildInfoSettings ++ Seq( sourceGenerators in Compile <+= buildInfo, @@ -65,6 +63,30 @@ val data = (project in file("data")). settings(commonSettings: _*). settings(genjavadocSettings: _*) +val dataElasticsearch1 = (project in file("storage/elasticsearch1")). + settings(commonSettings: _*). + settings(genjavadocSettings: _*) + +val dataElasticsearch = (project in file("storage/elasticsearch")). + settings(commonSettings: _*). + settings(genjavadocSettings: _*) + +val dataHbase = (project in file("storage/hbase")). + settings(commonSettings: _*). + settings(genjavadocSettings: _*) + +val dataHdfs = (project in file("storage/hdfs")). + settings(commonSettings: _*). + settings(genjavadocSettings: _*) + +val dataJdbc = (project in file("storage/jdbc")). + settings(commonSettings: _*). + settings(genjavadocSettings: _*) + +val dataLocalfs = (project in file("storage/localfs")). + settings(commonSettings: _*). + settings(genjavadocSettings: _*) + val core = (project in file("core")). dependsOn(data). settings(commonSettings: _*). http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/conf/pio-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template index a06cd8e..0d76102 100644 --- a/conf/pio-env.sh.template +++ b/conf/pio-env.sh.template @@ -24,7 +24,7 @@ # you need to change these to fit your site. # SPARK_HOME: Apache Spark is a hard dependency and must be configured. -SPARK_HOME=$PIO_HOME/vendors/spark-1.5.1-bin-hadoop2.6 +SPARK_HOME=$PIO_HOME/vendors/spark-1.6.3-bin-hadoop2.6 POSTGRES_JDBC_DRIVER=$PIO_HOME/lib/postgresql-9.4-1204.jdbc41.jar MYSQL_JDBC_DRIVER=$PIO_HOME/lib/mysql-connector-java-5.1.37.jar @@ -85,10 +85,16 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio # Elasticsearch Example # PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch +# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost +# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200 +# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http +# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.2.1 +# Elasticsearch 1.x Example +# PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch # PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name> # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost # PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300 -# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-1.4.4 +# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-1.7.6 # Local File System Example # PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/core/build.sbt ---------------------------------------------------------------------- diff --git a/core/build.sbt b/core/build.sbt index 637d4ea..bfb8bf3 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -32,7 +32,6 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.clapper" %% "grizzled-slf4j" % "1.0.2", - "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value, "org.json4s" %% "json4s-native" % json4sVersion.value, "org.json4s" %% "json4s-ext" % json4sVersion.value, "org.scalaj" %% "scalaj-http" % "1.1.6", http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/build.sbt ---------------------------------------------------------------------- diff --git a/data/build.sbt b/data/build.sbt index 4526c39..f5e95b5 100644 --- a/data/build.sbt +++ b/data/build.sbt @@ -26,29 +26,15 @@ libraryDependencies ++= Seq( "mysql" % "mysql-connector-java" % "5.1.37" % "optional", "org.apache.hadoop" % "hadoop-common" % "2.6.2" exclude("javax.servlet", "servlet-api"), - "org.apache.hbase" % "hbase-common" % "0.98.5-hadoop2", - "org.apache.hbase" % "hbase-client" % "0.98.5-hadoop2" - exclude("org.apache.zookeeper", "zookeeper"), - // added for Parallel storage interface - "org.apache.hbase" % "hbase-server" % "0.98.5-hadoop2" - exclude("org.apache.hbase", "hbase-client") - exclude("org.apache.zookeeper", "zookeeper") - exclude("javax.servlet", "servlet-api") - exclude("org.mortbay.jetty", "servlet-api-2.5") - exclude("org.mortbay.jetty", "jsp-api-2.1") - exclude("org.mortbay.jetty", "jsp-2.1"), "org.apache.zookeeper" % "zookeeper" % "3.4.7" exclude("org.slf4j", "slf4j-api") exclude("org.slf4j", "slf4j-log4j12"), "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.clapper" %% "grizzled-slf4j" % "1.0.2", - "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value, "org.json4s" %% "json4s-native" % json4sVersion.value, "org.json4s" %% "json4s-ext" % json4sVersion.value, - "org.postgresql" % "postgresql" % "9.4-1204-jdbc41", "org.scalatest" %% "scalatest" % "2.1.7" % "test", - "org.scalikejdbc" %% "scalikejdbc" % "2.3.5", "org.slf4j" % "slf4j-log4j12" % "1.7.18", "org.spark-project.akka" %% "akka-actor" % "2.3.4-spark", "org.specs2" %% "specs2" % "2.3.13" % "test") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala deleted file mode 100644 index 077168a..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.AccessKey -import org.apache.predictionio.data.storage.AccessKeys -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.json4s.JsonDSL._ -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -import scala.util.Random - -/** Elasticsearch implementation of AccessKeys. */ -class ESAccessKeys(client: Client, config: StorageClientConfig, index: String) - extends AccessKeys with Logging { - implicit val formats = DefaultFormats.lossless - private val estype = "accesskeys" - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } - - def insert(accessKey: AccessKey): Option[String] = { - val key = if (accessKey.key.isEmpty) generateKey else accessKey.key - update(accessKey.copy(key = key)) - Some(key) - } - - def get(key: String): Option[AccessKey] = { - try { - val response = client.prepareGet( - index, - estype, - key).get() - Some(read[AccessKey](response.getSourceAsString)) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - case e: NullPointerException => None - } - } - - def getAll(): Seq[AccessKey] = { - try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[AccessKey](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[AccessKey]() - } - } - - def getByAppid(appid: Int): Seq[AccessKey] = { - try { - val builder = client.prepareSearch(index).setTypes(estype). - setPostFilter(termFilter("appid", appid)) - ESUtils.getAll[AccessKey](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[AccessKey]() - } - } - - def update(accessKey: AccessKey): Unit = { - try { - client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get() - } catch { - case e: ElasticsearchException => - error(e.getMessage) - } - } - - def delete(key: String): Unit = { - try { - client.prepareDelete(index, estype, key).get - } catch { - case e: ElasticsearchException => - error(e.getMessage) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala deleted file mode 100644 index 3781a4b..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.App -import org.apache.predictionio.data.storage.Apps -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.json4s.JsonDSL._ -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -/** Elasticsearch implementation of Items. */ -class ESApps(client: Client, config: StorageClientConfig, index: String) - extends Apps with Logging { - implicit val formats = DefaultFormats.lossless - private val estype = "apps" - private val seq = new ESSequences(client, config, index) - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } - - def insert(app: App): Option[Int] = { - val id = - if (app.id == 0) { - var roll = seq.genNext("apps") - while (!get(roll).isEmpty) roll = seq.genNext("apps") - roll - } - else app.id - val realapp = app.copy(id = id) - update(realapp) - Some(id) - } - - def get(id: Int): Option[App] = { - try { - val response = client.prepareGet( - index, - estype, - id.toString).get() - Some(read[App](response.getSourceAsString)) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - case e: NullPointerException => None - } - } - - def getByName(name: String): Option[App] = { - try { - val response = client.prepareSearch(index).setTypes(estype). - setPostFilter(termFilter("name", name)).get - val hits = response.getHits().hits() - if (hits.size > 0) { - Some(read[App](hits.head.getSourceAsString)) - } else { - None - } - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - } - } - - def getAll(): Seq[App] = { - try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[App](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[App]() - } - } - - def update(app: App): Unit = { - try { - val response = client.prepareIndex(index, estype, app.id.toString). - setSource(write(app)).get() - } catch { - case e: ElasticsearchException => - error(e.getMessage) - } - } - - def delete(id: Int): Unit = { - try { - client.prepareDelete(index, estype, id.toString).get - } catch { - case e: ElasticsearchException => - error(e.getMessage) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala deleted file mode 100644 index 52697fd..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.Channel -import org.apache.predictionio.data.storage.Channels -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders.termFilter -import org.json4s.DefaultFormats -import org.json4s.JsonDSL._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -class ESChannels(client: Client, config: StorageClientConfig, index: String) - extends Channels with Logging { - - implicit val formats = DefaultFormats.lossless - private val estype = "channels" - private val seq = new ESSequences(client, config, index) - private val seqName = "channels" - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } - - def insert(channel: Channel): Option[Int] = { - val id = - if (channel.id == 0) { - var roll = seq.genNext(seqName) - while (!get(roll).isEmpty) roll = seq.genNext(seqName) - roll - } else channel.id - - val realChannel = channel.copy(id = id) - if (update(realChannel)) Some(id) else None - } - - def get(id: Int): Option[Channel] = { - try { - val response = client.prepareGet( - index, - estype, - id.toString).get() - Some(read[Channel](response.getSourceAsString)) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - case e: NullPointerException => None - } - } - - def getByAppid(appid: Int): Seq[Channel] = { - try { - val builder = client.prepareSearch(index).setTypes(estype). - setPostFilter(termFilter("appid", appid)) - ESUtils.getAll[Channel](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[Channel]() - } - } - - def update(channel: Channel): Boolean = { - try { - val response = client.prepareIndex(index, estype, channel.id.toString). - setSource(write(channel)).get() - true - } catch { - case e: ElasticsearchException => - error(e.getMessage) - false - } - } - - def delete(id: Int): Unit = { - try { - client.prepareDelete(index, estype, id.toString).get - } catch { - case e: ElasticsearchException => - error(e.getMessage) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala deleted file mode 100644 index 21690bf..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.EngineInstance -import org.apache.predictionio.data.storage.EngineInstanceSerializer -import org.apache.predictionio.data.storage.EngineInstances -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.elasticsearch.search.sort.SortOrder -import org.json4s.JsonDSL._ -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -class ESEngineInstances(client: Client, config: StorageClientConfig, index: String) - extends EngineInstances with Logging { - implicit val formats = DefaultFormats + new EngineInstanceSerializer - private val estype = "engine_instances" - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineVersion" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineVariant" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineFactory" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("batch" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("dataSourceParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("preparatorParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("algorithmsParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("servingParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } - - def insert(i: EngineInstance): String = { - try { - val response = client.prepareIndex(index, estype). - setSource(write(i)).get - response.getId - } catch { - case e: ElasticsearchException => - error(e.getMessage) - "" - } - } - - def get(id: String): Option[EngineInstance] = { - try { - val response = client.prepareGet(index, estype, id).get - if (response.isExists) { - Some(read[EngineInstance](response.getSourceAsString)) - } else { - None - } - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - } - } - - def getAll(): Seq[EngineInstance] = { - try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[EngineInstance](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() - } - } - - def getCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Seq[EngineInstance] = { - try { - val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( - andFilter( - termFilter("status", "COMPLETED"), - termFilter("engineId", engineId), - termFilter("engineVersion", engineVersion), - termFilter("engineVariant", engineVariant))). - addSort("startTime", SortOrder.DESC) - ESUtils.getAll[EngineInstance](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() - } - } - - def getLatestCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Option[EngineInstance] = - getCompleted( - engineId, - engineVersion, - engineVariant).headOption - - def update(i: EngineInstance): Unit = { - try { - client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get - } catch { - case e: ElasticsearchException => error(e.getMessage) - } - } - - def delete(id: String): Unit = { - try { - val response = client.prepareDelete(index, estype, id).get - } catch { - case e: ElasticsearchException => error(e.getMessage) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala deleted file mode 100644 index 85bf820..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.EvaluationInstance -import org.apache.predictionio.data.storage.EvaluationInstanceSerializer -import org.apache.predictionio.data.storage.EvaluationInstances -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.elasticsearch.search.sort.SortOrder -import org.json4s.JsonDSL._ -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String) - extends EvaluationInstances with Logging { - implicit val formats = DefaultFormats + new EvaluationInstanceSerializer - private val estype = "evaluation_instances" - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("evaluationClass" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineParamsGeneratorClass" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("batch" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("evaluatorResults" -> - ("type" -> "string") ~ ("index" -> "no")) ~ - ("evaluatorResultsHTML" -> - ("type" -> "string") ~ ("index" -> "no")) ~ - ("evaluatorResultsJSON" -> - ("type" -> "string") ~ ("index" -> "no")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } - - def insert(i: EvaluationInstance): String = { - try { - val response = client.prepareIndex(index, estype). - setSource(write(i)).get - response.getId - } catch { - case e: ElasticsearchException => - error(e.getMessage) - "" - } - } - - def get(id: String): Option[EvaluationInstance] = { - try { - val response = client.prepareGet(index, estype, id).get - if (response.isExists) { - Some(read[EvaluationInstance](response.getSourceAsString)) - } else { - None - } - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - } - } - - def getAll(): Seq[EvaluationInstance] = { - try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[EvaluationInstance](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() - } - } - - def getCompleted(): Seq[EvaluationInstance] = { - try { - val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( - termFilter("status", "EVALCOMPLETED")). - addSort("startTime", SortOrder.DESC) - ESUtils.getAll[EvaluationInstance](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() - } - } - - def update(i: EvaluationInstance): Unit = { - try { - client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get - } catch { - case e: ElasticsearchException => error(e.getMessage) - } - } - - def delete(id: String): Unit = { - try { - client.prepareDelete(index, estype, id).get - } catch { - case e: ElasticsearchException => error(e.getMessage) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala deleted file mode 100644 index 5c9e170..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.json4s.JsonDSL._ -import org.json4s._ -import org.json4s.native.JsonMethods._ - -class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging { - implicit val formats = DefaultFormats - private val estype = "sequences" - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - // val settingsJson = - // ("number_of_shards" -> 1) ~ - // ("auto_expand_replicas" -> "0-all") - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val mappingJson = - (estype -> - ("_source" -> ("enabled" -> 0)) ~ - ("_all" -> ("enabled" -> 0)) ~ - ("_type" -> ("index" -> "no")) ~ - ("enabled" -> 0)) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(mappingJson))).get - } - - def genNext(name: String): Int = { - try { - val response = client.prepareIndex(index, estype, name). - setSource(compact(render("n" -> name))).get - response.getVersion().toInt - } catch { - case e: ElasticsearchException => - error(e.getMessage) - 0 - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala deleted file mode 100644 index f5c99bf..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch - -import org.elasticsearch.action.search.SearchRequestBuilder -import org.elasticsearch.client.Client -import org.elasticsearch.common.unit.TimeValue -import org.json4s.Formats -import org.json4s.native.Serialization.read - -import scala.collection.mutable.ArrayBuffer - -object ESUtils { - val scrollLife = new TimeValue(60000) - - def getAll[T : Manifest]( - client: Client, - builder: SearchRequestBuilder)( - implicit formats: Formats): Seq[T] = { - val results = ArrayBuffer[T]() - var response = builder.setScroll(scrollLife).get - var hits = response.getHits().hits() - results ++= hits.map(h => read[T](h.getSourceAsString)) - while (hits.size > 0) { - response = client.prepareSearchScroll(response.getScrollId). - setScroll(scrollLife).get - hits = response.getHits().hits() - results ++= hits.map(h => read[T](h.getSourceAsString)) - } - results - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala deleted file mode 100644 index 75ac2b0..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.BaseStorageClient -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.StorageClientException -import org.elasticsearch.client.transport.TransportClient -import org.elasticsearch.common.settings.ImmutableSettings -import org.elasticsearch.common.transport.InetSocketTransportAddress -import org.elasticsearch.transport.ConnectTransportException - -class StorageClient(val config: StorageClientConfig) extends BaseStorageClient - with Logging { - override val prefix = "ES" - val client = try { - val hosts = config.properties.get("HOSTS"). - map(_.split(",").toSeq).getOrElse(Seq("localhost")) - val ports = config.properties.get("PORTS"). - map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300)) - val settings = ImmutableSettings.settingsBuilder() - .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch")) - val transportClient = new TransportClient(settings) - (hosts zip ports) foreach { hp => - transportClient.addTransportAddress( - new InetSocketTransportAddress(hp._1, hp._2)) - } - transportClient - } catch { - case e: ConnectTransportException => - throw new StorageClientException(e.getMessage, e) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala deleted file mode 100644 index 0c549b8..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage - -/** Elasticsearch implementation of storage traits, supporting meta data only - * - * @group Implementation - */ -package object elasticsearch {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala deleted file mode 100644 index 2cdb734..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala +++ /dev/null @@ -1,415 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hbase - -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.EventValidation -import org.apache.predictionio.data.storage.DataMap - -import org.apache.hadoop.hbase.client.Result -import org.apache.hadoop.hbase.client.Put -import org.apache.hadoop.hbase.client.Scan -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.filter.FilterList -import org.apache.hadoop.hbase.filter.RegexStringComparator -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp -import org.apache.hadoop.hbase.filter.BinaryComparator -import org.apache.hadoop.hbase.filter.QualifierFilter -import org.apache.hadoop.hbase.filter.SkipFilter - -import org.json4s.DefaultFormats -import org.json4s.JObject -import org.json4s.native.Serialization.{ read, write } - -import org.joda.time.DateTime -import org.joda.time.DateTimeZone - -import org.apache.commons.codec.binary.Base64 -import java.security.MessageDigest - -import java.util.UUID - -/* common utility function for accessing EventsStore in HBase */ -object HBEventsUtil { - - implicit val formats = DefaultFormats - - def tableName(namespace: String, appId: Int, channelId: Option[Int] = None): String = { - channelId.map { ch => - s"${namespace}:events_${appId}_${ch}" - }.getOrElse { - s"${namespace}:events_${appId}" - } - } - - // column names for "e" column family - val colNames: Map[String, Array[Byte]] = Map( - "event" -> "e", - "entityType" -> "ety", - "entityId" -> "eid", - "targetEntityType" -> "tety", - "targetEntityId" -> "teid", - "properties" -> "p", - "prId" -> "prid", - "eventTime" -> "et", - "eventTimeZone" -> "etz", - "creationTime" -> "ct", - "creationTimeZone" -> "ctz" - ).mapValues(Bytes.toBytes(_)) - - def hash(entityType: String, entityId: String): Array[Byte] = { - val s = entityType + "-" + entityId - // get a new MessageDigest object each time for thread-safe - val md5 = MessageDigest.getInstance("MD5") - md5.digest(Bytes.toBytes(s)) - } - - class RowKey( - val b: Array[Byte] - ) { - require((b.size == 32), s"Incorrect b size: ${b.size}") - lazy val entityHash: Array[Byte] = b.slice(0, 16) - lazy val millis: Long = Bytes.toLong(b.slice(16, 24)) - lazy val uuidLow: Long = Bytes.toLong(b.slice(24, 32)) - - lazy val toBytes: Array[Byte] = b - - override def toString: String = { - Base64.encodeBase64URLSafeString(toBytes) - } - } - - object RowKey { - def apply( - entityType: String, - entityId: String, - millis: Long, - uuidLow: Long): RowKey = { - // add UUID least significant bits for multiple actions at the same time - // (UUID's most significant bits are actually timestamp, - // use eventTime instead). - val b = hash(entityType, entityId) ++ - Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow) - new RowKey(b) - } - - // get RowKey from string representation - def apply(s: String): RowKey = { - try { - apply(Base64.decodeBase64(s)) - } catch { - case e: Exception => throw new RowKeyException( - s"Failed to convert String ${s} to RowKey because ${e}", e) - } - } - - def apply(b: Array[Byte]): RowKey = { - if (b.size != 32) { - val bString = b.mkString(",") - throw new RowKeyException( - s"Incorrect byte array size. Bytes: ${bString}.") - } - new RowKey(b) - } - - } - - class RowKeyException(val msg: String, val cause: Exception) - extends Exception(msg, cause) { - def this(msg: String) = this(msg, null) - } - - case class PartialRowKey(entityType: String, entityId: String, - millis: Option[Long] = None) { - val toBytes: Array[Byte] = { - hash(entityType, entityId) ++ - (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]())) - } - } - - def eventToPut(event: Event, appId: Int): (Put, RowKey) = { - // generate new rowKey if eventId is None - val rowKey = event.eventId.map { id => - RowKey(id) // create rowKey from eventId - }.getOrElse { - // TOOD: use real UUID. not pseudo random - val uuidLow: Long = UUID.randomUUID().getLeastSignificantBits - RowKey( - entityType = event.entityType, - entityId = event.entityId, - millis = event.eventTime.getMillis, - uuidLow = uuidLow - ) - } - - val eBytes = Bytes.toBytes("e") - // use eventTime as HBase's cell timestamp - val put = new Put(rowKey.toBytes, event.eventTime.getMillis) - - def addStringToE(col: Array[Byte], v: String): Put = { - put.add(eBytes, col, Bytes.toBytes(v)) - } - - def addLongToE(col: Array[Byte], v: Long): Put = { - put.add(eBytes, col, Bytes.toBytes(v)) - } - - addStringToE(colNames("event"), event.event) - addStringToE(colNames("entityType"), event.entityType) - addStringToE(colNames("entityId"), event.entityId) - - event.targetEntityType.foreach { targetEntityType => - addStringToE(colNames("targetEntityType"), targetEntityType) - } - - event.targetEntityId.foreach { targetEntityId => - addStringToE(colNames("targetEntityId"), targetEntityId) - } - - // TODO: make properties Option[] - if (!event.properties.isEmpty) { - addStringToE(colNames("properties"), write(event.properties.toJObject)) - } - - event.prId.foreach { prId => - addStringToE(colNames("prId"), prId) - } - - addLongToE(colNames("eventTime"), event.eventTime.getMillis) - val eventTimeZone = event.eventTime.getZone - if (!eventTimeZone.equals(EventValidation.defaultTimeZone)) { - addStringToE(colNames("eventTimeZone"), eventTimeZone.getID) - } - - addLongToE(colNames("creationTime"), event.creationTime.getMillis) - val creationTimeZone = event.creationTime.getZone - if (!creationTimeZone.equals(EventValidation.defaultTimeZone)) { - addStringToE(colNames("creationTimeZone"), creationTimeZone.getID) - } - - // can use zero-length byte array for tag cell value - (put, rowKey) - } - - def resultToEvent(result: Result, appId: Int): Event = { - val rowKey = RowKey(result.getRow()) - - val eBytes = Bytes.toBytes("e") - // val e = result.getFamilyMap(eBytes) - - def getStringCol(col: String): String = { - val r = result.getValue(eBytes, colNames(col)) - require(r != null, - s"Failed to get value for column ${col}. " + - s"Rowkey: ${rowKey.toString} " + - s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.") - - Bytes.toString(r) - } - - def getLongCol(col: String): Long = { - val r = result.getValue(eBytes, colNames(col)) - require(r != null, - s"Failed to get value for column ${col}. " + - s"Rowkey: ${rowKey.toString} " + - s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.") - - Bytes.toLong(r) - } - - def getOptStringCol(col: String): Option[String] = { - val r = result.getValue(eBytes, colNames(col)) - if (r == null) { - None - } else { - Some(Bytes.toString(r)) - } - } - - def getTimestamp(col: String): Long = { - result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp() - } - - val event = getStringCol("event") - val entityType = getStringCol("entityType") - val entityId = getStringCol("entityId") - val targetEntityType = getOptStringCol("targetEntityType") - val targetEntityId = getOptStringCol("targetEntityId") - val properties: DataMap = getOptStringCol("properties") - .map(s => DataMap(read[JObject](s))).getOrElse(DataMap()) - val prId = getOptStringCol("prId") - val eventTimeZone = getOptStringCol("eventTimeZone") - .map(DateTimeZone.forID(_)) - .getOrElse(EventValidation.defaultTimeZone) - val eventTime = new DateTime( - getLongCol("eventTime"), eventTimeZone) - val creationTimeZone = getOptStringCol("creationTimeZone") - .map(DateTimeZone.forID(_)) - .getOrElse(EventValidation.defaultTimeZone) - val creationTime: DateTime = new DateTime( - getLongCol("creationTime"), creationTimeZone) - - Event( - eventId = Some(RowKey(result.getRow()).toString), - event = event, - entityType = entityType, - entityId = entityId, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - properties = properties, - eventTime = eventTime, - tags = Seq(), - prId = prId, - creationTime = creationTime - ) - } - - - // for mandatory field. None means don't care. - // for optional field. None means don't care. - // Some(None) means not exist. - // Some(Some(x)) means it should match x - def createScan( - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - reversed: Option[Boolean] = None): Scan = { - - val scan: Scan = new Scan() - - (entityType, entityId) match { - case (Some(et), Some(eid)) => { - val start = PartialRowKey(et, eid, - startTime.map(_.getMillis)).toBytes - // if no untilTime, stop when reach next bytes of entityTypeAndId - val stop = PartialRowKey(et, eid, - untilTime.map(_.getMillis).orElse(Some(-1))).toBytes - - if (reversed.getOrElse(false)) { - // Reversed order. - // If you specify a startRow and stopRow, - // to scan in reverse, the startRow needs to be lexicographically - // after the stopRow. - scan.setStartRow(stop) - scan.setStopRow(start) - scan.setReversed(true) - } else { - scan.setStartRow(start) - scan.setStopRow(stop) - } - } - case (_, _) => { - val minTime: Long = startTime.map(_.getMillis).getOrElse(0) - val maxTime: Long = untilTime.map(_.getMillis).getOrElse(Long.MaxValue) - scan.setTimeRange(minTime, maxTime) - if (reversed.getOrElse(false)) { - scan.setReversed(true) - } - } - } - - val filters = new FilterList(FilterList.Operator.MUST_PASS_ALL) - - val eBytes = Bytes.toBytes("e") - - def createBinaryFilter(col: String, value: Array[Byte]): SingleColumnValueFilter = { - val comp = new BinaryComparator(value) - new SingleColumnValueFilter( - eBytes, colNames(col), CompareOp.EQUAL, comp) - } - - // skip the row if the column exists - def createSkipRowIfColumnExistFilter(col: String): SkipFilter = { - val comp = new BinaryComparator(colNames(col)) - val q = new QualifierFilter(CompareOp.NOT_EQUAL, comp) - // filters an entire row if any of the Cell checks do not pass - new SkipFilter(q) - } - - entityType.foreach { et => - val compType = new BinaryComparator(Bytes.toBytes(et)) - val filterType = new SingleColumnValueFilter( - eBytes, colNames("entityType"), CompareOp.EQUAL, compType) - filters.addFilter(filterType) - } - - entityId.foreach { eid => - val compId = new BinaryComparator(Bytes.toBytes(eid)) - val filterId = new SingleColumnValueFilter( - eBytes, colNames("entityId"), CompareOp.EQUAL, compId) - filters.addFilter(filterId) - } - - eventNames.foreach { eventsList => - // match any of event in the eventsList - val eventFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE) - eventsList.foreach { e => - val compEvent = new BinaryComparator(Bytes.toBytes(e)) - val filterEvent = new SingleColumnValueFilter( - eBytes, colNames("event"), CompareOp.EQUAL, compEvent) - eventFilters.addFilter(filterEvent) - } - if (!eventFilters.getFilters().isEmpty) { - filters.addFilter(eventFilters) - } - } - - targetEntityType.foreach { tetOpt => - if (tetOpt.isEmpty) { - val filter = createSkipRowIfColumnExistFilter("targetEntityType") - filters.addFilter(filter) - } else { - tetOpt.foreach { tet => - val filter = createBinaryFilter( - "targetEntityType", Bytes.toBytes(tet)) - // the entire row will be skipped if the column is not found. - filter.setFilterIfMissing(true) - filters.addFilter(filter) - } - } - } - - targetEntityId.foreach { teidOpt => - if (teidOpt.isEmpty) { - val filter = createSkipRowIfColumnExistFilter("targetEntityId") - filters.addFilter(filter) - } else { - teidOpt.foreach { teid => - val filter = createBinaryFilter( - "targetEntityId", Bytes.toBytes(teid)) - // the entire row will be skipped if the column is not found. - filter.setFilterIfMissing(true) - filters.addFilter(filter) - } - } - } - - if (!filters.getFilters().isEmpty) { - scan.setFilter(filters) - } - - scan - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala deleted file mode 100644 index 360b007..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hbase - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.LEvents -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.hbase.HBEventsUtil.RowKey -import org.apache.hadoop.hbase.HColumnDescriptor -import org.apache.hadoop.hbase.HTableDescriptor -import org.apache.hadoop.hbase.NamespaceDescriptor -import org.apache.hadoop.hbase.TableName -import org.apache.hadoop.hbase.client._ -import org.joda.time.DateTime - -import scala.collection.JavaConversions._ -import scala.concurrent.ExecutionContext -import scala.concurrent.Future - -class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace: String) - extends LEvents with Logging { - - // implicit val formats = DefaultFormats + new EventJson4sSupport.DBSerializer - - def resultToEvent(result: Result, appId: Int): Event = - HBEventsUtil.resultToEvent(result, appId) - - def getTable(appId: Int, channelId: Option[Int] = None): HTableInterface = - client.connection.getTable(HBEventsUtil.tableName(namespace, appId, channelId)) - - override - def init(appId: Int, channelId: Option[Int] = None): Boolean = { - // check namespace exist - val existingNamespace = client.admin.listNamespaceDescriptors() - .map(_.getName) - if (!existingNamespace.contains(namespace)) { - val nameDesc = NamespaceDescriptor.create(namespace).build() - info(s"The namespace ${namespace} doesn't exist yet. Creating now...") - client.admin.createNamespace(nameDesc) - } - - val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId)) - if (!client.admin.tableExists(tableName)) { - info(s"The table ${tableName.getNameAsString()} doesn't exist yet." + - " Creating now...") - val tableDesc = new HTableDescriptor(tableName) - tableDesc.addFamily(new HColumnDescriptor("e")) - tableDesc.addFamily(new HColumnDescriptor("r")) // reserved - client.admin.createTable(tableDesc) - } - true - } - - override - def remove(appId: Int, channelId: Option[Int] = None): Boolean = { - val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId)) - try { - if (client.admin.tableExists(tableName)) { - info(s"Removing table ${tableName.getNameAsString()}...") - client.admin.disableTable(tableName) - client.admin.deleteTable(tableName) - } else { - info(s"Table ${tableName.getNameAsString()} doesn't exist." + - s" Nothing is deleted.") - } - true - } catch { - case e: Exception => { - error(s"Fail to remove table for appId ${appId}. Exception: ${e}") - false - } - } - } - - override - def close(): Unit = { - client.admin.close() - client.connection.close() - } - - override - def futureInsert( - event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): - Future[String] = { - Future { - val table = getTable(appId, channelId) - val (put, rowKey) = HBEventsUtil.eventToPut(event, appId) - table.put(put) - table.flushCommits() - table.close() - rowKey.toString - } - } - - override - def futureGet( - eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): - Future[Option[Event]] = { - Future { - val table = getTable(appId, channelId) - val rowKey = RowKey(eventId) - val get = new Get(rowKey.toBytes) - - val result = table.get(get) - table.close() - - if (!result.isEmpty()) { - val event = resultToEvent(result, appId) - Some(event) - } else { - None - } - } - } - - override - def futureDelete( - eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): - Future[Boolean] = { - Future { - val table = getTable(appId, channelId) - val rowKey = RowKey(eventId) - val exists = table.exists(new Get(rowKey.toBytes)) - table.delete(new Delete(rowKey.toBytes)) - table.close() - exists - } - } - - override - def futureFind( - appId: Int, - channelId: Option[Int] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - limit: Option[Int] = None, - reversed: Option[Boolean] = None)(implicit ec: ExecutionContext): - Future[Iterator[Event]] = { - Future { - - require(!((reversed == Some(true)) && (entityType.isEmpty || entityId.isEmpty)), - "the parameter reversed can only be used with both entityType and entityId specified.") - - val table = getTable(appId, channelId) - - val scan = HBEventsUtil.createScan( - startTime = startTime, - untilTime = untilTime, - entityType = entityType, - entityId = entityId, - eventNames = eventNames, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - reversed = reversed) - val scanner = table.getScanner(scan) - table.close() - - val eventsIter = scanner.iterator() - - // Get all events if None or Some(-1) - val results: Iterator[Result] = limit match { - case Some(-1) => eventsIter - case None => eventsIter - case Some(x) => eventsIter.take(x) - } - - val eventsIt = results.map { resultToEvent(_, appId) } - - eventsIt - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala deleted file mode 100644 index 7324fa6..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.predictionio.data.storage.hbase - -import org.apache.hadoop.hbase.HBaseConfiguration -import org.apache.hadoop.hbase.client.{Delete, HTable, Result} -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat} -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapreduce.OutputFormat -import org.apache.predictionio.data.storage.{Event, PEvents, StorageClientConfig} -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.joda.time.DateTime - -class HBPEvents(client: HBClient, config: StorageClientConfig, namespace: String) extends PEvents { - - def checkTableExists(appId: Int, channelId: Option[Int]): Unit = { - if (!client.admin.tableExists(HBEventsUtil.tableName(namespace, appId, channelId))) { - if (channelId.nonEmpty) { - logger.error(s"The appId $appId with channelId $channelId does not exist." + - s" Please use valid appId and channelId.") - throw new Exception(s"HBase table not found for appId $appId" + - s" with channelId $channelId.") - } else { - logger.error(s"The appId $appId does not exist. Please use valid appId.") - throw new Exception(s"HBase table not found for appId $appId.") - } - } - } - - override - def find( - appId: Int, - channelId: Option[Int] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None - )(sc: SparkContext): RDD[Event] = { - - checkTableExists(appId, channelId) - - val conf = HBaseConfiguration.create() - conf.set(TableInputFormat.INPUT_TABLE, - HBEventsUtil.tableName(namespace, appId, channelId)) - - val scan = HBEventsUtil.createScan( - startTime = startTime, - untilTime = untilTime, - entityType = entityType, - entityId = entityId, - eventNames = eventNames, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - reversed = None) - scan.setCaching(500) // TODO - scan.setCacheBlocks(false) // TODO - - conf.set(TableInputFormat.SCAN, PIOHBaseUtil.convertScanToString(scan)) - - // HBase is not accessed until this rdd is actually used. - val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], - classOf[ImmutableBytesWritable], - classOf[Result]).map { - case (key, row) => HBEventsUtil.resultToEvent(row, appId) - } - - rdd - } - - override - def write( - events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { - - checkTableExists(appId, channelId) - - val conf = HBaseConfiguration.create() - conf.set(TableOutputFormat.OUTPUT_TABLE, - HBEventsUtil.tableName(namespace, appId, channelId)) - conf.setClass("mapreduce.outputformat.class", - classOf[TableOutputFormat[Object]], - classOf[OutputFormat[Object, Writable]]) - - events.map { event => - val (put, rowKey) = HBEventsUtil.eventToPut(event, appId) - (new ImmutableBytesWritable(rowKey.toBytes), put) - }.saveAsNewAPIHadoopDataset(conf) - - } - - def delete( - eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { - - checkTableExists(appId, channelId) - - val tableName = HBEventsUtil.tableName(namespace, appId, channelId) - - eventIds.foreachPartition{ iter => - val conf = HBaseConfiguration.create() - conf.set(TableOutputFormat.OUTPUT_TABLE, - tableName) - - val table = new HTable(conf, tableName) - iter.foreach { id => - val rowKey = HBEventsUtil.RowKey(id) - val delete = new Delete(rowKey.b) - table.delete(delete) - } - table.close - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala deleted file mode 100644 index 745fcb9..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.predictionio.data.storage.hbase - -import org.apache.hadoop.hbase.client.Scan -import org.apache.hadoop.hbase.protobuf.ProtobufUtil -import org.apache.hadoop.hbase.util.Base64 - -object PIOHBaseUtil { - /* - * Copying this from Apache HBase because of its restrictive scope in 0.98.x - */ - def convertScanToString(scan: Scan): String = { - val proto = ProtobufUtil.toScan(scan) - Base64.encodeBytes(proto.toByteArray) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala deleted file mode 100644 index 1720410..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hbase - -import org.apache.predictionio.data.storage.BaseStorageClient -import org.apache.predictionio.data.storage.StorageClientConfig - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.HBaseConfiguration -import org.apache.hadoop.hbase.MasterNotRunningException -import org.apache.hadoop.hbase.ZooKeeperConnectionException -import org.apache.hadoop.hbase.client.HConnectionManager -import org.apache.hadoop.hbase.client.HConnection -import org.apache.hadoop.hbase.client.HBaseAdmin - -import grizzled.slf4j.Logging - -case class HBClient( - val conf: Configuration, - val connection: HConnection, - val admin: HBaseAdmin -) - -class StorageClient(val config: StorageClientConfig) - extends BaseStorageClient with Logging { - - val conf = HBaseConfiguration.create() - - if (config.test) { - // use fewer retries and shorter timeout for test mode - conf.set("hbase.client.retries.number", "1") - conf.set("zookeeper.session.timeout", "30000"); - conf.set("zookeeper.recovery.retry", "1") - } - - try { - HBaseAdmin.checkHBaseAvailable(conf) - } catch { - case e: MasterNotRunningException => - error("HBase master is not running (ZooKeeper ensemble: " + - conf.get("hbase.zookeeper.quorum") + "). Please make sure that HBase " + - "is running properly, and that the configuration is pointing at the " + - "correct ZooKeeper ensemble.") - throw e - case e: ZooKeeperConnectionException => - error("Cannot connect to ZooKeeper (ZooKeeper ensemble: " + - conf.get("hbase.zookeeper.quorum") + "). Please make sure that the " + - "configuration is pointing at the correct ZooKeeper ensemble. By " + - "default, HBase manages its own ZooKeeper, so if you have not " + - "configured HBase to use an external ZooKeeper, that means your " + - "HBase is not started or configured properly.") - throw e - case e: Exception => { - error("Failed to connect to HBase." + - " Please check if HBase is running properly.") - throw e - } - } - - val connection = HConnectionManager.createConnection(conf) - - val client = HBClient( - conf = conf, - connection = connection, - admin = new HBaseAdmin(connection) - ) - - override - val prefix = "HB" -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala deleted file mode 100644 index 49bf031..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage - -/** HBase implementation of storage traits, supporting event data only - * - * @group Implementation - */ -package object hbase {}
