http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESPEvents.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESPEvents.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESPEvents.scala
new file mode 100644
index 0000000..7f5dd9a
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESPEvents.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.MapWritable
+import org.apache.hadoop.io.Text
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.PEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.elasticsearch.client.RestClient
+import org.elasticsearch.hadoop.mr.EsInputFormat
+import org.elasticsearch.spark._
+import org.joda.time.DateTime
+import java.io.IOException
+import org.apache.http.util.EntityUtils
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.entity.ContentType
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.ext.JodaTimeSerializers
+
+
+class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
+    extends PEvents {
+  implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
+
+  def getEsType(appId: Int, channelId: Option[Int] = None): String = {
+    channelId.map { ch =>
+      s"${appId}_${ch}"
+    }.getOrElse {
+      s"${appId}"
+    }
+  }
+
+  def getESNodes(): String = {
+    val hosts = config.properties.get("HOSTS").
+      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+    val ports = config.properties.get("PORTS").
+      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200))
+    val schemes = config.properties.get("SCHEMES").
+      map(_.split(",").toSeq).getOrElse(Seq("http"))
+    (hosts, ports, schemes).zipped.map(
+      (h, p, s) => s"$s://$h:$p").mkString(",")
+  }
+
+  override def find(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None)(sc: SparkContext): 
RDD[Event] = {
+
+    val query = ESUtils.createEventQuery(
+      startTime, untilTime, entityType, entityId,
+      eventNames, targetEntityType, targetEntityId, None)
+
+    val estype = getEsType(appId, channelId)
+    val conf = new Configuration()
+    conf.set("es.resource", s"$index/$estype")
+    conf.set("es.query", query)
+    conf.set("es.nodes", getESNodes())
+
+    val rdd = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, 
MapWritable]],
+      classOf[Text], classOf[MapWritable]).map {
+        case (key, doc) => {
+          ESEventsUtil.resultToEvent(key, doc, appId)
+        }
+      }
+
+    rdd
+  }
+
+  override def write(
+    events: RDD[Event],
+    appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+    val estype = getEsType(appId, channelId)
+    events.map { event =>
+      ESEventsUtil.eventToPut(event, appId)
+    }.saveToEs(s"$index/$estype")
+  }
+
+  override def delete(
+    eventIds: RDD[String],
+    appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
+    val estype = getEsType(appId, channelId)
+    val restClient = client.open()
+    try {
+      eventIds.foreachPartition { iter =>
+        iter.foreach { eventId =>
+          try {
+            val json =
+              ("query" ->
+                ("term" ->
+                  ("eventId" -> eventId)))
+            val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
+            val response = restClient.performRequest(
+              "POST",
+              s"/$index/$estype/_delete_by_query",
+              Map.empty[String, String].asJava)
+            val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+            val result = (jsonResponse \ "result").extract[String]
+            result match {
+              case "deleted" => true
+              case _ =>
+                logger.error(s"[$result] Failed to update 
$index/$estype:$eventId")
+                false
+            }
+          } catch {
+            case e: IOException =>
+              logger.error(s"Failed to update $index/$estype:$eventId", e)
+              false
+          }
+        }
+      }
+    } finally {
+      restClient.close()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESSequences.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESSequences.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESSequences.scala
new file mode 100644
index 0000000..ae83e6a
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESSequences.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.http.Header
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+
+class ESSequences(client: ESClient, config: StorageClientConfig, index: 
String) extends Logging {
+  implicit val formats = DefaultFormats
+  private val estype = "sequences"
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)))
+    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def genNext(name: String): Int = {
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write("n" -> name), 
ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$name",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+          (jsonResponse \ "_version").extract[Int]
+        case "updated" =>
+          (jsonResponse \ "_version").extract[Int]
+        case _ =>
+          throw new IllegalStateException(s"[$result] Failed to update 
$index/$estype/$name")
+      }
+    } catch {
+      case e: IOException =>
+        throw new StorageClientException(s"Failed to update 
$index/$estype/$name", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESUtils.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESUtils.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESUtils.scala
new file mode 100644
index 0000000..ed46822
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESUtils.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.nio.entity.NStringEntity
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.apache.http.util.EntityUtils
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+import org.joda.time.DateTimeZone
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.http.HttpHost
+
+object ESUtils {
+  val scrollLife = "1m"
+
+  def getAll[T: Manifest](
+    client: RestClient,
+    index: String,
+    estype: String,
+    query: String)(
+      implicit formats: Formats): Seq[T] = {
+
+    @scala.annotation.tailrec
+    def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = 
{
+      if (hits.isEmpty) results
+      else {
+        val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId)
+        val scrollBody = new StringEntity(compact(render(json)))
+        val response = client.performRequest(
+          "POST",
+          "/_search/scroll",
+          Map[String, String](),
+          scrollBody)
+        val responseJValue = parse(EntityUtils.toString(response.getEntity))
+        scroll((responseJValue \ "_scroll_id").extract[String],
+          (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
+          hits.map(h => (h \ "_source").extract[T]) ++ results)
+      }
+    }
+
+    val response = client.performRequest(
+      "POST",
+      s"/$index/$estype/_search",
+      Map("scroll" -> scrollLife),
+      new StringEntity(query))
+    val responseJValue = parse(EntityUtils.toString(response.getEntity))
+    scroll((responseJValue \ "_scroll_id").extract[String],
+        (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
+        Nil)
+  }
+
+  def createIndex(
+    client: RestClient,
+    index: String): Unit = {
+    client.performRequest(
+      "HEAD",
+      s"/$index",
+      Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+        case 404 =>
+          client.performRequest(
+            "PUT",
+            s"/$index",
+            Map.empty[String, String].asJava)
+        case 200 =>
+        case _ =>
+          throw new IllegalStateException(s"/$index is invalid.")
+      }
+  }
+
+  def createMapping(
+    client: RestClient,
+    index: String,
+    estype: String,
+    json: String): Unit = {
+    client.performRequest(
+      "HEAD",
+      s"/$index/_mapping/$estype",
+      Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+        case 404 =>
+          val entity = new NStringEntity(json, ContentType.APPLICATION_JSON)
+          client.performRequest(
+            "PUT",
+            s"/$index/_mapping/$estype",
+            Map.empty[String, String].asJava,
+            entity)
+        case 200 =>
+        case _ =>
+          throw new IllegalStateException(s"/$index/$estype is invalid: $json")
+      }
+  }
+
+  def createEventQuery(
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    reversed: Option[Boolean] = None): String = {
+    val mustQueries = Seq(
+      startTime.map(x => {
+        val v = DateTimeFormat
+          
.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
+        s"""{"range":{"eventTime":{"gte":"${v}"}}}"""
+      }),
+      untilTime.map(x => {
+        val v = DateTimeFormat
+          
.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
+        s"""{"range":{"eventTime":{"gte":"${v}"}}}"""
+      }),
+      entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""),
+      entityId.map(x => s"""{"term":{"entityId":"${x}"}}"""),
+      targetEntityType.flatMap(xx => xx.map(x => 
s"""{"term":{"targetEntityType":"${x}"}}""")),
+      targetEntityId.flatMap(xx => xx.map(x => 
s"""{"term":{"targetEntityId":"${x}"}}""")),
+      eventNames
+        .map { xx => xx.map(x => "\"%s\"".format(x)) }
+        .map(x => 
s"""{"terms":{"event":[${x.mkString(",")}]}}""")).flatten.mkString(",")
+    val sortOrder = reversed.map(x => x match {
+      case true => "desc"
+      case _ => "asc"
+    })
+    s"""{
+       |"query":{"bool":{"must":[${mustQueries}]}},
+       |"sort":[{"eventTime":{"order":"${sortOrder}"}}]
+       |}""".stripMargin
+  }
+
+  def getHttpHosts(config: StorageClientConfig): Seq[HttpHost] = {
+    val hosts = config.properties.get("HOSTS").
+      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+    val ports = config.properties.get("PORTS").
+      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200))
+    val schemes = config.properties.get("SCHEMES").
+      map(_.split(",").toSeq).getOrElse(Seq("http"))
+    (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/StorageClient.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/StorageClient.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/StorageClient.scala
new file mode 100644
index 0000000..540892b
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/StorageClient.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import org.apache.http.HttpHost
+import org.apache.predictionio.data.storage.BaseStorageClient
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.RestClient
+
+import grizzled.slf4j.Logging
+
+case class ESClient(hosts: Seq[HttpHost]) {
+  def open(): RestClient = {
+    try {
+      RestClient.builder(hosts: _*).build()
+    } catch {
+      case e: Throwable =>
+        throw new StorageClientException(e.getMessage, e)
+    }
+  }
+}
+
+class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
+    with Logging {
+  override val prefix = "ES"
+
+  val client = ESClient(ESUtils.getHttpHosts(config))
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/package.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/package.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/package.scala
new file mode 100644
index 0000000..5cb423a
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage
+
+/** Elasticsearch implementation of storage traits, supporting meta data only
+  *
+  * @group Implementation
+  */
+package object elasticsearch5 {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index a8d730b..5420004 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -19,11 +19,11 @@ import sbt._
 import Keys._
 
 object PIOBuild extends Build {
+  val elasticsearch5Version = SettingKey[String](
+    "elasticsearch5-version",
+    "The version of Elasticsearch 5.x used for building.")
   val elasticsearchVersion = SettingKey[String](
     "elasticsearch-version",
-    "The version of Elasticsearch used for building.")
-  val elasticsearch1Version = SettingKey[String](
-    "elasticsearch1-version",
     "The version of Elasticsearch 1.x used for building.")
   val json4sVersion = SettingKey[String](
     "json4s-version",

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
index 10aca41..877da06 100644
--- 
a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
+++ 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
@@ -108,7 +108,7 @@ object Management extends EitherLogging {
     info("Inspecting Apache Spark...")
     val sparkHomePath = Common.getSparkHome(sparkHome)
     if (new File(s"$sparkHomePath/bin/spark-submit").exists) {
-      info(s"Apache Spark is installed at $sparkHome")
+      info(s"Apache Spark is installed at $sparkHomePath")
       val sparkMinVersion = "1.3.0"
       pioStatus = pioStatus.copy(
         sparkHome = sparkHomePath,

Reply via email to