http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala new file mode 100644 index 0000000..077168a --- /dev/null +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.AccessKey +import org.apache.predictionio.data.storage.AccessKeys +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +import scala.util.Random + +/** Elasticsearch implementation of AccessKeys. */ +class ESAccessKeys(client: Client, config: StorageClientConfig, index: String) + extends AccessKeys with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "accesskeys" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(accessKey: AccessKey): Option[String] = { + val key = if (accessKey.key.isEmpty) generateKey else accessKey.key + update(accessKey.copy(key = key)) + Some(key) + } + + def get(key: String): Option[AccessKey] = { + try { + val response = client.prepareGet( + index, + estype, + key).get() + Some(read[AccessKey](response.getSourceAsString)) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + case e: NullPointerException => None + } + } + + def getAll(): Seq[AccessKey] = { + try { + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[AccessKey](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[AccessKey]() + } + } + + def getByAppid(appid: Int): Seq[AccessKey] = { + try { + val builder = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("appid", appid)) + ESUtils.getAll[AccessKey](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[AccessKey]() + } + } + + def update(accessKey: AccessKey): Unit = { + try { + client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get() + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } + + def delete(key: String): Unit = { + try { + client.prepareDelete(index, estype, key).get + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala new file mode 100644 index 0000000..3781a4b --- /dev/null +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.App +import org.apache.predictionio.data.storage.Apps +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +/** Elasticsearch implementation of Items. */ +class ESApps(client: Client, config: StorageClientConfig, index: String) + extends Apps with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "apps" + private val seq = new ESSequences(client, config, index) + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(app: App): Option[Int] = { + val id = + if (app.id == 0) { + var roll = seq.genNext("apps") + while (!get(roll).isEmpty) roll = seq.genNext("apps") + roll + } + else app.id + val realapp = app.copy(id = id) + update(realapp) + Some(id) + } + + def get(id: Int): Option[App] = { + try { + val response = client.prepareGet( + index, + estype, + id.toString).get() + Some(read[App](response.getSourceAsString)) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + case e: NullPointerException => None + } + } + + def getByName(name: String): Option[App] = { + try { + val response = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("name", name)).get + val hits = response.getHits().hits() + if (hits.size > 0) { + Some(read[App](hits.head.getSourceAsString)) + } else { + None + } + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + } + } + + def getAll(): Seq[App] = { + try { + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[App](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[App]() + } + } + + def update(app: App): Unit = { + try { + val response = client.prepareIndex(index, estype, app.id.toString). + setSource(write(app)).get() + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } + + def delete(id: Int): Unit = { + try { + client.prepareDelete(index, estype, id.toString).get + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala new file mode 100644 index 0000000..52697fd --- /dev/null +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.Channel +import org.apache.predictionio.data.storage.Channels +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders.termFilter +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +class ESChannels(client: Client, config: StorageClientConfig, index: String) + extends Channels with Logging { + + implicit val formats = DefaultFormats.lossless + private val estype = "channels" + private val seq = new ESSequences(client, config, index) + private val seqName = "channels" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(channel: Channel): Option[Int] = { + val id = + if (channel.id == 0) { + var roll = seq.genNext(seqName) + while (!get(roll).isEmpty) roll = seq.genNext(seqName) + roll + } else channel.id + + val realChannel = channel.copy(id = id) + if (update(realChannel)) Some(id) else None + } + + def get(id: Int): Option[Channel] = { + try { + val response = client.prepareGet( + index, + estype, + id.toString).get() + Some(read[Channel](response.getSourceAsString)) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + case e: NullPointerException => None + } + } + + def getByAppid(appid: Int): Seq[Channel] = { + try { + val builder = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("appid", appid)) + ESUtils.getAll[Channel](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[Channel]() + } + } + + def update(channel: Channel): Boolean = { + try { + val response = client.prepareIndex(index, estype, channel.id.toString). + setSource(write(channel)).get() + true + } catch { + case e: ElasticsearchException => + error(e.getMessage) + false + } + } + + def delete(id: Int): Unit = { + try { + client.prepareDelete(index, estype, id.toString).get + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala new file mode 100644 index 0000000..21690bf --- /dev/null +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EngineInstance +import org.apache.predictionio.data.storage.EngineInstanceSerializer +import org.apache.predictionio.data.storage.EngineInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.elasticsearch.search.sort.SortOrder +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +class ESEngineInstances(client: Client, config: StorageClientConfig, index: String) + extends EngineInstances with Logging { + implicit val formats = DefaultFormats + new EngineInstanceSerializer + private val estype = "engine_instances" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineVersion" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineVariant" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineFactory" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("batch" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("dataSourceParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("preparatorParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("algorithmsParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("servingParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(i: EngineInstance): String = { + try { + val response = client.prepareIndex(index, estype). + setSource(write(i)).get + response.getId + } catch { + case e: ElasticsearchException => + error(e.getMessage) + "" + } + } + + def get(id: String): Option[EngineInstance] = { + try { + val response = client.prepareGet(index, estype, id).get + if (response.isExists) { + Some(read[EngineInstance](response.getSourceAsString)) + } else { + None + } + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + } + } + + def getAll(): Seq[EngineInstance] = { + try { + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[EngineInstance](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq() + } + } + + def getCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Seq[EngineInstance] = { + try { + val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( + andFilter( + termFilter("status", "COMPLETED"), + termFilter("engineId", engineId), + termFilter("engineVersion", engineVersion), + termFilter("engineVariant", engineVariant))). + addSort("startTime", SortOrder.DESC) + ESUtils.getAll[EngineInstance](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq() + } + } + + def getLatestCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Option[EngineInstance] = + getCompleted( + engineId, + engineVersion, + engineVariant).headOption + + def update(i: EngineInstance): Unit = { + try { + client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get + } catch { + case e: ElasticsearchException => error(e.getMessage) + } + } + + def delete(id: String): Unit = { + try { + val response = client.prepareDelete(index, estype, id).get + } catch { + case e: ElasticsearchException => error(e.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala new file mode 100644 index 0000000..85bf820 --- /dev/null +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EvaluationInstance +import org.apache.predictionio.data.storage.EvaluationInstanceSerializer +import org.apache.predictionio.data.storage.EvaluationInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.elasticsearch.search.sort.SortOrder +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String) + extends EvaluationInstances with Logging { + implicit val formats = DefaultFormats + new EvaluationInstanceSerializer + private val estype = "evaluation_instances" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("evaluationClass" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineParamsGeneratorClass" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("batch" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("evaluatorResults" -> + ("type" -> "string") ~ ("index" -> "no")) ~ + ("evaluatorResultsHTML" -> + ("type" -> "string") ~ ("index" -> "no")) ~ + ("evaluatorResultsJSON" -> + ("type" -> "string") ~ ("index" -> "no")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(i: EvaluationInstance): String = { + try { + val response = client.prepareIndex(index, estype). + setSource(write(i)).get + response.getId + } catch { + case e: ElasticsearchException => + error(e.getMessage) + "" + } + } + + def get(id: String): Option[EvaluationInstance] = { + try { + val response = client.prepareGet(index, estype, id).get + if (response.isExists) { + Some(read[EvaluationInstance](response.getSourceAsString)) + } else { + None + } + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + } + } + + def getAll(): Seq[EvaluationInstance] = { + try { + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[EvaluationInstance](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq() + } + } + + def getCompleted(): Seq[EvaluationInstance] = { + try { + val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( + termFilter("status", "EVALCOMPLETED")). + addSort("startTime", SortOrder.DESC) + ESUtils.getAll[EvaluationInstance](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq() + } + } + + def update(i: EvaluationInstance): Unit = { + try { + client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get + } catch { + case e: ElasticsearchException => error(e.getMessage) + } + } + + def delete(id: String): Unit = { + try { + client.prepareDelete(index, estype, id).get + } catch { + case e: ElasticsearchException => error(e.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala new file mode 100644 index 0000000..5c9e170 --- /dev/null +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ + +class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging { + implicit val formats = DefaultFormats + private val estype = "sequences" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + // val settingsJson = + // ("number_of_shards" -> 1) ~ + // ("auto_expand_replicas" -> "0-all") + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val mappingJson = + (estype -> + ("_source" -> ("enabled" -> 0)) ~ + ("_all" -> ("enabled" -> 0)) ~ + ("_type" -> ("index" -> "no")) ~ + ("enabled" -> 0)) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(mappingJson))).get + } + + def genNext(name: String): Int = { + try { + val response = client.prepareIndex(index, estype, name). + setSource(compact(render("n" -> name))).get + response.getVersion().toInt + } catch { + case e: ElasticsearchException => + error(e.getMessage) + 0 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala new file mode 100644 index 0000000..f5c99bf --- /dev/null +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import org.elasticsearch.action.search.SearchRequestBuilder +import org.elasticsearch.client.Client +import org.elasticsearch.common.unit.TimeValue +import org.json4s.Formats +import org.json4s.native.Serialization.read + +import scala.collection.mutable.ArrayBuffer + +object ESUtils { + val scrollLife = new TimeValue(60000) + + def getAll[T : Manifest]( + client: Client, + builder: SearchRequestBuilder)( + implicit formats: Formats): Seq[T] = { + val results = ArrayBuffer[T]() + var response = builder.setScroll(scrollLife).get + var hits = response.getHits().hits() + results ++= hits.map(h => read[T](h.getSourceAsString)) + while (hits.size > 0) { + response = client.prepareSearchScroll(response.getScrollId). + setScroll(scrollLife).get + hits = response.getHits().hits() + results ++= hits.map(h => read[T](h.getSourceAsString)) + } + results + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala new file mode 100644 index 0000000..75ac2b0 --- /dev/null +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.BaseStorageClient +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException +import org.elasticsearch.client.transport.TransportClient +import org.elasticsearch.common.settings.ImmutableSettings +import org.elasticsearch.common.transport.InetSocketTransportAddress +import org.elasticsearch.transport.ConnectTransportException + +class StorageClient(val config: StorageClientConfig) extends BaseStorageClient + with Logging { + override val prefix = "ES" + val client = try { + val hosts = config.properties.get("HOSTS"). + map(_.split(",").toSeq).getOrElse(Seq("localhost")) + val ports = config.properties.get("PORTS"). + map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300)) + val settings = ImmutableSettings.settingsBuilder() + .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch")) + val transportClient = new TransportClient(settings) + (hosts zip ports) foreach { hp => + transportClient.addTransportAddress( + new InetSocketTransportAddress(hp._1, hp._2)) + } + transportClient + } catch { + case e: ConnectTransportException => + throw new StorageClientException(e.getMessage, e) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala new file mode 100644 index 0000000..0c549b8 --- /dev/null +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage + +/** Elasticsearch implementation of storage traits, supporting meta data only + * + * @group Implementation + */ +package object elasticsearch {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/test/resources/application.conf b/storage/elasticsearch1/src/test/resources/application.conf new file mode 100644 index 0000000..eecae44 --- /dev/null +++ b/storage/elasticsearch1/src/test/resources/application.conf @@ -0,0 +1,28 @@ +org.apache.predictionio.data.storage { + sources { + mongodb { + type = mongodb + hosts = [localhost] + ports = [27017] + } + elasticsearch { + type = elasticsearch + hosts = [localhost] + ports = [9300] + } + } + repositories { + # This section is dummy just to make storage happy. + # The actual testing will not bypass these repository settings completely. + # Please refer to StorageTestUtils.scala. + settings { + name = "test_predictionio" + source = mongodb + } + + appdata { + name = "test_predictionio_appdata" + source = mongodb + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/.gitignore ---------------------------------------------------------------------- diff --git a/storage/hbase/.gitignore b/storage/hbase/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/storage/hbase/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/build.sbt ---------------------------------------------------------------------- diff --git a/storage/hbase/build.sbt b/storage/hbase/build.sbt new file mode 100644 index 0000000..5856a5e --- /dev/null +++ b/storage/hbase/build.sbt @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "apache-predictionio-data-hbase" + +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", + "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", + "org.apache.hbase" % "hbase-common" % "0.98.5-hadoop2", + "org.apache.hbase" % "hbase-client" % "0.98.5-hadoop2" + exclude("org.apache.zookeeper", "zookeeper"), + // added for Parallel storage interface + "org.apache.hbase" % "hbase-server" % "0.98.5-hadoop2" + exclude("org.apache.hbase", "hbase-client") + exclude("org.apache.zookeeper", "zookeeper") + exclude("javax.servlet", "servlet-api") + exclude("org.mortbay.jetty", "servlet-api-2.5") + exclude("org.mortbay.jetty", "jsp-api-2.1") + exclude("org.mortbay.jetty", "jsp-2.1"), + "org.scalatest" %% "scalatest" % "2.1.7" % "test", + "org.specs2" %% "specs2" % "2.3.13" % "test") + +parallelExecution in Test := false + +pomExtra := childrenPomExtra.value + +assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true) + +assemblyMergeStrategy in assembly := { + case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat + case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) +} + +// skip test in assembly +test in assembly := {} + +outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-hbase-assembly-" + version.value + ".jar") + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala new file mode 100644 index 0000000..2cdb734 --- /dev/null +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase + +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventValidation +import org.apache.predictionio.data.storage.DataMap + +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.filter.FilterList +import org.apache.hadoop.hbase.filter.RegexStringComparator +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp +import org.apache.hadoop.hbase.filter.BinaryComparator +import org.apache.hadoop.hbase.filter.QualifierFilter +import org.apache.hadoop.hbase.filter.SkipFilter + +import org.json4s.DefaultFormats +import org.json4s.JObject +import org.json4s.native.Serialization.{ read, write } + +import org.joda.time.DateTime +import org.joda.time.DateTimeZone + +import org.apache.commons.codec.binary.Base64 +import java.security.MessageDigest + +import java.util.UUID + +/* common utility function for accessing EventsStore in HBase */ +object HBEventsUtil { + + implicit val formats = DefaultFormats + + def tableName(namespace: String, appId: Int, channelId: Option[Int] = None): String = { + channelId.map { ch => + s"${namespace}:events_${appId}_${ch}" + }.getOrElse { + s"${namespace}:events_${appId}" + } + } + + // column names for "e" column family + val colNames: Map[String, Array[Byte]] = Map( + "event" -> "e", + "entityType" -> "ety", + "entityId" -> "eid", + "targetEntityType" -> "tety", + "targetEntityId" -> "teid", + "properties" -> "p", + "prId" -> "prid", + "eventTime" -> "et", + "eventTimeZone" -> "etz", + "creationTime" -> "ct", + "creationTimeZone" -> "ctz" + ).mapValues(Bytes.toBytes(_)) + + def hash(entityType: String, entityId: String): Array[Byte] = { + val s = entityType + "-" + entityId + // get a new MessageDigest object each time for thread-safe + val md5 = MessageDigest.getInstance("MD5") + md5.digest(Bytes.toBytes(s)) + } + + class RowKey( + val b: Array[Byte] + ) { + require((b.size == 32), s"Incorrect b size: ${b.size}") + lazy val entityHash: Array[Byte] = b.slice(0, 16) + lazy val millis: Long = Bytes.toLong(b.slice(16, 24)) + lazy val uuidLow: Long = Bytes.toLong(b.slice(24, 32)) + + lazy val toBytes: Array[Byte] = b + + override def toString: String = { + Base64.encodeBase64URLSafeString(toBytes) + } + } + + object RowKey { + def apply( + entityType: String, + entityId: String, + millis: Long, + uuidLow: Long): RowKey = { + // add UUID least significant bits for multiple actions at the same time + // (UUID's most significant bits are actually timestamp, + // use eventTime instead). + val b = hash(entityType, entityId) ++ + Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow) + new RowKey(b) + } + + // get RowKey from string representation + def apply(s: String): RowKey = { + try { + apply(Base64.decodeBase64(s)) + } catch { + case e: Exception => throw new RowKeyException( + s"Failed to convert String ${s} to RowKey because ${e}", e) + } + } + + def apply(b: Array[Byte]): RowKey = { + if (b.size != 32) { + val bString = b.mkString(",") + throw new RowKeyException( + s"Incorrect byte array size. Bytes: ${bString}.") + } + new RowKey(b) + } + + } + + class RowKeyException(val msg: String, val cause: Exception) + extends Exception(msg, cause) { + def this(msg: String) = this(msg, null) + } + + case class PartialRowKey(entityType: String, entityId: String, + millis: Option[Long] = None) { + val toBytes: Array[Byte] = { + hash(entityType, entityId) ++ + (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]())) + } + } + + def eventToPut(event: Event, appId: Int): (Put, RowKey) = { + // generate new rowKey if eventId is None + val rowKey = event.eventId.map { id => + RowKey(id) // create rowKey from eventId + }.getOrElse { + // TOOD: use real UUID. not pseudo random + val uuidLow: Long = UUID.randomUUID().getLeastSignificantBits + RowKey( + entityType = event.entityType, + entityId = event.entityId, + millis = event.eventTime.getMillis, + uuidLow = uuidLow + ) + } + + val eBytes = Bytes.toBytes("e") + // use eventTime as HBase's cell timestamp + val put = new Put(rowKey.toBytes, event.eventTime.getMillis) + + def addStringToE(col: Array[Byte], v: String): Put = { + put.add(eBytes, col, Bytes.toBytes(v)) + } + + def addLongToE(col: Array[Byte], v: Long): Put = { + put.add(eBytes, col, Bytes.toBytes(v)) + } + + addStringToE(colNames("event"), event.event) + addStringToE(colNames("entityType"), event.entityType) + addStringToE(colNames("entityId"), event.entityId) + + event.targetEntityType.foreach { targetEntityType => + addStringToE(colNames("targetEntityType"), targetEntityType) + } + + event.targetEntityId.foreach { targetEntityId => + addStringToE(colNames("targetEntityId"), targetEntityId) + } + + // TODO: make properties Option[] + if (!event.properties.isEmpty) { + addStringToE(colNames("properties"), write(event.properties.toJObject)) + } + + event.prId.foreach { prId => + addStringToE(colNames("prId"), prId) + } + + addLongToE(colNames("eventTime"), event.eventTime.getMillis) + val eventTimeZone = event.eventTime.getZone + if (!eventTimeZone.equals(EventValidation.defaultTimeZone)) { + addStringToE(colNames("eventTimeZone"), eventTimeZone.getID) + } + + addLongToE(colNames("creationTime"), event.creationTime.getMillis) + val creationTimeZone = event.creationTime.getZone + if (!creationTimeZone.equals(EventValidation.defaultTimeZone)) { + addStringToE(colNames("creationTimeZone"), creationTimeZone.getID) + } + + // can use zero-length byte array for tag cell value + (put, rowKey) + } + + def resultToEvent(result: Result, appId: Int): Event = { + val rowKey = RowKey(result.getRow()) + + val eBytes = Bytes.toBytes("e") + // val e = result.getFamilyMap(eBytes) + + def getStringCol(col: String): String = { + val r = result.getValue(eBytes, colNames(col)) + require(r != null, + s"Failed to get value for column ${col}. " + + s"Rowkey: ${rowKey.toString} " + + s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.") + + Bytes.toString(r) + } + + def getLongCol(col: String): Long = { + val r = result.getValue(eBytes, colNames(col)) + require(r != null, + s"Failed to get value for column ${col}. " + + s"Rowkey: ${rowKey.toString} " + + s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.") + + Bytes.toLong(r) + } + + def getOptStringCol(col: String): Option[String] = { + val r = result.getValue(eBytes, colNames(col)) + if (r == null) { + None + } else { + Some(Bytes.toString(r)) + } + } + + def getTimestamp(col: String): Long = { + result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp() + } + + val event = getStringCol("event") + val entityType = getStringCol("entityType") + val entityId = getStringCol("entityId") + val targetEntityType = getOptStringCol("targetEntityType") + val targetEntityId = getOptStringCol("targetEntityId") + val properties: DataMap = getOptStringCol("properties") + .map(s => DataMap(read[JObject](s))).getOrElse(DataMap()) + val prId = getOptStringCol("prId") + val eventTimeZone = getOptStringCol("eventTimeZone") + .map(DateTimeZone.forID(_)) + .getOrElse(EventValidation.defaultTimeZone) + val eventTime = new DateTime( + getLongCol("eventTime"), eventTimeZone) + val creationTimeZone = getOptStringCol("creationTimeZone") + .map(DateTimeZone.forID(_)) + .getOrElse(EventValidation.defaultTimeZone) + val creationTime: DateTime = new DateTime( + getLongCol("creationTime"), creationTimeZone) + + Event( + eventId = Some(RowKey(result.getRow()).toString), + event = event, + entityType = entityType, + entityId = entityId, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + properties = properties, + eventTime = eventTime, + tags = Seq(), + prId = prId, + creationTime = creationTime + ) + } + + + // for mandatory field. None means don't care. + // for optional field. None means don't care. + // Some(None) means not exist. + // Some(Some(x)) means it should match x + def createScan( + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + reversed: Option[Boolean] = None): Scan = { + + val scan: Scan = new Scan() + + (entityType, entityId) match { + case (Some(et), Some(eid)) => { + val start = PartialRowKey(et, eid, + startTime.map(_.getMillis)).toBytes + // if no untilTime, stop when reach next bytes of entityTypeAndId + val stop = PartialRowKey(et, eid, + untilTime.map(_.getMillis).orElse(Some(-1))).toBytes + + if (reversed.getOrElse(false)) { + // Reversed order. + // If you specify a startRow and stopRow, + // to scan in reverse, the startRow needs to be lexicographically + // after the stopRow. + scan.setStartRow(stop) + scan.setStopRow(start) + scan.setReversed(true) + } else { + scan.setStartRow(start) + scan.setStopRow(stop) + } + } + case (_, _) => { + val minTime: Long = startTime.map(_.getMillis).getOrElse(0) + val maxTime: Long = untilTime.map(_.getMillis).getOrElse(Long.MaxValue) + scan.setTimeRange(minTime, maxTime) + if (reversed.getOrElse(false)) { + scan.setReversed(true) + } + } + } + + val filters = new FilterList(FilterList.Operator.MUST_PASS_ALL) + + val eBytes = Bytes.toBytes("e") + + def createBinaryFilter(col: String, value: Array[Byte]): SingleColumnValueFilter = { + val comp = new BinaryComparator(value) + new SingleColumnValueFilter( + eBytes, colNames(col), CompareOp.EQUAL, comp) + } + + // skip the row if the column exists + def createSkipRowIfColumnExistFilter(col: String): SkipFilter = { + val comp = new BinaryComparator(colNames(col)) + val q = new QualifierFilter(CompareOp.NOT_EQUAL, comp) + // filters an entire row if any of the Cell checks do not pass + new SkipFilter(q) + } + + entityType.foreach { et => + val compType = new BinaryComparator(Bytes.toBytes(et)) + val filterType = new SingleColumnValueFilter( + eBytes, colNames("entityType"), CompareOp.EQUAL, compType) + filters.addFilter(filterType) + } + + entityId.foreach { eid => + val compId = new BinaryComparator(Bytes.toBytes(eid)) + val filterId = new SingleColumnValueFilter( + eBytes, colNames("entityId"), CompareOp.EQUAL, compId) + filters.addFilter(filterId) + } + + eventNames.foreach { eventsList => + // match any of event in the eventsList + val eventFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE) + eventsList.foreach { e => + val compEvent = new BinaryComparator(Bytes.toBytes(e)) + val filterEvent = new SingleColumnValueFilter( + eBytes, colNames("event"), CompareOp.EQUAL, compEvent) + eventFilters.addFilter(filterEvent) + } + if (!eventFilters.getFilters().isEmpty) { + filters.addFilter(eventFilters) + } + } + + targetEntityType.foreach { tetOpt => + if (tetOpt.isEmpty) { + val filter = createSkipRowIfColumnExistFilter("targetEntityType") + filters.addFilter(filter) + } else { + tetOpt.foreach { tet => + val filter = createBinaryFilter( + "targetEntityType", Bytes.toBytes(tet)) + // the entire row will be skipped if the column is not found. + filter.setFilterIfMissing(true) + filters.addFilter(filter) + } + } + } + + targetEntityId.foreach { teidOpt => + if (teidOpt.isEmpty) { + val filter = createSkipRowIfColumnExistFilter("targetEntityId") + filters.addFilter(filter) + } else { + teidOpt.foreach { teid => + val filter = createBinaryFilter( + "targetEntityId", Bytes.toBytes(teid)) + // the entire row will be skipped if the column is not found. + filter.setFilterIfMissing(true) + filters.addFilter(filter) + } + } + } + + if (!filters.getFilters().isEmpty) { + scan.setFilter(filters) + } + + scan + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala new file mode 100644 index 0000000..360b007 --- /dev/null +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.LEvents +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.hbase.HBEventsUtil.RowKey +import org.apache.hadoop.hbase.HColumnDescriptor +import org.apache.hadoop.hbase.HTableDescriptor +import org.apache.hadoop.hbase.NamespaceDescriptor +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.joda.time.DateTime + +import scala.collection.JavaConversions._ +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace: String) + extends LEvents with Logging { + + // implicit val formats = DefaultFormats + new EventJson4sSupport.DBSerializer + + def resultToEvent(result: Result, appId: Int): Event = + HBEventsUtil.resultToEvent(result, appId) + + def getTable(appId: Int, channelId: Option[Int] = None): HTableInterface = + client.connection.getTable(HBEventsUtil.tableName(namespace, appId, channelId)) + + override + def init(appId: Int, channelId: Option[Int] = None): Boolean = { + // check namespace exist + val existingNamespace = client.admin.listNamespaceDescriptors() + .map(_.getName) + if (!existingNamespace.contains(namespace)) { + val nameDesc = NamespaceDescriptor.create(namespace).build() + info(s"The namespace ${namespace} doesn't exist yet. Creating now...") + client.admin.createNamespace(nameDesc) + } + + val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId)) + if (!client.admin.tableExists(tableName)) { + info(s"The table ${tableName.getNameAsString()} doesn't exist yet." + + " Creating now...") + val tableDesc = new HTableDescriptor(tableName) + tableDesc.addFamily(new HColumnDescriptor("e")) + tableDesc.addFamily(new HColumnDescriptor("r")) // reserved + client.admin.createTable(tableDesc) + } + true + } + + override + def remove(appId: Int, channelId: Option[Int] = None): Boolean = { + val tableName = TableName.valueOf(HBEventsUtil.tableName(namespace, appId, channelId)) + try { + if (client.admin.tableExists(tableName)) { + info(s"Removing table ${tableName.getNameAsString()}...") + client.admin.disableTable(tableName) + client.admin.deleteTable(tableName) + } else { + info(s"Table ${tableName.getNameAsString()} doesn't exist." + + s" Nothing is deleted.") + } + true + } catch { + case e: Exception => { + error(s"Fail to remove table for appId ${appId}. Exception: ${e}") + false + } + } + } + + override + def close(): Unit = { + client.admin.close() + client.connection.close() + } + + override + def futureInsert( + event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): + Future[String] = { + Future { + val table = getTable(appId, channelId) + val (put, rowKey) = HBEventsUtil.eventToPut(event, appId) + table.put(put) + table.flushCommits() + table.close() + rowKey.toString + } + } + + override + def futureGet( + eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): + Future[Option[Event]] = { + Future { + val table = getTable(appId, channelId) + val rowKey = RowKey(eventId) + val get = new Get(rowKey.toBytes) + + val result = table.get(get) + table.close() + + if (!result.isEmpty()) { + val event = resultToEvent(result, appId) + Some(event) + } else { + None + } + } + } + + override + def futureDelete( + eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): + Future[Boolean] = { + Future { + val table = getTable(appId, channelId) + val rowKey = RowKey(eventId) + val exists = table.exists(new Get(rowKey.toBytes)) + table.delete(new Delete(rowKey.toBytes)) + table.close() + exists + } + } + + override + def futureFind( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + limit: Option[Int] = None, + reversed: Option[Boolean] = None)(implicit ec: ExecutionContext): + Future[Iterator[Event]] = { + Future { + + require(!((reversed == Some(true)) && (entityType.isEmpty || entityId.isEmpty)), + "the parameter reversed can only be used with both entityType and entityId specified.") + + val table = getTable(appId, channelId) + + val scan = HBEventsUtil.createScan( + startTime = startTime, + untilTime = untilTime, + entityType = entityType, + entityId = entityId, + eventNames = eventNames, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + reversed = reversed) + val scanner = table.getScanner(scan) + table.close() + + val eventsIter = scanner.iterator() + + // Get all events if None or Some(-1) + val results: Iterator[Result] = limit match { + case Some(-1) => eventsIter + case None => eventsIter + case Some(x) => eventsIter.take(x) + } + + val eventsIt = results.map { resultToEvent(_, appId) } + + eventsIt + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala new file mode 100644 index 0000000..7324fa6 --- /dev/null +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBPEvents.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.hbase + +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.client.{Delete, HTable, Result} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce.OutputFormat +import org.apache.predictionio.data.storage.{Event, PEvents, StorageClientConfig} +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.joda.time.DateTime + +class HBPEvents(client: HBClient, config: StorageClientConfig, namespace: String) extends PEvents { + + def checkTableExists(appId: Int, channelId: Option[Int]): Unit = { + if (!client.admin.tableExists(HBEventsUtil.tableName(namespace, appId, channelId))) { + if (channelId.nonEmpty) { + logger.error(s"The appId $appId with channelId $channelId does not exist." + + s" Please use valid appId and channelId.") + throw new Exception(s"HBase table not found for appId $appId" + + s" with channelId $channelId.") + } else { + logger.error(s"The appId $appId does not exist. Please use valid appId.") + throw new Exception(s"HBase table not found for appId $appId.") + } + } + } + + override + def find( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None + )(sc: SparkContext): RDD[Event] = { + + checkTableExists(appId, channelId) + + val conf = HBaseConfiguration.create() + conf.set(TableInputFormat.INPUT_TABLE, + HBEventsUtil.tableName(namespace, appId, channelId)) + + val scan = HBEventsUtil.createScan( + startTime = startTime, + untilTime = untilTime, + entityType = entityType, + entityId = entityId, + eventNames = eventNames, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + reversed = None) + scan.setCaching(500) // TODO + scan.setCacheBlocks(false) // TODO + + conf.set(TableInputFormat.SCAN, PIOHBaseUtil.convertScanToString(scan)) + + // HBase is not accessed until this rdd is actually used. + val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], + classOf[ImmutableBytesWritable], + classOf[Result]).map { + case (key, row) => HBEventsUtil.resultToEvent(row, appId) + } + + rdd + } + + override + def write( + events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + + checkTableExists(appId, channelId) + + val conf = HBaseConfiguration.create() + conf.set(TableOutputFormat.OUTPUT_TABLE, + HBEventsUtil.tableName(namespace, appId, channelId)) + conf.setClass("mapreduce.outputformat.class", + classOf[TableOutputFormat[Object]], + classOf[OutputFormat[Object, Writable]]) + + events.map { event => + val (put, rowKey) = HBEventsUtil.eventToPut(event, appId) + (new ImmutableBytesWritable(rowKey.toBytes), put) + }.saveAsNewAPIHadoopDataset(conf) + + } + + def delete( + eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + + checkTableExists(appId, channelId) + + val tableName = HBEventsUtil.tableName(namespace, appId, channelId) + + eventIds.foreachPartition{ iter => + val conf = HBaseConfiguration.create() + conf.set(TableOutputFormat.OUTPUT_TABLE, + tableName) + + val table = new HTable(conf, tableName) + iter.foreach { id => + val rowKey = HBEventsUtil.RowKey(id) + val delete = new Delete(rowKey.b) + table.delete(delete) + } + table.close + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala new file mode 100644 index 0000000..745fcb9 --- /dev/null +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/PIOHBaseUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.hbase + +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.protobuf.ProtobufUtil +import org.apache.hadoop.hbase.util.Base64 + +object PIOHBaseUtil { + /* + * Copying this from Apache HBase because of its restrictive scope in 0.98.x + */ + def convertScanToString(scan: Scan): String = { + val proto = ProtobufUtil.toScan(scan) + Base64.encodeBytes(proto.toByteArray) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala new file mode 100644 index 0000000..1720410 --- /dev/null +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/StorageClient.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase + +import org.apache.predictionio.data.storage.BaseStorageClient +import org.apache.predictionio.data.storage.StorageClientConfig + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.MasterNotRunningException +import org.apache.hadoop.hbase.ZooKeeperConnectionException +import org.apache.hadoop.hbase.client.HConnectionManager +import org.apache.hadoop.hbase.client.HConnection +import org.apache.hadoop.hbase.client.HBaseAdmin + +import grizzled.slf4j.Logging + +case class HBClient( + val conf: Configuration, + val connection: HConnection, + val admin: HBaseAdmin +) + +class StorageClient(val config: StorageClientConfig) + extends BaseStorageClient with Logging { + + val conf = HBaseConfiguration.create() + + if (config.test) { + // use fewer retries and shorter timeout for test mode + conf.set("hbase.client.retries.number", "1") + conf.set("zookeeper.session.timeout", "30000"); + conf.set("zookeeper.recovery.retry", "1") + } + + try { + HBaseAdmin.checkHBaseAvailable(conf) + } catch { + case e: MasterNotRunningException => + error("HBase master is not running (ZooKeeper ensemble: " + + conf.get("hbase.zookeeper.quorum") + "). Please make sure that HBase " + + "is running properly, and that the configuration is pointing at the " + + "correct ZooKeeper ensemble.") + throw e + case e: ZooKeeperConnectionException => + error("Cannot connect to ZooKeeper (ZooKeeper ensemble: " + + conf.get("hbase.zookeeper.quorum") + "). Please make sure that the " + + "configuration is pointing at the correct ZooKeeper ensemble. By " + + "default, HBase manages its own ZooKeeper, so if you have not " + + "configured HBase to use an external ZooKeeper, that means your " + + "HBase is not started or configured properly.") + throw e + case e: Exception => { + error("Failed to connect to HBase." + + " Please check if HBase is running properly.") + throw e + } + } + + val connection = HConnectionManager.createConnection(conf) + + val client = HBClient( + conf = conf, + connection = connection, + admin = new HBaseAdmin(connection) + ) + + override + val prefix = "HB" +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala new file mode 100644 index 0000000..49bf031 --- /dev/null +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage + +/** HBase implementation of storage traits, supporting event data only + * + * @group Implementation + */ +package object hbase {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala new file mode 100644 index 0000000..cc07fa4 --- /dev/null +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase.upgrade + +import org.apache.predictionio.annotation.Experimental + +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventValidation +import org.apache.predictionio.data.storage.DataMap + +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.client.HConnection +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.util.Bytes + +import org.joda.time.DateTime +import org.joda.time.DateTimeZone + +import org.json4s.DefaultFormats +import org.json4s.JObject +import org.json4s.native.Serialization.{ read, write } + +import org.apache.commons.codec.binary.Base64 + +import scala.collection.JavaConversions._ + +/** :: Experimental :: */ +@Experimental +object HB_0_8_0 { + + implicit val formats = DefaultFormats + + def getByAppId( + connection: HConnection, + namespace: String, + appId: Int): Iterator[Event] = { + val tableName = TableName.valueOf(namespace, "events") + val table = connection.getTable(tableName) + val start = PartialRowKey(appId) + val stop = PartialRowKey(appId + 1) + val scan = new Scan(start.toBytes, stop.toBytes) + val scanner = table.getScanner(scan) + table.close() + scanner.iterator().map { resultToEvent(_) } + } + + val colNames: Map[String, Array[Byte]] = Map( + "event" -> "e", + "entityType" -> "ety", + "entityId" -> "eid", + "targetEntityType" -> "tety", + "targetEntityId" -> "teid", + "properties" -> "p", + "prId" -> "pk", // columna name is 'pk' in 0.8.0/0.8.1 + "eventTimeZone" -> "etz", + "creationTimeZone" -> "ctz" + ).mapValues(Bytes.toBytes(_)) + + + class RowKey( + val appId: Int, + val millis: Long, + val uuidLow: Long + ) { + lazy val toBytes: Array[Byte] = { + // add UUID least significant bits for multiple actions at the same time + // (UUID's most significant bits are actually timestamp, + // use eventTime instead). + Bytes.toBytes(appId) ++ Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow) + } + override def toString: String = { + Base64.encodeBase64URLSafeString(toBytes) + } + } + + object RowKey { + // get RowKey from string representation + def apply(s: String): RowKey = { + try { + apply(Base64.decodeBase64(s)) + } catch { + case e: Exception => throw new RowKeyException( + s"Failed to convert String ${s} to RowKey because ${e}", e) + } + } + + def apply(b: Array[Byte]): RowKey = { + if (b.size != 20) { + val bString = b.mkString(",") + throw new RowKeyException( + s"Incorrect byte array size. Bytes: ${bString}.") + } + + new RowKey( + appId = Bytes.toInt(b.slice(0, 4)), + millis = Bytes.toLong(b.slice(4, 12)), + uuidLow = Bytes.toLong(b.slice(12, 20)) + ) + } + } + + class RowKeyException(msg: String, cause: Exception) + extends Exception(msg, cause) { + def this(msg: String) = this(msg, null) + } + + case class PartialRowKey(val appId: Int, val millis: Option[Long] = None) { + val toBytes: Array[Byte] = { + Bytes.toBytes(appId) ++ + (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]())) + } + } + + def resultToEvent(result: Result): Event = { + val rowKey = RowKey(result.getRow()) + + val eBytes = Bytes.toBytes("e") + // val e = result.getFamilyMap(eBytes) + + def getStringCol(col: String): String = { + val r = result.getValue(eBytes, colNames(col)) + require(r != null, + s"Failed to get value for column ${col}. " + + s"Rowkey: ${rowKey.toString} " + + s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.") + + Bytes.toString(r) + } + + def getOptStringCol(col: String): Option[String] = { + val r = result.getValue(eBytes, colNames(col)) + if (r == null) { + None + } else { + Some(Bytes.toString(r)) + } + } + + def getTimestamp(col: String): Long = { + result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp() + } + + val event = getStringCol("event") + val entityType = getStringCol("entityType") + val entityId = getStringCol("entityId") + val targetEntityType = getOptStringCol("targetEntityType") + val targetEntityId = getOptStringCol("targetEntityId") + val properties: DataMap = getOptStringCol("properties") + .map(s => DataMap(read[JObject](s))).getOrElse(DataMap()) + val prId = getOptStringCol("prId") + val eventTimeZone = getOptStringCol("eventTimeZone") + .map(DateTimeZone.forID(_)) + .getOrElse(EventValidation.defaultTimeZone) + val creationTimeZone = getOptStringCol("creationTimeZone") + .map(DateTimeZone.forID(_)) + .getOrElse(EventValidation.defaultTimeZone) + + val creationTime: DateTime = new DateTime( + getTimestamp("event"), creationTimeZone + ) + + Event( + eventId = Some(RowKey(result.getRow()).toString), + event = event, + entityType = entityType, + entityId = entityId, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + properties = properties, + eventTime = new DateTime(rowKey.millis, eventTimeZone), + tags = Seq(), + prId = prId, + creationTime = creationTime + ) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala new file mode 100644 index 0000000..1759561 --- /dev/null +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase.upgrade + +import org.apache.predictionio.annotation.Experimental + +import org.apache.predictionio.data.storage.Storage +import org.apache.predictionio.data.storage.hbase.HBLEvents +import org.apache.predictionio.data.storage.hbase.HBEventsUtil + +import scala.collection.JavaConversions._ + +/** :: Experimental :: */ +@Experimental +object Upgrade { + + def main(args: Array[String]) { + val fromAppId = args(0).toInt + val toAppId = args(1).toInt + val batchSize = args.lift(2).map(_.toInt).getOrElse(100) + val fromNamespace = args.lift(3).getOrElse("predictionio_eventdata") + + upgrade(fromAppId, toAppId, batchSize, fromNamespace) + } + + /* For upgrade from 0.8.0 or 0.8.1 to 0.8.2 only */ + def upgrade( + fromAppId: Int, + toAppId: Int, + batchSize: Int, + fromNamespace: String) { + + val events = Storage.getLEvents().asInstanceOf[HBLEvents] + + // Assume already run "pio app new <newapp>" (new app already created) + // TODO: check if new table empty and warn user if not + val newTable = events.getTable(toAppId) + + val newTableName = newTable.getName().getNameAsString() + println(s"Copying data from ${fromNamespace}:events for app ID ${fromAppId}" + + s" to new HBase table ${newTableName}...") + + HB_0_8_0.getByAppId( + events.client.connection, + fromNamespace, + fromAppId).grouped(batchSize).foreach { eventGroup => + val puts = eventGroup.map{ e => + val (put, rowkey) = HBEventsUtil.eventToPut(e, toAppId) + put + } + newTable.put(puts.toList) + } + + newTable.flushCommits() + newTable.close() + println("Done.") + } + +}
