add elasticsearch 1.x support
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/bad2f038 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/bad2f038 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/bad2f038 Branch: refs/heads/feature/es5 Commit: bad2f0384d6d892eb576a6c2267bf88612a8eb66 Parents: 48e18b5 Author: Shinsuke Sugaya <[email protected]> Authored: Wed Jan 18 16:29:15 2017 +0900 Committer: Shinsuke Sugaya <[email protected]> Committed: Wed Jan 18 16:29:15 2017 +0900 ---------------------------------------------------------------------- .travis.yml | 4 +- bin/install.sh | 4 +- bin/pio-start-all | 9 +- build.sbt | 2 + conf/pio-env.sh.template | 6 + core/build.sbt | 1 + data/build.sbt | 1 + .../storage/elasticsearch1/ESAccessKeys.scala | 119 ++++++++++++++ .../data/storage/elasticsearch1/ESApps.scala | 130 +++++++++++++++ .../storage/elasticsearch1/ESChannels.scala | 117 ++++++++++++++ .../elasticsearch1/ESEngineInstances.scala | 158 +++++++++++++++++++ .../elasticsearch1/ESEngineManifests.scala | 84 ++++++++++ .../elasticsearch1/ESEvaluationInstances.scala | 136 ++++++++++++++++ .../storage/elasticsearch1/ESSequences.scala | 64 ++++++++ .../data/storage/elasticsearch1/ESUtils.scala | 48 ++++++ .../storage/elasticsearch1/StorageClient.scala | 50 ++++++ .../data/storage/elasticsearch1/package.scala | 25 +++ project/Build.scala | 3 + tests/Dockerfile | 2 + tests/docker-files/env-conf/pio-env.sh | 16 +- tools/build.sbt | 1 - 21 files changed, 969 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 68dee42..4d62999 100644 --- a/.travis.yml +++ b/.travis.yml @@ -63,8 +63,8 @@ env: matrix: - BUILD_TYPE=Unit - BUILD_TYPE=Integration METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL - - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS - - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS + - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH1 EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS + - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH1 EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS before_install: - unset SBT_OPTS JVM_OPTS http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/bin/install.sh ---------------------------------------------------------------------- diff --git a/bin/install.sh b/bin/install.sh index e4fe220..2f7c162 100755 --- a/bin/install.sh +++ b/bin/install.sh @@ -21,6 +21,7 @@ OS=`uname` PIO_VERSION=0.11.0-SNAPSHOT SPARK_VERSION=1.6.3 # Looks like support for Elasticsearch 2.0 will require 2.0 so deferring +#ELASTICSEARCH_VERSION=1.7.6 ELASTICSEARCH_VERSION=5.1.2 HBASE_VERSION=1.2.2 POSTGRES_VERSION=9.4-1204.jdbc41 @@ -352,7 +353,8 @@ installES() { fi if [[ ! -e elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz ]]; then echo "Downloading Elasticsearch..." - curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz + #curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz + curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz fi tar zxf elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz rm -rf ${elasticsearch_dir} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/bin/pio-start-all ---------------------------------------------------------------------- diff --git a/bin/pio-start-all b/bin/pio-start-all index 03e10ae..87130fa 100755 --- a/bin/pio-start-all +++ b/bin/pio-start-all @@ -33,6 +33,11 @@ SOURCE_TYPE=$SOURCE_TYPE$PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then echo "Starting Elasticsearch..." if [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME" ]; then + ELASTICSEARCH_HOME=$PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME + elif [ -n "$PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME" ]; then + ELASTICSEARCH_HOME=$PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME + fi + if [ -n "$ELASTICSEARCH_HOME" ]; then if [ -n "$JAVA_HOME" ]; then JPS=`$JAVA_HOME/bin/jps` else @@ -44,7 +49,7 @@ if [ `echo $SOURCE_TYPE | grep -i elasticsearch | wc -l` != 0 ] ; then echo -e "\033[0;31mAborting...\033[0m" exit 1 else - $PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid + $ELASTICSEARCH_HOME/bin/elasticsearch -d -p $PIO_HOME/es.pid fi else echo -e "\033[0;31mPlease set PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME in conf/pio-env.sh, or in your environment.\033[0m" @@ -91,7 +96,7 @@ if [ `echo $SOURCE_TYPE | grep -i pgsql | wc -l` != 0 ] ; then fi # PredictionIO Event Server -echo "Waiting 10 seconds for HBase/Elasticsearch to fully initialize..." +echo "Waiting 10 seconds for Storage Repositories to fully initialize..." sleep 10 echo "Starting PredictionIO Event Server..." ${PIO_HOME}/bin/pio-daemon ${PIO_HOME}/eventserver.pid eventserver --ip 0.0.0.0 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index 4d6a5b6..776b2ad 100644 --- a/build.sbt +++ b/build.sbt @@ -36,6 +36,8 @@ javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.7", "-target", "1.7", elasticsearchVersion in ThisBuild := "5.1.2" +elasticsearch1Version in ThisBuild := "1.7.6" + json4sVersion in ThisBuild := "3.2.10" sparkVersion in ThisBuild := "1.6.3" http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/conf/pio-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template index a2841e3..8f5d7b1 100644 --- a/conf/pio-env.sh.template +++ b/conf/pio-env.sh.template @@ -89,6 +89,12 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio # PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200 # PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2 +# Elasticsearch 1.x Example +# PIO_STORAGE_SOURCES_ELASTICSEARCH1_TYPE=elasticsearch1 +# PIO_STORAGE_SOURCES_ELASTICSEARCH1_CLUSTERNAME=<elasticsearch_cluster_name> +# PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOSTS=localhost +# PIO_STORAGE_SOURCES_ELASTICSEARCH1_PORTS=9300 +# PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME=$PIO_HOME/vendors/elasticsearch-1.7.6 # Local File System Example # PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/core/build.sbt ---------------------------------------------------------------------- diff --git a/core/build.sbt b/core/build.sbt index abd8e07..305075e 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -33,6 +33,7 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.clapper" %% "grizzled-slf4j" % "1.0.2", "org.elasticsearch.client" % "rest" % elasticsearchVersion.value, + "org.elasticsearch" % "elasticsearch" % elasticsearch1Version.value, "org.json4s" %% "json4s-native" % json4sVersion.value, "org.json4s" %% "json4s-ext" % json4sVersion.value, "org.scalaj" %% "scalaj-http" % "1.1.6", http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/build.sbt ---------------------------------------------------------------------- diff --git a/data/build.sbt b/data/build.sbt index 4ae9b42..75d3c09 100644 --- a/data/build.sbt +++ b/data/build.sbt @@ -44,6 +44,7 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.clapper" %% "grizzled-slf4j" % "1.0.2", "org.elasticsearch.client" % "rest" % elasticsearchVersion.value, + "org.elasticsearch" % "elasticsearch" % elasticsearch1Version.value, "org.elasticsearch" % "elasticsearch-spark-13_2.10" % elasticsearchVersion.value % "provided", "org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearchVersion.value, "org.json4s" %% "json4s-native" % json4sVersion.value, http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala new file mode 100644 index 0000000..7f50488 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch1 + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.AccessKey +import org.apache.predictionio.data.storage.AccessKeys +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +import scala.util.Random + +/** Elasticsearch implementation of AccessKeys. */ +class ESAccessKeys(client: Client, config: StorageClientConfig, index: String) + extends AccessKeys with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "accesskeys" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(accessKey: AccessKey): Option[String] = { + val key = if (accessKey.key.isEmpty) generateKey else accessKey.key + update(accessKey.copy(key = key)) + Some(key) + } + + def get(key: String): Option[AccessKey] = { + try { + val response = client.prepareGet( + index, + estype, + key).get() + Some(read[AccessKey](response.getSourceAsString)) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + case e: NullPointerException => None + } + } + + def getAll(): Seq[AccessKey] = { + try { + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[AccessKey](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[AccessKey]() + } + } + + def getByAppid(appid: Int): Seq[AccessKey] = { + try { + val builder = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("appid", appid)) + ESUtils.getAll[AccessKey](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[AccessKey]() + } + } + + def update(accessKey: AccessKey): Unit = { + try { + client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get() + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } + + def delete(key: String): Unit = { + try { + client.prepareDelete(index, estype, key).get + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala new file mode 100644 index 0000000..af61e17 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch1 + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.App +import org.apache.predictionio.data.storage.Apps +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +/** Elasticsearch implementation of Items. */ +class ESApps(client: Client, config: StorageClientConfig, index: String) + extends Apps with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "apps" + private val seq = new ESSequences(client, config, index) + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(app: App): Option[Int] = { + val id = + if (app.id == 0) { + var roll = seq.genNext("apps") + while (!get(roll).isEmpty) roll = seq.genNext("apps") + roll + } + else app.id + val realapp = app.copy(id = id) + update(realapp) + Some(id) + } + + def get(id: Int): Option[App] = { + try { + val response = client.prepareGet( + index, + estype, + id.toString).get() + Some(read[App](response.getSourceAsString)) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + case e: NullPointerException => None + } + } + + def getByName(name: String): Option[App] = { + try { + val response = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("name", name)).get + val hits = response.getHits().hits() + if (hits.size > 0) { + Some(read[App](hits.head.getSourceAsString)) + } else { + None + } + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + } + } + + def getAll(): Seq[App] = { + try { + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[App](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[App]() + } + } + + def update(app: App): Unit = { + try { + val response = client.prepareIndex(index, estype, app.id.toString). + setSource(write(app)).get() + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } + + def delete(id: Int): Unit = { + try { + client.prepareDelete(index, estype, id.toString).get + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala new file mode 100644 index 0000000..f955bee --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch1 + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.Channel +import org.apache.predictionio.data.storage.Channels +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders.termFilter +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +class ESChannels(client: Client, config: StorageClientConfig, index: String) + extends Channels with Logging { + + implicit val formats = DefaultFormats.lossless + private val estype = "channels" + private val seq = new ESSequences(client, config, index) + private val seqName = "channels" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(channel: Channel): Option[Int] = { + val id = + if (channel.id == 0) { + var roll = seq.genNext(seqName) + while (!get(roll).isEmpty) roll = seq.genNext(seqName) + roll + } else channel.id + + val realChannel = channel.copy(id = id) + if (update(realChannel)) Some(id) else None + } + + def get(id: Int): Option[Channel] = { + try { + val response = client.prepareGet( + index, + estype, + id.toString).get() + Some(read[Channel](response.getSourceAsString)) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + case e: NullPointerException => None + } + } + + def getByAppid(appid: Int): Seq[Channel] = { + try { + val builder = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("appid", appid)) + ESUtils.getAll[Channel](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[Channel]() + } + } + + def update(channel: Channel): Boolean = { + try { + val response = client.prepareIndex(index, estype, channel.id.toString). + setSource(write(channel)).get() + true + } catch { + case e: ElasticsearchException => + error(e.getMessage) + false + } + } + + def delete(id: Int): Unit = { + try { + client.prepareDelete(index, estype, id.toString).get + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala new file mode 100644 index 0000000..cc10ff0 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch1 + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EngineInstance +import org.apache.predictionio.data.storage.EngineInstanceSerializer +import org.apache.predictionio.data.storage.EngineInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.elasticsearch.search.sort.SortOrder +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +class ESEngineInstances(client: Client, config: StorageClientConfig, index: String) + extends EngineInstances with Logging { + implicit val formats = DefaultFormats + new EngineInstanceSerializer + private val estype = "engine_instances" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineVersion" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineVariant" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineFactory" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("batch" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("dataSourceParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("preparatorParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("algorithmsParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("servingParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(i: EngineInstance): String = { + try { + val response = client.prepareIndex(index, estype). + setSource(write(i)).get + response.getId + } catch { + case e: ElasticsearchException => + error(e.getMessage) + "" + } + } + + def get(id: String): Option[EngineInstance] = { + try { + val response = client.prepareGet(index, estype, id).get + if (response.isExists) { + Some(read[EngineInstance](response.getSourceAsString)) + } else { + None + } + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + } + } + + def getAll(): Seq[EngineInstance] = { + try { + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[EngineInstance](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq() + } + } + + def getCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Seq[EngineInstance] = { + try { + val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( + andFilter( + termFilter("status", "COMPLETED"), + termFilter("engineId", engineId), + termFilter("engineVersion", engineVersion), + termFilter("engineVariant", engineVariant))). + addSort("startTime", SortOrder.DESC) + ESUtils.getAll[EngineInstance](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq() + } + } + + def getLatestCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Option[EngineInstance] = + getCompleted( + engineId, + engineVersion, + engineVariant).headOption + + def update(i: EngineInstance): Unit = { + try { + client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get + } catch { + case e: ElasticsearchException => error(e.getMessage) + } + } + + def delete(id: String): Unit = { + try { + val response = client.prepareDelete(index, estype, id).get + } catch { + case e: ElasticsearchException => error(e.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala new file mode 100644 index 0000000..307b582 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch1 + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EngineManifestSerializer +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.EngineManifest +import org.apache.predictionio.data.storage.EngineManifests +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.json4s._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +class ESEngineManifests(client: Client, config: StorageClientConfig, index: String) + extends EngineManifests with Logging { + implicit val formats = DefaultFormats + new EngineManifestSerializer + private val estype = "engine_manifests" + private def esid(id: String, version: String) = s"$id $version" + + def insert(engineManifest: EngineManifest): Unit = { + val json = write(engineManifest) + val response = client.prepareIndex( + index, + estype, + esid(engineManifest.id, engineManifest.version)). + setSource(json).execute().actionGet() + } + + def get(id: String, version: String): Option[EngineManifest] = { + try { + val response = client.prepareGet(index, estype, esid(id, version)). + execute().actionGet() + if (response.isExists) { + Some(read[EngineManifest](response.getSourceAsString)) + } else { + None + } + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + } + } + + def getAll(): Seq[EngineManifest] = { + try { + val builder = client.prepareSearch() + ESUtils.getAll[EngineManifest](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq() + } + } + + def update(engineManifest: EngineManifest, upsert: Boolean = false): Unit = + insert(engineManifest) + + def delete(id: String, version: String): Unit = { + try { + client.prepareDelete(index, estype, esid(id, version)).execute().actionGet() + } catch { + case e: ElasticsearchException => error(e.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala new file mode 100644 index 0000000..b8d7056 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch1 + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EvaluationInstance +import org.apache.predictionio.data.storage.EvaluationInstanceSerializer +import org.apache.predictionio.data.storage.EvaluationInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.elasticsearch.search.sort.SortOrder +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String) + extends EvaluationInstances with Logging { + implicit val formats = DefaultFormats + new EvaluationInstanceSerializer + private val estype = "evaluation_instances" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("evaluationClass" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineParamsGeneratorClass" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("batch" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("evaluatorResults" -> + ("type" -> "string") ~ ("index" -> "no")) ~ + ("evaluatorResultsHTML" -> + ("type" -> "string") ~ ("index" -> "no")) ~ + ("evaluatorResultsJSON" -> + ("type" -> "string") ~ ("index" -> "no")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(i: EvaluationInstance): String = { + try { + val response = client.prepareIndex(index, estype). + setSource(write(i)).get + response.getId + } catch { + case e: ElasticsearchException => + error(e.getMessage) + "" + } + } + + def get(id: String): Option[EvaluationInstance] = { + try { + val response = client.prepareGet(index, estype, id).get + if (response.isExists) { + Some(read[EvaluationInstance](response.getSourceAsString)) + } else { + None + } + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + } + } + + def getAll(): Seq[EvaluationInstance] = { + try { + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[EvaluationInstance](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq() + } + } + + def getCompleted(): Seq[EvaluationInstance] = { + try { + val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( + termFilter("status", "EVALCOMPLETED")). + addSort("startTime", SortOrder.DESC) + ESUtils.getAll[EvaluationInstance](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq() + } + } + + def update(i: EvaluationInstance): Unit = { + try { + client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get + } catch { + case e: ElasticsearchException => error(e.getMessage) + } + } + + def delete(id: String): Unit = { + try { + client.prepareDelete(index, estype, id).get + } catch { + case e: ElasticsearchException => error(e.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala new file mode 100644 index 0000000..80247ec --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch1 + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ + +class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging { + implicit val formats = DefaultFormats + private val estype = "sequences" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + // val settingsJson = + // ("number_of_shards" -> 1) ~ + // ("auto_expand_replicas" -> "0-all") + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val mappingJson = + (estype -> + ("_source" -> ("enabled" -> 0)) ~ + ("_all" -> ("enabled" -> 0)) ~ + ("_type" -> ("index" -> "no")) ~ + ("enabled" -> 0)) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(mappingJson))).get + } + + def genNext(name: String): Int = { + try { + val response = client.prepareIndex(index, estype, name). + setSource(compact(render("n" -> name))).get + response.getVersion().toInt + } catch { + case e: ElasticsearchException => + error(e.getMessage) + 0 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala new file mode 100644 index 0000000..5de2999 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch1 + +import org.elasticsearch.action.search.SearchRequestBuilder +import org.elasticsearch.client.Client +import org.elasticsearch.common.unit.TimeValue +import org.json4s.Formats +import org.json4s.native.Serialization.read + +import scala.collection.mutable.ArrayBuffer + +object ESUtils { + val scrollLife = new TimeValue(60000) + + def getAll[T : Manifest]( + client: Client, + builder: SearchRequestBuilder)( + implicit formats: Formats): Seq[T] = { + val results = ArrayBuffer[T]() + var response = builder.setScroll(scrollLife).get + var hits = response.getHits().hits() + results ++= hits.map(h => read[T](h.getSourceAsString)) + while (hits.size > 0) { + response = client.prepareSearchScroll(response.getScrollId). + setScroll(scrollLife).get + hits = response.getHits().hits() + results ++= hits.map(h => read[T](h.getSourceAsString)) + } + results + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala new file mode 100644 index 0000000..6f6b1c9 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch1 + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.BaseStorageClient +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException +import org.elasticsearch.client.transport.TransportClient +import org.elasticsearch.common.settings.ImmutableSettings +import org.elasticsearch.common.transport.InetSocketTransportAddress +import org.elasticsearch.transport.ConnectTransportException + +class StorageClient(val config: StorageClientConfig) extends BaseStorageClient + with Logging { + override val prefix = "ES" + val client = try { + val hosts = config.properties.get("HOSTS"). + map(_.split(",").toSeq).getOrElse(Seq("localhost")) + val ports = config.properties.get("PORTS"). + map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300)) + val settings = ImmutableSettings.settingsBuilder() + .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch")) + val transportClient = new TransportClient(settings) + (hosts zip ports) foreach { hp => + transportClient.addTransportAddress( + new InetSocketTransportAddress(hp._1, hp._2)) + } + transportClient + } catch { + case e: ConnectTransportException => + throw new StorageClientException(e.getMessage, e) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala new file mode 100644 index 0000000..d6aa24a --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage + +/** Elasticsearch implementation of storage traits, supporting meta data only + * + * @group Implementation + */ +package object elasticsearch1 {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index 885073a..a8d730b 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -22,6 +22,9 @@ object PIOBuild extends Build { val elasticsearchVersion = SettingKey[String]( "elasticsearch-version", "The version of Elasticsearch used for building.") + val elasticsearch1Version = SettingKey[String]( + "elasticsearch1-version", + "The version of Elasticsearch 1.x used for building.") val json4sVersion = SettingKey[String]( "json4s-version", "The version of JSON4S used for building.") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/tests/Dockerfile ---------------------------------------------------------------------- diff --git a/tests/Dockerfile b/tests/Dockerfile index 1f87554..0fe7187 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -18,6 +18,7 @@ from ubuntu ENV SPARK_VERSION 1.4.0 +#ENV ELASTICSEARCH_VERSION 1.4.4 ENV ELASTICSEARCH_VERSION 5.1.2 ENV HBASE_VERSION 1.0.0 @@ -48,6 +49,7 @@ RUN rm spark-${SPARK_VERSION}-bin-hadoop2.6.tgz ENV SPARK_HOME /vendors/spark-${SPARK_VERSION}-bin-hadoop2.6 RUN echo "== Installing Elasticsearch ==" +#RUN wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz RUN wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz RUN tar zxvfC elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz /vendors RUN rm elasticsearch-${ELASTICSEARCH_VERSION}.tar.gz http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/tests/docker-files/env-conf/pio-env.sh ---------------------------------------------------------------------- diff --git a/tests/docker-files/env-conf/pio-env.sh b/tests/docker-files/env-conf/pio-env.sh index 7ea2164..aa18ff8 100644 --- a/tests/docker-files/env-conf/pio-env.sh +++ b/tests/docker-files/env-conf/pio-env.sh @@ -84,11 +84,17 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio # PIO_STORAGE_SOURCES_MYSQL_PASSWORD=pio # Elasticsearch Example -PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch -PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost -PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200 -PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http -PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$ELASTICSEARCH_HOME +# PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch +# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost +# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200 +# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http +# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2 +# Elasticsearch 1.x Example +PIO_STORAGE_SOURCES_ELASTICSEARCH1_TYPE=elasticsearch1 +#PIO_STORAGE_SOURCES_ELASTICSEARCH1_CLUSTERNAME=pio +PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOSTS=localhost +PIO_STORAGE_SOURCES_ELASTICSEARCH1_PORTS=9300 +PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME=$ELASTICSEARCH_HOME # Local File System Example PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bad2f038/tools/build.sbt ---------------------------------------------------------------------- diff --git a/tools/build.sbt b/tools/build.sbt index 4e2b266..108e2fb 100644 --- a/tools/build.sbt +++ b/tools/build.sbt @@ -42,7 +42,6 @@ dependencyOverrides += "org.slf4j" % "slf4j-log4j12" % "1.7.18" assemblyMergeStrategy in assembly := { case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat - case PathList("org", "joda", "time", "base", "BaseDateTime.class") => MergeStrategy.first case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x)
