Repository: incubator-predictionio Updated Branches: refs/heads/develop 6faba78dd -> 7289174fc
Add parameters for the number of shards/replicas Closes #369 Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/7289174f Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/7289174f Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/7289174f Branch: refs/heads/develop Commit: 7289174fc04c8aaf050bdd43f537cfb77a64cda0 Parents: 6faba78 Author: Shinsuke Sugaya <[email protected]> Authored: Fri Apr 21 12:03:29 2017 -0700 Committer: Donald Szeto <[email protected]> Committed: Fri Apr 21 12:03:29 2017 -0700 ---------------------------------------------------------------------- .../storage/elasticsearch/ESAccessKeys.scala | 4 +++- .../data/storage/elasticsearch/ESApps.scala | 4 +++- .../data/storage/elasticsearch/ESChannels.scala | 4 +++- .../elasticsearch/ESEngineInstances.scala | 4 +++- .../elasticsearch/ESEvaluationInstances.scala | 4 +++- .../data/storage/elasticsearch/ESLEvents.scala | 4 +++- .../data/storage/elasticsearch/ESSequences.scala | 4 +++- .../data/storage/elasticsearch/ESUtils.scala | 19 +++++++++++++++++-- 8 files changed, 38 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index cb6d330..9278366 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -44,7 +44,9 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) val restClient = client.open() try { - ESUtils.createIndex(restClient, index) + ESUtils.createIndex(restClient, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> 0)) ~ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index abea2b8..e7fe4af 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -45,7 +45,9 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String) val restClient = client.open() try { - ESUtils.createIndex(restClient, index) + ESUtils.createIndex(restClient, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> 0)) ~ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala index f092cc7..a173c59 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -44,7 +44,9 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String) val restClient = client.open() try { - ESUtils.createIndex(restClient, index) + ESUtils.createIndex(restClient, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> 0)) ~ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index 4dbacb7..e123744 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -44,7 +44,9 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St val restClient = client.open() try { - ESUtils.createIndex(restClient, index) + ESUtils.createIndex(restClient, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> 0)) ~ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 5bdc0fb..48f191a 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -46,7 +46,9 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index val restClient = client.open() try { - ESUtils.createIndex(restClient, index) + ESUtils.createIndex(restClient, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> 0)) ~ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index 809a064..b57ed03 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -60,7 +60,9 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St val estype = getEsType(appId, channelId) val restClient = client.open() try { - ESUtils.createIndex(restClient, index) + ESUtils.createIndex(restClient, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val json = (estype -> ("_all" -> ("enabled" -> 0)) ~ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala index e5264ae..29c8c33 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -41,7 +41,9 @@ class ESSequences(client: ESClient, config: StorageClientConfig, index: String) val restClient = client.open() try { - ESUtils.createIndex(restClient, index) + ESUtils.createIndex(restClient, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> 0))) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala index 4eb117e..4711e80 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -166,16 +166,23 @@ object ESUtils { def createIndex( client: RestClient, - index: String): Unit = { + index: String, + numberOfShards: Option[Int], + numberOfReplicas: Option[Int]): Unit = { client.performRequest( "HEAD", s"/$index", Map.empty[String, String].asJava).getStatusLine.getStatusCode match { case 404 => + val json = ("settings" -> + ("number_of_shards" -> numberOfShards) ~ + ("number_of_replicas" -> numberOfReplicas)) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) client.performRequest( "PUT", s"/$index", - Map.empty[String, String].asJava) + Map.empty[String, String].asJava, + entity) case 200 => case _ => throw new IllegalStateException(s"/$index is invalid.") @@ -262,4 +269,12 @@ object ESUtils { map(_.split(",").toSeq).getOrElse(Seq("http")) (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s)) } + + def getNumberOfShards(config: StorageClientConfig, index: String): Option[Int] = { + config.properties.get(s"${index}_NUM_OF_SHARDS").map(_.toInt) + } + + def getNumberOfReplicas(config: StorageClientConfig, index: String): Option[Int] = { + config.properties.get(s"${index}_NUM_OF_REPLICAS").map(_.toInt) + } }
