This is an automated email from the ASF dual-hosted git repository.
shimamoto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/predictionio.git
The following commit(s) were added to refs/heads/develop by this push:
new 52eb306 [PIO-203] Fixes pio status warnings in ES storage (#507)
52eb306 is described below
commit 52eb306bfba86fd1c732f7f36d28c24dbfffed2f
Author: takako shimamoto <[email protected]>
AuthorDate: Mon Jan 21 14:21:34 2019 +0900
[PIO-203] Fixes pio status warnings in ES storage (#507)
---
.../data/storage/elasticsearch/ESAccessKeys.scala | 5 +--
.../data/storage/elasticsearch/ESApps.scala | 8 ++---
.../data/storage/elasticsearch/ESChannels.scala | 7 ++--
.../storage/elasticsearch/ESEngineInstances.scala | 42 +++++++++++-----------
.../elasticsearch/ESEvaluationInstances.scala | 8 ++---
.../data/storage/elasticsearch/ESLEvents.scala | 5 +--
.../data/storage/elasticsearch/ESSequences.scala | 6 +---
.../data/storage/elasticsearch/ESUtils.scala | 20 ++---------
8 files changed, 31 insertions(+), 70 deletions(-)
diff --git
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 15f223f..eef83e4 100644
---
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -42,12 +42,9 @@ class ESAccessKeys(client: RestClient, config:
StorageClientConfig, index: Strin
private val estype = "accesskeys"
private val internalIndex = index + "_" + estype
- ESUtils.createIndex(client, internalIndex,
- ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
- ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("key" -> ("type" -> "keyword")) ~
("events" -> ("type" -> "keyword"))))
diff --git
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index cb17af8..26621cf 100644
---
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -40,16 +40,12 @@ class ESApps(client: RestClient, config:
StorageClientConfig, index: String)
extends Apps with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "apps"
+ private val seq = new ESSequences(client, config, index)
private val internalIndex = index + "_" + estype
- private val seq = new ESSequences(client, config, internalIndex)
-
- ESUtils.createIndex(client, internalIndex,
- ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
- ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("id" -> ("type" -> "keyword")) ~
("name" -> ("type" -> "keyword"))))
diff --git
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index 63b108f..ac248de 100644
---
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -39,15 +39,12 @@ class ESChannels(client: RestClient, config:
StorageClientConfig, index: String)
extends Channels with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "channels"
- private val seq = new ESSequences(client, config, internalIndex)
+ private val seq = new ESSequences(client, config, index)
private val internalIndex = index + "_" + estype
- ESUtils.createIndex(client, internalIndex,
- ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
- ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("name" -> ("type" -> "keyword"))))
ESUtils.createMapping(client, internalIndex, estype,
compact(render(mappingJson)))
diff --git
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index 02f7b98..96f8a67 100644
---
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -40,13 +40,11 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
extends EngineInstances with Logging {
implicit val formats = DefaultFormats + new EngineInstanceSerializer
private val estype = "engine_instances"
-
- ESUtils.createIndex(client, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ private val internalIndex = index + "_" + estype
+
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("status" -> ("type" -> "keyword")) ~
("startTime" -> ("type" -> "date")) ~
@@ -61,7 +59,7 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
("algorithmsParams" -> ("type" -> "keyword")) ~
("servingParams" -> ("type" -> "keyword"))
))
- ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+ ESUtils.createMapping(client, internalIndex, estype,
compact(render(mappingJson)))
def insert(i: EngineInstance): String = {
val id = i.id match {
@@ -86,7 +84,7 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
val response = client.performRequest(
"POST",
- s"/$index/$estype/",
+ s"/$internalIndex/$estype/",
Map("refresh" -> "true").asJava,
entity)
val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -95,12 +93,12 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
case "created" =>
Some((jsonResponse \ "_id").extract[String])
case _ =>
- error(s"[$result] Failed to create $index/$estype")
+ error(s"[$result] Failed to create $internalIndex/$estype")
None
}
} catch {
case e: IOException =>
- error(s"Failed to create $index/$estype", e)
+ error(s"Failed to create $internalIndex/$estype", e)
None
}
}
@@ -109,7 +107,7 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
try {
val response = client.performRequest(
"GET",
- s"/$index/$estype/$id",
+ s"/$internalIndex/$estype/$id",
Map.empty[String, String].asJava)
val jsonResponse = parse(EntityUtils.toString(response.getEntity))
(jsonResponse \ "found").extract[Boolean] match {
@@ -123,11 +121,11 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
e.getResponse.getStatusLine.getStatusCode match {
case 404 => None
case _ =>
- error(s"Failed to access to /$index/$estype/$id", e)
+ error(s"Failed to access to /$internalIndex/$estype/$id", e)
None
}
case e: IOException =>
- error(s"Failed to access to /$index/$estype/$id", e)
+ error(s"Failed to access to /$internalIndex/$estype/$id", e)
None
}
}
@@ -137,10 +135,10 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
val json =
("query" ->
("match_all" -> List.empty))
- ESUtils.getAll[EngineInstance](client, index, estype,
compact(render(json)))
+ ESUtils.getAll[EngineInstance](client, internalIndex, estype,
compact(render(json)))
} catch {
case e: IOException =>
- error("Failed to access to /$index/$estype/_search", e)
+ error(s"Failed to access to /$internalIndex/$estype/_search", e)
Nil
}
}
@@ -165,10 +163,10 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
("sort" -> List(
("startTime" ->
("order" -> "desc"))))
- ESUtils.getAll[EngineInstance](client, index, estype,
compact(render(json)))
+ ESUtils.getAll[EngineInstance](client, internalIndex, estype,
compact(render(json)))
} catch {
case e: IOException =>
- error(s"Failed to access to /$index/$estype/_search", e)
+ error(s"Failed to access to /$internalIndex/$estype/_search", e)
Nil
}
}
@@ -188,7 +186,7 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
val response = client.performRequest(
"POST",
- s"/$index/$estype/$id",
+ s"/$internalIndex/$estype/$id",
Map("refresh" -> "true").asJava,
entity)
val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -197,11 +195,11 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
case "created" =>
case "updated" =>
case _ =>
- error(s"[$result] Failed to update $index/$estype/$id")
+ error(s"[$result] Failed to update $internalIndex/$estype/$id")
}
} catch {
case e: IOException =>
- error(s"Failed to update $index/$estype/$id", e)
+ error(s"Failed to update $internalIndex/$estype/$id", e)
}
}
@@ -209,18 +207,18 @@ class ESEngineInstances(client: RestClient, config:
StorageClientConfig, index:
try {
val response = client.performRequest(
"DELETE",
- s"/$index/$estype/$id",
+ s"/$internalIndex/$estype/$id",
Map("refresh" -> "true").asJava)
val json = parse(EntityUtils.toString(response.getEntity))
val result = (json \ "result").extract[String]
result match {
case "deleted" =>
case _ =>
- error(s"[$result] Failed to update $index/$estype/$id")
+ error(s"[$result] Failed to update $internalIndex/$estype/$id")
}
} catch {
case e: IOException =>
- error(s"Failed to update $index/$estype/$id", e)
+ error(s"Failed to update $internalIndex/$estype/$id", e)
}
}
}
diff --git
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 03b851d..0025950 100644
---
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -28,7 +28,6 @@ import org.apache.predictionio.data.storage.EvaluationInstance
import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
import org.apache.predictionio.data.storage.EvaluationInstances
import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.StorageClientException
import org.elasticsearch.client.{ResponseException, RestClient}
import org.json4s._
import org.json4s.JsonDSL._
@@ -41,15 +40,12 @@ class ESEvaluationInstances(client: RestClient, config:
StorageClientConfig, ind
extends EvaluationInstances with Logging {
implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
private val estype = "evaluation_instances"
- private val seq = new ESSequences(client, config, internalIndex)
+ private val seq = new ESSequences(client, config, index)
private val internalIndex = index + "_" + estype
- ESUtils.createIndex(client, internalIndex,
- ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
- ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("status" -> ("type" -> "keyword")) ~
("startTime" -> ("type" -> "date")) ~
diff --git
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index f275ec9..708d3d3 100644
---
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -53,12 +53,9 @@ class ESLEvents(val client: RestClient, config:
StorageClientConfig, val baseInd
override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
val estype = getEsType(appId, channelId)
val index = baseIndex + "_" + estype
- ESUtils.createIndex(client, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ ESUtils.createIndex(client, index)
val json =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("name" -> ("type" -> "keyword")) ~
("eventId" -> ("type" -> "keyword")) ~
diff --git
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index d43ecc6..ade0f40 100644
---
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -21,7 +21,6 @@ import java.io.IOException
import scala.collection.JavaConverters._
-import org.apache.http.Header
import org.apache.http.entity.ContentType
import org.apache.http.nio.entity.NStringEntity
import org.apache.http.util.EntityUtils
@@ -40,12 +39,9 @@ class ESSequences(client: RestClient, config:
StorageClientConfig, index: String
private val estype = "sequences"
private val internalIndex = index + "_" + estype
- ESUtils.createIndex(client, internalIndex,
- ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
- ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("n" -> ("enabled" -> false))))
ESUtils.createMapping(client, internalIndex, estype,
compact(render(mappingJson)))
diff --git
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index cd9aa53..93d5d94 100644
---
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import org.apache.http.entity.ContentType
-import org.apache.http.entity.StringEntity
import org.apache.http.nio.entity.NStringEntity
import org.elasticsearch.client.RestClient
import org.json4s._
@@ -165,23 +164,16 @@ object ESUtils {
def createIndex(
client: RestClient,
- index: String,
- numberOfShards: Option[Int],
- numberOfReplicas: Option[Int]): Unit = {
+ index: String): Unit = {
client.performRequest(
"HEAD",
s"/$index",
Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
case 404 =>
- val json = ("settings" ->
- ("number_of_shards" -> numberOfShards) ~
- ("number_of_replicas" -> numberOfReplicas))
- val entity = new NStringEntity(compact(render(json)),
ContentType.APPLICATION_JSON)
client.performRequest(
"PUT",
s"/$index",
- Map.empty[String, String].asJava,
- entity)
+ Map.empty[String, String].asJava)
case 200 =>
case _ =>
throw new IllegalStateException(s"/$index is invalid.")
@@ -269,14 +261,6 @@ object ESUtils {
(hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
}
- def getNumberOfShards(config: StorageClientConfig, index: String):
Option[Int] = {
- config.properties.get(s"${index}_NUM_OF_SHARDS").map(_.toInt)
- }
-
- def getNumberOfReplicas(config: StorageClientConfig, index: String):
Option[Int] = {
- config.properties.get(s"${index}_NUM_OF_REPLICAS").map(_.toInt)
- }
-
def getEventDataRefresh(config: StorageClientConfig): String = {
config.properties.getOrElse("EVENTDATA_REFRESH", "true")
}