Repository: incubator-predictionio
Updated Branches:
  refs/heads/develop 6faba78dd -> 7289174fc


Add parameters for the number of shards/replicas

Closes #369


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/7289174f
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/7289174f
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/7289174f

Branch: refs/heads/develop
Commit: 7289174fc04c8aaf050bdd43f537cfb77a64cda0
Parents: 6faba78
Author: Shinsuke Sugaya <[email protected]>
Authored: Fri Apr 21 12:03:29 2017 -0700
Committer: Donald Szeto <[email protected]>
Committed: Fri Apr 21 12:03:29 2017 -0700

----------------------------------------------------------------------
 .../storage/elasticsearch/ESAccessKeys.scala     |  4 +++-
 .../data/storage/elasticsearch/ESApps.scala      |  4 +++-
 .../data/storage/elasticsearch/ESChannels.scala  |  4 +++-
 .../elasticsearch/ESEngineInstances.scala        |  4 +++-
 .../elasticsearch/ESEvaluationInstances.scala    |  4 +++-
 .../data/storage/elasticsearch/ESLEvents.scala   |  4 +++-
 .../data/storage/elasticsearch/ESSequences.scala |  4 +++-
 .../data/storage/elasticsearch/ESUtils.scala     | 19 +++++++++++++++++--
 8 files changed, 38 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index cb6d330..9278366 100644
--- 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -44,7 +44,9 @@ class ESAccessKeys(client: ESClient, config: 
StorageClientConfig, index: String)
 
   val restClient = client.open()
   try {
-    ESUtils.createIndex(restClient, index)
+    ESUtils.createIndex(restClient, index,
+      ESUtils.getNumberOfShards(config, index.toUpperCase),
+      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
         ("_all" -> ("enabled" -> 0)) ~

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index abea2b8..e7fe4af 100644
--- 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -45,7 +45,9 @@ class ESApps(client: ESClient, config: StorageClientConfig, 
index: String)
 
   val restClient = client.open()
   try {
-    ESUtils.createIndex(restClient, index)
+    ESUtils.createIndex(restClient, index,
+      ESUtils.getNumberOfShards(config, index.toUpperCase),
+      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
         ("_all" -> ("enabled" -> 0)) ~

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index f092cc7..a173c59 100644
--- 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -44,7 +44,9 @@ class ESChannels(client: ESClient, config: 
StorageClientConfig, index: String)
 
   val restClient = client.open()
   try {
-    ESUtils.createIndex(restClient, index)
+    ESUtils.createIndex(restClient, index,
+      ESUtils.getNumberOfShards(config, index.toUpperCase),
+      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
         ("_all" -> ("enabled" -> 0)) ~

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index 4dbacb7..e123744 100644
--- 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -44,7 +44,9 @@ class ESEngineInstances(client: ESClient, config: 
StorageClientConfig, index: St
 
   val restClient = client.open()
   try {
-    ESUtils.createIndex(restClient, index)
+    ESUtils.createIndex(restClient, index,
+      ESUtils.getNumberOfShards(config, index.toUpperCase),
+      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
         ("_all" -> ("enabled" -> 0)) ~

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 5bdc0fb..48f191a 100644
--- 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -46,7 +46,9 @@ class ESEvaluationInstances(client: ESClient, config: 
StorageClientConfig, index
 
   val restClient = client.open()
   try {
-    ESUtils.createIndex(restClient, index)
+    ESUtils.createIndex(restClient, index,
+      ESUtils.getNumberOfShards(config, index.toUpperCase),
+      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
         ("_all" -> ("enabled" -> 0)) ~

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 809a064..b57ed03 100644
--- 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -60,7 +60,9 @@ class ESLEvents(val client: ESClient, config: 
StorageClientConfig, val index: St
     val estype = getEsType(appId, channelId)
     val restClient = client.open()
     try {
-      ESUtils.createIndex(restClient, index)
+      ESUtils.createIndex(restClient, index,
+        ESUtils.getNumberOfShards(config, index.toUpperCase),
+        ESUtils.getNumberOfReplicas(config, index.toUpperCase))
       val json =
         (estype ->
           ("_all" -> ("enabled" -> 0)) ~

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index e5264ae..29c8c33 100644
--- 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -41,7 +41,9 @@ class ESSequences(client: ESClient, config: 
StorageClientConfig, index: String)
 
   val restClient = client.open()
   try {
-    ESUtils.createIndex(restClient, index)
+    ESUtils.createIndex(restClient, index,
+      ESUtils.getNumberOfShards(config, index.toUpperCase),
+      ESUtils.getNumberOfReplicas(config, index.toUpperCase))
     val mappingJson =
       (estype ->
         ("_all" -> ("enabled" -> 0)))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/7289174f/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index 4eb117e..4711e80 100644
--- 
a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ 
b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -166,16 +166,23 @@ object ESUtils {
 
   def createIndex(
     client: RestClient,
-    index: String): Unit = {
+    index: String,
+    numberOfShards: Option[Int],
+    numberOfReplicas: Option[Int]): Unit = {
     client.performRequest(
       "HEAD",
       s"/$index",
       Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
         case 404 =>
+          val json = ("settings" ->
+            ("number_of_shards" -> numberOfShards) ~
+            ("number_of_replicas" -> numberOfReplicas))
+          val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
           client.performRequest(
             "PUT",
             s"/$index",
-            Map.empty[String, String].asJava)
+            Map.empty[String, String].asJava,
+            entity)
         case 200 =>
         case _ =>
           throw new IllegalStateException(s"/$index is invalid.")
@@ -262,4 +269,12 @@ object ESUtils {
       map(_.split(",").toSeq).getOrElse(Seq("http"))
     (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
   }
+
+  def getNumberOfShards(config: StorageClientConfig, index: String): 
Option[Int] = {
+    config.properties.get(s"${index}_NUM_OF_SHARDS").map(_.toInt)
+  }
+
+  def getNumberOfReplicas(config: StorageClientConfig, index: String): 
Option[Int] = {
+    config.properties.get(s"${index}_NUM_OF_REPLICAS").map(_.toInt)
+  }
 }

Reply via email to