This is an automated email from the ASF dual-hosted git repository.
wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 6977d1d Support http remote conf
6977d1d is described below
commit 6977d1dbb5997122d4458ce9e3ac0423dd62be7d
Author: yuxiaoyu <[email protected]>
AuthorDate: Mon Dec 7 11:59:14 2020 +0800
Support http remote conf
In our production practice, many Griffin jobs run on yarn in cluster mode.
We upload different conf files to the http file server and we also provide
services that generate specific configurations based on different HTTP URLs.
So we supports setting HTTP URLs as conf in submitting Griffin jobs in this
PR. There is no effect on JSON or File conf mode. And it works well in our
production environment for a long time.
Author: yuxiaoyu <[email protected]>
Closes #587 from XiaoyuBD/support_http_url_conf.
---
...amReaderFactory.scala => ParamHttpReader.scala} | 39 +++++-------
.../dqdefinition/reader/ParamReaderFactory.scala | 11 +++-
.../griffin/measure/sink/ElasticSearchSink.scala | 3 +-
.../apache/griffin/measure/utils/HttpUtil.scala | 73 +++++++---------------
4 files changed, 48 insertions(+), 78 deletions(-)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala
similarity index 57%
copy from
measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
copy to
measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala
index 5067a9d..e7c19a8 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala
@@ -17,31 +17,26 @@
package org.apache.griffin.measure.configuration.dqdefinition.reader
-import org.apache.griffin.measure.utils.JsonUtil
+import scala.reflect.ClassTag
+import scala.util.Try
-object ParamReaderFactory {
+import org.apache.griffin.measure.configuration.dqdefinition.Param
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
- val json = "json"
- val file = "file"
-
- /**
- * parse string content to get param reader
- * @param pathOrJson
- * @return
- */
- def getParamReader(pathOrJson: String): ParamReader = {
- val strType = paramStrType(pathOrJson)
- if (json.equals(strType)) ParamJsonReader(pathOrJson)
- else ParamFileReader(pathOrJson)
- }
+/**
+ * read params by http url
+ *
+ * @param httpUrl
+ */
+case class ParamHttpReader(httpUrl: String) extends ParamReader {
- private def paramStrType(str: String): String = {
- try {
- JsonUtil.toAnyMap(str)
- json
- } catch {
- case _: Throwable => file
+ def readConfig[T <: Param](implicit m: ClassTag[T]): Try[T] = {
+ Try {
+ val params = Map[String, Object]()
+ val headers = Map[String, Object](("Content-Type", "application/json"))
+ val jsonString = HttpUtil.doHttpRequest(httpUrl, "get", params, headers,
null)._2
+ val param = JsonUtil.fromJson[T](jsonString)
+ validate(param)
}
}
-
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
index 5067a9d..f4bfd28 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
@@ -23,6 +23,7 @@ object ParamReaderFactory {
val json = "json"
val file = "file"
+ val httpRegex = "^http[s]?://.*"
/**
* parse string content to get param reader
@@ -30,9 +31,13 @@ object ParamReaderFactory {
* @return
*/
def getParamReader(pathOrJson: String): ParamReader = {
- val strType = paramStrType(pathOrJson)
- if (json.equals(strType)) ParamJsonReader(pathOrJson)
- else ParamFileReader(pathOrJson)
+ if (pathOrJson.matches(httpRegex)) {
+ ParamHttpReader(pathOrJson)
+ } else {
+ val strType = paramStrType(pathOrJson)
+ if (json.equals(strType)) ParamJsonReader(pathOrJson)
+ else ParamFileReader(pathOrJson)
+ }
}
private def paramStrType(str: String): String = {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index d8cce41..3134b44 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -63,7 +63,8 @@ case class ElasticSearchSink(
def func(): (Long, Future[Boolean]) = {
import scala.concurrent.ExecutionContext.Implicits.global
- (timeStamp, Future(HttpUtil.doHttpRequest(api, method, params, header,
data)))
+ val code = HttpUtil.doHttpRequest(api, method, params, header, data)._1
+ (timeStamp, Future(code >= 200 && code < 300))
}
if (block) SinkTaskRunner.addBlockTask(func _, retry, connectionTimeout)
else SinkTaskRunner.addNonBlockTask(func _, retry)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
index 66648d0..652bbf7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
@@ -19,10 +19,10 @@ package org.apache.griffin.measure.utils
import scala.util.matching.Regex
-import org.apache.http.client.methods.{HttpGet, HttpPost}
+import org.apache.http.client.methods.{HttpDelete, HttpGet, HttpPost, HttpPut}
+import org.apache.http.client.utils.URIBuilder
import org.apache.http.entity.{ContentType, StringEntity}
-import org.apache.http.impl.client.HttpClientBuilder
-import scalaj.http._
+import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
object HttpUtil {
@@ -31,65 +31,34 @@ object HttpUtil {
val PUT_REGEX: Regex = """^(?i)put$""".r
val DELETE_REGEX: Regex = """^(?i)delete$""".r
- def postData(
- url: String,
- params: Map[String, Object],
- headers: Map[String, Object],
- data: String): Boolean = {
- val response = Http(url)
- .params(convertObjMap2StrMap(params))
- .headers(convertObjMap2StrMap(headers))
- .postData(data)
- .asString
-
- response.isSuccess
- }
-
def doHttpRequest(
url: String,
method: String,
params: Map[String, Object],
headers: Map[String, Object],
- data: String): Boolean = {
+ data: String): (Integer, String) = {
val client = HttpClientBuilder.create.build
- method match {
+ val uriBuilder = new URIBuilder(url)
+ convertObjMap2StrMap(params) foreach (param =>
uriBuilder.setParameter(param._1, param._2))
+ val handler = new BasicResponseHandler()
+ val request = method match {
case POST_REGEX() =>
- val post = new HttpPost(url)
- convertObjMap2StrMap(headers) foreach (header =>
post.addHeader(header._1, header._2))
+ val post = new HttpPost(uriBuilder.build())
post.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON))
-
- // send the post request
- val response = client.execute(post)
- val code = response.getStatusLine.getStatusCode
- code >= 200 && code < 300
- case PUT_REGEX() =>
- val get = new HttpGet(url)
- convertObjMap2StrMap(headers) foreach (header =>
get.addHeader(header._1, header._2))
- val response = client.execute(get)
- val code = response.getStatusLine.getStatusCode
- code >= 200 && code < 300
- case _ => false
- }
- }
-
- def httpRequest(
- url: String,
- method: String,
- params: Map[String, Object],
- headers: Map[String, Object],
- data: String): Boolean = {
- val httpReq = Http(url)
- .params(convertObjMap2StrMap(params))
- .headers(convertObjMap2StrMap(headers))
- method match {
- case POST_REGEX() =>
- val res = httpReq.postData(data).asString
- res.isSuccess
+ post
case PUT_REGEX() =>
- val res = httpReq.put(data).asString
- res.isSuccess
- case _ => false
+ val put = new HttpPut(uriBuilder.build())
+ put.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON))
+ put
+ case GET_REGEX() =>
+ new HttpGet(uriBuilder.build())
+ case DELETE_REGEX() =>
+ new HttpDelete(uriBuilder.build())
+ case _ => throw new UnsupportedOperationException("Unsupported http
method error!")
}
+ convertObjMap2StrMap(headers) foreach (header =>
request.addHeader(header._1, header._2))
+ val response = client.execute(request)
+ (response.getStatusLine.getStatusCode,
handler.handleResponse(response).trim)
}
private def convertObjMap2StrMap(map: Map[String, Object]): Map[String,
String] = {