This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 89e72fd  [Improve] Code improvement in loadBatch method of 
DorisStreamLoad (#78)
89e72fd is described below

commit 89e72fd22c89de0a1fbf6241b08947ef831907c0
Author: Bowen Liang <[email protected]>
AuthorDate: Tue Mar 28 13:49:30 2023 +0800

    [Improve] Code improvement in loadBatch method of DorisStreamLoad (#78)
    
    * update response handling
    * use stream in listToString method
    * fix typo in error message for unsupported file format
    * extract getAuthEncoded method
---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 69 +++++++++-------------
 1 file changed, 28 insertions(+), 41 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index bda1c47..2a89858 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.spark.cfg.ConfigurationOptions;
 import org.apache.doris.spark.cfg.SparkSettings;
@@ -33,24 +34,20 @@ import org.apache.doris.spark.util.ListUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Serializable;
+import java.io.*;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 
 /**
  * DorisStreamLoad
  **/
-public class DorisStreamLoad implements Serializable{
+public class DorisStreamLoad implements Serializable {
     private String FIELD_DELIMITER;
     private String LINE_DELIMITER;
     private String NULL_VALUE = "\\N";
@@ -64,7 +61,7 @@ public class DorisStreamLoad implements Serializable{
     private String loadUrlStr;
     private String db;
     private String tbl;
-    private String authEncoding;
+    private String authEncoded;
     private String columns;
     private String[] dfColumns;
     private String maxFilterRatio;
@@ -79,7 +76,7 @@ public class DorisStreamLoad implements Serializable{
         this.user = user;
         this.passwd = passwd;
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
-        this.authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
+        this.authEncoded = getAuthEncoded(user, passwd);
     }
 
     public DorisStreamLoad(SparkSettings settings) throws IOException, 
DorisException {
@@ -88,7 +85,7 @@ public class DorisStreamLoad implements Serializable{
         this.tbl = dbTable[1];
         this.user = 
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
         this.passwd = 
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
-        this.authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
+        this.authEncoded = getAuthEncoded(user, passwd);
         this.columns = 
settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
 
         this.maxFilterRatio = 
settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
@@ -111,7 +108,7 @@ public class DorisStreamLoad implements Serializable{
         this.passwd = 
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
 
 
-        this.authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
+        this.authEncoded = getAuthEncoded(user, passwd);
         this.columns = 
settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
         this.dfColumns = dfColumns;
 
@@ -121,7 +118,7 @@ public class DorisStreamLoad implements Serializable{
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
                 .build(new BackendCacheLoader(settings));
         fileType = this.streamLoadProp.get("format") == null ? "csv" : 
this.streamLoadProp.get("format");
-        if ("csv".equals(fileType)){
+        if ("csv".equals(fileType)) {
             FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == 
null ? "\t" : this.streamLoadProp.get("column_separator");
             LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null 
? "\n" : this.streamLoadProp.get("line_delimiter");
         }
@@ -139,7 +136,7 @@ public class DorisStreamLoad implements Serializable{
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         conn.setInstanceFollowRedirects(false);
         conn.setRequestMethod("PUT");
-        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+        conn.setRequestProperty("Authorization", "Basic " + authEncoded);
         conn.addRequestProperty("Expect", "100-continue");
         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
         conn.addRequestProperty("label", label);
@@ -187,19 +184,11 @@ public class DorisStreamLoad implements Serializable{
     }
 
     public String listToString(List<List<Object>> rows) {
-        StringJoiner lines = new StringJoiner(LINE_DELIMITER);
-        for (List<Object> row : rows) {
-            StringJoiner line = new StringJoiner(FIELD_DELIMITER);
-            for (Object field : row) {
-                if (field == null) {
-                    line.add(NULL_VALUE);
-                } else {
-                    line.add(field.toString());
-                }
-            }
-            lines.add(line.toString());
-        }
-        return lines.toString();
+        return rows.stream().map(row ->
+                row.stream().map(field ->
+                        (field == null) ? NULL_VALUE : field.toString()
+                ).collect(Collectors.joining(FIELD_DELIMITER))
+        ).collect(Collectors.joining(LINE_DELIMITER));
     }
 
 
@@ -227,7 +216,7 @@ public class DorisStreamLoad implements Serializable{
                 load(serializedRows);
             }
         } else {
-            throw new StreamLoadException("Not supoort the file format in 
stream load.");
+            throw new StreamLoadException(String.format("Unsupported file 
format in stream load: %s.", fileType));
         }
     }
 
@@ -263,28 +252,24 @@ public class DorisStreamLoad implements Serializable{
         //only to record the BE node in case of an exception
         this.loadUrlStr = loadUrlStr;
 
-        HttpURLConnection feConn = null;
         HttpURLConnection beConn = null;
         int status = -1;
         try {
             // build request and send to new be location
             beConn = getConnection(loadUrlStr, label);
             // send data to be
-            BufferedOutputStream bos = new 
BufferedOutputStream(beConn.getOutputStream());
-            bos.write(value.getBytes("UTF-8"));
-            bos.close();
+            try (OutputStream beConnOutputStream = new 
BufferedOutputStream(beConn.getOutputStream())) {
+                IOUtils.write(value, beConnOutputStream, 
StandardCharsets.UTF_8);
+            }
 
             // get respond
             status = beConn.getResponseCode();
             String respMsg = beConn.getResponseMessage();
-            InputStream stream = (InputStream) beConn.getContent();
-            BufferedReader br = new BufferedReader(new 
InputStreamReader(stream));
-            StringBuilder response = new StringBuilder();
-            String line;
-            while ((line = br.readLine()) != null) {
-                response.append(line);
+            String response;
+            try (InputStream beConnInputStream = beConn.getInputStream()) {
+                response = IOUtils.toString(beConnInputStream, 
StandardCharsets.UTF_8);
             }
-            return new LoadResponse(status, respMsg, response.toString());
+            return new LoadResponse(status, respMsg, response);
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -292,9 +277,6 @@ public class DorisStreamLoad implements Serializable{
             LOG.warn(err, e);
             return new LoadResponse(status, e.getMessage(), err);
         } finally {
-            if (feConn != null) {
-                feConn.disconnect();
-            }
             if (beConn != null) {
                 beConn.disconnect();
             }
@@ -325,6 +307,11 @@ public class DorisStreamLoad implements Serializable{
         }
     }
 
+    private static String getAuthEncoded(String user, String passwd) {
+        String raw = String.format("%s:%s", user, passwd);
+        return 
Base64.getEncoder().encodeToString(raw.getBytes(StandardCharsets.UTF_8));
+    }
+
     /**
      * serializable be cache loader
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to