Add support for Elasticsearch 5.x
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/36b79d7d Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/36b79d7d Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/36b79d7d Branch: refs/heads/feature/es5 Commit: 36b79d7d029d086d14b74c675308fca948c6aa85 Parents: 09d04ed Author: takahiro-hagino <[email protected]> Authored: Mon Jan 16 11:50:48 2017 +0900 Committer: takahiro-hagino <[email protected]> Committed: Mon Jan 16 13:41:44 2017 +0900 ---------------------------------------------------------------------- bin/install.sh | 4 +- bin/pio-start-all | 94 ++++--- bin/pio-stop-all | 50 ++-- build.sbt | 4 +- conf/pio-env.sh.template | 8 +- core/build.sbt | 40 +-- .../predictionio/workflow/CreateWorkflow.scala | 74 ++--- data/build.sbt | 49 ++-- .../storage/elasticsearch/ESAccessKeys.scala | 140 ++++++---- .../data/storage/elasticsearch/ESApps.scala | 162 +++++++---- .../data/storage/elasticsearch/ESChannels.scala | 132 +++++---- .../elasticsearch/ESEngineInstances.scala | 233 ++++++++++------ .../elasticsearch/ESEngineManifests.scala | 111 +++++--- .../elasticsearch/ESEvaluationInstances.scala | 176 +++++++----- .../storage/elasticsearch/ESEventsUtil.scala | 125 +++++++++ .../data/storage/elasticsearch/ESLEvents.scala | 269 +++++++++++++++++++ .../data/storage/elasticsearch/ESPEvents.scala | 151 +++++++++++ .../storage/elasticsearch/ESSequences.scala | 69 ++--- .../data/storage/elasticsearch/ESUtils.scala | 159 +++++++++-- .../storage/elasticsearch/StorageClient.scala | 24 +- tools/build.sbt | 4 +- 21 files changed, 1518 insertions(+), 560 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/bin/install.sh ---------------------------------------------------------------------- diff --git a/bin/install.sh b/bin/install.sh index f17cde5..e4fe220 100755 --- a/bin/install.sh +++ b/bin/install.sh @@ -19,9 +19,9 @@ OS=`uname` PIO_VERSION=0.11.0-SNAPSHOT -SPARK_VERSION=1.6.2 +SPARK_VERSION=1.6.3 # Looks like support for Elasticsearch 2.0 will require 2.0 so deferring -ELASTICSEARCH_VERSION=1.7.5 +ELASTICSEARCH_VERSION=5.1.2 HBASE_VERSION=1.2.2 POSTGRES_VERSION=9.4-1204.jdbc41 MYSQL_VERSION=5.1.37 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/bin/pio-start-all ---------------------------------------------------------------------- diff --git a/bin/pio-start-all b/bin/pio-start-all index a78b0d2..03e10ae 100755 --- a/bin/pio-start-all +++ b/bin/pio-start-all @@ -25,63 +25,73 @@ export PIO_HOME="$(cd `dirname $0`/..; pwd)" . ${PIO_HOME}/bin/load-pio-env.sh +SOURCE_TYPE=$PIO_STORAGE_REPOSITORIES_METADATA_SOURCE +SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE +SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE + # Elasticsearch -echo "Starting Elasticsearch..." -if [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME" ]; then - if [ -n "$JAVA_HOME" ]; then - JPS=`$JAVA_HOME/bin/jps` +if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then + echo "Starting Elasticsearch..." + if [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME" ]; then + if [ -n "$JAVA_HOME" ]; then + JPS=`$JAVA_HOME/bin/jps` + else + JPS=`jps` + fi + if [[ ${JPS} =~ "Elasticsearch" ]]; then + echo -e "\033[0;31mElasticsearch is already running. Please use pio-stop-all to try stopping it first.\033[0m" + echo -e "\033[0;31mNote: If you started Elasticsearch manually, you will need to kill it manually.\033[0m" + echo -e "\033[0;31mAborting...\033[0m" + exit 1 + else + $PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid + fi else - JPS=`jps` - fi - if [[ ${JPS} =~ "Elasticsearch" ]]; then - echo -e "\033[0;31mElasticsearch is already running. Please use pio-stop-all to try stopping it first.\033[0m" - echo -e "\033[0;31mNote: If you started Elasticsearch manually, you will need to kill it manually.\033[0m" - echo -e "\033[0;31mAborting...\033[0m" + echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME in conf/pio-env.sh, or in your environment.\033[0m" + echo -e "\033[0;31mCannot start Elasticsearch. Aborting...\033[0m" exit 1 - else - $PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid fi -else - echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME in conf/pio-env.sh, or in your environment.\033[0m" - echo -e "\033[0;31mCannot start Elasticsearch. Aborting...\033[0m" - exit 1 fi # HBase -echo "Starting HBase..." -if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then - $PIO_STORAGE_SOURCES_HBASE_HOME/bin/start-hbase.sh -else - echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_HBASE_HOME in conf/pio-env.sh, or in your environment.\033[0m" - # Kill everything for cleanliness - echo -e "\033[0;31mCannot start HBase. Aborting...\033[0m" - sleep 3 - ${PIO_HOME}/bin/pio-stop-all - exit 1 +if [ `echo $SOURCE_TYPE | grep -i hbase | wc -l` != 0 ] ; then + echo "Starting HBase..." + if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then + $PIO_STORAGE_SOURCES_HBASE_HOME/bin/start-hbase.sh + else + echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_HBASE_HOME in conf/pio-env.sh, or in your environment.\033[0m" + # Kill everything for cleanliness + echo -e "\033[0;31mCannot start HBase. Aborting...\033[0m" + sleep 3 + ${PIO_HOME}/bin/pio-stop-all + exit 1 + fi fi #PGSQL -pgsqlStatus="$(ps auxwww | grep postgres | wc -l)" -if [[ "$pgsqlStatus" < 5 ]]; then - # Detect OS - OS=`uname` - if [[ "$OS" = "Darwin" ]]; then - pg_cmd=`which pg_ctl` - if [[ "$pg_cmd" != "" ]]; then - pg_ctl -D /usr/local/var/postgres -l /usr/local/var/postgres/server.log start +if [ `echo $SOURCE_TYPE | grep -i pgsql | wc -l` != 0 ] ; then + pgsqlStatus="$(ps auxwww | grep postgres | wc -l)" + if [[ "$pgsqlStatus" < 5 ]]; then + # Detect OS + OS=`uname` + if [[ "$OS" = "Darwin" ]]; then + pg_cmd=`which pg_ctl` + if [[ "$pg_cmd" != "" ]]; then + pg_ctl -D /usr/local/var/postgres -l /usr/local/var/postgres/server.log start + fi + elif [[ "$OS" = "Linux" ]]; then + sudo service postgresql start + else + echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m" + echo -e "\033[1;31mPlease do a manual startup!\033[0m" + ${PIO_HOME}/bin/pio-stop-all + exit 1 fi - elif [[ "$OS" = "Linux" ]]; then - sudo service postgresql start - else - echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m" - echo -e "\033[1;31mPlease do a manual startup!\033[0m" - ${PIO_HOME}/bin/pio-stop-all - exit 1 fi fi # PredictionIO Event Server -echo "Waiting 10 seconds for HBase to fully initialize..." +echo "Waiting 10 seconds for HBase/Elasticsearch to fully initialize..." sleep 10 echo "Starting PredictionIO Event Server..." ${PIO_HOME}/bin/pio-daemon ${PIO_HOME}/eventserver.pid eventserver --ip 0.0.0.0 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/bin/pio-stop-all ---------------------------------------------------------------------- diff --git a/bin/pio-stop-all b/bin/pio-stop-all index 4aab5a3..dabad5d 100755 --- a/bin/pio-stop-all +++ b/bin/pio-stop-all @@ -25,6 +25,10 @@ export PIO_HOME="$(cd `dirname $0`/..; pwd)" . ${PIO_HOME}/bin/load-pio-env.sh +SOURCE_TYPE=$PIO_STORAGE_REPOSITORIES_METADATA_SOURCE +SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE +SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE + # PredictionIO Event Server echo "Stopping PredictionIO Event Server..." PIDFILE=${PIO_HOME}/eventserver.pid @@ -34,30 +38,38 @@ if [ -e ${PIDFILE} ]; then fi # HBase -echo "Stopping HBase..." -if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then - $PIO_STORAGE_SOURCES_HBASE_HOME/bin/stop-hbase.sh +if [ `echo $SOURCE_TYPE | grep -i hbase | wc -l` != 0 ] ; then + echo "Stopping HBase..." + if [ -n "$PIO_STORAGE_SOURCES_HBASE_HOME" ]; then + $PIO_STORAGE_SOURCES_HBASE_HOME/bin/stop-hbase.sh + fi fi # Elasticsearch -echo "Stopping Elasticsearch..." -PIDFILE=${PIO_HOME}/es.pid -if [ -e ${PIDFILE} ]; then - cat ${PIDFILE} | xargs kill - rm ${PIDFILE} +if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then + echo "Stopping Elasticsearch..." + PIDFILE=${PIO_HOME}/es.pid + if [ -e ${PIDFILE} ]; then + cat ${PIDFILE} | xargs kill + rm ${PIDFILE} + fi fi #PGSQL -OS=`uname` -if [[ "$OS" = "Darwin" ]]; then - pg_cmd=`which pg_ctl` - if [[ "$pg_cmd" != "" ]]; then - pg_ctl -D /usr/local/var/postgres stop -s -m fast +if [ `echo $SOURCE_TYPE | grep -i pgsql | wc -l` != 0 ] ; then + if [ -n "$PIO_STORAGE_SOURCES_PGSQL_TYPE" ]; then + OS=`uname` + if [[ "$OS" = "Darwin" ]]; then + pg_cmd=`which pg_ctl` + if [[ "$pg_cmd" != "" ]]; then + pg_ctl -D /usr/local/var/postgres stop -s -m fast + fi + elif [[ "$OS" = "Linux" ]]; then + sudo service postgresql stop + else + echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m" + echo -e "\033[1;31mPlease do a manual shutdown!\033[0m" + exit 1 + fi fi -elif [[ "$OS" = "Linux" ]]; then - sudo service postgresql stop -else - echo -e "\033[1;31mYour OS $OS is not yet supported for automatic postgresql startup:(\033[0m" - echo -e "\033[1;31mPlease do a manual shutdown!\033[0m" - exit 1 fi http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index 21c31c6..4d6a5b6 100644 --- a/build.sbt +++ b/build.sbt @@ -34,11 +34,11 @@ fork in (ThisBuild, run) := true javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.7", "-target", "1.7", "-Xlint:deprecation", "-Xlint:unchecked") -elasticsearchVersion in ThisBuild := "1.4.4" +elasticsearchVersion in ThisBuild := "5.1.2" json4sVersion in ThisBuild := "3.2.10" -sparkVersion in ThisBuild := "1.4.0" +sparkVersion in ThisBuild := "1.6.3" lazy val pioBuildInfoSettings = buildInfoSettings ++ Seq( sourceGenerators in Compile <+= buildInfo, http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/conf/pio-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template index a06cd8e..a2841e3 100644 --- a/conf/pio-env.sh.template +++ b/conf/pio-env.sh.template @@ -24,7 +24,7 @@ # you need to change these to fit your site. # SPARK_HOME: Apache Spark is a hard dependency and must be configured. -SPARK_HOME=$PIO_HOME/vendors/spark-1.5.1-bin-hadoop2.6 +SPARK_HOME=$PIO_HOME/vendors/spark-1.6.3-bin-hadoop2.6 POSTGRES_JDBC_DRIVER=$PIO_HOME/lib/postgresql-9.4-1204.jdbc41.jar MYSQL_JDBC_DRIVER=$PIO_HOME/lib/mysql-connector-java-5.1.37.jar @@ -85,10 +85,10 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio # Elasticsearch Example # PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch -# PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name> # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost -# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300 -# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-1.4.4 +# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200 +# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http +# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2 # Local File System Example # PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/core/build.sbt ---------------------------------------------------------------------- diff --git a/core/build.sbt b/core/build.sbt index 637d4ea..abd8e07 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -18,27 +18,27 @@ name := "apache-predictionio-core" libraryDependencies ++= Seq( - "com.github.scopt" %% "scopt" % "3.3.0", - "com.google.code.gson" % "gson" % "2.5", - "com.google.guava" % "guava" % "18.0", - "com.twitter" %% "chill" % "0.7.2" + "com.github.scopt" %% "scopt" % "3.3.0", + "com.google.code.gson" % "gson" % "2.5", + "com.google.guava" % "guava" % "18.0", + "com.twitter" %% "chill" % "0.7.2" exclude("com.esotericsoftware.minlog", "minlog"), - "com.twitter" %% "chill-bijection" % "0.7.2", - "de.javakaffee" % "kryo-serializers" % "0.37", - "commons-io" % "commons-io" % "2.4", - "io.spray" %% "spray-can" % "1.3.3", - "io.spray" %% "spray-routing" % "1.3.3", - "net.jodah" % "typetools" % "0.3.1", - "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", - "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", - "org.clapper" %% "grizzled-slf4j" % "1.0.2", - "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value, - "org.json4s" %% "json4s-native" % json4sVersion.value, - "org.json4s" %% "json4s-ext" % json4sVersion.value, - "org.scalaj" %% "scalaj-http" % "1.1.6", - "org.scalatest" %% "scalatest" % "2.1.7" % "test", - "org.slf4j" % "slf4j-log4j12" % "1.7.18", - "org.specs2" %% "specs2" % "2.3.13" % "test") + "com.twitter" %% "chill-bijection" % "0.7.2", + "de.javakaffee" % "kryo-serializers" % "0.37", + "commons-io" % "commons-io" % "2.4", + "io.spray" %% "spray-can" % "1.3.3", + "io.spray" %% "spray-routing" % "1.3.3", + "net.jodah" % "typetools" % "0.3.1", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", + "org.clapper" %% "grizzled-slf4j" % "1.0.2", + "org.elasticsearch.client" % "rest" % elasticsearchVersion.value, + "org.json4s" %% "json4s-native" % json4sVersion.value, + "org.json4s" %% "json4s-ext" % json4sVersion.value, + "org.scalaj" %% "scalaj-http" % "1.1.6", + "org.scalatest" %% "scalatest" % "2.1.7" % "test", + "org.slf4j" % "slf4j-log4j12" % "1.7.18", + "org.specs2" %% "specs2" % "2.3.13" % "test") //testOptions := Seq(Tests.Filter(s => Seq("Dev").exists(s.contains(_)))) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala index 899ace2..edfc1b6 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala @@ -223,36 +223,40 @@ object CreateWorkflow extends Logging { engineFactoryObj.engineParams(wfc.engineParamsKey) } - val engineInstance = EngineInstance( - id = "", - status = "INIT", - startTime = DateTime.now, - endTime = DateTime.now, - engineId = wfc.engineId, - engineVersion = wfc.engineVersion, - engineVariant = variantId, - engineFactory = engineFactory, - batch = wfc.batch, - env = pioEnvVars, - sparkConf = workflowParams.sparkEnv, - dataSourceParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams), - preparatorParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams), - algorithmsParams = - JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList), - servingParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams)) + try { + val engineInstance = EngineInstance( + id = "", + status = "INIT", + startTime = DateTime.now, + endTime = DateTime.now, + engineId = wfc.engineId, + engineVersion = wfc.engineVersion, + engineVariant = variantId, + engineFactory = engineFactory, + batch = wfc.batch, + env = pioEnvVars, + sparkConf = workflowParams.sparkEnv, + dataSourceParams = + JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams), + preparatorParams = + JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams), + algorithmsParams = + JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList), + servingParams = + JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams)) - val engineInstanceId = Storage.getMetaDataEngineInstances.insert( - engineInstance) + val engineInstanceId = Storage.getMetaDataEngineInstances.insert( + engineInstance) - CoreWorkflow.runTrain( - env = pioEnvVars, - params = workflowParams, - engine = trainableEngine, - engineParams = engineParams, - engineInstance = engineInstance.copy(id = engineInstanceId)) + CoreWorkflow.runTrain( + env = pioEnvVars, + params = workflowParams, + engine = trainableEngine, + engineParams = engineParams, + engineInstance = engineInstance.copy(id = engineInstanceId)) + } finally { + Storage.getLEvents().close() + } } else { val workflowParams = WorkflowParams( verbose = wfc.verbosity, @@ -267,11 +271,15 @@ object CreateWorkflow extends Logging { env = pioEnvVars, sparkConf = workflowParams.sparkEnv ) - Workflow.runEvaluation( - evaluation = evaluation.get, - engineParamsGenerator = engineParamsGenerator.get, - evaluationInstance = evaluationInstance, - params = workflowParams) + try { + Workflow.runEvaluation( + evaluation = evaluation.get, + engineParamsGenerator = engineParamsGenerator.get, + evaluationInstance = evaluationInstance, + params = workflowParams) + } finally { + Storage.getLEvents().close() + } } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/build.sbt ---------------------------------------------------------------------- diff --git a/data/build.sbt b/data/build.sbt index 4526c39..4ae9b42 100644 --- a/data/build.sbt +++ b/data/build.sbt @@ -18,41 +18,44 @@ name := "apache-predictionio-data" libraryDependencies ++= Seq( - "com.github.nscala-time" %% "nscala-time" % "2.6.0", - "commons-codec" % "commons-codec" % "1.9", - "io.spray" %% "spray-can" % "1.3.3", - "io.spray" %% "spray-routing" % "1.3.3", - "io.spray" %% "spray-testkit" % "1.3.3" % "test", - "mysql" % "mysql-connector-java" % "5.1.37" % "optional", - "org.apache.hadoop" % "hadoop-common" % "2.6.2" + "com.github.nscala-time" %% "nscala-time" % "2.6.0", + "commons-codec" % "commons-codec" % "1.9", + "io.spray" %% "spray-can" % "1.3.3", + "io.spray" %% "spray-routing" % "1.3.3", + "io.spray" %% "spray-testkit" % "1.3.3" % "test", + "mysql" % "mysql-connector-java" % "5.1.37" % "optional", + "org.apache.hadoop" % "hadoop-common" % "2.6.2" exclude("javax.servlet", "servlet-api"), - "org.apache.hbase" % "hbase-common" % "0.98.5-hadoop2", - "org.apache.hbase" % "hbase-client" % "0.98.5-hadoop2" + "org.apache.hbase" % "hbase-common" % "0.98.5-hadoop2", + "org.apache.hbase" % "hbase-client" % "0.98.5-hadoop2" exclude("org.apache.zookeeper", "zookeeper"), // added for Parallel storage interface - "org.apache.hbase" % "hbase-server" % "0.98.5-hadoop2" + "org.apache.hbase" % "hbase-server" % "0.98.5-hadoop2" exclude("org.apache.hbase", "hbase-client") exclude("org.apache.zookeeper", "zookeeper") exclude("javax.servlet", "servlet-api") exclude("org.mortbay.jetty", "servlet-api-2.5") exclude("org.mortbay.jetty", "jsp-api-2.1") exclude("org.mortbay.jetty", "jsp-2.1"), - "org.apache.zookeeper" % "zookeeper" % "3.4.7" + "org.apache.zookeeper" % "zookeeper" % "3.4.7" exclude("org.slf4j", "slf4j-api") exclude("org.slf4j", "slf4j-log4j12"), - "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", - "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", - "org.clapper" %% "grizzled-slf4j" % "1.0.2", - "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value, - "org.json4s" %% "json4s-native" % json4sVersion.value, - "org.json4s" %% "json4s-ext" % json4sVersion.value, - "org.postgresql" % "postgresql" % "9.4-1204-jdbc41", - "org.scalatest" %% "scalatest" % "2.1.7" % "test", - "org.scalikejdbc" %% "scalikejdbc" % "2.3.5", - "org.slf4j" % "slf4j-log4j12" % "1.7.18", - "org.spark-project.akka" %% "akka-actor" % "2.3.4-spark", - "org.specs2" %% "specs2" % "2.3.13" % "test") + "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", + "org.clapper" %% "grizzled-slf4j" % "1.0.2", + "org.elasticsearch.client" % "rest" % elasticsearchVersion.value, + "org.elasticsearch" % "elasticsearch-spark-13_2.10" % elasticsearchVersion.value % "provided", + "org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearchVersion.value, + "org.json4s" %% "json4s-native" % json4sVersion.value, + "org.json4s" %% "json4s-ext" % json4sVersion.value, + "org.postgresql" % "postgresql" % "9.4-1204-jdbc41", + "org.scalatest" %% "scalatest" % "2.1.7" % "test", + "org.scalikejdbc" %% "scalikejdbc" % "2.3.5", + "org.slf4j" % "slf4j-log4j12" % "1.7.18", + "org.spark-project.akka" %% "akka-actor" % "2.3.4-spark", + "org.specs2" %% "specs2" % "2.3.13" % "test") parallelExecution in Test := false pomExtra := childrenPomExtra.value + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index 077168a..2c69cf4 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -15,45 +15,41 @@ * limitations under the License. */ - package org.apache.predictionio.data.storage.elasticsearch -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.StorageClientConfig +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.AccessKey import org.apache.predictionio.data.storage.AccessKeys -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.json4s.JsonDSL._ +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient import org.json4s._ +import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write -import scala.util.Random +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException /** Elasticsearch implementation of AccessKeys. */ -class ESAccessKeys(client: Client, config: StorageClientConfig, index: String) +class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String) extends AccessKeys with Logging { implicit val formats = DefaultFormats.lossless private val estype = "accesskeys" - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } + ESUtils.createIndex(client, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("key" -> ("type" -> "keyword")) ~ + ("events" -> ("type" -> "keyword")))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(accessKey: AccessKey): Option[String] = { val key = if (accessKey.key.isEmpty) generateKey else accessKey.key @@ -61,59 +57,99 @@ class ESAccessKeys(client: Client, config: StorageClientConfig, index: String) Some(key) } - def get(key: String): Option[AccessKey] = { + def get(id: String): Option[AccessKey] = { try { - val response = client.prepareGet( - index, - estype, - key).get() - Some(read[AccessKey](response.getSourceAsString)) + val response = client.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[AccessKey]) + case _ => + None + } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error("Failed to access to /$index/$estype/$key", e) None - case e: NullPointerException => None } } def getAll(): Seq[AccessKey] = { try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[AccessKey](client, builder) + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[AccessKey](client, index, estype, compact(render(json))) } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[AccessKey]() + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil } } def getByAppid(appid: Int): Seq[AccessKey] = { try { - val builder = client.prepareSearch(index).setTypes(estype). - setPostFilter(termFilter("appid", appid)) - ESUtils.getAll[AccessKey](client, builder) + val json = + ("query" -> + ("term" -> + ("appid" -> appid))) + ESUtils.getAll[AccessKey](client, index, estype, compact(render(json))) } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[AccessKey]() + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil } } def update(accessKey: AccessKey): Unit = { + val id = accessKey.key try { - client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get() + val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) } } - def delete(key: String): Unit = { + def delete(id: String): Unit = { try { - client.prepareDelete(index, estype, key).get + val response = client.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/id") + } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/id", e) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index 3781a4b..7a65379 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -15,116 +15,160 @@ * limitations under the License. */ - package org.apache.predictionio.data.storage.elasticsearch -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.StorageClientConfig +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.App import org.apache.predictionio.data.storage.Apps -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.json4s.JsonDSL._ +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient import org.json4s._ +import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + /** Elasticsearch implementation of Items. */ -class ESApps(client: Client, config: StorageClientConfig, index: String) - extends Apps with Logging { +class ESApps(client: RestClient, config: StorageClientConfig, index: String) + extends Apps with Logging { implicit val formats = DefaultFormats.lossless private val estype = "apps" private val seq = new ESSequences(client, config, index) - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } + ESUtils.createIndex(client, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0))~ + ("properties" -> + ("id" -> ("type" -> "keyword")) ~ + ("name" -> ("type" -> "keyword")))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(app: App): Option[Int] = { val id = if (app.id == 0) { - var roll = seq.genNext("apps") - while (!get(roll).isEmpty) roll = seq.genNext("apps") + var roll = seq.genNext(estype) + while (!get(roll).isEmpty) roll = seq.genNext(estype) roll - } - else app.id - val realapp = app.copy(id = id) - update(realapp) + } else app.id + update(app.copy(id = id)) Some(id) } def get(id: Int): Option[App] = { try { - val response = client.prepareGet( - index, - estype, - id.toString).get() - Some(read[App](response.getSourceAsString)) + val response = client.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[App]) + case _ => + None + } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) None - case e: NullPointerException => None } } def getByName(name: String): Option[App] = { try { - val response = client.prepareSearch(index).setTypes(estype). - setPostFilter(termFilter("name", name)).get - val hits = response.getHits().hits() - if (hits.size > 0) { - Some(read[App](hits.head.getSourceAsString)) - } else { - None + val json = + ("query" -> + ("term" -> + ("name" -> name))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/_search", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "hits" \ "total").extract[Long] match { + case 0 => None + case _ => + val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] + val result = (results.head \ "_source").extract[App] + Some(result) } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: IOException => + error(s"Failed to access to /$index/$estype/_search", e) None } } def getAll(): Seq[App] = { try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[App](client, builder) + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[App](client, index, estype, compact(render(json))) } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[App]() + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil } } def update(app: App): Unit = { + val id = app.id.toString try { - val response = client.prepareIndex(index, estype, app.id.toString). - setSource(write(app)).get() + val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON); + val response = client.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) } } def delete(id: Int): Unit = { try { - client.prepareDelete(index, estype, id.toString).get + val response = client.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/id", e) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala index 52697fd..c90d668 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -15,102 +15,134 @@ * limitations under the License. */ - package org.apache.predictionio.data.storage.elasticsearch -import grizzled.slf4j.Logging +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.Channel import org.apache.predictionio.data.storage.Channels import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders.termFilter -import org.json4s.DefaultFormats +import org.elasticsearch.client.RestClient +import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write -class ESChannels(client: Client, config: StorageClientConfig, index: String) - extends Channels with Logging { +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException +class ESChannels(client: RestClient, config: StorageClientConfig, index: String) + extends Channels with Logging { implicit val formats = DefaultFormats.lossless private val estype = "channels" private val seq = new ESSequences(client, config, index) - private val seqName = "channels" - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } + ESUtils.createIndex(client, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0))~ + ("properties" -> + ("name" -> ("type" -> "keyword")))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(channel: Channel): Option[Int] = { val id = if (channel.id == 0) { - var roll = seq.genNext(seqName) - while (!get(roll).isEmpty) roll = seq.genNext(seqName) + var roll = seq.genNext(estype) + while (!get(roll).isEmpty) roll = seq.genNext(estype) roll } else channel.id - val realChannel = channel.copy(id = id) - if (update(realChannel)) Some(id) else None + if (update(channel.copy(id = id))) Some(id) else None } def get(id: Int): Option[Channel] = { try { - val response = client.prepareGet( - index, - estype, - id.toString).get() - Some(read[Channel](response.getSourceAsString)) + val response = client.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[Channel]) + case _ => + None + } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) None - case e: NullPointerException => None } } def getByAppid(appid: Int): Seq[Channel] = { try { - val builder = client.prepareSearch(index).setTypes(estype). - setPostFilter(termFilter("appid", appid)) - ESUtils.getAll[Channel](client, builder) + val json = + ("query" -> + ("term" -> + ("appid" -> appid))) + ESUtils.getAll[Channel](client, index, estype, compact(render(json))) } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq[Channel]() + case e: IOException => + error(s"Failed to access to /$index/$estype/_search", e) + Nil } } def update(channel: Channel): Boolean = { + val id = channel.id.toString try { - val response = client.prepareIndex(index, estype, channel.id.toString). - setSource(write(channel)).get() - true + val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "created" => true + case "updated" => true + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + false + } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) false } } def delete(id: Int): Unit = { try { - client.prepareDelete(index, estype, id.toString).get + val response = client.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index 21690bf..08f87f3 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -15,144 +15,211 @@ * limitations under the License. */ - package org.apache.predictionio.data.storage.elasticsearch -import grizzled.slf4j.Logging +import java.io.IOException + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.EngineInstance import org.apache.predictionio.data.storage.EngineInstanceSerializer import org.apache.predictionio.data.storage.EngineInstances import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.elasticsearch.search.sort.SortOrder -import org.json4s.JsonDSL._ +import org.elasticsearch.client.RestClient import org.json4s._ +import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write -class ESEngineInstances(client: Client, config: StorageClientConfig, index: String) - extends EngineInstances with Logging { +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String) + extends EngineInstances with Logging { implicit val formats = DefaultFormats + new EngineInstanceSerializer private val estype = "engine_instances" - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineVersion" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineVariant" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineFactory" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("batch" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("dataSourceParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("preparatorParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("algorithmsParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("servingParams" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } + ESUtils.createIndex(client, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0))~ + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("engineId" -> ("type" -> "keyword")) ~ + ("engineVersion" -> ("type" -> "keyword")) ~ + ("engineVariant" -> ("type" -> "keyword")) ~ + ("engineFactory" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("dataSourceParams" -> ("type" -> "keyword")) ~ + ("preparatorParams" -> ("type" -> "keyword")) ~ + ("algorithmsParams" -> ("type" -> "keyword")) ~ + ("servingParams" -> ("type" -> "keyword")) ~ + ("status" -> ("type" -> "keyword")))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(i: EngineInstance): String = { + val id = i.id match { + case x if x.isEmpty => + @scala.annotation.tailrec + def generateId(newId: Option[String]): String = { + newId match { + case Some(x) => x + case _ => generateId(preInsert()) + } + } + generateId(preInsert()) + case x => x + } + + update(i.copy(id = id)) + id + } + + def preInsert(): Option[String] = { try { - val response = client.prepareIndex(index, estype). - setSource(write(i)).get - response.getId + val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + Some((jsonResponse \ "_id").extract[String]) + case _ => + error(s"[$result] Failed to create $index/$estype") + None + } } catch { - case e: ElasticsearchException => - error(e.getMessage) - "" + case e: IOException => + error(s"Failed to create $index/$estype", e) + None } } def get(id: String): Option[EngineInstance] = { try { - val response = client.prepareGet(index, estype, id).get - if (response.isExists) { - Some(read[EngineInstance](response.getSourceAsString)) - } else { - None + val response = client.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[EngineInstance]) + case _ => + None } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) None } } def getAll(): Seq[EngineInstance] = { try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[EngineInstance](client, builder) + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json))) } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil } } def getCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Seq[EngineInstance] = { + engineId: String, + engineVersion: String, + engineVariant: String): Seq[EngineInstance] = { try { - val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( - andFilter( - termFilter("status", "COMPLETED"), - termFilter("engineId", engineId), - termFilter("engineVersion", engineVersion), - termFilter("engineVariant", engineVariant))). - addSort("startTime", SortOrder.DESC) - ESUtils.getAll[EngineInstance](client, builder) + val json = + ("query" -> + ("bool" -> + ("must" -> List( + ("term" -> + ("status" -> "COMPLETED")), + ("term" -> + ("engineId" -> engineId)), + ("term" -> + ("engineVersion" -> engineVersion)), + ("term" -> + ("engineVariant" -> engineVariant)))))) ~ + ("sort" -> List( + ("startTime" -> + ("order" -> "desc")))) + ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json))) } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() + case e: IOException => + error(s"Failed to access to /$index/$estype/_search", e) + Nil } } def getLatestCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Option[EngineInstance] = + engineId: String, + engineVersion: String, + engineVariant: String): Option[EngineInstance] = getCompleted( engineId, engineVersion, engineVariant).headOption def update(i: EngineInstance): Unit = { + val id = i.id try { - client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get + val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } } catch { - case e: ElasticsearchException => error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) } } def delete(id: String): Unit = { try { - val response = client.prepareDelete(index, estype, id).get + val response = client.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } } catch { - case e: ElasticsearchException => error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala index 65b6691..a965c71 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala @@ -15,59 +15,96 @@ * limitations under the License. */ - package org.apache.predictionio.data.storage.elasticsearch -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.EngineManifestSerializer -import org.apache.predictionio.data.storage.StorageClientConfig +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.EngineManifest +import org.apache.predictionio.data.storage.EngineManifestSerializer import org.apache.predictionio.data.storage.EngineManifests -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient import org.json4s._ -import org.json4s.native.Serialization.read +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write -class ESEngineManifests(client: Client, config: StorageClientConfig, index: String) - extends EngineManifests with Logging { +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: String) + extends EngineManifests with Logging { implicit val formats = DefaultFormats + new EngineManifestSerializer private val estype = "engine_manifests" - private def esid(id: String, version: String) = s"$id $version" + private def esid(id: String, version: String) = s"$id-$version" def insert(engineManifest: EngineManifest): Unit = { - val json = write(engineManifest) - val response = client.prepareIndex( - index, - estype, - esid(engineManifest.id, engineManifest.version)). - setSource(json).execute().actionGet() + val id = esid(engineManifest.id, engineManifest.version) + try { + val entity = new NStringEntity(write(engineManifest), ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) + } } def get(id: String, version: String): Option[EngineManifest] = { + val esId = esid(id, version) try { - val response = client.prepareGet(index, estype, esid(id, version)). - execute().actionGet() - if (response.isExists) { - Some(read[EngineManifest](response.getSourceAsString)) - } else { - None + val response = client.performRequest( + "GET", + s"/$index/$estype/$esId", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[EngineManifest]) + case _ => + None } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$esId", e) None } + } def getAll(): Seq[EngineManifest] = { try { - val builder = client.prepareSearch() - ESUtils.getAll[EngineManifest](client, builder) + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[EngineManifest](client, index, estype, compact(render(json))) } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil } } @@ -75,10 +112,22 @@ class ESEngineManifests(client: Client, config: StorageClientConfig, index: Stri insert(engineManifest) def delete(id: String, version: String): Unit = { + val esId = esid(id, version) try { - client.prepareDelete(index, estype, esid(id, version)).execute().actionGet() + val response = client.performRequest( + "DELETE", + s"/$index/$estype/$esId", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$esId") + } } catch { - case e: ElasticsearchException => error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/$esId", e) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 85bf820..0e71f79 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -15,122 +15,160 @@ * limitations under the License. */ - package org.apache.predictionio.data.storage.elasticsearch -import grizzled.slf4j.Logging +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.EvaluationInstance import org.apache.predictionio.data.storage.EvaluationInstanceSerializer import org.apache.predictionio.data.storage.EvaluationInstances import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.elasticsearch.index.query.FilterBuilders._ -import org.elasticsearch.search.sort.SortOrder -import org.json4s.JsonDSL._ +import org.apache.predictionio.data.storage.StorageClientException +import org.elasticsearch.client.RestClient import org.json4s._ +import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write -class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String) - extends EvaluationInstances with Logging { +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String) + extends EvaluationInstances with Logging { implicit val formats = DefaultFormats + new EvaluationInstanceSerializer private val estype = "evaluation_instances" + private val seq = new ESSequences(client, config, index) - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val json = - (estype -> - ("properties" -> - ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("evaluationClass" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("engineParamsGeneratorClass" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("batch" -> - ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ - ("evaluatorResults" -> - ("type" -> "string") ~ ("index" -> "no")) ~ - ("evaluatorResultsHTML" -> - ("type" -> "string") ~ ("index" -> "no")) ~ - ("evaluatorResultsJSON" -> - ("type" -> "string") ~ ("index" -> "no")))) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(json))).get - } + ESUtils.createIndex(client, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0))~ + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("evaluationClass" -> ("type" -> "keyword")) ~ + ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~ + ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~ + ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no")))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(i: EvaluationInstance): String = { - try { - val response = client.prepareIndex(index, estype). - setSource(write(i)).get - response.getId - } catch { - case e: ElasticsearchException => - error(e.getMessage) - "" + val id = i.id match { + case x if x.isEmpty => + var roll = seq.genNext(estype).toString + while (!get(roll).isEmpty) roll = seq.genNext(estype).toString + roll + case x => x } + + update(i.copy(id = id)) + id } def get(id: String): Option[EvaluationInstance] = { try { - val response = client.prepareGet(index, estype, id).get - if (response.isExists) { - Some(read[EvaluationInstance](response.getSourceAsString)) - } else { - None + val response = client.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "found").extract[Boolean] match { + case true => + Some((jsonResponse \ "_source").extract[EvaluationInstance]) + case _ => + None } } catch { - case e: ElasticsearchException => - error(e.getMessage) + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => None + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + None + } + case e: IOException => + error(s"Failed to access to /$index/$estype/$id", e) None } } def getAll(): Seq[EvaluationInstance] = { try { - val builder = client.prepareSearch(index).setTypes(estype) - ESUtils.getAll[EvaluationInstance](client, builder) + val json = + ("query" -> + ("match_all" -> List.empty)) + ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json))) } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil } } def getCompleted(): Seq[EvaluationInstance] = { try { - val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( - termFilter("status", "EVALCOMPLETED")). - addSort("startTime", SortOrder.DESC) - ESUtils.getAll[EvaluationInstance](client, builder) + val json = + ("query" -> + ("term" -> + ("status" -> "EVALCOMPLETED"))) ~ + ("sort" -> + ("startTime" -> + ("order" -> "desc"))) + ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json))) } catch { - case e: ElasticsearchException => - error(e.getMessage) - Seq() + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + Nil } } def update(i: EvaluationInstance): Unit = { + val id = i.id try { - client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get + val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "created" => + case "updated" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } } catch { - case e: ElasticsearchException => error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) } } def delete(id: String): Unit = { try { - client.prepareDelete(index, estype, id).get + val response = client.performRequest( + "DELETE", + s"/$index/$estype/$id", + Map.empty[String, String].asJava) + val json = parse(EntityUtils.toString(response.getEntity)) + val result = (json \ "result").extract[String] + result match { + case "deleted" => + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + } } catch { - case e: ElasticsearchException => error(e.getMessage) + case e: IOException => + error(s"Failed to update $index/$estype/$id", e) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala new file mode 100644 index 0000000..f2ab7c2 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.MapWritable +import org.apache.hadoop.io.Text +import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventValidation +import org.joda.time.DateTime +import org.joda.time.DateTimeZone +import org.json4s._ + +object ESEventsUtil { + + implicit val formats = DefaultFormats + + def resultToEvent(id: Text, result: MapWritable, appId: Int): Event = { + + def getStringCol(col: String): String = { + val r = result.get(new Text(col)).asInstanceOf[Text] + require(r != null, + s"Failed to get value for column ${col}. " + + s"StringBinary: ${r.getBytes()}.") + + r.toString() + } + + def getOptStringCol(col: String): Option[String] = { + val r = result.get(new Text(col)) + if (r == null) { + None + } else { + Some(r.asInstanceOf[Text].toString()) + } + } + + val tmp = result + .get(new Text("properties")).asInstanceOf[MapWritable] + .get(new Text("fields")).asInstanceOf[MapWritable] + .get(new Text("rating")) + + val rating = + if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable] + else if (tmp.isInstanceOf[LongWritable]) { + new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble) + } + else null + + val properties: DataMap = + if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""") + else DataMap() + + + val eventId = Some(getStringCol("eventId")) + val event = getStringCol("event") + val entityType = getStringCol("entityType") + val entityId = getStringCol("entityId") + val targetEntityType = getOptStringCol("targetEntityType") + val targetEntityId = getOptStringCol("targetEntityId") + val prId = getOptStringCol("prId") + val eventTimeZone = getOptStringCol("eventTimeZone") + .map(DateTimeZone.forID(_)) + .getOrElse(EventValidation.defaultTimeZone) + val eventTime = new DateTime( + getStringCol("eventTime"), eventTimeZone) + val creationTimeZone = getOptStringCol("creationTimeZone") + .map(DateTimeZone.forID(_)) + .getOrElse(EventValidation.defaultTimeZone) + val creationTime: DateTime = new DateTime( + getStringCol("creationTime"), creationTimeZone) + + + Event( + eventId = eventId, + event = event, + entityType = entityType, + entityId = entityId, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + properties = properties, + eventTime = eventTime, + tags = Seq(), + prId = prId, + creationTime = creationTime + ) + } + + def eventToPut(event: Event, appId: Int): Seq[Map[String, Any]] = { + Seq( + Map( + "eventId" -> event.eventId, + "event" -> event.event, + "entityType" -> event.entityType, + "entityId" -> event.entityId, + "targetEntityType" -> event.targetEntityType, + "targetEntityId" -> event.targetEntityId, + "properties" -> event.properties, + "eventTime" -> event.eventTime, + "tags" -> event.tags, + "prId" -> event.prId, + "creationTime" -> event.creationTime + ) + ) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala new file mode 100644 index 0000000..ef25204 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import java.io.IOException + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.LEvents +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.RestClient +import org.joda.time.DateTime +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write +import org.json4s.ext.JodaTimeSerializers + +import grizzled.slf4j.Logging +import org.elasticsearch.client.ResponseException + +class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String) + extends LEvents with Logging { + implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all + private val seq = new ESSequences(client, config, index) + private val seqName = "events" + + def getEsType(appId: Int, channelId: Option[Int] = None): String = { + channelId.map { ch => + s"${appId}_${ch}" + }.getOrElse { + s"${appId}" + } + } + + override def init(appId: Int, channelId: Option[Int] = None): Boolean = { + val estype = getEsType(appId, channelId) + ESUtils.createIndex(client, index) + val json = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("name" -> ("type" -> "keyword")) ~ + ("eventId" -> ("type" -> "keyword")) ~ + ("event" -> ("type" -> "keyword")) ~ + ("entityType" -> ("type" -> "keyword")) ~ + ("entityId" -> ("type" -> "keyword")) ~ + ("targetEntityType" -> ("type" -> "keyword")) ~ + ("targetEntityId" -> ("type" -> "keyword")) ~ + ("properties" -> + ("type" -> "nested") ~ + ("properties" -> + ("fields" -> ("type" -> "nested") ~ + ("properties" -> + ("user" -> ("type" -> "long")) ~ + ("num" -> ("type" -> "long")))))) ~ + ("eventTime" -> ("type" -> "date")) ~ + ("tags" -> ("type" -> "keyword")) ~ + ("prId" -> ("type" -> "keyword")) ~ + ("creationTime" -> ("type" -> "date")))) + ESUtils.createMapping(client, index, estype, compact(render(json))) + true + } + + override def remove(appId: Int, channelId: Option[Int] = None): Boolean = { + val estype = getEsType(appId, channelId) + try { + val json = + ("query" -> + ("match_all" -> List.empty)) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + client.performRequest( + "POST", + s"/$index/$estype/_delete_by_query", + Map.empty[String, String].asJava, + entity).getStatusLine.getStatusCode match { + case 200 => true + case _ => + error(s"Failed to remove $index/$estype") + false + } + } catch { + case e: Exception => + error(s"Failed to remove $index/$estype", e) + false + } + } + + override def close(): Unit = { + try client.close() catch { + case e: Exception => + error("Failed to close client.", e) + } + } + + override def futureInsert( + event: Event, + appId: Int, + channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = { + Future { + val estype = getEsType(appId, channelId) + val id = event.eventId.getOrElse { + var roll = seq.genNext(seqName) + while (exists(estype, roll)) roll = seq.genNext(seqName) + roll.toString + } + val json = write(event.copy(eventId = Some(id))) + try { + val entity = new NStringEntity(json, ContentType.APPLICATION_JSON); + val response = client.performRequest( + "POST", + s"/$index/$estype/$id", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => id + case "updated" => id + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + "" + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/$id: $json", e) + "" + } + } + } + + private def exists(estype: String, id: Int): Boolean = { + try { + client.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + case 200 => true + case _ => false + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => false + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + false + } + case e: IOException => + error(s"Failed to access to $index/$estype/$id", e) + false + } + } + + override def futureGet( + eventId: String, + appId: Int, + channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { + Future { + val estype = getEsType(appId, channelId) + try { + val json = + ("query" -> + ("term" -> + ("eventId" -> eventId))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/_search", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + (jsonResponse \ "hits" \ "total").extract[Long] match { + case 0 => None + case _ => + val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] + val result = (results.head \ "_source").extract[Event] + Some(result) + } + } catch { + case e: IOException => + error("Failed to access to /$index/$estype/_search", e) + None + } + } + } + + override def futureDelete( + eventId: String, + appId: Int, + channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = { + Future { + val estype = getEsType(appId, channelId) + try { + val json = + ("query" -> + ("term" -> + ("eventId" -> eventId))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/_delete_by_query", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => true + case _ => + error(s"[$result] Failed to update $index/$estype:$eventId") + false + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype:$eventId", e) + false + } + } + } + + override def futureFind( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + limit: Option[Int] = None, + reversed: Option[Boolean] = None) + (implicit ec: ExecutionContext): Future[Iterator[Event]] = { + Future { + val estype = getEsType(appId, channelId) + try { + val query = ESUtils.createEventQuery( + startTime, untilTime, entityType, entityId, + eventNames, targetEntityType, targetEntityId, None) + ESUtils.getAll[Event](client, index, estype, query).toIterator + } catch { + case e: IOException => + error(e.getMessage) + Iterator[Event]() + } + } + } + +}
