This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new a5baa27 Bug Fix
a5baa27 is described below
commit a5baa276296a952e7c1c47f1294fa68919e80293
Author: Eugene <[email protected]>
AuthorDate: Wed Mar 20 09:48:07 2019 +0800
Bug Fix
scalaj.http only allows HttpURLConnection to call remote web api,
but on spark cluster condition, no HttpURLConnection returns instead
of class org.apache.hadoop.fs.FsUrlConnection. so rest post api never
really calls out,
use httpcomponent to execute rest call.
GRIFFIN-238
Author: Eugene <[email protected]>
Closes #487 from toyboxman/http-bug.
---
.../griffin/measure/sink/ElasticSearchSink.scala | 2 +-
.../apache/griffin/measure/utils/HttpUtil.scala | 30 +++++++++++++++++++++-
2 files changed, 30 insertions(+), 2 deletions(-)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index 745f760..c63fd09 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -64,7 +64,7 @@ case class ElasticSearchSink(config: Map[String, Any],
metricName: String,
def func(): (Long, Future[Boolean]) = {
import scala.concurrent.ExecutionContext.Implicits.global
- (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header,
data)))
+ (timeStamp, Future(HttpUtil.doHttpRequest(api, method, params, header,
data)))
}
if (block) SinkTaskRunner.addBlockTask(func _, retry, connectionTimeout)
else SinkTaskRunner.addNonBlockTask(func _, retry)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
index 4949642..e679c52 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
@@ -18,6 +18,9 @@ under the License.
*/
package org.apache.griffin.measure.utils
+import org.apache.http.client.methods.{HttpGet, HttpPost}
+import org.apache.http.entity.{ContentType, StringEntity}
+import org.apache.http.impl.client.HttpClientBuilder
import scalaj.http._
object HttpUtil {
@@ -37,6 +40,32 @@ object HttpUtil {
response.isSuccess
}
+ def doHttpRequest(url: String,
+ method: String,
+ params: Map[String, Object],
+ headers: Map[String, Object],
+ data: String): Boolean = {
+ val client = HttpClientBuilder.create.build
+ method match {
+ case POST_REGEX() =>
+ val post = new HttpPost(url)
+ convertObjMap2StrMap(headers) foreach (header =>
post.addHeader(header._1, header._2))
+ post.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON));
+
+ // send the post request
+ val response = client.execute(post)
+ val code = response.getStatusLine.getStatusCode
+ code >= 200 && code < 300
+ case PUT_REGEX() =>
+ val get = new HttpGet(url)
+ convertObjMap2StrMap(headers) foreach (header =>
get.addHeader(header._1, header._2))
+ val response = client.execute(get)
+ val code = response.getStatusLine.getStatusCode
+ code >= 200 && code < 300
+ case _ => false
+ }
+ }
+
def httpRequest(url: String,
method: String,
params: Map[String, Object],
@@ -58,5 +87,4 @@ object HttpUtil {
private def convertObjMap2StrMap(map: Map[String, Object]): Map[String,
String] = {
map.map(pair => pair._1 -> pair._2.toString)
}
-
}