This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new b7549be Introduce `httpclient` for sending requests in
DorisStreamLoad (#79)
b7549be is described below
commit b7549be2fb0a146ea496bdf760ecba9271bb630d
Author: Bowen Liang <[email protected]>
AuthorDate: Wed Mar 29 09:50:38 2023 +0800
Introduce `httpclient` for sending requests in DorisStreamLoad (#79)
* refactor DorisStreamLoad to use httpclient
* fix return empty response message
---
.../org/apache/doris/spark/DorisStreamLoad.java | 112 ++++++++++-----------
1 file changed, 51 insertions(+), 61 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 2a89858..522e791 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
@@ -31,12 +31,21 @@ import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.RespContent;
import org.apache.doris.spark.util.ListUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.BufferedHttpEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
-import java.net.HttpURLConnection;
-import java.net.URL;
+import java.io.IOException;
+import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -130,38 +139,33 @@ public class DorisStreamLoad implements Serializable {
}
return loadUrlStr;
}
+ private CloseableHttpClient getHttpClient() {
+ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create()
+ .disableRedirectHandling();
+ return httpClientBuilder.build();
+ }
- private HttpURLConnection getConnection(String urlStr, String label)
throws IOException {
- URL url = new URL(urlStr);
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setInstanceFollowRedirects(false);
- conn.setRequestMethod("PUT");
- conn.setRequestProperty("Authorization", "Basic " + authEncoded);
- conn.addRequestProperty("Expect", "100-continue");
- conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
- conn.addRequestProperty("label", label);
- if (columns != null && !columns.equals("")) {
- conn.addRequestProperty("columns", columns);
+ private HttpPut getHttpPut(String label, String loadUrlStr) {
+ HttpPut httpPut = new HttpPut(loadUrlStr);
+ httpPut.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + authEncoded);
+ httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
+ httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain;
charset=UTF-8");
+ httpPut.setHeader("label", label);
+ if (StringUtils.isNotBlank(columns)) {
+ httpPut.setHeader("columns", columns);
}
-
- if (maxFilterRatio != null && !maxFilterRatio.equals("")) {
- conn.addRequestProperty("max_filter_ratio", maxFilterRatio);
+ if (StringUtils.isNotBlank(maxFilterRatio)) {
+ httpPut.setHeader("max_filter_ratio", maxFilterRatio);
}
-
- conn.setDoOutput(true);
- conn.setDoInput(true);
- if (streamLoadProp != null) {
- streamLoadProp.forEach((k, v) -> {
- if ("read_json_by_line".equals(k)) {
- return;
- }
- conn.addRequestProperty(k, v);
- });
+ if (MapUtils.isNotEmpty(streamLoadProp)) {
+ streamLoadProp.entrySet().stream()
+ .filter(entry ->
!"read_json_by_line".equals(entry.getKey()))
+ .forEach(entry -> httpPut.setHeader(entry.getKey(),
entry.getValue()));
}
if (fileType.equals("json")) {
- conn.addRequestProperty("strip_outer_array", "true");
+ httpPut.setHeader("strip_outer_array", "true");
}
- return conn;
+ return httpPut;
}
public static class LoadResponse {
@@ -222,7 +226,7 @@ public class DorisStreamLoad implements Serializable {
public void load(String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(value);
- if (loadResponse.status != 200) {
+ if (loadResponse.status != HttpStatus.SC_OK) {
LOG.info("Streamload Response HTTP Status Error:{}", loadResponse);
throw new StreamLoadException("stream load error: " +
loadResponse.respContent);
} else {
@@ -247,39 +251,25 @@ public class DorisStreamLoad implements Serializable {
calendar.get(Calendar.HOUR_OF_DAY),
calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
- String loadUrlStr = String.format(loadUrlPattern, getBackend(), db,
tbl);
- LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
- //only to record the BE node in case of an exception
- this.loadUrlStr = loadUrlStr;
-
- HttpURLConnection beConn = null;
- int status = -1;
- try {
- // build request and send to new be location
- beConn = getConnection(loadUrlStr, label);
- // send data to be
- try (OutputStream beConnOutputStream = new
BufferedOutputStream(beConn.getOutputStream())) {
- IOUtils.write(value, beConnOutputStream,
StandardCharsets.UTF_8);
- }
-
- // get respond
- status = beConn.getResponseCode();
- String respMsg = beConn.getResponseMessage();
- String response;
- try (InputStream beConnInputStream = beConn.getInputStream()) {
- response = IOUtils.toString(beConnInputStream,
StandardCharsets.UTF_8);
- }
- return new LoadResponse(status, respMsg, response);
-
- } catch (Exception e) {
+ int responseHttpStatus = -1;
+ try (CloseableHttpClient httpClient = getHttpClient()) {
+ String loadUrlStr = String.format(loadUrlPattern, getBackend(),
db, tbl);
+ LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
+ //only to record the BE node in case of an exception
+ this.loadUrlStr = loadUrlStr;
+
+ HttpPut httpPut = getHttpPut(label, loadUrlStr);
+ httpPut.setEntity(new StringEntity(value, StandardCharsets.UTF_8));
+ HttpResponse httpResponse = httpClient.execute(httpPut);
+ responseHttpStatus = httpResponse.getStatusLine().getStatusCode();
+ String respMsg = httpResponse.getStatusLine().getReasonPhrase();
+ String response = EntityUtils.toString(new
BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
+ return new LoadResponse(responseHttpStatus, respMsg, response);
+ } catch (IOException e) {
e.printStackTrace();
String err = "http request exception,load url : " + loadUrlStr +
",failed to execute spark streamload with label: " + label;
LOG.warn(err, e);
- return new LoadResponse(status, e.getMessage(), err);
- } finally {
- if (beConn != null) {
- beConn.disconnect();
- }
+ return new LoadResponse(responseHttpStatus, e.getMessage(), err);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]