http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESPEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESPEvents.scala new file mode 100644 index 0000000..7f5dd9a --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESPEvents.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch5 + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.MapWritable +import org.apache.hadoop.io.Text +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.PEvents +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.elasticsearch.client.RestClient +import org.elasticsearch.hadoop.mr.EsInputFormat +import org.elasticsearch.spark._ +import org.joda.time.DateTime +import java.io.IOException +import org.apache.http.util.EntityUtils +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.entity.ContentType +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.ext.JodaTimeSerializers + + +class ESPEvents(client: ESClient, config: StorageClientConfig, index: String) + extends PEvents { + implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all + + def getEsType(appId: Int, channelId: Option[Int] = None): String = { + channelId.map { ch => + s"${appId}_${ch}" + }.getOrElse { + s"${appId}" + } + } + + def getESNodes(): String = { + val hosts = config.properties.get("HOSTS"). + map(_.split(",").toSeq).getOrElse(Seq("localhost")) + val ports = config.properties.get("PORTS"). + map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200)) + val schemes = config.properties.get("SCHEMES"). + map(_.split(",").toSeq).getOrElse(Seq("http")) + (hosts, ports, schemes).zipped.map( + (h, p, s) => s"$s://$h:$p").mkString(",") + } + + override def find( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = { + + val query = ESUtils.createEventQuery( + startTime, untilTime, entityType, entityId, + eventNames, targetEntityType, targetEntityId, None) + + val estype = getEsType(appId, channelId) + val conf = new Configuration() + conf.set("es.resource", s"$index/$estype") + conf.set("es.query", query) + conf.set("es.nodes", getESNodes()) + + val rdd = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], + classOf[Text], classOf[MapWritable]).map { + case (key, doc) => { + ESEventsUtil.resultToEvent(key, doc, appId) + } + } + + rdd + } + + override def write( + events: RDD[Event], + appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + val estype = getEsType(appId, channelId) + events.map { event => + ESEventsUtil.eventToPut(event, appId) + }.saveToEs(s"$index/$estype") + } + + override def delete( + eventIds: RDD[String], + appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + val estype = getEsType(appId, channelId) + val restClient = client.open() + try { + eventIds.foreachPartition { iter => + iter.foreach { eventId => + try { + val json = + ("query" -> + ("term" -> + ("eventId" -> eventId))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/_delete_by_query", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => true + case _ => + logger.error(s"[$result] Failed to update $index/$estype:$eventId") + false + } + } catch { + case e: IOException => + logger.error(s"Failed to update $index/$estype:$eventId", e) + false + } + } + } + } finally { + restClient.close() + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESSequences.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESSequences.scala new file mode 100644 index 0000000..ae83e6a --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESSequences.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch5 + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.http.Header +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging + +class ESSequences(client: ESClient, config: StorageClientConfig, index: String) extends Logging { + implicit val formats = DefaultFormats + private val estype = "sequences" + + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } + + def genNext(name: String): Int = { + val restClient = client.open() + try { + val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON) + val response = restClient.performRequest( + "POST", + s"/$index/$estype/$name", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + (jsonResponse \ "_version").extract[Int] + case "updated" => + (jsonResponse \ "_version").extract[Int] + case _ => + throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name") + } + } catch { + case e: IOException => + throw new StorageClientException(s"Failed to update $index/$estype/$name", e) + } finally { + restClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESUtils.scala new file mode 100644 index 0000000..ed46822 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESUtils.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch5 + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.nio.entity.NStringEntity +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.apache.http.util.EntityUtils +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat +import org.joda.time.DateTimeZone +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.http.HttpHost + +object ESUtils { + val scrollLife = "1m" + + def getAll[T: Manifest]( + client: RestClient, + index: String, + estype: String, + query: String)( + implicit formats: Formats): Seq[T] = { + + @scala.annotation.tailrec + def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = { + if (hits.isEmpty) results + else { + val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId) + val scrollBody = new StringEntity(compact(render(json))) + val response = client.performRequest( + "POST", + "/_search/scroll", + Map[String, String](), + scrollBody) + val responseJValue = parse(EntityUtils.toString(response.getEntity)) + scroll((responseJValue \ "_scroll_id").extract[String], + (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], + hits.map(h => (h \ "_source").extract[T]) ++ results) + } + } + + val response = client.performRequest( + "POST", + s"/$index/$estype/_search", + Map("scroll" -> scrollLife), + new StringEntity(query)) + val responseJValue = parse(EntityUtils.toString(response.getEntity)) + scroll((responseJValue \ "_scroll_id").extract[String], + (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], + Nil) + } + + def createIndex( + client: RestClient, + index: String): Unit = { + client.performRequest( + "HEAD", + s"/$index", + Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + case 404 => + client.performRequest( + "PUT", + s"/$index", + Map.empty[String, String].asJava) + case 200 => + case _ => + throw new IllegalStateException(s"/$index is invalid.") + } + } + + def createMapping( + client: RestClient, + index: String, + estype: String, + json: String): Unit = { + client.performRequest( + "HEAD", + s"/$index/_mapping/$estype", + Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + case 404 => + val entity = new NStringEntity(json, ContentType.APPLICATION_JSON) + client.performRequest( + "PUT", + s"/$index/_mapping/$estype", + Map.empty[String, String].asJava, + entity) + case 200 => + case _ => + throw new IllegalStateException(s"/$index/$estype is invalid: $json") + } + } + + def createEventQuery( + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + reversed: Option[Boolean] = None): String = { + val mustQueries = Seq( + startTime.map(x => { + val v = DateTimeFormat + .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC)) + s"""{"range":{"eventTime":{"gte":"${v}"}}}""" + }), + untilTime.map(x => { + val v = DateTimeFormat + .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC)) + s"""{"range":{"eventTime":{"gte":"${v}"}}}""" + }), + entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""), + entityId.map(x => s"""{"term":{"entityId":"${x}"}}"""), + targetEntityType.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityType":"${x}"}}""")), + targetEntityId.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityId":"${x}"}}""")), + eventNames + .map { xx => xx.map(x => "\"%s\"".format(x)) } + .map(x => s"""{"terms":{"event":[${x.mkString(",")}]}}""")).flatten.mkString(",") + val sortOrder = reversed.map(x => x match { + case true => "desc" + case _ => "asc" + }) + s"""{ + |"query":{"bool":{"must":[${mustQueries}]}}, + |"sort":[{"eventTime":{"order":"${sortOrder}"}}] + |}""".stripMargin + } + + def getHttpHosts(config: StorageClientConfig): Seq[HttpHost] = { + val hosts = config.properties.get("HOSTS"). + map(_.split(",").toSeq).getOrElse(Seq("localhost")) + val ports = config.properties.get("PORTS"). + map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200)) + val schemes = config.properties.get("SCHEMES"). + map(_.split(",").toSeq).getOrElse(Seq("http")) + (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/StorageClient.scala new file mode 100644 index 0000000..540892b --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/StorageClient.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch5 + +import org.apache.http.HttpHost +import org.apache.predictionio.data.storage.BaseStorageClient +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException +import org.elasticsearch.client.RestClient + +import grizzled.slf4j.Logging + +case class ESClient(hosts: Seq[HttpHost]) { + def open(): RestClient = { + try { + RestClient.builder(hosts: _*).build() + } catch { + case e: Throwable => + throw new StorageClientException(e.getMessage, e) + } + } +} + +class StorageClient(val config: StorageClientConfig) extends BaseStorageClient + with Logging { + override val prefix = "ES" + + val client = ESClient(ESUtils.getHttpHosts(config)) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/package.scala new file mode 100644 index 0000000..5cb423a --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage + +/** Elasticsearch implementation of storage traits, supporting meta data only + * + * @group Implementation + */ +package object elasticsearch5 {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index a8d730b..5420004 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -19,11 +19,11 @@ import sbt._ import Keys._ object PIOBuild extends Build { + val elasticsearch5Version = SettingKey[String]( + "elasticsearch5-version", + "The version of Elasticsearch 5.x used for building.") val elasticsearchVersion = SettingKey[String]( "elasticsearch-version", - "The version of Elasticsearch used for building.") - val elasticsearch1Version = SettingKey[String]( - "elasticsearch1-version", "The version of Elasticsearch 1.x used for building.") val json4sVersion = SettingKey[String]( "json4s-version", http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala index 10aca41..877da06 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala @@ -108,7 +108,7 @@ object Management extends EitherLogging { info("Inspecting Apache Spark...") val sparkHomePath = Common.getSparkHome(sparkHome) if (new File(s"$sparkHomePath/bin/spark-submit").exists) { - info(s"Apache Spark is installed at $sparkHome") + info(s"Apache Spark is installed at $sparkHomePath") val sparkMinVersion = "1.3.0" pioStatus = pioStatus.copy( sparkHome = sparkHomePath,
