This is an automated email from the ASF dual-hosted git repository. shimamoto pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/predictionio.git
The following commit(s) were added to refs/heads/develop by this push: new 52eb306 [PIO-203] Fixes pio status warnings in ES storage (#507) 52eb306 is described below commit 52eb306bfba86fd1c732f7f36d28c24dbfffed2f Author: takako shimamoto <chiboch...@gmail.com> AuthorDate: Mon Jan 21 14:21:34 2019 +0900 [PIO-203] Fixes pio status warnings in ES storage (#507) --- .../data/storage/elasticsearch/ESAccessKeys.scala | 5 +-- .../data/storage/elasticsearch/ESApps.scala | 8 ++--- .../data/storage/elasticsearch/ESChannels.scala | 7 ++-- .../storage/elasticsearch/ESEngineInstances.scala | 42 +++++++++++----------- .../elasticsearch/ESEvaluationInstances.scala | 8 ++--- .../data/storage/elasticsearch/ESLEvents.scala | 5 +-- .../data/storage/elasticsearch/ESSequences.scala | 6 +--- .../data/storage/elasticsearch/ESUtils.scala | 20 ++--------- 8 files changed, 31 insertions(+), 70 deletions(-) diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index 15f223f..eef83e4 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -42,12 +42,9 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin private val estype = "accesskeys" private val internalIndex = index + "_" + estype - ESUtils.createIndex(client, internalIndex, - ESUtils.getNumberOfShards(config, internalIndex.toUpperCase), - ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase)) + ESUtils.createIndex(client, internalIndex) val mappingJson = (estype -> - ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("key" -> ("type" -> "keyword")) ~ ("events" -> ("type" -> "keyword")))) diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index cb17af8..26621cf 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -40,16 +40,12 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) extends Apps with Logging { implicit val formats = DefaultFormats.lossless private val estype = "apps" + private val seq = new ESSequences(client, config, index) private val internalIndex = index + "_" + estype - private val seq = new ESSequences(client, config, internalIndex) - - ESUtils.createIndex(client, internalIndex, - ESUtils.getNumberOfShards(config, internalIndex.toUpperCase), - ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase)) + ESUtils.createIndex(client, internalIndex) val mappingJson = (estype -> - ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("id" -> ("type" -> "keyword")) ~ ("name" -> ("type" -> "keyword")))) diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala index 63b108f..ac248de 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -39,15 +39,12 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) extends Channels with Logging { implicit val formats = DefaultFormats.lossless private val estype = "channels" - private val seq = new ESSequences(client, config, internalIndex) + private val seq = new ESSequences(client, config, index) private val internalIndex = index + "_" + estype - ESUtils.createIndex(client, internalIndex, - ESUtils.getNumberOfShards(config, internalIndex.toUpperCase), - ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase)) + ESUtils.createIndex(client, internalIndex) val mappingJson = (estype -> - ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("name" -> ("type" -> "keyword")))) ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index 02f7b98..96f8a67 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -40,13 +40,11 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: extends EngineInstances with Logging { implicit val formats = DefaultFormats + new EngineInstanceSerializer private val estype = "engine_instances" - - ESUtils.createIndex(client, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + private val internalIndex = index + "_" + estype + + ESUtils.createIndex(client, internalIndex) val mappingJson = (estype -> - ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("status" -> ("type" -> "keyword")) ~ ("startTime" -> ("type" -> "date")) ~ @@ -61,7 +59,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: ("algorithmsParams" -> ("type" -> "keyword")) ~ ("servingParams" -> ("type" -> "keyword")) )) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) def insert(i: EngineInstance): String = { val id = i.id match { @@ -86,7 +84,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/", + s"/$internalIndex/$estype/", Map("refresh" -> "true").asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -95,12 +93,12 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: case "created" => Some((jsonResponse \ "_id").extract[String]) case _ => - error(s"[$result] Failed to create $index/$estype") + error(s"[$result] Failed to create $internalIndex/$estype") None } } catch { case e: IOException => - error(s"Failed to create $index/$estype", e) + error(s"Failed to create $internalIndex/$estype", e) None } } @@ -109,7 +107,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: try { val response = client.performRequest( "GET", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map.empty[String, String].asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "found").extract[Boolean] match { @@ -123,11 +121,11 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: e.getResponse.getStatusLine.getStatusCode match { case 404 => None case _ => - error(s"Failed to access to /$index/$estype/$id", e) + error(s"Failed to access to /$internalIndex/$estype/$id", e) None } case e: IOException => - error(s"Failed to access to /$index/$estype/$id", e) + error(s"Failed to access to /$internalIndex/$estype/$id", e) None } } @@ -137,10 +135,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json))) + ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$index/$estype/_search", e) + error(s"Failed to access to /$internalIndex/$estype/_search", e) Nil } } @@ -165,10 +163,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: ("sort" -> List( ("startTime" -> ("order" -> "desc")))) - ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json))) + ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json))) } catch { case e: IOException => - error(s"Failed to access to /$index/$estype/_search", e) + error(s"Failed to access to /$internalIndex/$estype/_search", e) Nil } } @@ -188,7 +186,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map("refresh" -> "true").asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -197,11 +195,11 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: case "created" => case "updated" => case _ => - error(s"[$result] Failed to update $index/$estype/$id") + error(s"[$result] Failed to update $internalIndex/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $index/$estype/$id", e) + error(s"Failed to update $internalIndex/$estype/$id", e) } } @@ -209,18 +207,18 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: try { val response = client.performRequest( "DELETE", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map("refresh" -> "true").asJava) val json = parse(EntityUtils.toString(response.getEntity)) val result = (json \ "result").extract[String] result match { case "deleted" => case _ => - error(s"[$result] Failed to update $index/$estype/$id") + error(s"[$result] Failed to update $internalIndex/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $index/$estype/$id", e) + error(s"Failed to update $internalIndex/$estype/$id", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 03b851d..0025950 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -28,7 +28,6 @@ import org.apache.predictionio.data.storage.EvaluationInstance import org.apache.predictionio.data.storage.EvaluationInstanceSerializer import org.apache.predictionio.data.storage.EvaluationInstances import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.StorageClientException import org.elasticsearch.client.{ResponseException, RestClient} import org.json4s._ import org.json4s.JsonDSL._ @@ -41,15 +40,12 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind extends EvaluationInstances with Logging { implicit val formats = DefaultFormats + new EvaluationInstanceSerializer private val estype = "evaluation_instances" - private val seq = new ESSequences(client, config, internalIndex) + private val seq = new ESSequences(client, config, index) private val internalIndex = index + "_" + estype - ESUtils.createIndex(client, internalIndex, - ESUtils.getNumberOfShards(config, internalIndex.toUpperCase), - ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase)) + ESUtils.createIndex(client, internalIndex) val mappingJson = (estype -> - ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("status" -> ("type" -> "keyword")) ~ ("startTime" -> ("type" -> "date")) ~ diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index f275ec9..708d3d3 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -53,12 +53,9 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseInd override def init(appId: Int, channelId: Option[Int] = None): Boolean = { val estype = getEsType(appId, channelId) val index = baseIndex + "_" + estype - ESUtils.createIndex(client, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + ESUtils.createIndex(client, index) val json = (estype -> - ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("name" -> ("type" -> "keyword")) ~ ("eventId" -> ("type" -> "keyword")) ~ diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala index d43ecc6..ade0f40 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -21,7 +21,6 @@ import java.io.IOException import scala.collection.JavaConverters._ -import org.apache.http.Header import org.apache.http.entity.ContentType import org.apache.http.nio.entity.NStringEntity import org.apache.http.util.EntityUtils @@ -40,12 +39,9 @@ class ESSequences(client: RestClient, config: StorageClientConfig, index: String private val estype = "sequences" private val internalIndex = index + "_" + estype - ESUtils.createIndex(client, internalIndex, - ESUtils.getNumberOfShards(config, internalIndex.toUpperCase), - ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase)) + ESUtils.createIndex(client, internalIndex) val mappingJson = (estype -> - ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("n" -> ("enabled" -> false)))) ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala index cd9aa53..93d5d94 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -21,7 +21,6 @@ import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import org.apache.http.entity.ContentType -import org.apache.http.entity.StringEntity import org.apache.http.nio.entity.NStringEntity import org.elasticsearch.client.RestClient import org.json4s._ @@ -165,23 +164,16 @@ object ESUtils { def createIndex( client: RestClient, - index: String, - numberOfShards: Option[Int], - numberOfReplicas: Option[Int]): Unit = { + index: String): Unit = { client.performRequest( "HEAD", s"/$index", Map.empty[String, String].asJava).getStatusLine.getStatusCode match { case 404 => - val json = ("settings" -> - ("number_of_shards" -> numberOfShards) ~ - ("number_of_replicas" -> numberOfReplicas)) - val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) client.performRequest( "PUT", s"/$index", - Map.empty[String, String].asJava, - entity) + Map.empty[String, String].asJava) case 200 => case _ => throw new IllegalStateException(s"/$index is invalid.") @@ -269,14 +261,6 @@ object ESUtils { (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s)) } - def getNumberOfShards(config: StorageClientConfig, index: String): Option[Int] = { - config.properties.get(s"${index}_NUM_OF_SHARDS").map(_.toInt) - } - - def getNumberOfReplicas(config: StorageClientConfig, index: String): Option[Int] = { - config.properties.get(s"${index}_NUM_OF_REPLICAS").map(_.toInt) - } - def getEventDataRefresh(config: StorageClientConfig): String = { config.properties.getOrElse("EVENTDATA_REFRESH", "true") }