http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala deleted file mode 100644 index b453820..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.view - -import org.apache.predictionio.data.storage.hbase.HBPEvents -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.EventValidation -import org.apache.predictionio.data.storage.DataMap -import org.apache.predictionio.data.storage.Storage - -import org.joda.time.DateTime - -import org.json4s.JValue - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - - -// each JValue data associated with the time it is set -private[predictionio] case class PropTime(val d: JValue, val t: Long) extends Serializable - -private[predictionio] case class SetProp ( - val fields: Map[String, PropTime], - // last set time. Note: fields could be empty with valid set time - val t: Long) extends Serializable { - - def ++ (that: SetProp): SetProp = { - val commonKeys = fields.keySet.intersect(that.fields.keySet) - - val common: Map[String, PropTime] = commonKeys.map { k => - val thisData = this.fields(k) - val thatData = that.fields(k) - // only keep the value with latest time - val v = if (thisData.t > thatData.t) thisData else thatData - (k, v) - }.toMap - - val combinedFields = common ++ - (this.fields -- commonKeys) ++ (that.fields -- commonKeys) - - // keep the latest set time - val combinedT = if (this.t > that.t) this.t else that.t - - SetProp( - fields = combinedFields, - t = combinedT - ) - } -} - -private[predictionio] case class UnsetProp (fields: Map[String, Long]) extends Serializable { - def ++ (that: UnsetProp): UnsetProp = { - val commonKeys = fields.keySet.intersect(that.fields.keySet) - - val common: Map[String, Long] = commonKeys.map { k => - val thisData = this.fields(k) - val thatData = that.fields(k) - // only keep the value with latest time - val v = if (thisData > thatData) thisData else thatData - (k, v) - }.toMap - - val combinedFields = common ++ - (this.fields -- commonKeys) ++ (that.fields -- commonKeys) - - UnsetProp( - fields = combinedFields - ) - } -} - -private[predictionio] case class DeleteEntity (t: Long) extends Serializable { - def ++ (that: DeleteEntity): DeleteEntity = { - if (this.t > that.t) this else that - } -} - -private[predictionio] case class EventOp ( - val setProp: Option[SetProp] = None, - val unsetProp: Option[UnsetProp] = None, - val deleteEntity: Option[DeleteEntity] = None -) extends Serializable { - - def ++ (that: EventOp): EventOp = { - EventOp( - setProp = (setProp ++ that.setProp).reduceOption(_ ++ _), - unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _), - deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _) - ) - } - - def toDataMap(): Option[DataMap] = { - setProp.flatMap { set => - - val unsetKeys: Set[String] = unsetProp.map( unset => - unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet - ).getOrElse(Set()) - - val combinedFields = deleteEntity.map { delete => - if (delete.t >= set.t) { - None - } else { - val deleteKeys: Set[String] = set.fields - .filter { case (k, PropTime(kv, t)) => - (delete.t >= t) - }.keySet - Some(set.fields -- unsetKeys -- deleteKeys) - } - }.getOrElse{ - Some(set.fields -- unsetKeys) - } - - // Note: mapValues() doesn't return concrete Map and causes - // NotSerializableException issue. Use map(identity) to work around this. - // see https://issues.scala-lang.org/browse/SI-7005 - combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity))) - } - } - -} - -private[predictionio] object EventOp { - def apply(e: Event): EventOp = { - val t = e.eventTime.getMillis - e.event match { - case "$set" => { - val fields = e.properties.fields.mapValues(jv => - PropTime(jv, t) - ).map(identity) - - EventOp( - setProp = Some(SetProp(fields = fields, t = t)) - ) - } - case "$unset" => { - val fields = e.properties.fields.mapValues(jv => t).map(identity) - EventOp( - unsetProp = Some(UnsetProp(fields = fields)) - ) - } - case "$delete" => { - EventOp( - deleteEntity = Some(DeleteEntity(t)) - ) - } - case _ => { - EventOp() - } - } - } -} - -@deprecated("Use PEvents or PEventStore instead.", "0.9.2") -class PBatchView( - val appId: Int, - val startTime: Option[DateTime], - val untilTime: Option[DateTime], - val sc: SparkContext) { - - // NOTE: parallel Events DB interface - @transient lazy val eventsDb = Storage.getPEvents() - - @transient lazy val _events: RDD[Event] = - eventsDb.getByAppIdAndTimeAndEntity( - appId = appId, - startTime = startTime, - untilTime = untilTime, - entityType = None, - entityId = None)(sc) - - // TODO: change to use EventSeq? - @transient lazy val events: RDD[Event] = _events - - def aggregateProperties( - entityType: String, - startTimeOpt: Option[DateTime] = None, - untilTimeOpt: Option[DateTime] = None - ): RDD[(String, DataMap)] = { - - _events - .filter( e => ((e.entityType == entityType) && - (EventValidation.isSpecialEvents(e.event))) ) - .map( e => (e.entityId, EventOp(e) )) - .aggregateByKey[EventOp](EventOp())( - // within same partition - seqOp = { case (u, v) => u ++ v }, - // across partition - combOp = { case (accu, u) => accu ++ u } - ) - .mapValues(_.toDataMap) - .filter{ case (k, v) => v.isDefined } - .map{ case (k, v) => (k, v.get) } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/make-distribution.sh ---------------------------------------------------------------------- diff --git a/make-distribution.sh b/make-distribution.sh index b6c8ed3..31954c0 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -27,13 +27,15 @@ VERSION=$(grep version ${FWDIR}/build.sbt | grep ThisBuild | grep -o '".*"' | se echo "Building binary distribution for PredictionIO $VERSION..." cd ${FWDIR} -sbt/sbt common/publishLocal data/publishLocal core/publishLocal e2/publishLocal tools/assembly +sbt/sbt common/publishLocal data/publishLocal core/publishLocal e2/publishLocal dataElasticsearch1/assembly dataElasticsearch/assembly dataHbase/assembly dataHdfs/assembly dataJdbc/assembly dataLocalfs/assembly tools/assembly cd ${FWDIR} rm -rf ${DISTDIR} mkdir -p ${DISTDIR}/bin mkdir -p ${DISTDIR}/conf mkdir -p ${DISTDIR}/lib +mkdir -p ${DISTDIR}/lib/spark +mkdir -p ${DISTDIR}/lib/extra mkdir -p ${DISTDIR}/project mkdir -p ${DISTDIR}/sbt @@ -42,6 +44,8 @@ cp ${FWDIR}/conf/* ${DISTDIR}/conf cp ${FWDIR}/project/build.properties ${DISTDIR}/project cp ${FWDIR}/sbt/sbt ${DISTDIR}/sbt cp ${FWDIR}/assembly/*assembly*jar ${DISTDIR}/lib +cp ${FWDIR}/assembly/spark/*jar ${DISTDIR}/lib/spark +cp ${FWDIR}/assembly/extra/*jar ${DISTDIR}/lib/extra rm -f ${DISTDIR}/lib/*javadoc.jar rm -f ${DISTDIR}/lib/*sources.jar http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/.gitignore ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/.gitignore b/storage/elasticsearch/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/storage/elasticsearch/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/build.sbt ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/build.sbt b/storage/elasticsearch/build.sbt new file mode 100644 index 0000000..b22cbd8 --- /dev/null +++ b/storage/elasticsearch/build.sbt @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "apache-predictionio-data-elasticsearch" + +elasticsearchVersion := "5.2.1" + +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", + "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", + "org.elasticsearch.client" % "rest" % elasticsearchVersion.value, + "org.elasticsearch" %% "elasticsearch-spark-13" % elasticsearchVersion.value + exclude("org.apache.spark", "spark-sql_2.10") + exclude("org.apache.spark", "spark-streaming_2.10"), + "org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearchVersion.value, + "org.scalatest" %% "scalatest" % "2.1.7" % "test", + "org.specs2" %% "specs2" % "2.3.13" % "test") + +parallelExecution in Test := false + +pomExtra := childrenPomExtra.value + +assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true) + +assemblyMergeStrategy in assembly := { + case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat + case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) +} + +assemblyShadeRules in assembly := Seq( + ShadeRule.rename("org.apache.http.**" -> "shadeio.data.http.@1").inAll +) + +// skip test in assembly +test in assembly := {} + +outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-elasticsearch-assembly-" + version.value + ".jar") + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala new file mode 100644 index 0000000..cb6d330 --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.AccessKey +import org.apache.predictionio.data.storage.AccessKeys +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +/** Elasticsearch implementation of AccessKeys. */ +class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) + extends AccessKeys with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "accesskeys" + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("key" -> ("type" -> "keyword")) ~ + ("events" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def insert(accessKey: AccessKey): Option[String] = { + val key = if (accessKey.key.isEmpty) generateKey else accessKey.key + update(accessKey.copy(key = key)) + Some(key) + } + + def get(id: String): Option[AccessKey] = { + if (id.isEmpty) { + return None + } + val restClient = client.open() + try { + val response = restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[AccessKey]) + case _ => + None + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) + None + } finally { + restClient.close() + } + } + + def getAll(): Seq[AccessKey] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def getByAppid(appid: Int): Seq[AccessKey] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("appid" -> appid))) + ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def update(accessKey: AccessKey): Unit = { + val id = accessKey.key + val restClient = client.open() + try { + val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } + + def delete(id: String): Unit = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/id", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala new file mode 100644 index 0000000..abea2b8 --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.App +import org.apache.predictionio.data.storage.Apps +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +/** Elasticsearch implementation of Items. */ +class ESApps(client: ESClient, config: StorageClientConfig, index: String) + extends Apps with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "apps" + private val seq = new ESSequences(client, config, index) + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("id" -> ("type" -> "keyword")) ~ + ("name" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def insert(app: App): Option[Int] = { + val id = + if (app.id == 0) { + var roll = seq.genNext(estype) + while (!get(roll).isEmpty) roll = seq.genNext(estype) + roll + } else app.id + update(app.copy(id = id)) + Some(id) + } + + def get(id: Int): Option[App] = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[App]) + case _ => + None + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) + None + } finally { + restClient.close() + } + } + + def getByName(name: String): Option[App] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("name" -> name))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/_search", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "hits" \ "total").extract[Long] match { + case 0 => None + case _ => + val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] + val result = (results.head \ "_source").extract[App] + Some(result) + } + } catch { + case e: IOException => + error(s"Failed to access to /$index/$estype/_search", e) + None + } finally { + restClient.close() + } + } + + def getAll(): Seq[App] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[App](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def update(app: App): Unit = { + val id = app.id.toString + val restClient = client.open() + try { + val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON); + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } + + def delete(id: Int): Unit = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/id", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala new file mode 100644 index 0000000..f092cc7 --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.Channel +import org.apache.predictionio.data.storage.Channels +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESChannels(client: ESClient, config: StorageClientConfig, index: String) + extends Channels with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "channels" + private val seq = new ESSequences(client, config, index) + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("name" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def insert(channel: Channel): Option[Int] = { + val id = + if (channel.id == 0) { + var roll = seq.genNext(estype) + while (!get(roll).isEmpty) roll = seq.genNext(estype) + roll + } else channel.id + + if (update(channel.copy(id = id))) Some(id) else None + } + + def get(id: Int): Option[Channel] = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[Channel]) + case _ => + None + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) + None + } finally { + restClient.close() + } + } + + def getByAppid(appid: Int): Seq[Channel] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("appid" -> appid))) + ESUtils.getAll[Channel](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error(s"Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def update(channel: Channel): Boolean = { + val id = channel.id.toString + val restClient = client.open() + try { + val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava, + entity) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "created" => true + case "updated" => true + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + false + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + false + } finally { + restClient.close() + } + } + + def delete(id: Int): Unit = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala new file mode 100644 index 0000000..4dbacb7 --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.EngineInstance +import org.apache.predictionio.data.storage.EngineInstanceSerializer +import org.apache.predictionio.data.storage.EngineInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: String) + extends EngineInstances with Logging { + implicit val formats = DefaultFormats + new EngineInstanceSerializer + private val estype = "engine_instances" + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("engineId" -> ("type" -> "keyword")) ~ + ("engineVersion" -> ("type" -> "keyword")) ~ + ("engineVariant" -> ("type" -> "keyword")) ~ + ("engineFactory" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("dataSourceParams" -> ("type" -> "keyword")) ~ + ("preparatorParams" -> ("type" -> "keyword")) ~ + ("algorithmsParams" -> ("type" -> "keyword")) ~ + ("servingParams" -> ("type" -> "keyword")) ~ + ("status" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def insert(i: EngineInstance): String = { + val id = i.id match { + case x if x.isEmpty => + @scala.annotation.tailrec + def generateId(newId: Option[String]): String = { + newId match { + case Some(x) => x + case _ => generateId(preInsert()) + } + } + generateId(preInsert()) + case x => x + } + + update(i.copy(id = id)) + id + } + + def preInsert(): Option[String] = { + val restClient = client.open() + try { + val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/", + Map("refresh" -> "true").asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + Some((jsonResponse \ "_id").extract[String]) + case _ => + error(s"[$result] Failed to create $index/$estype") + None + } + } catch { + case e: IOException => + error(s"Failed to create $index/$estype", e) + None + } finally { + restClient.close() + } + } + + def get(id: String): Option[EngineInstance] = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[EngineInstance]) + case _ => + None + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) + None + } finally { + restClient.close() + } + } + + def getAll(): Seq[EngineInstance] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def getCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Seq[EngineInstance] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("bool" -> + ("must" -> List( + ("term" -> + ("status" -> "COMPLETED")), + ("term" -> + ("engineId" -> engineId)), + ("term" -> + ("engineVersion" -> engineVersion)), + ("term" -> + ("engineVariant" -> engineVariant)))))) ~ + ("sort" -> List( + ("startTime" -> + ("order" -> "desc")))) + ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error(s"Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def getLatestCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Option[EngineInstance] = + getCompleted( + engineId, + engineVersion, + engineVariant).headOption + + def update(i: EngineInstance): Unit = { + val id = i.id + val restClient = client.open() + try { + val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } + + def delete(id: String): Unit = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala new file mode 100644 index 0000000..5bdc0fb --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.EvaluationInstance +import org.apache.predictionio.data.storage.EvaluationInstanceSerializer +import org.apache.predictionio.data.storage.EvaluationInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index: String) + extends EvaluationInstances with Logging { + implicit val formats = DefaultFormats + new EvaluationInstanceSerializer + private val estype = "evaluation_instances" + private val seq = new ESSequences(client, config, index) + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("evaluationClass" -> ("type" -> "keyword")) ~ + ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~ + ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~ + ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def insert(i: EvaluationInstance): String = { + val id = i.id match { + case x if x.isEmpty => + var roll = seq.genNext(estype).toString + while (!get(roll).isEmpty) roll = seq.genNext(estype).toString + roll + case x => x + } + + update(i.copy(id = id)) + id + } + + def get(id: String): Option[EvaluationInstance] = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[EvaluationInstance]) + case _ => + None + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) + None + } finally { + restClient.close() + } + } + + def getAll(): Seq[EvaluationInstance] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def getCompleted(): Seq[EvaluationInstance] = { + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("status" -> "EVALCOMPLETED"))) ~ + ("sort" -> + ("startTime" -> + ("order" -> "desc"))) + ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json))) + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil + } finally { + restClient.close() + } + } + + def update(i: EvaluationInstance): Unit = { + val id = i.id + val restClient = client.open() + try { + val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava, + entity) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } + + def delete(id: String): Unit = { + val restClient = client.open() + try { + val response = restClient.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala new file mode 100644 index 0000000..56f47ab --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.MapWritable +import org.apache.hadoop.io.Text +import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventValidation +import org.joda.time.DateTime +import org.joda.time.DateTimeZone +import org.json4s._ + +object ESEventsUtil { + + implicit val formats = DefaultFormats + + def resultToEvent(id: Text, result: MapWritable, appId: Int): Event = { + + def getStringCol(col: String): String = { + val r = result.get(new Text(col)).asInstanceOf[Text] + require(r != null, + s"Failed to get value for column ${col}. " + + s"StringBinary: ${r.getBytes()}.") + + r.toString() + } + + def getOptStringCol(col: String): Option[String] = { + val r = result.get(new Text(col)) + if (r == null) { + None + } else { + Some(r.asInstanceOf[Text].toString()) + } + } + + val tmp = result + .get(new Text("properties")).asInstanceOf[MapWritable] + .get(new Text("fields")).asInstanceOf[MapWritable] + .get(new Text("rating")) + + val rating = + if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable] + else if (tmp.isInstanceOf[LongWritable]) { + new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble) + } + else null + + val properties: DataMap = + if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""") + else DataMap() + + + val eventId = Some(getStringCol("eventId")) + val event = getStringCol("event") + val entityType = getStringCol("entityType") + val entityId = getStringCol("entityId") + val targetEntityType = getOptStringCol("targetEntityType") + val targetEntityId = getOptStringCol("targetEntityId") + val prId = getOptStringCol("prId") + val eventTimeZone = getOptStringCol("eventTimeZone") + .map(DateTimeZone.forID(_)) + .getOrElse(EventValidation.defaultTimeZone) + val eventTime = new DateTime( + getStringCol("eventTime"), eventTimeZone) + val creationTimeZone = getOptStringCol("creationTimeZone") + .map(DateTimeZone.forID(_)) + .getOrElse(EventValidation.defaultTimeZone) + val creationTime: DateTime = new DateTime( + getStringCol("creationTime"), creationTimeZone) + + + Event( + eventId = eventId, + event = event, + entityType = entityType, + entityId = entityId, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + properties = properties, + eventTime = eventTime, + tags = Seq(), + prId = prId, + creationTime = creationTime + ) + } + + def eventToPut(event: Event, appId: Int): Map[String, Any] = { + Map( + "eventId" -> event.eventId, + "event" -> event.event, + "entityType" -> event.entityType, + "entityId" -> event.entityId, + "targetEntityType" -> event.targetEntityType, + "targetEntityId" -> event.targetEntityId, + "properties" -> event.properties.toJObject, + "eventTime" -> event.eventTime.toString, + "tags" -> event.tags, + "prId" -> event.prId, + "creationTime" -> event.creationTime.toString + ) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala new file mode 100644 index 0000000..fdd370a --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import java.io.IOException + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.LEvents +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.joda.time.DateTime +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write +import org.json4s.ext.JodaTimeSerializers + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException +import org.apache.http.entity.StringEntity + +class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String) + extends LEvents with Logging { + implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all + private val seq = new ESSequences(client, config, index) + private val seqName = "events" + + def getEsType(appId: Int, channelId: Option[Int] = None): String = { + channelId.map { ch => + s"${appId}_${ch}" + }.getOrElse { + s"${appId}" + } + } + + override def init(appId: Int, channelId: Option[Int] = None): Boolean = { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val json = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("name" -> ("type" -> "keyword")) ~ + ("eventId" -> ("type" -> "keyword")) ~ + ("event" -> ("type" -> "keyword")) ~ + ("entityType" -> ("type" -> "keyword")) ~ + ("entityId" -> ("type" -> "keyword")) ~ + ("targetEntityType" -> ("type" -> "keyword")) ~ + ("targetEntityId" -> ("type" -> "keyword")) ~ + ("properties" -> + ("type" -> "nested") ~ + ("properties" -> + ("fields" -> ("type" -> "nested") ~ + ("properties" -> + ("user" -> ("type" -> "long")) ~ + ("num" -> ("type" -> "long")))))) ~ + ("eventTime" -> ("type" -> "date")) ~ + ("tags" -> ("type" -> "keyword")) ~ + ("prId" -> ("type" -> "keyword")) ~ + ("creationTime" -> ("type" -> "date")))) + ESUtils.createMapping(restClient, index, estype, compact(render(json))) + } finally { + restClient.close() + } + true + } + + override def remove(appId: Int, channelId: Option[Int] = None): Boolean = { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + restClient.performRequest( + "POST", + s"/$index/$estype/_delete_by_query", + Map("refresh" -> "true").asJava, + entity).getStatusLine.getStatusCode match { + case 200 => true + case _ => + error(s"Failed to remove $index/$estype") + false + } + } catch { + case e: Exception => + error(s"Failed to remove $index/$estype", e) + false + } finally { + restClient.close() + } + } + + override def close(): Unit = { + // nothing + } + + override def futureInsert( + event: Event, + appId: Int, + channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = { + Future { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + val id = event.eventId.getOrElse { + var roll = seq.genNext(seqName) + while (exists(restClient, estype, roll)) roll = seq.genNext(seqName) + roll.toString + } + val json = write(event.copy(eventId = Some(id))) + val entity = new NStringEntity(json, ContentType.APPLICATION_JSON); + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$id", + Map("refresh" -> "true").asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => id + case "updated" => id + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + "" + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/<id>", e) + "" + } finally { + restClient.close() + } + } + } + + private def exists(restClient: RestClient, estype: String, id: Int): Boolean = { + try { + restClient.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + case 200 => true + case _ => false + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => false + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + false + } + case e: IOException => + error(s"Failed to access to $index/$estype/$id", e) + false + } + } + + override def futureGet( + eventId: String, + appId: Int, + channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { + Future { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("eventId" -> eventId))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/_search", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "hits" \ "total").extract[Long] match { + case 0 => None + case _ => + val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] + val result = (results.head \ "_source").extract[Event] + Some(result) + } + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + None + } finally { + restClient.close() + } + } + } + + override def futureDelete( + eventId: String, + appId: Int, + channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = { + Future { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + val json = + ("query" -> + ("term" -> + ("eventId" -> eventId))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/_delete_by_query", + Map("refresh" -> "true").asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => true + case _ => + error(s"[$result] Failed to update $index/$estype:$eventId") + false + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype:$eventId", e) + false + } finally { + restClient.close() + } + } + } + + override def futureFind( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + limit: Option[Int] = None, + reversed: Option[Boolean] = None) + (implicit ec: ExecutionContext): Future[Iterator[Event]] = { + Future { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + val query = ESUtils.createEventQuery( + startTime, untilTime, entityType, entityId, + eventNames, targetEntityType, targetEntityId, reversed) + limit.getOrElse(20) match { + case -1 => ESUtils.getAll[Event](restClient, index, estype, query).toIterator + case size => ESUtils.get[Event](restClient, index, estype, query, size).toIterator + } + } catch { + case e: IOException => + error(e.getMessage) + Iterator[Event]() + } finally { + restClient.close() + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala new file mode 100644 index 0000000..390e78c --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.MapWritable +import org.apache.hadoop.io.Text +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.PEvents +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.elasticsearch.client.RestClient +import org.elasticsearch.hadoop.mr.EsInputFormat +import org.elasticsearch.spark._ +import org.joda.time.DateTime +import java.io.IOException +import org.apache.http.util.EntityUtils +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.entity.ContentType +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.ext.JodaTimeSerializers + + +class ESPEvents(client: ESClient, config: StorageClientConfig, index: String) + extends PEvents { + implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all + + def getEsType(appId: Int, channelId: Option[Int] = None): String = { + channelId.map { ch => + s"${appId}_${ch}" + }.getOrElse { + s"${appId}" + } + } + + def getESNodes(): String = { + val hosts = config.properties.get("HOSTS"). + map(_.split(",").toSeq).getOrElse(Seq("localhost")) + val ports = config.properties.get("PORTS"). + map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200)) + (hosts, ports).zipped.map( + (h, p) => s"$h:$p").mkString(",") + } + + override def find( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = { + + val query = ESUtils.createEventQuery( + startTime, untilTime, entityType, entityId, + eventNames, targetEntityType, targetEntityId, None) + + val estype = getEsType(appId, channelId) + val conf = new Configuration() + conf.set("es.resource", s"$index/$estype") + conf.set("es.query", query) + conf.set("es.nodes", getESNodes()) + + val rdd = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], + classOf[Text], classOf[MapWritable]).map { + case (key, doc) => { + ESEventsUtil.resultToEvent(key, doc, appId) + } + } + + rdd + } + + override def write( + events: RDD[Event], + appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + val estype = getEsType(appId, channelId) + val conf = Map("es.resource" -> s"$index/$estype", "es.nodes" -> getESNodes()) + events.map { event => + ESEventsUtil.eventToPut(event, appId) + }.saveToEs(conf) + } + + override def delete( + eventIds: RDD[String], + appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + eventIds.foreachPartition { iter => + iter.foreach { eventId => + try { + val json = + ("query" -> + ("term" -> + ("eventId" -> eventId))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/_delete_by_query", + Map("refresh" -> "true").asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => true + case _ => + logger.error(s"[$result] Failed to update $index/$estype:$eventId") + false + } + } catch { + case e: IOException => + logger.error(s"Failed to update $index/$estype:$eventId", e) + false + } + } + } + } finally { + restClient.close() + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala new file mode 100644 index 0000000..e5264ae --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.http.Header +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging + +class ESSequences(client: ESClient, config: StorageClientConfig, index: String) extends Logging { + implicit val formats = DefaultFormats + private val estype = "sequences" + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def genNext(name: String): Int = { + val restClient = client.open() + try { + val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$name", + Map("refresh" -> "true").asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + (jsonResponse \ "_version").extract[Int] + case "updated" => + (jsonResponse \ "_version").extract[Int] + case _ => + throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name") + } + } catch { + case e: IOException => + throw new StorageClientException(s"Failed to update $index/$estype/$name", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala new file mode 100644 index 0000000..72f4dd6 --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.nio.entity.NStringEntity +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.apache.http.util.EntityUtils +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat +import org.joda.time.DateTimeZone +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.http.HttpHost + +object ESUtils { + val scrollLife = "1m" + + def get[T: Manifest]( + client: RestClient, + index: String, + estype: String, + query: String, + size: Int)( + implicit formats: Formats): Seq[T] = { + val response = client.performRequest( + "POST", + s"/$index/$estype/_search", + Map("size" -> s"${size}"), + new StringEntity(query)) + val responseJValue = parse(EntityUtils.toString(response.getEntity)) + val hits = (responseJValue \ "hits" \ "hits").extract[Seq[JValue]] + hits.map(h => (h \ "_source").extract[T]) + } + + def getAll[T: Manifest]( + client: RestClient, + index: String, + estype: String, + query: String)( + implicit formats: Formats): Seq[T] = { + + @scala.annotation.tailrec + def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = { + if (hits.isEmpty) results + else { + val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId) + val scrollBody = new StringEntity(compact(render(json))) + val response = client.performRequest( + "POST", + "/_search/scroll", + Map[String, String](), + scrollBody) + val responseJValue = parse(EntityUtils.toString(response.getEntity)) + scroll((responseJValue \ "_scroll_id").extract[String], + (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], + hits.map(h => (h \ "_source").extract[T]) ++ results) + } + } + + val response = client.performRequest( + "POST", + s"/$index/$estype/_search", + Map("scroll" -> scrollLife), + new StringEntity(query)) + val responseJValue = parse(EntityUtils.toString(response.getEntity)) + scroll((responseJValue \ "_scroll_id").extract[String], + (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], + Nil) + } + + def createIndex( + client: RestClient, + index: String): Unit = { + client.performRequest( + "HEAD", + s"/$index", + Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + case 404 => + client.performRequest( + "PUT", + s"/$index", + Map.empty[String, String].asJava) + case 200 => + case _ => + throw new IllegalStateException(s"/$index is invalid.") + } + } + + def createMapping( + client: RestClient, + index: String, + estype: String, + json: String): Unit = { + client.performRequest( + "HEAD", + s"/$index/_mapping/$estype", + Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + case 404 => + val entity = new NStringEntity(json, ContentType.APPLICATION_JSON) + client.performRequest( + "PUT", + s"/$index/_mapping/$estype", + Map.empty[String, String].asJava, + entity) + case 200 => + case _ => + throw new IllegalStateException(s"/$index/$estype is invalid: $json") + } + } + + def createEventQuery( + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + reversed: Option[Boolean] = None): String = { + val mustQueries = Seq( + startTime.map(x => { + val v = DateTimeFormat + .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC)) + s"""{"range":{"eventTime":{"gte":"${v}"}}}""" + }), + untilTime.map(x => { + val v = DateTimeFormat + .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC)) + s"""{"range":{"eventTime":{"lt":"${v}"}}}""" + }), + entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""), + entityId.map(x => s"""{"term":{"entityId":"${x}"}}"""), + targetEntityType.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityType":"${x}"}}""")), + targetEntityId.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityId":"${x}"}}""")), + eventNames + .map { xx => xx.map(x => "\"%s\"".format(x)) } + .map(x => s"""{"terms":{"event":[${x.mkString(",")}]}}""")).flatten.mkString(",") + val query = mustQueries.isEmpty match { + case true => """query":{"match_all":{}}""" + case _ => s"""query":{"bool":{"must":[${mustQueries}]}}""" + } + val sortOrder = reversed.map(x => x match { + case true => "desc" + case _ => "asc" + }).getOrElse("asc") + s"""{ + |"${query}, + |"sort":[{"eventTime":{"order":"${sortOrder}"}}] + |}""".stripMargin + } + + def getHttpHosts(config: StorageClientConfig): Seq[HttpHost] = { + val hosts = config.properties.get("HOSTS"). + map(_.split(",").toSeq).getOrElse(Seq("localhost")) + val ports = config.properties.get("PORTS"). + map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200)) + val schemes = config.properties.get("SCHEMES"). + map(_.split(",").toSeq).getOrElse(Seq("http")) + (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala new file mode 100644 index 0000000..647d180 --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import org.apache.http.HttpHost +import org.apache.predictionio.data.storage.BaseStorageClient +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException +import org.elasticsearch.client.RestClient + +import grizzled.slf4j.Logging + +case class ESClient(hosts: Seq[HttpHost]) { + def open(): RestClient = { + try { + RestClient.builder(hosts: _*).build() + } catch { + case e: Throwable => + throw new StorageClientException(e.getMessage, e) + } + } +} + +class StorageClient(val config: StorageClientConfig) extends BaseStorageClient + with Logging { + override val prefix = "ES" + + val client = ESClient(ESUtils.getHttpHosts(config)) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala new file mode 100644 index 0000000..fdc3b48 --- /dev/null +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage + +/** Elasticsearch implementation of storage traits, supporting meta data only + * + * @group Implementation + */ +package elasticsearch {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/test/resources/application.conf b/storage/elasticsearch/src/test/resources/application.conf new file mode 100644 index 0000000..eecae44 --- /dev/null +++ b/storage/elasticsearch/src/test/resources/application.conf @@ -0,0 +1,28 @@ +org.apache.predictionio.data.storage { + sources { + mongodb { + type = mongodb + hosts = [localhost] + ports = [27017] + } + elasticsearch { + type = elasticsearch + hosts = [localhost] + ports = [9300] + } + } + repositories { + # This section is dummy just to make storage happy. + # The actual testing will not bypass these repository settings completely. + # Please refer to StorageTestUtils.scala. + settings { + name = "test_predictionio" + source = mongodb + } + + appdata { + name = "test_predictionio_appdata" + source = mongodb + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/.gitignore ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/.gitignore b/storage/elasticsearch1/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/storage/elasticsearch1/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/elasticsearch1/build.sbt ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/build.sbt b/storage/elasticsearch1/build.sbt new file mode 100644 index 0000000..8c29b84 --- /dev/null +++ b/storage/elasticsearch1/build.sbt @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "apache-predictionio-data-elasticsearch1" + +elasticsearchVersion := "1.7.3" + +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", + "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided", + "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value, + "org.scalatest" %% "scalatest" % "2.1.7" % "test", + "org.specs2" %% "specs2" % "2.3.13" % "test") + +parallelExecution in Test := false + +pomExtra := childrenPomExtra.value + +assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true) + +assemblyMergeStrategy in assembly := { + case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat + case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) +} + +// skip test in assembly +test in assembly := {} + +outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "extra" / ("pio-data-elasticsearch1-assembly-" + version.value + ".jar") +
