http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala deleted file mode 100644 index 7f50488..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch1 - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.AccessKey -import org.apache.predictionio.data.storage.AccessKeys -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.json4s.JsonDSL._ -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -import scala.util.Random - -/** Elasticsearch implementation of AccessKeys. */ -class ESAccessKeys(client: Client, config: StorageClientConfig, index: String) - extends AccessKeys with Logging { - implicit val formats = DefaultFormats.lossless - private val estype = "accesskeys" - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } - - def insert(accessKey: AccessKey): Option[String] = { - val key = if (accessKey.key.isEmpty) generateKey else accessKey.key - update(accessKey.copy(key = key)) - Some(key) - } - - def get(key: String): Option[AccessKey] = { - try { - val response = client.prepareGet( - index, - estype, - key).get() - Some(read[AccessKey](response.getSourceAsString)) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - case e: NullPointerException => None - } - } - - def getAll(): Seq[AccessKey] = { - try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[AccessKey](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[AccessKey]() - } - } - - def getByAppid(appid: Int): Seq[AccessKey] = { - try { - val builder = client.prepareSearch(index).setTypes(estype). - setPostFilter(termFilter("appid", appid)) - ESUtils.getAll[AccessKey](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[AccessKey]() - } - } - - def update(accessKey: AccessKey): Unit = { - try { - client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get() - } catch { - case e: ElasticsearchException => - error(e.getMessage) - } - } - - def delete(key: String): Unit = { - try { - client.prepareDelete(index, estype, key).get - } catch { - case e: ElasticsearchException => - error(e.getMessage) - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala deleted file mode 100644 index af61e17..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch1 - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.App -import org.apache.predictionio.data.storage.Apps -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.json4s.JsonDSL._ -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -/** Elasticsearch implementation of Items. */ -class ESApps(client: Client, config: StorageClientConfig, index: String) - extends Apps with Logging { - implicit val formats = DefaultFormats.lossless - private val estype = "apps" - private val seq = new ESSequences(client, config, index) - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } - - def insert(app: App): Option[Int] = { - val id = - if (app.id == 0) { - var roll = seq.genNext("apps") - while (!get(roll).isEmpty) roll = seq.genNext("apps") - roll - } - else app.id - val realapp = app.copy(id = id) - update(realapp) - Some(id) - } - - def get(id: Int): Option[App] = { - try { - val response = client.prepareGet( - index, - estype, - id.toString).get() - Some(read[App](response.getSourceAsString)) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - case e: NullPointerException => None - } - } - - def getByName(name: String): Option[App] = { - try { - val response = client.prepareSearch(index).setTypes(estype). - setPostFilter(termFilter("name", name)).get - val hits = response.getHits().hits() - if (hits.size > 0) { - Some(read[App](hits.head.getSourceAsString)) - } else { - None - } - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - } - } - - def getAll(): Seq[App] = { - try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[App](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[App]() - } - } - - def update(app: App): Unit = { - try { - val response = client.prepareIndex(index, estype, app.id.toString). - setSource(write(app)).get() - } catch { - case e: ElasticsearchException => - error(e.getMessage) - } - } - - def delete(id: Int): Unit = { - try { - client.prepareDelete(index, estype, id.toString).get - } catch { - case e: ElasticsearchException => - error(e.getMessage) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala deleted file mode 100644 index f955bee..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch1 - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.Channel -import org.apache.predictionio.data.storage.Channels -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders.termFilter -import org.json4s.DefaultFormats -import org.json4s.JsonDSL._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -class ESChannels(client: Client, config: StorageClientConfig, index: String) - extends Channels with Logging { - - implicit val formats = DefaultFormats.lossless - private val estype = "channels" - private val seq = new ESSequences(client, config, index) - private val seqName = "channels" - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } - - def insert(channel: Channel): Option[Int] = { - val id = - if (channel.id == 0) { - var roll = seq.genNext(seqName) - while (!get(roll).isEmpty) roll = seq.genNext(seqName) - roll - } else channel.id - - val realChannel = channel.copy(id = id) - if (update(realChannel)) Some(id) else None - } - - def get(id: Int): Option[Channel] = { - try { - val response = client.prepareGet( - index, - estype, - id.toString).get() - Some(read[Channel](response.getSourceAsString)) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - case e: NullPointerException => None - } - } - - def getByAppid(appid: Int): Seq[Channel] = { - try { - val builder = client.prepareSearch(index).setTypes(estype). - setPostFilter(termFilter("appid", appid)) - ESUtils.getAll[Channel](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[Channel]() - } - } - - def update(channel: Channel): Boolean = { - try { - val response = client.prepareIndex(index, estype, channel.id.toString). - setSource(write(channel)).get() - true - } catch { - case e: ElasticsearchException => - error(e.getMessage) - false - } - } - - def delete(id: Int): Unit = { - try { - client.prepareDelete(index, estype, id.toString).get - } catch { - case e: ElasticsearchException => - error(e.getMessage) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala deleted file mode 100644 index cc10ff0..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch1 - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.EngineInstance -import org.apache.predictionio.data.storage.EngineInstanceSerializer -import org.apache.predictionio.data.storage.EngineInstances -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.elasticsearch.search.sort.SortOrder -import org.json4s.JsonDSL._ -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -class ESEngineInstances(client: Client, config: StorageClientConfig, index: String) - extends EngineInstances with Logging { - implicit val formats = DefaultFormats + new EngineInstanceSerializer - private val estype = "engine_instances" - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineVersion" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineVariant" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineFactory" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("batch" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("dataSourceParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("preparatorParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("algorithmsParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("servingParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } - - def insert(i: EngineInstance): String = { - try { - val response = client.prepareIndex(index, estype). - setSource(write(i)).get - response.getId - } catch { - case e: ElasticsearchException => - error(e.getMessage) - "" - } - } - - def get(id: String): Option[EngineInstance] = { - try { - val response = client.prepareGet(index, estype, id).get - if (response.isExists) { - Some(read[EngineInstance](response.getSourceAsString)) - } else { - None - } - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - } - } - - def getAll(): Seq[EngineInstance] = { - try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[EngineInstance](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() - } - } - - def getCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Seq[EngineInstance] = { - try { - val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( - andFilter( - termFilter("status", "COMPLETED"), - termFilter("engineId", engineId), - termFilter("engineVersion", engineVersion), - termFilter("engineVariant", engineVariant))). - addSort("startTime", SortOrder.DESC) - ESUtils.getAll[EngineInstance](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() - } - } - - def getLatestCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Option[EngineInstance] = - getCompleted( - engineId, - engineVersion, - engineVariant).headOption - - def update(i: EngineInstance): Unit = { - try { - client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get - } catch { - case e: ElasticsearchException => error(e.getMessage) - } - } - - def delete(id: String): Unit = { - try { - val response = client.prepareDelete(index, estype, id).get - } catch { - case e: ElasticsearchException => error(e.getMessage) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala deleted file mode 100644 index 307b582..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch1 - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.EngineManifestSerializer -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.EngineManifest -import org.apache.predictionio.data.storage.EngineManifests -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.json4s._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -class ESEngineManifests(client: Client, config: StorageClientConfig, index: String) - extends EngineManifests with Logging { - implicit val formats = DefaultFormats + new EngineManifestSerializer - private val estype = "engine_manifests" - private def esid(id: String, version: String) = s"$id $version" - - def insert(engineManifest: EngineManifest): Unit = { - val json = write(engineManifest) - val response = client.prepareIndex( - index, - estype, - esid(engineManifest.id, engineManifest.version)). - setSource(json).execute().actionGet() - } - - def get(id: String, version: String): Option[EngineManifest] = { - try { - val response = client.prepareGet(index, estype, esid(id, version)). - execute().actionGet() - if (response.isExists) { - Some(read[EngineManifest](response.getSourceAsString)) - } else { - None - } - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - } - } - - def getAll(): Seq[EngineManifest] = { - try { - val builder = client.prepareSearch() - ESUtils.getAll[EngineManifest](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() - } - } - - def update(engineManifest: EngineManifest, upsert: Boolean = false): Unit = - insert(engineManifest) - - def delete(id: String, version: String): Unit = { - try { - client.prepareDelete(index, estype, esid(id, version)).execute().actionGet() - } catch { - case e: ElasticsearchException => error(e.getMessage) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala deleted file mode 100644 index b8d7056..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch1 - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.EvaluationInstance -import org.apache.predictionio.data.storage.EvaluationInstanceSerializer -import org.apache.predictionio.data.storage.EvaluationInstances -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.elasticsearch.search.sort.SortOrder -import org.json4s.JsonDSL._ -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - -class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String) - extends EvaluationInstances with Logging { - implicit val formats = DefaultFormats + new EvaluationInstanceSerializer - private val estype = "evaluation_instances" - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("evaluationClass" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineParamsGeneratorClass" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("batch" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("evaluatorResults" -> - ("type" -> "string") ~ ("index" -> "no")) ~ - ("evaluatorResultsHTML" -> - ("type" -> "string") ~ ("index" -> "no")) ~ - ("evaluatorResultsJSON" -> - ("type" -> "string") ~ ("index" -> "no")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } - - def insert(i: EvaluationInstance): String = { - try { - val response = client.prepareIndex(index, estype). - setSource(write(i)).get - response.getId - } catch { - case e: ElasticsearchException => - error(e.getMessage) - "" - } - } - - def get(id: String): Option[EvaluationInstance] = { - try { - val response = client.prepareGet(index, estype, id).get - if (response.isExists) { - Some(read[EvaluationInstance](response.getSourceAsString)) - } else { - None - } - } catch { - case e: ElasticsearchException => - error(e.getMessage) - None - } - } - - def getAll(): Seq[EvaluationInstance] = { - try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[EvaluationInstance](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() - } - } - - def getCompleted(): Seq[EvaluationInstance] = { - try { - val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( - termFilter("status", "EVALCOMPLETED")). - addSort("startTime", SortOrder.DESC) - ESUtils.getAll[EvaluationInstance](client, builder) - } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() - } - } - - def update(i: EvaluationInstance): Unit = { - try { - client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get - } catch { - case e: ElasticsearchException => error(e.getMessage) - } - } - - def delete(id: String): Unit = { - try { - client.prepareDelete(index, estype, id).get - } catch { - case e: ElasticsearchException => error(e.getMessage) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala deleted file mode 100644 index 80247ec..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch1 - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.json4s.JsonDSL._ -import org.json4s._ -import org.json4s.native.JsonMethods._ - -class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging { - implicit val formats = DefaultFormats - private val estype = "sequences" - - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - // val settingsJson = - // ("number_of_shards" -> 1) ~ - // ("auto_expand_replicas" -> "0-all") - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val mappingJson = - (estype -> - ("_source" -> ("enabled" -> 0)) ~ - ("_all" -> ("enabled" -> 0)) ~ - ("_type" -> ("index" -> "no")) ~ - ("enabled" -> 0)) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(mappingJson))).get - } - - def genNext(name: String): Int = { - try { - val response = client.prepareIndex(index, estype, name). - setSource(compact(render("n" -> name))).get - response.getVersion().toInt - } catch { - case e: ElasticsearchException => - error(e.getMessage) - 0 - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala deleted file mode 100644 index 5de2999..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch1 - -import org.elasticsearch.action.search.SearchRequestBuilder -import org.elasticsearch.client.Client -import org.elasticsearch.common.unit.TimeValue -import org.json4s.Formats -import org.json4s.native.Serialization.read - -import scala.collection.mutable.ArrayBuffer - -object ESUtils { - val scrollLife = new TimeValue(60000) - - def getAll[T : Manifest]( - client: Client, - builder: SearchRequestBuilder)( - implicit formats: Formats): Seq[T] = { - val results = ArrayBuffer[T]() - var response = builder.setScroll(scrollLife).get - var hits = response.getHits().hits() - results ++= hits.map(h => read[T](h.getSourceAsString)) - while (hits.size > 0) { - response = client.prepareSearchScroll(response.getScrollId). - setScroll(scrollLife).get - hits = response.getHits().hits() - results ++= hits.map(h => read[T](h.getSourceAsString)) - } - results - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala deleted file mode 100644 index 6f6b1c9..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch1 - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.BaseStorageClient -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.StorageClientException -import org.elasticsearch.client.transport.TransportClient -import org.elasticsearch.common.settings.ImmutableSettings -import org.elasticsearch.common.transport.InetSocketTransportAddress -import org.elasticsearch.transport.ConnectTransportException - -class StorageClient(val config: StorageClientConfig) extends BaseStorageClient - with Logging { - override val prefix = "ES" - val client = try { - val hosts = config.properties.get("HOSTS"). - map(_.split(",").toSeq).getOrElse(Seq("localhost")) - val ports = config.properties.get("PORTS"). - map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300)) - val settings = ImmutableSettings.settingsBuilder() - .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch")) - val transportClient = new TransportClient(settings) - (hosts zip ports) foreach { hp => - transportClient.addTransportAddress( - new InetSocketTransportAddress(hp._1, hp._2)) - } - transportClient - } catch { - case e: ConnectTransportException => - throw new StorageClientException(e.getMessage, e) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala deleted file mode 100644 index d6aa24a..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage - -/** Elasticsearch implementation of storage traits, supporting meta data only - * - * @group Implementation - */ -package object elasticsearch1 {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESAccessKeys.scala new file mode 100644 index 0000000..3cdfad9 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESAccessKeys.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch5 + +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.AccessKey +import org.apache.predictionio.data.storage.AccessKeys +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +/** Elasticsearch implementation of AccessKeys. */ +class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) + extends AccessKeys with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "accesskeys" + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("key" -> ("type" -> "keyword")) ~ + ("events" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def insert(accessKey: AccessKey): Option[String] = { + val key = if (accessKey.key.isEmpty) generateKey else accessKey.key + update(accessKey.copy(key = key)) + Some(key) + } + + def get(id: String): Option[AccessKey] = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[AccessKey]) + case _ => + None + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error("Failed to access to /$index/$estype/$key", e) + None + } finally { + restClient.close() + } + } + + def getAll(): Seq[AccessKey] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def getByAppid(appid: Int): Seq[AccessKey] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("appid" -> appid))) + ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def update(accessKey: AccessKey): Unit = { + val id = accessKey.key + val restClient = client.open() + try { + val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } + + def delete(id: String): Unit = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/id", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESApps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESApps.scala new file mode 100644 index 0000000..99fa2f0 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESApps.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch5 + +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.App +import org.apache.predictionio.data.storage.Apps +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +/** Elasticsearch implementation of Items. */ +class ESApps(client: ESClient, config: StorageClientConfig, index: String) + extends Apps with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "apps" + private val seq = new ESSequences(client, config, index) + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("id" -> ("type" -> "keyword")) ~ + ("name" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def insert(app: App): Option[Int] = { + val id = + if (app.id == 0) { + var roll = seq.genNext(estype) + while (!get(roll).isEmpty) roll = seq.genNext(estype) + roll + } else app.id + update(app.copy(id = id)) + Some(id) + } + + def get(id: Int): Option[App] = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[App]) + case _ => + None + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) + None + } finally { + restClient.close() + } + } + + def getByName(name: String): Option[App] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("name" -> name))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/_search", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "hits" \ "total").extract[Long] match { + case 0 => None + case _ => + val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] + val result = (results.head \ "_source").extract[App] + Some(result) + } + } catch { + case e: IOException => + error(s"Failed to access to /$index/$estype/_search", e) + None + } finally { + restClient.close() + } + } + + def getAll(): Seq[App] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[App](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def update(app: App): Unit = { + val id = app.id.toString + val restClient = client.open() + try { + val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON); + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } + + def delete(id: Int): Unit = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/id", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESChannels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESChannels.scala new file mode 100644 index 0000000..b8ae5b4 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESChannels.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch5 + +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.Channel +import org.apache.predictionio.data.storage.Channels +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESChannels(client: ESClient, config: StorageClientConfig, index: String) + extends Channels with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "channels" + private val seq = new ESSequences(client, config, index) + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("name" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def insert(channel: Channel): Option[Int] = { + val id = + if (channel.id == 0) { + var roll = seq.genNext(estype) + while (!get(roll).isEmpty) roll = seq.genNext(estype) + roll + } else channel.id + + if (update(channel.copy(id = id))) Some(id) else None + } + + def get(id: Int): Option[Channel] = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[Channel]) + case _ => + None + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) + None + } finally { + restClient.close() + } + } + + def getByAppid(appid: Int): Seq[Channel] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("appid" -> appid))) + ESUtils.getAll[Channel](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error(s"Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def update(channel: Channel): Boolean = { + val id = channel.id.toString + val restClient = client.open() + try { + val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "created" => true + case "updated" => true + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + false + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + false + } finally { + restClient.close() + } + } + + def delete(id: Int): Unit = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEngineInstances.scala new file mode 100644 index 0000000..d8d3c75 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEngineInstances.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch5 + +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.EngineInstance +import org.apache.predictionio.data.storage.EngineInstanceSerializer +import org.apache.predictionio.data.storage.EngineInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: String) + extends EngineInstances with Logging { + implicit val formats = DefaultFormats + new EngineInstanceSerializer + private val estype = "engine_instances" + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("engineId" -> ("type" -> "keyword")) ~ + ("engineVersion" -> ("type" -> "keyword")) ~ + ("engineVariant" -> ("type" -> "keyword")) ~ + ("engineFactory" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("dataSourceParams" -> ("type" -> "keyword")) ~ + ("preparatorParams" -> ("type" -> "keyword")) ~ + ("algorithmsParams" -> ("type" -> "keyword")) ~ + ("servingParams" -> ("type" -> "keyword")) ~ + ("status" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def insert(i: EngineInstance): String = { + val id = i.id match { + case x if x.isEmpty => + @scala.annotation.tailrec + def generateId(newId: Option[String]): String = { + newId match { + case Some(x) => x + case _ => generateId(preInsert()) + } + } + generateId(preInsert()) + case x => x + } + + update(i.copy(id = id)) + id + } + + def preInsert(): Option[String] = { + val restClient = client.open() + try { + val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + Some((jsonResponse \ "_id").extract[String]) + case _ => + error(s"[$result] Failed to create $index/$estype") + None + } + } catch { + case e: IOException => + error(s"Failed to create $index/$estype", e) + None + } finally { + restClient.close() + } + } + + def get(id: String): Option[EngineInstance] = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[EngineInstance]) + case _ => + None + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) + None + } finally { + restClient.close() + } + } + + def getAll(): Seq[EngineInstance] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def getCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Seq[EngineInstance] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("bool" -> + ("must" -> List( + ("term" -> + ("status" -> "COMPLETED")), + ("term" -> + ("engineId" -> engineId)), + ("term" -> + ("engineVersion" -> engineVersion)), + ("term" -> + ("engineVariant" -> engineVariant)))))) ~ + ("sort" -> List( + ("startTime" -> + ("order" -> "desc")))) + ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error(s"Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def getLatestCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Option[EngineInstance] = + getCompleted( + engineId, + engineVersion, + engineVariant).headOption + + def update(i: EngineInstance): Unit = { + val id = i.id + val restClient = client.open() + try { + val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } + + def delete(id: String): Unit = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEvaluationInstances.scala new file mode 100644 index 0000000..d2f8623 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEvaluationInstances.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch5 + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.EvaluationInstance +import org.apache.predictionio.data.storage.EvaluationInstanceSerializer +import org.apache.predictionio.data.storage.EvaluationInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index: String) + extends EvaluationInstances with Logging { + implicit val formats = DefaultFormats + new EvaluationInstanceSerializer + private val estype = "evaluation_instances" + private val seq = new ESSequences(client, config, index) + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("evaluationClass" -> ("type" -> "keyword")) ~ + ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~ + ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~ + ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def insert(i: EvaluationInstance): String = { + val id = i.id match { + case x if x.isEmpty => + var roll = seq.genNext(estype).toString + while (!get(roll).isEmpty) roll = seq.genNext(estype).toString + roll + case x => x + } + + update(i.copy(id = id)) + id + } + + def get(id: String): Option[EvaluationInstance] = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[EvaluationInstance]) + case _ => + None + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) + None + } finally { + restClient.close() + } + } + + def getAll(): Seq[EvaluationInstance] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def getCompleted(): Seq[EvaluationInstance] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("status" -> "EVALCOMPLETED"))) ~ + ("sort" -> + ("startTime" -> + ("order" -> "desc"))) + ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def update(i: EvaluationInstance): Unit = { + val id = i.id + val restClient = client.open() + try { + val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } + + def delete(id: String): Unit = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEventsUtil.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEventsUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEventsUtil.scala new file mode 100644 index 0000000..3f70266 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEventsUtil.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch5 + +import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.MapWritable +import org.apache.hadoop.io.Text +import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventValidation +import org.joda.time.DateTime +import org.joda.time.DateTimeZone +import org.json4s._ + +object ESEventsUtil { + + implicit val formats = DefaultFormats + + def resultToEvent(id: Text, result: MapWritable, appId: Int): Event = { + + def getStringCol(col: String): String = { + val r = result.get(new Text(col)).asInstanceOf[Text] + require(r != null, + s"Failed to get value for column ${col}. " + + s"StringBinary: ${r.getBytes()}.") + + r.toString() + } + + def getOptStringCol(col: String): Option[String] = { + val r = result.get(new Text(col)) + if (r == null) { + None + } else { + Some(r.asInstanceOf[Text].toString()) + } + } + + val tmp = result + .get(new Text("properties")).asInstanceOf[MapWritable] + .get(new Text("fields")).asInstanceOf[MapWritable] + .get(new Text("rating")) + + val rating = + if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable] + else if (tmp.isInstanceOf[LongWritable]) { + new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble) + } + else null + + val properties: DataMap = + if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""") + else DataMap() + + + val eventId = Some(getStringCol("eventId")) + val event = getStringCol("event") + val entityType = getStringCol("entityType") + val entityId = getStringCol("entityId") + val targetEntityType = getOptStringCol("targetEntityType") + val targetEntityId = getOptStringCol("targetEntityId") + val prId = getOptStringCol("prId") + val eventTimeZone = getOptStringCol("eventTimeZone") + .map(DateTimeZone.forID(_)) + .getOrElse(EventValidation.defaultTimeZone) + val eventTime = new DateTime( + getStringCol("eventTime"), eventTimeZone) + val creationTimeZone = getOptStringCol("creationTimeZone") + .map(DateTimeZone.forID(_)) + .getOrElse(EventValidation.defaultTimeZone) + val creationTime: DateTime = new DateTime( + getStringCol("creationTime"), creationTimeZone) + + + Event( + eventId = eventId, + event = event, + entityType = entityType, + entityId = entityId, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + properties = properties, + eventTime = eventTime, + tags = Seq(), + prId = prId, + creationTime = creationTime + ) + } + + def eventToPut(event: Event, appId: Int): Seq[Map[String, Any]] = { + Seq( + Map( + "eventId" -> event.eventId, + "event" -> event.event, + "entityType" -> event.entityType, + "entityId" -> event.entityId, + "targetEntityType" -> event.targetEntityType, + "targetEntityId" -> event.targetEntityId, + "properties" -> event.properties, + "eventTime" -> event.eventTime, + "tags" -> event.tags, + "prId" -> event.prId, + "creationTime" -> event.creationTime + ) + ) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESLEvents.scala new file mode 100644 index 0000000..74aa4b5 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESLEvents.scala @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch5 + +import java.io.IOException + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.LEvents +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.joda.time.DateTime +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write +import org.json4s.ext.JodaTimeSerializers + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String) + extends LEvents with Logging { + implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all + private val seq = new ESSequences(client, config, index) + private val seqName = "events" + + def getEsType(appId: Int, channelId: Option[Int] = None): String = { + channelId.map { ch => + s"${appId}_${ch}" + }.getOrElse { + s"${appId}" + } + } + + override def init(appId: Int, channelId: Option[Int] = None): Boolean = { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val json = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("name" -> ("type" -> "keyword")) ~ + ("eventId" -> ("type" -> "keyword")) ~ + ("event" -> ("type" -> "keyword")) ~ + ("entityType" -> ("type" -> "keyword")) ~ + ("entityId" -> ("type" -> "keyword")) ~ + ("targetEntityType" -> ("type" -> "keyword")) ~ + ("targetEntityId" -> ("type" -> "keyword")) ~ + ("properties" -> + ("type" -> "nested") ~ + ("properties" -> + ("fields" -> ("type" -> "nested") ~ + ("properties" -> + ("user" -> ("type" -> "long")) ~ + ("num" -> ("type" -> "long")))))) ~ + ("eventTime" -> ("type" -> "date")) ~ + ("tags" -> ("type" -> "keyword")) ~ + ("prId" -> ("type" -> "keyword")) ~ + ("creationTime" -> ("type" -> "date")))) + ESUtils.createMapping(restClient, index, estype, compact(render(json))) + } finally { + restClient.close() + } + true + } + + override def remove(appId: Int, channelId: Option[Int] = None): Boolean = { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + restClient.performRequest( + "POST", + s"/$index/$estype/_delete_by_query", + Map.empty[String, String].asJava, + entity).getStatusLine.getStatusCode match { + case 200 => true + case _ => + error(s"Failed to remove $index/$estype") + false + } + } catch { + case e: Exception => + error(s"Failed to remove $index/$estype", e) + false + } finally { + restClient.close() + } + } + + override def close(): Unit = { + // nothing + } + + override def futureInsert( + event: Event, + appId: Int, + channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = { + Future { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + val id = event.eventId.getOrElse { + var roll = seq.genNext(seqName) + while (exists(restClient, estype, roll)) roll = seq.genNext(seqName) + roll.toString + } + val json = write(event.copy(eventId = Some(id))) + val entity = new NStringEntity(json, ContentType.APPLICATION_JSON); + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => id + case "updated" => id + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + "" + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/<id>", e) + "" + } finally { + restClient.close() + } + } + } + + private def exists(restClient: RestClient, estype: String, id: Int): Boolean = { + try { + restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + case 200 => true + case _ => false + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => false + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + false + } + case e: IOException => + error(s"Failed to access to $index/$estype/$id", e) + false + } + } + + override def futureGet( + eventId: String, + appId: Int, + channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { + Future { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("eventId" -> eventId))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/_search", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "hits" \ "total").extract[Long] match { + case 0 => None + case _ => + val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] + val result = (results.head \ "_source").extract[Event] + Some(result) + } + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + None + } finally { + restClient.close() + } + } + } + + override def futureDelete( + eventId: String, + appId: Int, + channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = { + Future { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("eventId" -> eventId))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/_delete_by_query", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => true + case _ => + error(s"[$result] Failed to update $index/$estype:$eventId") + false + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype:$eventId", e) + false + } finally { + restClient.close() + } + } + } + + override def futureFind( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + limit: Option[Int] = None, + reversed: Option[Boolean] = None) + (implicit ec: ExecutionContext): Future[Iterator[Event]] = { + Future { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + val query = ESUtils.createEventQuery( + startTime, untilTime, entityType, entityId, + eventNames, targetEntityType, targetEntityId, None) + ESUtils.getAll[Event](restClient, index, estype, query).toIterator + } catch { + case e: IOException => + error(e.getMessage) + Iterator[Event]() + } finally { + restClient.close() + } + } + } + +}
