This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new a5baa27  Bug Fix
a5baa27 is described below

commit a5baa276296a952e7c1c47f1294fa68919e80293
Author: Eugene <[email protected]>
AuthorDate: Wed Mar 20 09:48:07 2019 +0800

    Bug Fix
    
    scalaj.http only allows HttpURLConnection to call remote web api,
    but on spark cluster condition, no HttpURLConnection returns instead
    of class org.apache.hadoop.fs.FsUrlConnection. so rest post api never 
really calls out,
    use httpcomponent to execute rest call.
    
    GRIFFIN-238
    
    Author: Eugene <[email protected]>
    
    Closes #487 from toyboxman/http-bug.
---
 .../griffin/measure/sink/ElasticSearchSink.scala   |  2 +-
 .../apache/griffin/measure/utils/HttpUtil.scala    | 30 +++++++++++++++++++++-
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index 745f760..c63fd09 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -64,7 +64,7 @@ case class ElasticSearchSink(config: Map[String, Any], 
metricName: String,
 
       def func(): (Long, Future[Boolean]) = {
         import scala.concurrent.ExecutionContext.Implicits.global
-        (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, 
data)))
+        (timeStamp, Future(HttpUtil.doHttpRequest(api, method, params, header, 
data)))
       }
       if (block) SinkTaskRunner.addBlockTask(func _, retry, connectionTimeout)
       else SinkTaskRunner.addNonBlockTask(func _, retry)
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
index 4949642..e679c52 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
@@ -18,6 +18,9 @@ under the License.
 */
 package org.apache.griffin.measure.utils
 
+import org.apache.http.client.methods.{HttpGet, HttpPost}
+import org.apache.http.entity.{ContentType, StringEntity}
+import org.apache.http.impl.client.HttpClientBuilder
 import scalaj.http._
 
 object HttpUtil {
@@ -37,6 +40,32 @@ object HttpUtil {
     response.isSuccess
   }
 
+  def doHttpRequest(url: String,
+                  method: String,
+                  params: Map[String, Object],
+                  headers: Map[String, Object],
+                  data: String): Boolean = {
+    val client = HttpClientBuilder.create.build
+    method match {
+      case POST_REGEX() =>
+        val post = new HttpPost(url)
+        convertObjMap2StrMap(headers) foreach (header => 
post.addHeader(header._1, header._2))
+        post.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON));
+
+        // send the post request
+        val response = client.execute(post)
+        val code = response.getStatusLine.getStatusCode
+        code >= 200 && code < 300
+      case PUT_REGEX() =>
+        val get = new HttpGet(url)
+        convertObjMap2StrMap(headers) foreach (header => 
get.addHeader(header._1, header._2))
+        val response = client.execute(get)
+        val code = response.getStatusLine.getStatusCode
+        code >= 200 && code < 300
+      case _ => false
+    }
+  }
+
   def httpRequest(url: String,
                   method: String,
                   params: Map[String, Object],
@@ -58,5 +87,4 @@ object HttpUtil {
   private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, 
String] = {
     map.map(pair => pair._1 -> pair._2.toString)
   }
-
 }

Reply via email to