[PIO-106,PIO-114] Elasticsearch 5.x singleton client with authentication Closes #421
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/bf84ede6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/bf84ede6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/bf84ede6 Branch: refs/heads/livedoc Commit: bf84ede6fe475ec591e784eb453c6194befb8515 Parents: eb61358 Author: Mars Hall <[email protected]> Authored: Tue Aug 29 11:38:34 2017 -0700 Committer: Mars Hall <[email protected]> Committed: Tue Aug 29 11:38:34 2017 -0700 ---------------------------------------------------------------------- bin/compute-classpath.sh | 21 ++- conf/pio-env.sh.template | 3 + .../predictionio/workflow/BatchPredict.scala | 140 ++++++++++--------- .../workflow/CleanupFunctions.scala | 65 +++++++++ .../predictionio/workflow/CoreWorkflow.scala | 66 +++++---- .../predictionio/data/storage/Storage.scala | 8 +- storage/elasticsearch/build.sbt | 8 +- .../storage/elasticsearch/ESAccessKeys.scala | 55 +++----- .../data/storage/elasticsearch/ESApps.scala | 55 +++----- .../data/storage/elasticsearch/ESChannels.scala | 50 +++---- .../elasticsearch/ESEngineInstances.scala | 84 ++++------- .../elasticsearch/ESEvaluationInstances.scala | 71 ++++------ .../data/storage/elasticsearch/ESLEvents.scala | 49 +++++-- .../data/storage/elasticsearch/ESPEvents.scala | 29 ++-- .../storage/elasticsearch/ESSequences.scala | 30 ++-- .../storage/elasticsearch/StorageClient.scala | 69 ++++++++- .../elasticsearch/StorageClientSpec.scala | 67 +++++++++ .../elasticsearch/StorageTestUtils.scala | 28 ++++ storage/hdfs/project/build.properties | 1 + .../tools/export/EventsToFile.scala | 62 ++++---- .../predictionio/tools/imprt/FileToEvents.scala | 64 +++++---- 21 files changed, 596 insertions(+), 429 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/bin/compute-classpath.sh ---------------------------------------------------------------------- diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 3e59ca7..7a38e0b 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -42,8 +42,25 @@ ASSEMBLY_JARS=$(printf "${MAIN_JAR}\n${DATA_JARS}" | paste -sd "," -) # Build up classpath CLASSPATH="${PIO_CONF_DIR}" -CLASSPATH="$CLASSPATH:${FWDIR}/plugins/*" -CLASSPATH="$CLASSPATH:${assembly_folder}/spark/*" + +# stable classpath for plugin JARs +if [ -d "${FWDIR}/plugins" ]; then + lib_plugin_jars=`ls "${FWDIR}"/plugins/*` + lib_plugin_classpath='' + for J in $lib_plugin_jars; do + lib_plugin_classpath="${lib_plugin_classpath}:${J}" + done + CLASSPATH="$CLASSPATH${lib_plugin_classpath}" +fi + +# stable classpath for Spark JARs +lib_spark_jars=`ls "${assembly_folder}"/spark/*.jar` +lib_spark_classpath='' +for J in $lib_spark_jars; do + lib_spark_classpath="${lib_spark_classpath}:${J}" +done +CLASSPATH="$CLASSPATH${lib_spark_classpath}" + CLASSPATH="$CLASSPATH:${MAIN_JAR}" # Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail ! Note, this http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/conf/pio-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template index 832b422..0b6b5b9 100644 --- a/conf/pio-env.sh.template +++ b/conf/pio-env.sh.template @@ -90,6 +90,9 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio # PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200 # PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.4.1 +# Optional basic HTTP auth +# PIO_STORAGE_SOURCES_ELASTICSEARCH_USERNAME=my-name +# PIO_STORAGE_SOURCES_ELASTICSEARCH_PASSWORD=my-secret # Elasticsearch 1.x Example # PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch # PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name> http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala index 5420638..69525b1 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala @@ -28,6 +28,7 @@ import org.apache.predictionio.controller.{Engine, Utils} import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer} import org.apache.predictionio.data.storage.{EngineInstance, Storage} import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption +import org.apache.predictionio.workflow.CleanupFunctions import org.apache.spark.rdd.RDD import org.json4s._ import org.json4s.native.JsonMethods._ @@ -146,84 +147,89 @@ object BatchPredict extends Logging { engineInstance: EngineInstance, engine: Engine[_, _, _, Q, P, _]): Unit = { - val engineParams = engine.engineInstanceToEngineParams( - engineInstance, config.jsonExtractor) + try { + val engineParams = engine.engineInstanceToEngineParams( + engineInstance, config.jsonExtractor) - val kryo = KryoInstantiator.newKryoInjection + val kryo = KryoInstantiator.newKryoInjection - val modelsFromEngineInstance = - kryo.invert(modeldata.get(engineInstance.id).get.models).get. - asInstanceOf[Seq[Any]] + val modelsFromEngineInstance = + kryo.invert(modeldata.get(engineInstance.id).get.models).get. + asInstanceOf[Seq[Any]] - val prepareSparkContext = WorkflowContext( - batch = engineInstance.engineFactory, - executorEnv = engineInstance.env, - mode = "Batch Predict (model)", - sparkEnv = engineInstance.sparkConf) + val prepareSparkContext = WorkflowContext( + batch = engineInstance.engineFactory, + executorEnv = engineInstance.env, + mode = "Batch Predict (model)", + sparkEnv = engineInstance.sparkConf) - val models = engine.prepareDeploy( - prepareSparkContext, - engineParams, - engineInstance.id, - modelsFromEngineInstance, - params = WorkflowParams() - ) + val models = engine.prepareDeploy( + prepareSparkContext, + engineParams, + engineInstance.id, + modelsFromEngineInstance, + params = WorkflowParams() + ) - val algorithms = engineParams.algorithmParamsList.map { case (n, p) => - Doer(engine.algorithmClassMap(n), p) - } + val algorithms = engineParams.algorithmParamsList.map { case (n, p) => + Doer(engine.algorithmClassMap(n), p) + } - val servingParamsWithName = engineParams.servingParams + val servingParamsWithName = engineParams.servingParams - val serving = Doer(engine.servingClassMap(servingParamsWithName._1), - servingParamsWithName._2) + val serving = Doer(engine.servingClassMap(servingParamsWithName._1), + servingParamsWithName._2) - val runSparkContext = WorkflowContext( - batch = engineInstance.engineFactory, - executorEnv = engineInstance.env, - mode = "Batch Predict (runner)", - sparkEnv = engineInstance.sparkConf) + val runSparkContext = WorkflowContext( + batch = engineInstance.engineFactory, + executorEnv = engineInstance.env, + mode = "Batch Predict (runner)", + sparkEnv = engineInstance.sparkConf) - val inputRDD: RDD[String] = runSparkContext. - textFile(config.inputFilePath). - filter(_.trim.nonEmpty) - val queriesRDD: RDD[String] = config.queryPartitions match { - case Some(p) => inputRDD.repartition(p) - case None => inputRDD - } + val inputRDD: RDD[String] = runSparkContext. + textFile(config.inputFilePath). + filter(_.trim.nonEmpty) + val queriesRDD: RDD[String] = config.queryPartitions match { + case Some(p) => inputRDD.repartition(p) + case None => inputRDD + } - val predictionsRDD: RDD[String] = queriesRDD.map { queryString => - val jsonExtractorOption = config.jsonExtractor - // Extract Query from Json - val query = JsonExtractor.extract( - jsonExtractorOption, - queryString, - algorithms.head.queryClass, - algorithms.head.querySerializer, - algorithms.head.gsonTypeAdapterFactories - ) - // Deploy logic. First call Serving.supplement, then Algo.predict, - // finally Serving.serve. - val supplementedQuery = serving.supplementBase(query) - // TODO: Parallelize the following. - val predictions = algorithms.zip(models).map { case (a, m) => - a.predictBase(m, supplementedQuery) + val predictionsRDD: RDD[String] = queriesRDD.map { queryString => + val jsonExtractorOption = config.jsonExtractor + // Extract Query from Json + val query = JsonExtractor.extract( + jsonExtractorOption, + queryString, + algorithms.head.queryClass, + algorithms.head.querySerializer, + algorithms.head.gsonTypeAdapterFactories + ) + // Deploy logic. First call Serving.supplement, then Algo.predict, + // finally Serving.serve. + val supplementedQuery = serving.supplementBase(query) + // TODO: Parallelize the following. + val predictions = algorithms.zip(models).map { case (a, m) => + a.predictBase(m, supplementedQuery) + } + // Notice that it is by design to call Serving.serve with the + // *original* query. + val prediction = serving.serveBase(query, predictions) + // Combine query with prediction, so the batch results are + // self-descriptive. + val predictionJValue = JsonExtractor.toJValue( + jsonExtractorOption, + Map("query" -> query, + "prediction" -> prediction), + algorithms.head.querySerializer, + algorithms.head.gsonTypeAdapterFactories) + // Return JSON string + compact(render(predictionJValue)) } - // Notice that it is by design to call Serving.serve with the - // *original* query. - val prediction = serving.serveBase(query, predictions) - // Combine query with prediction, so the batch results are - // self-descriptive. - val predictionJValue = JsonExtractor.toJValue( - jsonExtractorOption, - Map("query" -> query, - "prediction" -> prediction), - algorithms.head.querySerializer, - algorithms.head.gsonTypeAdapterFactories) - // Return JSON string - compact(render(predictionJValue)) - } - predictionsRDD.saveAsTextFile(config.outputFilePath) + predictionsRDD.saveAsTextFile(config.outputFilePath) + + } finally { + CleanupFunctions.run() + } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala b/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala new file mode 100644 index 0000000..bdd8b01 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.workflow + +/** :: DeveloperApi :: + * Singleton object that collects anonymous functions to be + * executed to allow the process to end gracefully. + * + * For example, the Elasticsearch REST storage client + * maintains an internal connection pool that must + * be closed to allow the process to exit. + */ +object CleanupFunctions { + @volatile private var functions: Seq[() => Unit] = Seq.empty[() => Unit] + + /** Add a function to be called during cleanup. + * + * {{{ + * import org.apache.predictionio.workflow.CleanupFunctions + * + * CleanupFunctions.add { MyStorageClass.close } + * }}} + * + * @param anonymous function containing cleanup code. + */ + def add(f: () => Unit): Seq[() => Unit] = { + functions = functions :+ f + functions + } + + /** Call all cleanup functions in order added. + * + * {{{ + * import org.apache.predictionio.workflow.CleanupFunctions + * + * try { + * // Much code that needs cleanup + * // whether successful or error thrown. + * } finally { + * CleanupFunctions.run() + * } + * }}} + * + * @param anonymous function containing cleanup code. + */ + def run(): Unit = { + functions.foreach { f => f() } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala index d956fd7..65270a2 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala @@ -96,6 +96,7 @@ object CoreWorkflow { } } finally { logger.debug("Stopping SparkContext") + CleanupFunctions.run() sc.stop() } } @@ -123,40 +124,43 @@ object CoreWorkflow { env, params.sparkEnv, mode.capitalize) - val evaluationInstanceId = evaluationInstances.insert(evaluationInstance) - logger.info(s"Starting evaluation instance ID: $evaluationInstanceId") - - val evaluatorResult: BaseEvaluatorResult = EvaluationWorkflow.runEvaluation( - sc, - evaluation, - engine, - engineParamsList, - evaluator, - params) - - if (evaluatorResult.noSave) { - logger.info(s"This evaluation result is not inserted into database: $evaluatorResult") - } else { - val evaluatedEvaluationInstance = evaluationInstance.copy( - status = "EVALCOMPLETED", - id = evaluationInstanceId, - endTime = DateTime.now, - evaluatorResults = evaluatorResult.toOneLiner, - evaluatorResultsHTML = evaluatorResult.toHTML, - evaluatorResultsJSON = evaluatorResult.toJSON - ) - - logger.info(s"Updating evaluation instance with result: $evaluatorResult") + try { + val evaluationInstanceId = evaluationInstances.insert(evaluationInstance) + + logger.info(s"Starting evaluation instance ID: $evaluationInstanceId") + + val evaluatorResult: BaseEvaluatorResult = EvaluationWorkflow.runEvaluation( + sc, + evaluation, + engine, + engineParamsList, + evaluator, + params) + + if (evaluatorResult.noSave) { + logger.info(s"This evaluation result is not inserted into database: $evaluatorResult") + } else { + val evaluatedEvaluationInstance = evaluationInstance.copy( + status = "EVALCOMPLETED", + id = evaluationInstanceId, + endTime = DateTime.now, + evaluatorResults = evaluatorResult.toOneLiner, + evaluatorResultsHTML = evaluatorResult.toHTML, + evaluatorResultsJSON = evaluatorResult.toJSON + ) + + logger.info(s"Updating evaluation instance with result: $evaluatorResult") + + evaluationInstances.update(evaluatedEvaluationInstance) + } + logger.info("runEvaluation completed") - evaluationInstances.update(evaluatedEvaluationInstance) + } finally { + logger.debug("Stop SparkContext") + CleanupFunctions.run() + sc.stop() } - - logger.debug("Stop SparkContext") - - sc.stop() - - logger.info("runEvaluation completed") } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala index 52442a6..70e1973 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala @@ -19,6 +19,7 @@ package org.apache.predictionio.data.storage import grizzled.slf4j.Logging +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.predictionio.annotation.DeveloperApi import scala.concurrent.ExecutionContext.Implicits.global @@ -214,7 +215,8 @@ object Storage extends Logging { } } catch { case e: Throwable => - error(e.getMessage) + val stackTrace = ExceptionUtils.getStackTrace(e) + error(s"${e.getMessage}\n${stackTrace}\n\n") errors += 1 r -> DataObjectMeta("", "") } @@ -282,7 +284,9 @@ object Storage extends Logging { Some(ClientMeta(sourceType, client, clientConfig)) } catch { case e: Throwable => - error(s"Error initializing storage client for source ${k}", e) + val stackTrace = ExceptionUtils.getStackTrace(e) + error(s"Error initializing storage client for source ${k}.\n" + + s"${stackTrace}\n\n") errors += 1 None } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/build.sbt ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/build.sbt b/storage/elasticsearch/build.sbt index da4842e..b60e86e 100644 --- a/storage/elasticsearch/build.sbt +++ b/storage/elasticsearch/build.sbt @@ -30,7 +30,7 @@ libraryDependencies ++= Seq( "org.elasticsearch" %% elasticsearchSparkArtifact.value % elasticsearchVersion.value exclude("org.apache.spark", "*"), "org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearchVersion.value, - "org.scalatest" %% "scalatest" % "2.1.7" % "test") + "org.specs2" %% "specs2" % "2.3.13" % "test") parallelExecution in Test := false @@ -39,8 +39,10 @@ pomExtra := childrenPomExtra.value assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) assemblyShadeRules in assembly := Seq( - ShadeRule.rename("org.apache.http.**" -> "shadeio.data.http.@1").inAll -) + ShadeRule.rename("org.apache.http.**" -> + "org.apache.predictionio.shaded.org.apache.http.@1").inAll, + ShadeRule.rename("org.elasticsearch.client.**" -> + "org.apache.predictionio.shaded.org.elasticsearch.client.@1").inAll) // skip test in assembly test in assembly := {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index 98c2781..73ef1d0 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -27,36 +27,30 @@ import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.AccessKey import org.apache.predictionio.data.storage.AccessKeys import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.RestClient +import org.elasticsearch.client.{ResponseException, RestClient} import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException /** Elasticsearch implementation of AccessKeys. */ -class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) +class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String) extends AccessKeys with Logging { implicit val formats = DefaultFormats.lossless private val estype = "accesskeys" - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> false)) ~ - ("properties" -> - ("key" -> ("type" -> "keyword")) ~ - ("events" -> ("type" -> "keyword")))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() - } + ESUtils.createIndex(client, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> false)) ~ + ("properties" -> + ("key" -> ("type" -> "keyword")) ~ + ("events" -> ("type" -> "keyword")))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(accessKey: AccessKey): Option[String] = { val key = if (accessKey.key.isEmpty) generateKey else accessKey.key @@ -68,9 +62,8 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) if (id.isEmpty) { return None } - val restClient = client.open() try { - val response = restClient.performRequest( + val response = client.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -92,50 +85,41 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) case e: IOException => error(s"Failed to access to /$index/$estype/$id", e) None - } finally { - restClient.close() } } def getAll(): Seq[AccessKey] = { - val restClient = client.open() try { val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json))) + ESUtils.getAll[AccessKey](client, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil - } finally { - restClient.close() } } def getByAppid(appid: Int): Seq[AccessKey] = { - val restClient = client.open() try { val json = ("query" -> ("term" -> ("appid" -> appid))) - ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json))) + ESUtils.getAll[AccessKey](client, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil - } finally { - restClient.close() } } def update(accessKey: AccessKey): Unit = { val id = accessKey.key - val restClient = client.open() try { val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/$id", Map("refresh" -> "true").asJava, @@ -151,15 +135,12 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() } } def delete(id: String): Unit = { - val restClient = client.open() try { - val response = restClient.performRequest( + val response = client.performRequest( "DELETE", s"/$index/$estype/$id", Map("refresh" -> "true").asJava) @@ -173,8 +154,6 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) } catch { case e: IOException => error(s"Failed to update $index/$estype/id", e) - } finally { - restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index 6afed12..ba48065 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -27,37 +27,31 @@ import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.App import org.apache.predictionio.data.storage.Apps import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.RestClient +import org.elasticsearch.client.{ResponseException, RestClient} import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException /** Elasticsearch implementation of Items. */ -class ESApps(client: ESClient, config: StorageClientConfig, index: String) +class ESApps(client: RestClient, config: StorageClientConfig, index: String) extends Apps with Logging { implicit val formats = DefaultFormats.lossless private val estype = "apps" private val seq = new ESSequences(client, config, index) - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> false)) ~ - ("properties" -> - ("id" -> ("type" -> "keyword")) ~ - ("name" -> ("type" -> "keyword")))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() - } + ESUtils.createIndex(client, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> false)) ~ + ("properties" -> + ("id" -> ("type" -> "keyword")) ~ + ("name" -> ("type" -> "keyword")))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(app: App): Option[Int] = { val id = app.id match { @@ -77,9 +71,8 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String) } def get(id: Int): Option[App] = { - val restClient = client.open() try { - val response = restClient.performRequest( + val response = client.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -101,20 +94,17 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String) case e: IOException => error(s"Failed to access to /$index/$estype/$id", e) None - } finally { - restClient.close() } } def getByName(name: String): Option[App] = { - val restClient = client.open() try { val json = ("query" -> ("term" -> ("name" -> name))) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/_search", Map.empty[String, String].asJava, @@ -131,33 +121,27 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String) case e: IOException => error(s"Failed to access to /$index/$estype/_search", e) None - } finally { - restClient.close() } } def getAll(): Seq[App] = { - val restClient = client.open() try { val json = ("query" -> ("match_all" -> Nil)) - ESUtils.getAll[App](restClient, index, estype, compact(render(json))) + ESUtils.getAll[App](client, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil - } finally { - restClient.close() } } def update(app: App): Unit = { val id = app.id.toString - val restClient = client.open() try { val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON); - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/$id", Map("refresh" -> "true").asJava, @@ -173,15 +157,12 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String) } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() } } def delete(id: Int): Unit = { - val restClient = client.open() try { - val response = restClient.performRequest( + val response = client.performRequest( "DELETE", s"/$index/$estype/$id", Map("refresh" -> "true").asJava) @@ -195,8 +176,6 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String) } catch { case e: IOException => error(s"Failed to update $index/$estype/id", e) - } finally { - restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala index c142beb..b5eb5c8 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -27,35 +27,29 @@ import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.Channel import org.apache.predictionio.data.storage.Channels import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.RestClient +import org.elasticsearch.client.{ResponseException, RestClient} import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException -class ESChannels(client: ESClient, config: StorageClientConfig, index: String) +class ESChannels(client: RestClient, config: StorageClientConfig, index: String) extends Channels with Logging { implicit val formats = DefaultFormats.lossless private val estype = "channels" private val seq = new ESSequences(client, config, index) - - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> false)) ~ - ("properties" -> - ("name" -> ("type" -> "keyword")))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() - } + + ESUtils.createIndex(client, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> false)) ~ + ("properties" -> + ("name" -> ("type" -> "keyword")))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(channel: Channel): Option[Int] = { val id = channel.id match { @@ -75,9 +69,8 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String) } def get(id: Int): Option[Channel] = { - val restClient = client.open() try { - val response = restClient.performRequest( + val response = client.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -99,34 +92,28 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String) case e: IOException => error(s"Failed to access to /$index/$estype/$id", e) None - } finally { - restClient.close() } } def getByAppid(appid: Int): Seq[Channel] = { - val restClient = client.open() try { val json = ("query" -> ("term" -> ("appid" -> appid))) - ESUtils.getAll[Channel](restClient, index, estype, compact(render(json))) + ESUtils.getAll[Channel](client, index, estype, compact(render(json))) } catch { case e: IOException => error(s"Failed to access to /$index/$estype/_search", e) Nil - } finally { - restClient.close() } } def update(channel: Channel): Boolean = { val id = channel.id.toString - val restClient = client.open() try { val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/$id", Map("refresh" -> "true").asJava, @@ -144,15 +131,12 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String) case e: IOException => error(s"Failed to update $index/$estype/$id", e) false - } finally { - restClient.close() } } def delete(id: Int): Unit = { - val restClient = client.open() try { - val response = restClient.performRequest( + val response = client.performRequest( "DELETE", s"/$index/$estype/$id", Map("refresh" -> "true").asJava) @@ -166,8 +150,6 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String) } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index de474c1..eec5b64 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -28,46 +28,40 @@ import org.apache.predictionio.data.storage.EngineInstance import org.apache.predictionio.data.storage.EngineInstanceSerializer import org.apache.predictionio.data.storage.EngineInstances import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.RestClient +import org.elasticsearch.client.{ResponseException, RestClient} import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException -class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: String) +class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String) extends EngineInstances with Logging { implicit val formats = DefaultFormats + new EngineInstanceSerializer private val estype = "engine_instances" - - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> false)) ~ - ("properties" -> - ("status" -> ("type" -> "keyword")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("engineId" -> ("type" -> "keyword")) ~ - ("engineVersion" -> ("type" -> "keyword")) ~ - ("engineVariant" -> ("type" -> "keyword")) ~ - ("engineFactory" -> ("type" -> "keyword")) ~ - ("batch" -> ("type" -> "keyword")) ~ - ("dataSourceParams" -> ("type" -> "keyword")) ~ - ("preparatorParams" -> ("type" -> "keyword")) ~ - ("algorithmsParams" -> ("type" -> "keyword")) ~ - ("servingParams" -> ("type" -> "keyword")) ~ - ("status" -> ("type" -> "keyword")))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() - } + + ESUtils.createIndex(client, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> false)) ~ + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("engineId" -> ("type" -> "keyword")) ~ + ("engineVersion" -> ("type" -> "keyword")) ~ + ("engineVariant" -> ("type" -> "keyword")) ~ + ("engineFactory" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("dataSourceParams" -> ("type" -> "keyword")) ~ + ("preparatorParams" -> ("type" -> "keyword")) ~ + ("algorithmsParams" -> ("type" -> "keyword")) ~ + ("servingParams" -> ("type" -> "keyword")) ~ + ("status" -> ("type" -> "keyword")))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(i: EngineInstance): String = { val id = i.id match { @@ -88,10 +82,9 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St } def preInsert(): Option[String] = { - val restClient = client.open() try { val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/", Map("refresh" -> "true").asJava, @@ -109,15 +102,12 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St case e: IOException => error(s"Failed to create $index/$estype", e) None - } finally { - restClient.close() } } def get(id: String): Option[EngineInstance] = { - val restClient = client.open() try { - val response = restClient.performRequest( + val response = client.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -139,24 +129,19 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St case e: IOException => error(s"Failed to access to /$index/$estype/$id", e) None - } finally { - restClient.close() } } def getAll(): Seq[EngineInstance] = { - val restClient = client.open() try { val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json))) + ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil - } finally { - restClient.close() } } @@ -164,7 +149,6 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St engineId: String, engineVersion: String, engineVariant: String): Seq[EngineInstance] = { - val restClient = client.open() try { val json = ("query" -> @@ -181,13 +165,11 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St ("sort" -> List( ("startTime" -> ("order" -> "desc")))) - ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json))) + ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json))) } catch { case e: IOException => error(s"Failed to access to /$index/$estype/_search", e) Nil - } finally { - restClient.close() } } @@ -202,10 +184,9 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St def update(i: EngineInstance): Unit = { val id = i.id - val restClient = client.open() try { val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/$id", Map("refresh" -> "true").asJava, @@ -221,15 +202,12 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() } } def delete(id: String): Unit = { - val restClient = client.open() try { - val response = restClient.performRequest( + val response = client.performRequest( "DELETE", s"/$index/$estype/$id", Map("refresh" -> "true").asJava) @@ -243,8 +221,6 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 9b19cf4..1706583 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -29,43 +29,37 @@ import org.apache.predictionio.data.storage.EvaluationInstanceSerializer import org.apache.predictionio.data.storage.EvaluationInstances import org.apache.predictionio.data.storage.StorageClientConfig import org.apache.predictionio.data.storage.StorageClientException -import org.elasticsearch.client.RestClient +import org.elasticsearch.client.{ResponseException, RestClient} import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException -class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index: String) +class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String) extends EvaluationInstances with Logging { implicit val formats = DefaultFormats + new EvaluationInstanceSerializer private val estype = "evaluation_instances" private val seq = new ESSequences(client, config, index) - - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> false)) ~ - ("properties" -> - ("status" -> ("type" -> "keyword")) ~ - ("startTime" -> ("type" -> "date")) ~ - ("endTime" -> ("type" -> "date")) ~ - ("evaluationClass" -> ("type" -> "keyword")) ~ - ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ - ("batch" -> ("type" -> "keyword")) ~ - ("evaluatorResults" -> ("type" -> "text")) ~ - ("evaluatorResultsHTML" -> ("enabled" -> false)) ~ - ("evaluatorResultsJSON" -> ("enabled" -> false)))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() - } + + ESUtils.createIndex(client, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> false)) ~ + ("properties" -> + ("status" -> ("type" -> "keyword")) ~ + ("startTime" -> ("type" -> "date")) ~ + ("endTime" -> ("type" -> "date")) ~ + ("evaluationClass" -> ("type" -> "keyword")) ~ + ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ + ("batch" -> ("type" -> "keyword")) ~ + ("evaluatorResults" -> ("type" -> "text")) ~ + ("evaluatorResultsHTML" -> ("enabled" -> false)) ~ + ("evaluatorResultsJSON" -> ("enabled" -> false)))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(i: EvaluationInstance): String = { val id = i.id match { @@ -85,9 +79,8 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index } def get(id: String): Option[EvaluationInstance] = { - val restClient = client.open() try { - val response = restClient.performRequest( + val response = client.performRequest( "GET", s"/$index/$estype/$id", Map.empty[String, String].asJava) @@ -109,29 +102,23 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index case e: IOException => error(s"Failed to access to /$index/$estype/$id", e) None - } finally { - restClient.close() } } def getAll(): Seq[EvaluationInstance] = { - val restClient = client.open() try { val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json))) + ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil - } finally { - restClient.close() } } def getCompleted(): Seq[EvaluationInstance] = { - val restClient = client.open() try { val json = ("query" -> @@ -140,22 +127,19 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index ("sort" -> ("startTime" -> ("order" -> "desc"))) - ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json))) + ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json))) } catch { case e: IOException => error("Failed to access to /$index/$estype/_search", e) Nil - } finally { - restClient.close() } } def update(i: EvaluationInstance): Unit = { val id = i.id - val restClient = client.open() try { val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/$id", Map("refresh" -> "true").asJava, @@ -171,15 +155,12 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() } } def delete(id: String): Unit = { - val restClient = client.open() try { - val response = restClient.performRequest( + val response = client.performRequest( "DELETE", s"/$index/$estype/$id", Map("refresh" -> "true").asJava) @@ -193,8 +174,6 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index } catch { case e: IOException => error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index 6c0c4a7..5e1f4c1 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -28,6 +28,7 @@ import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.Event import org.apache.predictionio.data.storage.LEvents import org.apache.predictionio.data.storage.StorageClientConfig +import org.elasticsearch.client.{ResponseException, RestClient} import org.joda.time.DateTime import org.json4s._ import org.json4s.JsonDSL._ @@ -37,10 +38,9 @@ import org.json4s.ext.JodaTimeSerializers import grizzled.slf4j.Logging import org.apache.http.message.BasicHeader -class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String) +class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String) extends LEvents with Logging { implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all - val restClient = client.open() def getEsType(appId: Int, channelId: Option[Int] = None): String = { channelId.map { ch => @@ -52,7 +52,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St override def init(appId: Int, channelId: Option[Int] = None): Boolean = { val estype = getEsType(appId, channelId) - ESUtils.createIndex(restClient, index, + ESUtils.createIndex(client, index, ESUtils.getNumberOfShards(config, index.toUpperCase), ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val json = @@ -71,7 +71,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St ("tags" -> ("type" -> "keyword")) ~ ("prId" -> ("type" -> "keyword")) ~ ("creationTime" -> ("type" -> "date")))) - ESUtils.createMapping(restClient, index, estype, compact(render(json))) + ESUtils.createMapping(client, index, estype, compact(render(json))) true } @@ -82,7 +82,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St ("query" -> ("match_all" -> List.empty)) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - restClient.performRequest( + client.performRequest( "POST", s"/$index/$estype/_delete_by_query", Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, @@ -99,9 +99,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St } } - override def close(): Unit = { - restClient.close() - } + override def close(): Unit = {} override def futureInsert( event: Event, @@ -126,7 +124,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~ ("properties" -> write(event.properties.toJObject)) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/$id", Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, @@ -185,7 +183,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St }.mkString("", "\n", "\n") val entity = new StringEntity(json) - val response = restClient.performRequest( + val response = client.performRequest( "POST", "/_bulk", Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, @@ -215,6 +213,29 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St } } + private def exists(client: RestClient, estype: String, id: Int): Boolean = { + try { + client.performRequest( + "GET", + s"/$index/$estype/$id", + Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + case 200 => true + case _ => false + } + } catch { + case e: ResponseException => + e.getResponse.getStatusLine.getStatusCode match { + case 404 => false + case _ => + error(s"Failed to access to /$index/$estype/$id", e) + false + } + case e: IOException => + error(s"Failed to access to $index/$estype/$id", e) + false + } + } + override def futureGet( eventId: String, appId: Int, @@ -227,7 +248,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St ("term" -> ("eventId" -> eventId))) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/_search", Map.empty[String, String].asJava, @@ -260,7 +281,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St ("term" -> ("eventId" -> eventId))) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/_delete_by_query", Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, @@ -301,8 +322,8 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St startTime, untilTime, entityType, entityId, eventNames, targetEntityType, targetEntityId, reversed) limit.getOrElse(20) match { - case -1 => ESUtils.getEventAll(restClient, index, estype, query).toIterator - case size => ESUtils.getEvents(restClient, index, estype, query, size).toIterator + case -1 => ESUtils.getEventAll(client, index, estype, query).toIterator + case size => ESUtils.getEvents(client, index, estype, query, size).toIterator } } catch { case e: IOException => http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala index 9f0a188..75f7639 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala @@ -41,7 +41,7 @@ import org.json4s.native.JsonMethods._ import org.json4s.ext.JodaTimeSerializers -class ESPEvents(client: ESClient, config: StorageClientConfig, index: String) +class ESPEvents(client: RestClient, config: StorageClientConfig, index: String) extends PEvents { implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all @@ -107,8 +107,6 @@ class ESPEvents(client: ESClient, config: StorageClientConfig, index: String) eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { val estype = getEsType(appId, channelId) - val restClient = client.open() - try { eventIds.foreachPartition { iter => iter.foreach { eventId => try { @@ -117,28 +115,23 @@ class ESPEvents(client: ESClient, config: StorageClientConfig, index: String) ("term" -> ("eventId" -> eventId))) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/_delete_by_query", Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "deleted" => true - case _ => - logger.error(s"[$result] Failed to update $index/$estype:$eventId") - false - } - } catch { - case e: IOException => - logger.error(s"Failed to update $index/$estype:$eventId", e) - false + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => + case _ => + logger.error(s"[$result] Failed to update $index/$estype:$eventId") } + } catch { + case e: IOException => + logger.error(s"Failed to update $index/$estype:$eventId", e) } } - } finally { - restClient.close() } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala index 9fd31a3..018ef85 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -35,30 +35,24 @@ import org.json4s.native.Serialization.write import grizzled.slf4j.Logging -class ESSequences(client: ESClient, config: StorageClientConfig, index: String) extends Logging { +class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends Logging { implicit val formats = DefaultFormats private val estype = "sequences" - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) - val mappingJson = - (estype -> - ("_all" -> ("enabled" -> false)) ~ - ("properties" -> - ("n" -> ("enabled" -> false)))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() - } + ESUtils.createIndex(client, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> false)) ~ + ("properties" -> + ("n" -> ("enabled" -> false)))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def genNext(name: String): Long = { - val restClient = client.open() try { val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( + val response = client.performRequest( "POST", s"/$index/$estype/$name", Map("refresh" -> "false").asJava, @@ -76,8 +70,6 @@ class ESSequences(client: ESClient, config: StorageClientConfig, index: String) } catch { case e: IOException => throw new StorageClientException(s"Failed to update $index/$estype/$name", e) - } finally { - restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala index 647d180..d2c69b9 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala @@ -18,27 +18,84 @@ package org.apache.predictionio.data.storage.elasticsearch import org.apache.http.HttpHost +import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} +import org.apache.http.impl.client.BasicCredentialsProvider +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder import org.apache.predictionio.data.storage.BaseStorageClient import org.apache.predictionio.data.storage.StorageClientConfig import org.apache.predictionio.data.storage.StorageClientException +import org.apache.predictionio.workflow.CleanupFunctions import org.elasticsearch.client.RestClient +import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback import grizzled.slf4j.Logging -case class ESClient(hosts: Seq[HttpHost]) { - def open(): RestClient = { +object ESClient extends Logging { + private var _sharedRestClient: Option[RestClient] = None + + def open( + hosts: Seq[HttpHost], + basicAuth: Option[(String, String)] = None): RestClient = { try { - RestClient.builder(hosts: _*).build() + val newClient = _sharedRestClient match { + case Some(c) => c + case None => { + var builder = RestClient.builder(hosts: _*) + builder = basicAuth match { + case Some((username, password)) => builder.setHttpClientConfigCallback( + new BasicAuthProvider(username, password)) + case None => builder} + builder.build() + } + } + _sharedRestClient = Some(newClient) + newClient } catch { case e: Throwable => throw new StorageClientException(e.getMessage, e) } } + + def close(): Unit = { + _sharedRestClient.foreach { client => + client.close() + _sharedRestClient = None + } + } } -class StorageClient(val config: StorageClientConfig) extends BaseStorageClient - with Logging { +class StorageClient(val config: StorageClientConfig) + extends BaseStorageClient with Logging { + override val prefix = "ES" - val client = ESClient(ESUtils.getHttpHosts(config)) + val usernamePassword = ( + config.properties.get("USERNAME"), + config.properties.get("PASSWORD")) + val optionalBasicAuth: Option[(String, String)] = usernamePassword match { + case (None, None) => None + case (username, password) => Some( + (username.getOrElse(""), password.getOrElse(""))) + } + + CleanupFunctions.add { ESClient.close } + + val client = ESClient.open(ESUtils.getHttpHosts(config), optionalBasicAuth) +} + +class BasicAuthProvider( + val username: String, + val password: String) + extends HttpClientConfigCallback { + + val credentialsProvider = new BasicCredentialsProvider() + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(username, password)) + + override def customizeHttpClient( + httpClientBuilder: HttpAsyncClientBuilder + ): HttpAsyncClientBuilder = { + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider) + } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala new file mode 100644 index 0000000..91cc4d6 --- /dev/null +++ b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +import org.apache.predictionio.data.storage.{App, Apps, Storage, StorageClientConfig} +import org.elasticsearch.client.{RestClient, Response} +import scala.collection.JavaConverters._ + +import org.specs2._ +import org.specs2.specification.Step + +class ElasticsearchStorageClientSpec extends Specification { + def is = s2""" + + PredictionIO Storage Elasticsearch REST Client Specification ${getESClient} + + """ + + def getESClient = sequential ^ s2""" + + StorageClient should + - initialize metadata store ${initMetadataStore(appsDO)} + + """ + + def initMetadataStore(appsDO: Apps) = sequential ^ s2""" + + creates an app ${createsApp(appsDO)} + gets apps ${getApps(appsDO)} + + """ + + val indexName = "test_pio_storage_meta_" + hashCode + + def appsDO: Apps = Storage.getDataObject[Apps](StorageTestUtils.elasticsearchSourceName, indexName) + + def createsApp(appsDO: Apps) = { + val newId: Int = 123 + val newApp: App = App(newId, "test1", Some("App for ElasticsearchStorageClientSpec")) + val id: Option[Int] = appsDO.insert(newApp) + val createdApp: Option[App] = appsDO.get(id.get) + createdApp.get.id mustEqual newId + } + + def getApps(appsDO: Apps) = { + val apps: Seq[App] = appsDO.getAll() + println(s"Storage.config ${Storage.config}") + println(s"getApps ${apps}") + apps must beAnInstanceOf[Seq[App]] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala new file mode 100644 index 0000000..f891c58 --- /dev/null +++ b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.elasticsearch + +object StorageTestUtils { + val elasticsearchSourceName = "ELASTICSEARCH" + + def dropESIndex(namespace: String): Unit = { + // TODO + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/hdfs/project/build.properties ---------------------------------------------------------------------- diff --git a/storage/hdfs/project/build.properties b/storage/hdfs/project/build.properties new file mode 100644 index 0000000..64317fd --- /dev/null +++ b/storage/hdfs/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.15 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala index c101d3f..0372a44 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala @@ -25,6 +25,7 @@ import org.apache.predictionio.data.SparkVersionDependent import org.apache.predictionio.tools.Runner import org.apache.predictionio.workflow.WorkflowContext import org.apache.predictionio.workflow.WorkflowUtils +import org.apache.predictionio.workflow.CleanupFunctions import grizzled.slf4j.Logging import org.apache.spark.sql.SaveMode @@ -69,40 +70,45 @@ object EventsToFile extends Logging { } } parser.parse(args, EventsToFileArgs()) map { args => - // get channelId - val channels = Storage.getMetaDataChannels - val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap + try { + // get channelId + val channels = Storage.getMetaDataChannels + val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap - val channelId: Option[Int] = args.channel.map { ch => - if (!channelMap.contains(ch)) { - error(s"Channel ${ch} doesn't exist in this app.") - sys.exit(1) + val channelId: Option[Int] = args.channel.map { ch => + if (!channelMap.contains(ch)) { + error(s"Channel ${ch} doesn't exist in this app.") + sys.exit(1) + } + + channelMap(ch) } - channelMap(ch) - } + val channelStr = args.channel.map(n => " Channel " + n).getOrElse("") - val channelStr = args.channel.map(n => " Channel " + n).getOrElse("") + WorkflowUtils.modifyLogging(verbose = args.verbose) + @transient lazy implicit val formats = Utils.json4sDefaultFormats + + new EventJson4sSupport.APISerializer + val sc = WorkflowContext( + mode = "Export", + batch = "App ID " + args.appId + channelStr, + executorEnv = Runner.envStringToMap(args.env)) + val sqlSession = SparkVersionDependent.sqlSession(sc) + val events = Storage.getPEvents() + val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc) + val jsonStringRdd = eventsRdd.map(write(_)) + if (args.format == "json") { + jsonStringRdd.saveAsTextFile(args.outputPath) + } else { + val jsonDf = sqlSession.read.json(jsonStringRdd) + jsonDf.write.mode(SaveMode.ErrorIfExists).parquet(args.outputPath) + } + info(s"Events are exported to ${args.outputPath}/.") + info("Done.") - WorkflowUtils.modifyLogging(verbose = args.verbose) - @transient lazy implicit val formats = Utils.json4sDefaultFormats + - new EventJson4sSupport.APISerializer - val sc = WorkflowContext( - mode = "Export", - batch = "App ID " + args.appId + channelStr, - executorEnv = Runner.envStringToMap(args.env)) - val sqlSession = SparkVersionDependent.sqlSession(sc) - val events = Storage.getPEvents() - val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc) - val jsonStringRdd = eventsRdd.map(write(_)) - if (args.format == "json") { - jsonStringRdd.saveAsTextFile(args.outputPath) - } else { - val jsonDf = sqlSession.read.json(jsonStringRdd) - jsonDf.write.mode(SaveMode.ErrorIfExists).parquet(args.outputPath) + } finally { + CleanupFunctions.run() } - info(s"Events are exported to ${args.outputPath}/.") - info("Done.") } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala b/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala index 4b333ab..11f5a52 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala @@ -25,6 +25,7 @@ import org.apache.predictionio.data.storage.Storage import org.apache.predictionio.tools.Runner import org.apache.predictionio.workflow.WorkflowContext import org.apache.predictionio.workflow.WorkflowUtils +import org.apache.predictionio.workflow.CleanupFunctions import grizzled.slf4j.Logging import org.json4s.native.Serialization._ @@ -66,41 +67,46 @@ object FileToEvents extends Logging { } } parser.parse(args, FileToEventsArgs()) map { args => - // get channelId - val channels = Storage.getMetaDataChannels - val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap + try { + // get channelId + val channels = Storage.getMetaDataChannels + val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap - val channelId: Option[Int] = args.channel.map { ch => - if (!channelMap.contains(ch)) { - error(s"Channel ${ch} doesn't exist in this app.") - sys.exit(1) + val channelId: Option[Int] = args.channel.map { ch => + if (!channelMap.contains(ch)) { + error(s"Channel ${ch} doesn't exist in this app.") + sys.exit(1) + } + + channelMap(ch) } - channelMap(ch) - } + val channelStr = args.channel.map(n => " Channel " + n).getOrElse("") - val channelStr = args.channel.map(n => " Channel " + n).getOrElse("") + WorkflowUtils.modifyLogging(verbose = args.verbose) + @transient lazy implicit val formats = Utils.json4sDefaultFormats + + new EventJson4sSupport.APISerializer + val sc = WorkflowContext( + mode = "Import", + batch = "App ID " + args.appId + channelStr, + executorEnv = Runner.envStringToMap(args.env)) + val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json => + Try(read[Event](json)).recoverWith { + case e: Throwable => + error(s"\nmalformed json => $json") + Failure(e) + }.get + } + val events = Storage.getPEvents() + events.write(events = rdd, + appId = args.appId, + channelId = channelId)(sc) + info("Events are imported.") + info("Done.") - WorkflowUtils.modifyLogging(verbose = args.verbose) - @transient lazy implicit val formats = Utils.json4sDefaultFormats + - new EventJson4sSupport.APISerializer - val sc = WorkflowContext( - mode = "Import", - batch = "App ID " + args.appId + channelStr, - executorEnv = Runner.envStringToMap(args.env)) - val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json => - Try(read[Event](json)).recoverWith { - case e: Throwable => - error(s"\nmalformed json => $json") - Failure(e) - }.get + } finally { + CleanupFunctions.run() } - val events = Storage.getPEvents() - events.write(events = rdd, - appId = args.appId, - channelId = channelId)(sc) - info("Events are imported.") - info("Done.") } } }
