This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 89e72fd [Improve] Code improvement in loadBatch method of
DorisStreamLoad (#78)
89e72fd is described below
commit 89e72fd22c89de0a1fbf6241b08947ef831907c0
Author: Bowen Liang <[email protected]>
AuthorDate: Tue Mar 28 13:49:30 2023 +0800
[Improve] Code improvement in loadBatch method of DorisStreamLoad (#78)
* update response handling
* use stream in listToString method
* fix typo in error message for unsupported file format
* extract getAuthEncoded method
---
.../org/apache/doris/spark/DorisStreamLoad.java | 69 +++++++++-------------
1 file changed, 28 insertions(+), 41 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index bda1c47..2a89858 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
@@ -33,24 +34,20 @@ import org.apache.doris.spark.util.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Serializable;
+import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/**
* DorisStreamLoad
**/
-public class DorisStreamLoad implements Serializable{
+public class DorisStreamLoad implements Serializable {
private String FIELD_DELIMITER;
private String LINE_DELIMITER;
private String NULL_VALUE = "\\N";
@@ -64,7 +61,7 @@ public class DorisStreamLoad implements Serializable{
private String loadUrlStr;
private String db;
private String tbl;
- private String authEncoding;
+ private String authEncoded;
private String columns;
private String[] dfColumns;
private String maxFilterRatio;
@@ -79,7 +76,7 @@ public class DorisStreamLoad implements Serializable{
this.user = user;
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
- this.authEncoding =
Base64.getEncoder().encodeToString(String.format("%s:%s", user,
passwd).getBytes(StandardCharsets.UTF_8));
+ this.authEncoded = getAuthEncoded(user, passwd);
}
public DorisStreamLoad(SparkSettings settings) throws IOException,
DorisException {
@@ -88,7 +85,7 @@ public class DorisStreamLoad implements Serializable{
this.tbl = dbTable[1];
this.user =
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
this.passwd =
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
- this.authEncoding =
Base64.getEncoder().encodeToString(String.format("%s:%s", user,
passwd).getBytes(StandardCharsets.UTF_8));
+ this.authEncoded = getAuthEncoded(user, passwd);
this.columns =
settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.maxFilterRatio =
settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
@@ -111,7 +108,7 @@ public class DorisStreamLoad implements Serializable{
this.passwd =
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
- this.authEncoding =
Base64.getEncoder().encodeToString(String.format("%s:%s", user,
passwd).getBytes(StandardCharsets.UTF_8));
+ this.authEncoded = getAuthEncoded(user, passwd);
this.columns =
settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.dfColumns = dfColumns;
@@ -121,7 +118,7 @@ public class DorisStreamLoad implements Serializable{
.expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
.build(new BackendCacheLoader(settings));
fileType = this.streamLoadProp.get("format") == null ? "csv" :
this.streamLoadProp.get("format");
- if ("csv".equals(fileType)){
+ if ("csv".equals(fileType)) {
FIELD_DELIMITER = this.streamLoadProp.get("column_separator") ==
null ? "\t" : this.streamLoadProp.get("column_separator");
LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null
? "\n" : this.streamLoadProp.get("line_delimiter");
}
@@ -139,7 +136,7 @@ public class DorisStreamLoad implements Serializable{
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
- conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+ conn.setRequestProperty("Authorization", "Basic " + authEncoded);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
@@ -187,19 +184,11 @@ public class DorisStreamLoad implements Serializable{
}
public String listToString(List<List<Object>> rows) {
- StringJoiner lines = new StringJoiner(LINE_DELIMITER);
- for (List<Object> row : rows) {
- StringJoiner line = new StringJoiner(FIELD_DELIMITER);
- for (Object field : row) {
- if (field == null) {
- line.add(NULL_VALUE);
- } else {
- line.add(field.toString());
- }
- }
- lines.add(line.toString());
- }
- return lines.toString();
+ return rows.stream().map(row ->
+ row.stream().map(field ->
+ (field == null) ? NULL_VALUE : field.toString()
+ ).collect(Collectors.joining(FIELD_DELIMITER))
+ ).collect(Collectors.joining(LINE_DELIMITER));
}
@@ -227,7 +216,7 @@ public class DorisStreamLoad implements Serializable{
load(serializedRows);
}
} else {
- throw new StreamLoadException("Not supoort the file format in
stream load.");
+ throw new StreamLoadException(String.format("Unsupported file
format in stream load: %s.", fileType));
}
}
@@ -263,28 +252,24 @@ public class DorisStreamLoad implements Serializable{
//only to record the BE node in case of an exception
this.loadUrlStr = loadUrlStr;
- HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
int status = -1;
try {
// build request and send to new be location
beConn = getConnection(loadUrlStr, label);
// send data to be
- BufferedOutputStream bos = new
BufferedOutputStream(beConn.getOutputStream());
- bos.write(value.getBytes("UTF-8"));
- bos.close();
+ try (OutputStream beConnOutputStream = new
BufferedOutputStream(beConn.getOutputStream())) {
+ IOUtils.write(value, beConnOutputStream,
StandardCharsets.UTF_8);
+ }
// get respond
status = beConn.getResponseCode();
String respMsg = beConn.getResponseMessage();
- InputStream stream = (InputStream) beConn.getContent();
- BufferedReader br = new BufferedReader(new
InputStreamReader(stream));
- StringBuilder response = new StringBuilder();
- String line;
- while ((line = br.readLine()) != null) {
- response.append(line);
+ String response;
+ try (InputStream beConnInputStream = beConn.getInputStream()) {
+ response = IOUtils.toString(beConnInputStream,
StandardCharsets.UTF_8);
}
- return new LoadResponse(status, respMsg, response.toString());
+ return new LoadResponse(status, respMsg, response);
} catch (Exception e) {
e.printStackTrace();
@@ -292,9 +277,6 @@ public class DorisStreamLoad implements Serializable{
LOG.warn(err, e);
return new LoadResponse(status, e.getMessage(), err);
} finally {
- if (feConn != null) {
- feConn.disconnect();
- }
if (beConn != null) {
beConn.disconnect();
}
@@ -325,6 +307,11 @@ public class DorisStreamLoad implements Serializable{
}
}
+ private static String getAuthEncoded(String user, String passwd) {
+ String raw = String.format("%s:%s", user, passwd);
+ return
Base64.getEncoder().encodeToString(raw.getBytes(StandardCharsets.UTF_8));
+ }
+
/**
* serializable be cache loader
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]