add ESClient to close RestClient
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/d4e75ab5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/d4e75ab5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/d4e75ab5 Branch: refs/heads/feature/es5 Commit: d4e75ab5441c2d7278c0cc55c6f1d3b51c9479a0 Parents: 36b79d7 Author: Shinsuke Sugaya <[email protected]> Authored: Mon Jan 16 17:28:54 2017 +0900 Committer: Shinsuke Sugaya <[email protected]> Committed: Mon Jan 16 17:28:54 2017 +0900 ---------------------------------------------------------------------- .../predictionio/workflow/CreateWorkflow.scala | 74 +++++++-------- .../storage/elasticsearch/ESAccessKeys.scala | 48 +++++++--- .../data/storage/elasticsearch/ESApps.scala | 48 +++++++--- .../data/storage/elasticsearch/ESChannels.scala | 42 ++++++--- .../elasticsearch/ESEngineInstances.scala | 75 ++++++++++----- .../elasticsearch/ESEngineManifests.scala | 23 +++-- .../elasticsearch/ESEvaluationInstances.scala | 62 +++++++----- .../data/storage/elasticsearch/ESLEvents.scala | 99 ++++++++++++-------- .../data/storage/elasticsearch/ESPEvents.scala | 12 +-- .../storage/elasticsearch/ESSequences.scala | 22 +++-- .../data/storage/elasticsearch/ESUtils.scala | 6 +- .../storage/elasticsearch/StorageClient.scala | 24 +++-- 12 files changed, 330 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala index edfc1b6..899ace2 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala @@ -223,40 +223,36 @@ object CreateWorkflow extends Logging { engineFactoryObj.engineParams(wfc.engineParamsKey) } - try { - val engineInstance = EngineInstance( - id = "", - status = "INIT", - startTime = DateTime.now, - endTime = DateTime.now, - engineId = wfc.engineId, - engineVersion = wfc.engineVersion, - engineVariant = variantId, - engineFactory = engineFactory, - batch = wfc.batch, - env = pioEnvVars, - sparkConf = workflowParams.sparkEnv, - dataSourceParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams), - preparatorParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams), - algorithmsParams = - JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList), - servingParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams)) + val engineInstance = EngineInstance( + id = "", + status = "INIT", + startTime = DateTime.now, + endTime = DateTime.now, + engineId = wfc.engineId, + engineVersion = wfc.engineVersion, + engineVariant = variantId, + engineFactory = engineFactory, + batch = wfc.batch, + env = pioEnvVars, + sparkConf = workflowParams.sparkEnv, + dataSourceParams = + JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams), + preparatorParams = + JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams), + algorithmsParams = + JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList), + servingParams = + JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams)) - val engineInstanceId = Storage.getMetaDataEngineInstances.insert( - engineInstance) + val engineInstanceId = Storage.getMetaDataEngineInstances.insert( + engineInstance) - CoreWorkflow.runTrain( - env = pioEnvVars, - params = workflowParams, - engine = trainableEngine, - engineParams = engineParams, - engineInstance = engineInstance.copy(id = engineInstanceId)) - } finally { - Storage.getLEvents().close() - } + CoreWorkflow.runTrain( + env = pioEnvVars, + params = workflowParams, + engine = trainableEngine, + engineParams = engineParams, + engineInstance = engineInstance.copy(id = engineInstanceId)) } else { val workflowParams = WorkflowParams( verbose = wfc.verbosity, @@ -271,15 +267,11 @@ object CreateWorkflow extends Logging { env = pioEnvVars, sparkConf = workflowParams.sparkEnv ) - try { - Workflow.runEvaluation( - evaluation = evaluation.get, - engineParamsGenerator = engineParamsGenerator.get, - evaluationInstance = evaluationInstance, - params = workflowParams) - } finally { - Storage.getLEvents().close() - } + Workflow.runEvaluation( + evaluation = evaluation.get, + engineParamsGenerator = engineParamsGenerator.get, + evaluationInstance = evaluationInstance, + params = workflowParams) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index 2c69cf4..9156fab 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -37,19 +37,24 @@ import grizzled.slf4j.Logging import org.elasticsearch.client.ResponseException /** Elasticsearch implementation of AccessKeys. */ -class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String) +class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) extends AccessKeys with Logging { implicit val formats = DefaultFormats.lossless private val estype = "accesskeys" - ESUtils.createIndex(client, index) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> 0)) ~ - ("properties" -> - ("key" -> ("type" -> "keyword")) ~ - ("events" -> ("type" -> "keyword")))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("key" -> ("type" -> "keyword")) ~ + ("events" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } def insert(accessKey: AccessKey): Option[String] = { val key = if (accessKey.key.isEmpty) generateKey else accessKey.key @@ -58,8 +63,9 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin } def get(id: String): Option[AccessKey] = { + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -81,41 +87,50 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin case e: IOException => error("Failed to access to /$index/$estype/$key", e) None + } finally { + restClient.close() } } def getAll(): Seq[AccessKey] = { + val restClient = client.open() try { val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[AccessKey](client, index, estype, compact(render(json))) + ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil + } finally { + restClient.close() } } def getByAppid(appid: Int): Seq[AccessKey] = { + val restClient = client.open() try { val json = ("query" -> ("term" -> ("appid" -> appid))) - ESUtils.getAll[AccessKey](client, index, estype, compact(render(json))) + ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil + } finally { + restClient.close() } } def update(accessKey: AccessKey): Unit = { val id = accessKey.key + val restClient = client.open() try { val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/$id", Map.empty[String, String].asJava, @@ -131,12 +146,15 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() } } def delete(id: String): Unit = { + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "DELETE", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -150,6 +168,8 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin } catch { case e: IOException => error(s"Failed to update $index/$estype/id", e) + } finally { + restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index 7a65379..0379c90 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -37,20 +37,25 @@ import grizzled.slf4j.Logging import org.elasticsearch.client.ResponseException /** Elasticsearch implementation of Items. */ -class ESApps(client: RestClient, config: StorageClientConfig, index: String) +class ESApps(client: ESClient, config: StorageClientConfig, index: String) extends Apps with Logging { implicit val formats = DefaultFormats.lossless private val estype = "apps" private val seq = new ESSequences(client, config, index) - ESUtils.createIndex(client, index) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> 0))~ - ("properties" -> - ("id" -> ("type" -> "keyword")) ~ - ("name" -> ("type" -> "keyword")))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("id" -> ("type" -> "keyword")) ~ + ("name" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } def insert(app: App): Option[Int] = { val id = @@ -64,8 +69,9 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) } def get(id: Int): Option[App] = { + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -87,17 +93,20 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) case e: IOException => error(s"Failed to access to /$index/$estype/$id", e) None + } finally { + restClient.close() } } def getByName(name: String): Option[App] = { + val restClient = client.open() try { val json = ("query" -> ("term" -> ("name" -> name))) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/_search", Map.empty[String, String].asJava, @@ -114,27 +123,33 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) case e: IOException => error(s"Failed to access to /$index/$estype/_search", e) None + } finally { + restClient.close() } } def getAll(): Seq[App] = { + val restClient = client.open() try { val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[App](client, index, estype, compact(render(json))) + ESUtils.getAll[App](restClient, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil + } finally { + restClient.close() } } def update(app: App): Unit = { val id = app.id.toString + val restClient = client.open() try { val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON); - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/$id", Map.empty[String, String].asJava, @@ -150,12 +165,15 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() } } def delete(id: Int): Unit = { + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "DELETE", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -169,6 +187,8 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) } catch { case e: IOException => error(s"Failed to update $index/$estype/id", e) + } finally { + restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala index c90d668..b319c26 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -36,19 +36,24 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging import org.elasticsearch.client.ResponseException -class ESChannels(client: RestClient, config: StorageClientConfig, index: String) +class ESChannels(client: ESClient, config: StorageClientConfig, index: String) extends Channels with Logging { implicit val formats = DefaultFormats.lossless private val estype = "channels" private val seq = new ESSequences(client, config, index) - ESUtils.createIndex(client, index) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> 0))~ - ("properties" -> - ("name" -> ("type" -> "keyword")))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("name" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } def insert(channel: Channel): Option[Int] = { val id = @@ -62,8 +67,9 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) } def get(id: Int): Option[Channel] = { + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -85,28 +91,34 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) case e: IOException => error(s"Failed to access to /$index/$estype/$id", e) None + } finally { + restClient.close() } } def getByAppid(appid: Int): Seq[Channel] = { + val restClient = client.open() try { val json = ("query" -> ("term" -> ("appid" -> appid))) - ESUtils.getAll[Channel](client, index, estype, compact(render(json))) + ESUtils.getAll[Channel](restClient, index, estype, compact(render(json))) } catch { case e: IOException => error(s"Failed to access to /$index/$estype/_search", e) Nil + } finally { + restClient.close() } } def update(channel: Channel): Boolean = { val id = channel.id.toString + val restClient = client.open() try { val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/$id", Map.empty[String, String].asJava, @@ -124,12 +136,15 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) case e: IOException => error(s"Failed to update $index/$estype/$id", e) false + } finally { + restClient.close() } } def delete(id: Int): Unit = { + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "DELETE", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -143,7 +158,8 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() } } - } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index 08f87f3..68cdeac 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -37,30 +37,35 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging import org.elasticsearch.client.ResponseException -class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String) +class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: String) extends EngineInstances with Logging { implicit val formats = DefaultFormats + new EngineInstanceSerializer private val estype = "engine_instances" - ESUtils.createIndex(client, index) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> 0))~ - ("properties" -> - ("status" -> ("type" -> "keyword")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("engineId" -> ("type" -> "keyword")) ~ - ("engineVersion" -> ("type" -> "keyword")) ~ - ("engineVariant" -> ("type" -> "keyword")) ~ - ("engineFactory" -> ("type" -> "keyword")) ~ - ("batch" -> ("type" -> "keyword")) ~ - ("dataSourceParams" -> ("type" -> "keyword")) ~ - ("preparatorParams" -> ("type" -> "keyword")) ~ - ("algorithmsParams" -> ("type" -> "keyword")) ~ - ("servingParams" -> ("type" -> "keyword")) ~ - ("status" -> ("type" -> "keyword")))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("engineId" -> ("type" -> "keyword")) ~ + ("engineVersion" -> ("type" -> "keyword")) ~ + ("engineVariant" -> ("type" -> "keyword")) ~ + ("engineFactory" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("dataSourceParams" -> ("type" -> "keyword")) ~ + ("preparatorParams" -> ("type" -> "keyword")) ~ + ("algorithmsParams" -> ("type" -> "keyword")) ~ + ("servingParams" -> ("type" -> "keyword")) ~ + ("status" -> ("type" -> "keyword")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } def insert(i: EngineInstance): String = { val id = i.id match { @@ -81,9 +86,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: } def preInsert(): Option[String] = { + val restClient = client.open() try { val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/", Map.empty[String, String].asJava, @@ -101,12 +107,15 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: case e: IOException => error(s"Failed to create $index/$estype", e) None + } finally { + restClient.close() } } def get(id: String): Option[EngineInstance] = { + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -128,19 +137,24 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: case e: IOException => error(s"Failed to access to /$index/$estype/$id", e) None + } finally { + restClient.close() } } def getAll(): Seq[EngineInstance] = { + val restClient = client.open() try { val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json))) + ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil + } finally { + restClient.close() } } @@ -148,6 +162,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: engineId: String, engineVersion: String, engineVariant: String): Seq[EngineInstance] = { + val restClient = client.open() try { val json = ("query" -> @@ -164,11 +179,13 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: ("sort" -> List( ("startTime" -> ("order" -> "desc")))) - ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json))) + ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json))) } catch { case e: IOException => error(s"Failed to access to /$index/$estype/_search", e) Nil + } finally { + restClient.close() } } @@ -183,9 +200,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: def update(i: EngineInstance): Unit = { val id = i.id + val restClient = client.open() try { val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/$id", Map.empty[String, String].asJava, @@ -201,12 +219,15 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() } } def delete(id: String): Unit = { + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "DELETE", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -220,6 +241,8 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala index a965c71..ae4d86b 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala @@ -37,7 +37,7 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging import org.elasticsearch.client.ResponseException -class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: String) +class ESEngineManifests(client: ESClient, config: StorageClientConfig, index: String) extends EngineManifests with Logging { implicit val formats = DefaultFormats + new EngineManifestSerializer private val estype = "engine_manifests" @@ -45,9 +45,10 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: def insert(engineManifest: EngineManifest): Unit = { val id = esid(engineManifest.id, engineManifest.version) + val restClient = client.open() try { val entity = new NStringEntity(write(engineManifest), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/$id", Map.empty[String, String].asJava, @@ -63,13 +64,16 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() } } def get(id: String, version: String): Option[EngineManifest] = { val esId = esid(id, version) + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "GET", s"/$index/$estype/$esId", Map.empty[String, String].asJava) @@ -91,20 +95,24 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: case e: IOException => error(s"Failed to access to /$index/$estype/$esId", e) None + } finally { + restClient.close() } - } def getAll(): Seq[EngineManifest] = { + val restClient = client.open() try { val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[EngineManifest](client, index, estype, compact(render(json))) + ESUtils.getAll[EngineManifest](restClient, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil + } finally { + restClient.close() } } @@ -113,8 +121,9 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: def delete(id: String, version: String): Unit = { val esId = esid(id, version) + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "DELETE", s"/$index/$estype/$esId", Map.empty[String, String].asJava) @@ -128,6 +137,8 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: } catch { case e: IOException => error(s"Failed to update $index/$estype/$esId", e) + } finally { + restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 0e71f79..1f798f0 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -38,27 +38,32 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging import org.elasticsearch.client.ResponseException -class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String) +class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index: String) extends EvaluationInstances with Logging { implicit val formats = DefaultFormats + new EvaluationInstanceSerializer private val estype = "evaluation_instances" private val seq = new ESSequences(client, config, index) - ESUtils.createIndex(client, index) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> 0))~ - ("properties" -> - ("status" -> ("type" -> "keyword")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("evaluationClass" -> ("type" -> "keyword")) ~ - ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ - ("batch" -> ("type" -> "keyword")) ~ - ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~ - ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~ - ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no")))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("evaluationClass" -> ("type" -> "keyword")) ~ + ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~ + ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~ + ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no")))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } def insert(i: EvaluationInstance): String = { val id = i.id match { @@ -74,8 +79,9 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind } def get(id: String): Option[EvaluationInstance] = { + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -97,23 +103,29 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind case e: IOException => error(s"Failed to access to /$index/$estype/$id", e) None + } finally { + restClient.close() } } def getAll(): Seq[EvaluationInstance] = { + val restClient = client.open() try { val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json))) + ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil + } finally { + restClient.close() } } def getCompleted(): Seq[EvaluationInstance] = { + val restClient = client.open() try { val json = ("query" -> @@ -122,19 +134,22 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind ("sort" -> ("startTime" -> ("order" -> "desc"))) - ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json))) + ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil + } finally { + restClient.close() } } def update(i: EvaluationInstance): Unit = { val id = i.id + val restClient = client.open() try { val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/$id", Map.empty[String, String].asJava, @@ -150,12 +165,15 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() } } def delete(id: String): Unit = { + val restClient = client.open() try { - val response = client.performRequest( + val response = restClient.performRequest( "DELETE", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -169,6 +187,8 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) + } finally { + restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index ef25204..b4f7dc5 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -40,7 +40,7 @@ import org.json4s.ext.JodaTimeSerializers import grizzled.slf4j.Logging import org.elasticsearch.client.ResponseException -class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String) +class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String) extends LEvents with Logging { implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all private val seq = new ESSequences(client, config, index) @@ -56,41 +56,47 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: override def init(appId: Int, channelId: Option[Int] = None): Boolean = { val estype = getEsType(appId, channelId) - ESUtils.createIndex(client, index) - val json = - (estype -> - ("_all" -> ("enabled" -> 0)) ~ - ("properties" -> - ("name" -> ("type" -> "keyword")) ~ - ("eventId" -> ("type" -> "keyword")) ~ - ("event" -> ("type" -> "keyword")) ~ - ("entityType" -> ("type" -> "keyword")) ~ - ("entityId" -> ("type" -> "keyword")) ~ - ("targetEntityType" -> ("type" -> "keyword")) ~ - ("targetEntityId" -> ("type" -> "keyword")) ~ + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val json = + (estype -> + ("_all" -> ("enabled" -> 0)) ~ ("properties" -> - ("type" -> "nested") ~ + ("name" -> ("type" -> "keyword")) ~ + ("eventId" -> ("type" -> "keyword")) ~ + ("event" -> ("type" -> "keyword")) ~ + ("entityType" -> ("type" -> "keyword")) ~ + ("entityId" -> ("type" -> "keyword")) ~ + ("targetEntityType" -> ("type" -> "keyword")) ~ + ("targetEntityId" -> ("type" -> "keyword")) ~ ("properties" -> - ("fields" -> ("type" -> "nested") ~ - ("properties" -> - ("user" -> ("type" -> "long")) ~ - ("num" -> ("type" -> "long")))))) ~ - ("eventTime" -> ("type" -> "date")) ~ - ("tags" -> ("type" -> "keyword")) ~ - ("prId" -> ("type" -> "keyword")) ~ - ("creationTime" -> ("type" -> "date")))) - ESUtils.createMapping(client, index, estype, compact(render(json))) + ("type" -> "nested") ~ + ("properties" -> + ("fields" -> ("type" -> "nested") ~ + ("properties" -> + ("user" -> ("type" -> "long")) ~ + ("num" -> ("type" -> "long")))))) ~ + ("eventTime" -> ("type" -> "date")) ~ + ("tags" -> ("type" -> "keyword")) ~ + ("prId" -> ("type" -> "keyword")) ~ + ("creationTime" -> ("type" -> "date")))) + ESUtils.createMapping(restClient, index, estype, compact(render(json))) + } finally { + restClient.close() + } true } override def remove(appId: Int, channelId: Option[Int] = None): Boolean = { val estype = getEsType(appId, channelId) + val restClient = client.open() try { val json = ("query" -> ("match_all" -> List.empty)) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - client.performRequest( + restClient.performRequest( "POST", s"/$index/$estype/_delete_by_query", Map.empty[String, String].asJava, @@ -104,14 +110,13 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: case e: Exception => error(s"Failed to remove $index/$estype", e) false + } finally { + restClient.close() } } override def close(): Unit = { - try client.close() catch { - case e: Exception => - error("Failed to close client.", e) - } + // nothing } override def futureInsert( @@ -120,15 +125,16 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = { Future { val estype = getEsType(appId, channelId) - val id = event.eventId.getOrElse { - var roll = seq.genNext(seqName) - while (exists(estype, roll)) roll = seq.genNext(seqName) - roll.toString - } - val json = write(event.copy(eventId = Some(id))) + val restClient = client.open() try { + val id = event.eventId.getOrElse { + var roll = seq.genNext(seqName) + while (exists(restClient, estype, roll)) roll = seq.genNext(seqName) + roll.toString + } + val json = write(event.copy(eventId = Some(id))) val entity = new NStringEntity(json, ContentType.APPLICATION_JSON); - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/$id", Map.empty[String, String].asJava, @@ -144,15 +150,17 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: } } catch { case e: IOException => - error(s"Failed to update $index/$estype/$id: $json", e) + error(s"Failed to update $index/$estype/<id>", e) "" + } finally { + restClient.close() } } } - private def exists(estype: String, id: Int): Boolean = { + private def exists(restClient: RestClient, estype: String, id: Int): Boolean = { try { - client.performRequest( + restClient.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava).getStatusLine.getStatusCode match { @@ -179,13 +187,14 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { Future { val estype = getEsType(appId, channelId) + val restClient = client.open() try { val json = ("query" -> ("term" -> ("eventId" -> eventId))) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/_search", Map.empty[String, String].asJava, @@ -202,6 +211,8 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: case e: IOException => error("Failed to access to /$index/$estype/_search", e) None + } finally { + restClient.close() } } } @@ -212,13 +223,14 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = { Future { val estype = getEsType(appId, channelId) + val restClient = client.open() try { val json = ("query" -> ("term" -> ("eventId" -> eventId))) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/_delete_by_query", Map.empty[String, String].asJava) @@ -234,6 +246,8 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: case e: IOException => error(s"Failed to update $index/$estype:$eventId", e) false + } finally { + restClient.close() } } } @@ -253,15 +267,18 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: (implicit ec: ExecutionContext): Future[Iterator[Event]] = { Future { val estype = getEsType(appId, channelId) + val restClient = client.open() try { val query = ESUtils.createEventQuery( startTime, untilTime, entityType, entityId, eventNames, targetEntityType, targetEntityId, None) - ESUtils.getAll[Event](client, index, estype, query).toIterator + ESUtils.getAll[Event](restClient, index, estype, query).toIterator } catch { case e: IOException => error(e.getMessage) Iterator[Event]() + } finally { + restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala index 0e3eec8..5784b3f 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala @@ -41,16 +41,10 @@ import org.json4s.native.JsonMethods._ import org.json4s.ext.JodaTimeSerializers -class ESPEvents(client: RestClient, config: StorageClientConfig, index: String) +class ESPEvents(client: ESClient, config: StorageClientConfig, index: String) extends PEvents { implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all - // client is not used. - try client.close() catch { - case e: Exception => - logger.error("Failed to close client.", e) - } - def getEsType(appId: Int, channelId: Option[Int] = None): String = { channelId.map { ch => s"${appId}_${ch}" @@ -114,7 +108,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String) eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { val estype = getEsType(appId, channelId) - val restClient = ESUtils.createRestClient(config) + val restClient = client.open() try { eventIds.foreachPartition { iter => iter.foreach { eventId => @@ -124,7 +118,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String) ("term" -> ("eventId" -> eventId))) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/_delete_by_query", Map.empty[String, String].asJava) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala index c067f3a..4eb8cd7 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -35,20 +35,26 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends Logging { +class ESSequences(client: ESClient, config: StorageClientConfig, index: String) extends Logging { implicit val formats = DefaultFormats private val estype = "sequences" - ESUtils.createIndex(client, index) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> 0))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + val restClient = client.open() + try { + ESUtils.createIndex(restClient, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0))) + ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) + } finally { + restClient.close() + } def genNext(name: String): Int = { + val restClient = client.open() try { val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON) - val response = client.performRequest( + val response = restClient.performRequest( "POST", s"/$index/$estype/$name", Map.empty[String, String].asJava, @@ -66,6 +72,8 @@ class ESSequences(client: RestClient, config: StorageClientConfig, index: String } catch { case e: IOException => throw new StorageClientException(s"Failed to update $index/$estype/$name", e) + } finally { + restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala index 68e3f57..db841b6 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -151,15 +151,13 @@ object ESUtils { |}""".stripMargin } - def createRestClient(config: StorageClientConfig): RestClient = { + def getHttpHosts(config: StorageClientConfig): Seq[HttpHost] = { val hosts = config.properties.get("HOSTS"). map(_.split(",").toSeq).getOrElse(Seq("localhost")) val ports = config.properties.get("PORTS"). map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200)) val schemes = config.properties.get("SCHEMES"). map(_.split(",").toSeq).getOrElse(Seq("http")) - val httpHosts = (hosts, ports, schemes).zipped.map( - (h, p, s) => new HttpHost(h, p, s)) - RestClient.builder(httpHosts: _*).build() + (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s)) } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala index 912d467..647d180 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala @@ -17,22 +17,28 @@ package org.apache.predictionio.data.storage.elasticsearch -import grizzled.slf4j.Logging +import org.apache.http.HttpHost import org.apache.predictionio.data.storage.BaseStorageClient import org.apache.predictionio.data.storage.StorageClientConfig import org.apache.predictionio.data.storage.StorageClientException -import java.net.InetAddress import org.elasticsearch.client.RestClient -import org.apache.http.HttpHost + +import grizzled.slf4j.Logging + +case class ESClient(hosts: Seq[HttpHost]) { + def open(): RestClient = { + try { + RestClient.builder(hosts: _*).build() + } catch { + case e: Throwable => + throw new StorageClientException(e.getMessage, e) + } + } +} class StorageClient(val config: StorageClientConfig) extends BaseStorageClient with Logging { override val prefix = "ES" - val client = try { - ESUtils.createRestClient(config) - } catch { - case e: Throwable => - throw new StorageClientException(e.getMessage, e) - } + val client = ESClient(ESUtils.getHttpHosts(config)) }
