This is an automated email from the ASF dual-hosted git repository.

wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6977d1d  Support http remote conf
6977d1d is described below

commit 6977d1dbb5997122d4458ce9e3ac0423dd62be7d
Author: yuxiaoyu <[email protected]>
AuthorDate: Mon Dec 7 11:59:14 2020 +0800

    Support http remote conf
    
    In our production practice, many Griffin jobs run on yarn in cluster mode. 
We upload  different conf files to the http file server and we also provide 
services that generate specific configurations based on different HTTP URLs.
    So we supports setting HTTP URLs as conf in submitting Griffin jobs in this 
PR. There is no effect on JSON or File conf mode. And it works well in our 
production environment for a long time.
    
    Author: yuxiaoyu <[email protected]>
    
    Closes #587 from XiaoyuBD/support_http_url_conf.
---
 ...amReaderFactory.scala => ParamHttpReader.scala} | 39 +++++-------
 .../dqdefinition/reader/ParamReaderFactory.scala   | 11 +++-
 .../griffin/measure/sink/ElasticSearchSink.scala   |  3 +-
 .../apache/griffin/measure/utils/HttpUtil.scala    | 73 +++++++---------------
 4 files changed, 48 insertions(+), 78 deletions(-)

diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala
similarity index 57%
copy from 
measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
copy to 
measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala
index 5067a9d..e7c19a8 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala
@@ -17,31 +17,26 @@
 
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
-import org.apache.griffin.measure.utils.JsonUtil
+import scala.reflect.ClassTag
+import scala.util.Try
 
-object ParamReaderFactory {
+import org.apache.griffin.measure.configuration.dqdefinition.Param
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
 
-  val json = "json"
-  val file = "file"
-
-  /**
-   * parse string content to get param reader
-   * @param pathOrJson
-   * @return
-   */
-  def getParamReader(pathOrJson: String): ParamReader = {
-    val strType = paramStrType(pathOrJson)
-    if (json.equals(strType)) ParamJsonReader(pathOrJson)
-    else ParamFileReader(pathOrJson)
-  }
+/**
+ * read params by http url
+ *
+ * @param httpUrl
+ */
+case class ParamHttpReader(httpUrl: String) extends ParamReader {
 
-  private def paramStrType(str: String): String = {
-    try {
-      JsonUtil.toAnyMap(str)
-      json
-    } catch {
-      case _: Throwable => file
+  def readConfig[T <: Param](implicit m: ClassTag[T]): Try[T] = {
+    Try {
+      val params = Map[String, Object]()
+      val headers = Map[String, Object](("Content-Type", "application/json"))
+      val jsonString = HttpUtil.doHttpRequest(httpUrl, "get", params, headers, 
null)._2
+      val param = JsonUtil.fromJson[T](jsonString)
+      validate(param)
     }
   }
-
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
index 5067a9d..f4bfd28 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
@@ -23,6 +23,7 @@ object ParamReaderFactory {
 
   val json = "json"
   val file = "file"
+  val httpRegex = "^http[s]?://.*"
 
   /**
    * parse string content to get param reader
@@ -30,9 +31,13 @@ object ParamReaderFactory {
    * @return
    */
   def getParamReader(pathOrJson: String): ParamReader = {
-    val strType = paramStrType(pathOrJson)
-    if (json.equals(strType)) ParamJsonReader(pathOrJson)
-    else ParamFileReader(pathOrJson)
+    if (pathOrJson.matches(httpRegex)) {
+      ParamHttpReader(pathOrJson)
+    } else {
+      val strType = paramStrType(pathOrJson)
+      if (json.equals(strType)) ParamJsonReader(pathOrJson)
+      else ParamFileReader(pathOrJson)
+    }
   }
 
   private def paramStrType(str: String): String = {
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index d8cce41..3134b44 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -63,7 +63,8 @@ case class ElasticSearchSink(
 
       def func(): (Long, Future[Boolean]) = {
         import scala.concurrent.ExecutionContext.Implicits.global
-        (timeStamp, Future(HttpUtil.doHttpRequest(api, method, params, header, 
data)))
+        val code = HttpUtil.doHttpRequest(api, method, params, header, data)._1
+        (timeStamp, Future(code >= 200 && code < 300))
       }
       if (block) SinkTaskRunner.addBlockTask(func _, retry, connectionTimeout)
       else SinkTaskRunner.addNonBlockTask(func _, retry)
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
index 66648d0..652bbf7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
@@ -19,10 +19,10 @@ package org.apache.griffin.measure.utils
 
 import scala.util.matching.Regex
 
-import org.apache.http.client.methods.{HttpGet, HttpPost}
+import org.apache.http.client.methods.{HttpDelete, HttpGet, HttpPost, HttpPut}
+import org.apache.http.client.utils.URIBuilder
 import org.apache.http.entity.{ContentType, StringEntity}
-import org.apache.http.impl.client.HttpClientBuilder
-import scalaj.http._
+import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
 
 object HttpUtil {
 
@@ -31,65 +31,34 @@ object HttpUtil {
   val PUT_REGEX: Regex = """^(?i)put$""".r
   val DELETE_REGEX: Regex = """^(?i)delete$""".r
 
-  def postData(
-      url: String,
-      params: Map[String, Object],
-      headers: Map[String, Object],
-      data: String): Boolean = {
-    val response = Http(url)
-      .params(convertObjMap2StrMap(params))
-      .headers(convertObjMap2StrMap(headers))
-      .postData(data)
-      .asString
-
-    response.isSuccess
-  }
-
   def doHttpRequest(
       url: String,
       method: String,
       params: Map[String, Object],
       headers: Map[String, Object],
-      data: String): Boolean = {
+      data: String): (Integer, String) = {
     val client = HttpClientBuilder.create.build
-    method match {
+    val uriBuilder = new URIBuilder(url)
+    convertObjMap2StrMap(params) foreach (param => 
uriBuilder.setParameter(param._1, param._2))
+    val handler = new BasicResponseHandler()
+    val request = method match {
       case POST_REGEX() =>
-        val post = new HttpPost(url)
-        convertObjMap2StrMap(headers) foreach (header => 
post.addHeader(header._1, header._2))
+        val post = new HttpPost(uriBuilder.build())
         post.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON))
-
-        // send the post request
-        val response = client.execute(post)
-        val code = response.getStatusLine.getStatusCode
-        code >= 200 && code < 300
-      case PUT_REGEX() =>
-        val get = new HttpGet(url)
-        convertObjMap2StrMap(headers) foreach (header => 
get.addHeader(header._1, header._2))
-        val response = client.execute(get)
-        val code = response.getStatusLine.getStatusCode
-        code >= 200 && code < 300
-      case _ => false
-    }
-  }
-
-  def httpRequest(
-      url: String,
-      method: String,
-      params: Map[String, Object],
-      headers: Map[String, Object],
-      data: String): Boolean = {
-    val httpReq = Http(url)
-      .params(convertObjMap2StrMap(params))
-      .headers(convertObjMap2StrMap(headers))
-    method match {
-      case POST_REGEX() =>
-        val res = httpReq.postData(data).asString
-        res.isSuccess
+        post
       case PUT_REGEX() =>
-        val res = httpReq.put(data).asString
-        res.isSuccess
-      case _ => false
+        val put = new HttpPut(uriBuilder.build())
+        put.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON))
+        put
+      case GET_REGEX() =>
+        new HttpGet(uriBuilder.build())
+      case DELETE_REGEX() =>
+        new HttpDelete(uriBuilder.build())
+      case _ => throw new UnsupportedOperationException("Unsupported http 
method error!")
     }
+    convertObjMap2StrMap(headers) foreach (header => 
request.addHeader(header._1, header._2))
+    val response = client.execute(request)
+    (response.getStatusLine.getStatusCode, 
handler.handleResponse(response).trim)
   }
 
   private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, 
String] = {

Reply via email to