This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a9725bf  [feature] support fe https request (#189)
a9725bf is described below

commit a9725bfc80e915a76476f32207518478da8a5848
Author: gnehil <[email protected]>
AuthorDate: Mon Feb 5 18:06:58 2024 +0800

    [feature] support fe https request (#189)
---
 .../doris/spark/cfg/ConfigurationOptions.java      |  12 +
 .../org/apache/doris/spark/rest/RestService.java   | 328 +++++++--------------
 .../org/apache/doris/spark/load/StreamLoader.scala |  41 +--
 .../org/apache/doris/spark/util/HttpUtil.scala     |  70 +++++
 .../scala/org/apache/doris/spark/util/URLs.scala   |  42 +++
 .../apache/doris/spark/load/StreamLoaderTest.java  |  54 ++++
 6 files changed, 304 insertions(+), 243 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 6728906..437eabe 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -132,4 +132,16 @@ public interface ConfigurationOptions {
      */
     String DORIS_SINK_DATA_COMPRESS_TYPE = 
"doris.sink.properties.compress_type";
 
+    String DORIS_ENABLE_HTTPS = "doris.enable.https";
+
+    boolean DORIS_ENABLE_HTTPS_DEFAULT = false;
+
+    String DORIS_HTTPS_KEY_STORE_PATH = "doris.https.key-store-path";
+
+    String DORIS_HTTPS_KEY_STORE_TYPE = "doris.https.key-store-type";
+
+    String DORIS_HTTPS_KEY_STORE_TYPE_DEFAULT = "JKS";
+
+    String DORIS_HTTPS_KEY_STORE_PASSWORD = "doris.https.key-store-password";
+
 }
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index 86e370c..c838830 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -50,13 +50,19 @@ import java.util.List;
 import java.util.ArrayList;
 import java.util.Set;
 import java.util.HashSet;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.json.JsonMapper;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.doris.spark.cfg.ConfigurationOptions;
 import org.apache.doris.spark.cfg.Settings;
 import org.apache.doris.spark.cfg.SparkSettings;
@@ -70,15 +76,23 @@ import org.apache.doris.spark.rest.models.BackendV2;
 import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
 import org.apache.doris.spark.rest.models.Tablet;
+import org.apache.doris.spark.util.HttpUtil;
+import org.apache.doris.spark.util.URLs;
+import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
+import scala.collection.JavaConverters;
 
 /**
  * Service for communicate with Doris FE.
@@ -86,126 +100,9 @@ import com.google.common.annotations.VisibleForTesting;
 public class RestService implements Serializable {
     public final static int REST_RESPONSE_STATUS_OK = 200;
     private static final String API_PREFIX = "/api";
-    private static final String SCHEMA = "_schema";
-    private static final String QUERY_PLAN = "_query_plan";
-    @Deprecated
-    private static final String BACKENDS = "/rest/v1/system?path=//backends";
-    private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
-
-    /**
-     * send request to Doris FE and get response json string.
-     * @param cfg configuration of request
-     * @param request {@link HttpRequestBase} real request
-     * @param logger {@link Logger}
-     * @return Doris FE response in json string
-     * @throws ConnectedFailedException throw when cannot connect to Doris FE
-     */
-    private static String send(Settings cfg, HttpRequestBase request, Logger 
logger) throws
-            ConnectedFailedException {
-        int connectTimeout = 
cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,
-                ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT);
-        int socketTimeout = 
cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,
-                ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT);
-        int retries = 
cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,
-                ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT);
-        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. 
retries set to '{}'.",
-                connectTimeout, socketTimeout, retries);
-
-        RequestConfig requestConfig = RequestConfig.custom()
-                .setConnectTimeout(connectTimeout)
-                .setSocketTimeout(socketTimeout)
-                .build();
-
-        request.setConfig(requestConfig);
-        String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
-        String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
-        logger.info("Send request to Doris FE '{}' with user '{}'.", 
request.getURI(), user);
-        IOException ex = null;
-        int statusCode = -1;
-
-        for (int attempt = 0; attempt < retries; attempt++) {
-            logger.debug("Attempt {} to request {}.", attempt, 
request.getURI());
-            try {
-                String response;
-                if (request instanceof HttpGet){
-                    response = getConnectionGet(request.getURI().toString(), 
user, password,logger);
-                } else {
-                    response = getConnectionPost(request,user, 
password,logger);
-                }
-                if (response == null) {
-                    logger.warn("Failed to get response from Doris FE {}, http 
code is {}",
-                            request.getURI(), statusCode);
-                    continue;
-                }
-                logger.trace("Success get response from Doris FE: {}, response 
is: {}.",
-                        request.getURI(), response);
-                ObjectMapper mapper = new ObjectMapper();
-                Map map = mapper.readValue(response, Map.class);
-                //Handle the problem of inconsistent data format returned by 
http v1 and v2
-                if (map.containsKey("code") && map.containsKey("msg")) {
-                    Object data = map.get("data");
-                    return mapper.writeValueAsString(data);
-                } else {
-                    return response;
-                }
-            } catch (IOException e) {
-                ex = e;
-                logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
-            }
-        }
-
-        logger.error(CONNECT_FAILED_MESSAGE, request.getURI(), ex);
-        throw new ConnectedFailedException(request.getURI().toString(), 
statusCode, ex);
-    }
+    private static final ObjectMapper MAPPER = JsonMapper.builder().build();
 
-    private static String getConnectionGet(String request,String user, String 
passwd,Logger logger) throws IOException {
-        URL realUrl = new URL(request);
-        // open connection
-        HttpURLConnection connection = 
(HttpURLConnection)realUrl.openConnection();
-        String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
-        connection.setRequestProperty("Authorization", "Basic " + 
authEncoding);
 
-        connection.connect();
-        return parseResponse(connection,logger);
-    }
-
-    private static String parseResponse(HttpURLConnection connection,Logger 
logger) throws IOException {
-        if (connection.getResponseCode() != HttpStatus.SC_OK) {
-            logger.warn("Failed to get response from Doris  {}, http code is 
{}",
-                    connection.getURL(), connection.getResponseCode());
-            throw new IOException("Failed to get response from Doris");
-        }
-        StringBuilder result = new StringBuilder("");
-        BufferedReader in = new BufferedReader(new 
InputStreamReader(connection.getInputStream(), "utf-8"));
-        String line;
-        while ((line = in.readLine()) != null) {
-            result.append(line);
-        }
-        if (in != null) {
-            in.close();
-        }
-        return result.toString();
-    }
-
-    private static String getConnectionPost(HttpRequestBase request,String 
user, String passwd,Logger logger) throws IOException {
-        URL url = new URL(request.getURI().toString());
-        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-        conn.setInstanceFollowRedirects(false);
-        conn.setRequestMethod(request.getMethod());
-        String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
-        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
-        InputStream content = ((HttpPost)request).getEntity().getContent();
-        String res = IOUtils.toString(content);
-        conn.setDoOutput(true);
-        conn.setDoInput(true);
-        PrintWriter out = new PrintWriter(conn.getOutputStream());
-        // send request params
-        out.print(res);
-        // flush
-        out.flush();
-        // read response
-        return parseResponse(conn,logger);
-    }
     /**
      * parse table identifier to array.
      * @param tableIdentifier table identifier string
@@ -264,6 +161,7 @@ public class RestService implements Serializable {
                 "/";
     }
 
+    @Deprecated
     @VisibleForTesting
     static String getUriStr(String feNode,Settings cfg, Logger logger) throws 
IllegalArgumentException {
         String[] identifier = 
parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
@@ -285,20 +183,10 @@ public class RestService implements Serializable {
     public static Schema getSchema(Settings cfg, Logger logger)
             throws DorisException {
         logger.trace("Finding schema.");
-        List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), 
logger);
-        for (String feNode: feNodeList) {
-            try {
-                HttpGet httpGet = new HttpGet(getUriStr(feNode,cfg, logger) + 
SCHEMA);
-                String response = send(cfg, httpGet, logger);
-                logger.debug("Find schema response is '{}'.", response);
-                return parseSchema(response, logger);
-            } catch (ConnectedFailedException e) {
-                logger.info("Doris FE node {} is unavailable: {}, Request the 
next Doris FE node", feNode, e.getMessage());
-            }
-        }
-        String errMsg = "No Doris FE is available, please check configuration";
-        logger.error(errMsg);
-        throw new DorisException(errMsg);
+        String[] identifier = 
parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
+        String response = queryAllFrontends((SparkSettings) cfg, (frontend, 
enableHttps) ->
+                new HttpGet(URLs.tableSchema(frontend, identifier[0], 
identifier[1], enableHttps)), logger);
+        return parseSchema(response, logger);
     }
 
     /**
@@ -311,10 +199,9 @@ public class RestService implements Serializable {
     @VisibleForTesting
     public static Schema parseSchema(String response, Logger logger) throws 
DorisException {
         logger.trace("Parse response '{}' to schema.", response);
-        ObjectMapper mapper = new ObjectMapper();
         Schema schema;
         try {
-            schema = mapper.readValue(response, Schema.class);
+            schema = MAPPER.readValue(response, Schema.class);
         } catch (JsonParseException e) {
             String errMsg = "Doris FE's response is not a json. res: " + 
response;
             logger.error(errMsg, e);
@@ -359,35 +246,27 @@ public class RestService implements Serializable {
         }
         logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
 
-        List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), 
logger);
-        for (String feNode: feNodeList) {
-            try {
-                HttpPost httpPost = new HttpPost(getUriStr(feNode,cfg, logger) 
+ QUERY_PLAN);
-                String entity = "{\"sql\": \""+ sql +"\"}";
-                logger.debug("Post body Sending to Doris FE is: '{}'.", 
entity);
-                StringEntity stringEntity = new StringEntity(entity, 
StandardCharsets.UTF_8);
-                stringEntity.setContentEncoding("UTF-8");
-                stringEntity.setContentType("application/json");
-                httpPost.setEntity(stringEntity);
-
-                String resStr = send(cfg, httpPost, logger);
-                logger.debug("Find partition response is '{}'.", resStr);
-                QueryPlan queryPlan = getQueryPlan(resStr, logger);
-                Map<String, List<Long>> be2Tablets = 
selectBeForTablet(queryPlan, logger);
-                return tabletsMapToPartition(
-                        cfg,
-                        be2Tablets,
-                        queryPlan.getOpaqued_query_plan(),
-                        tableIdentifiers[0],
-                        tableIdentifiers[1],
-                        logger);
-            } catch (ConnectedFailedException e) {
-                logger.info("Doris FE node {} is unavailable: {}, Request the 
next Doris FE node", feNode, e.getMessage());
-            }
-        }
-        String errMsg = "No Doris FE is available, please check configuration";
-        logger.error(errMsg);
-        throw new DorisException(errMsg);
+        String finalSql = sql;
+        String response = queryAllFrontends((SparkSettings) cfg, (frontend, 
enableHttps) -> {
+            HttpPost httpPost = new HttpPost(URLs.queryPlan(frontend, 
tableIdentifiers[0], tableIdentifiers[1], enableHttps));
+            String entity = "{\"sql\": \""+ finalSql +"\"}";
+            logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
+            StringEntity stringEntity = new StringEntity(entity, 
StandardCharsets.UTF_8);
+            stringEntity.setContentEncoding("UTF-8");
+            stringEntity.setContentType("application/json");
+            httpPost.setEntity(stringEntity);
+            return httpPost;
+        }, logger);
+        logger.debug("Find partition response is '{}'.", response);
+        QueryPlan queryPlan = getQueryPlan(response, logger);
+        Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, 
logger);
+        return tabletsMapToPartition(
+                cfg,
+                be2Tablets,
+                queryPlan.getOpaqued_query_plan(),
+                tableIdentifiers[0],
+                tableIdentifiers[1],
+                logger);
 
     }
 
@@ -400,10 +279,9 @@ public class RestService implements Serializable {
      */
     @VisibleForTesting
     static QueryPlan getQueryPlan(String response, Logger logger) throws 
DorisException {
-        ObjectMapper mapper = new ObjectMapper();
         QueryPlan queryPlan;
         try {
-            queryPlan = mapper.readValue(response, QueryPlan.class);
+            queryPlan = MAPPER.readValue(response, QueryPlan.class);
         } catch (JsonParseException e) {
             String errMsg = "Doris FE's response is not a json. res: " + 
response;
             logger.error(errMsg, e);
@@ -527,17 +405,6 @@ public class RestService implements Serializable {
         return getBackend(sparkSettings, logger);
     }
 
-    /**
-     * choice a Doris BE node to request.
-     * @param logger slf4j logger
-     * @return the chosen one Doris BE node
-     * @throws IllegalArgumentException BE nodes is illegal
-     */
-    @VisibleForTesting
-    public static String randomBackendV2(SparkSettings sparkSettings, Logger 
logger) throws DorisException {
-        return getBackend(sparkSettings, logger);
-    }
-
     private static String getBackend(SparkSettings sparkSettings, Logger 
logger) throws DorisException {
         List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, 
logger);
         Collections.shuffle(backends);
@@ -555,10 +422,9 @@ public class RestService implements Serializable {
     @Deprecated
     @VisibleForTesting
     static List<BackendRow> parseBackend(String response, Logger logger) 
throws DorisException, IOException {
-        com.fasterxml.jackson.databind.ObjectMapper mapper = new 
com.fasterxml.jackson.databind.ObjectMapper();
         Backend backend;
         try {
-            backend = mapper.readValue(response, Backend.class);
+            backend = MAPPER.readValue(response, Backend.class);
         } catch (com.fasterxml.jackson.core.JsonParseException e) {
             String errMsg = "Doris BE's response is not a json. res: " + 
response;
             logger.error(errMsg, e);
@@ -577,7 +443,7 @@ public class RestService implements Serializable {
             logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
             throw new ShouldNeverHappenException();
         }
-        List<BackendRow> backendRows = backend.getRows().stream().filter(v -> 
v.getAlive()).collect(Collectors.toList());
+        List<BackendRow> backendRows = 
backend.getRows().stream().filter(BackendRow::getAlive).collect(Collectors.toList());
         logger.debug("Parsing schema result is '{}'.", backendRows);
         return backendRows;
     }
@@ -595,15 +461,17 @@ public class RestService implements Serializable {
             logger.error(ILLEGAL_ARGUMENT_MESSAGE, "benodes", backends);
             throw new IllegalArgumentException("benodes", 
String.valueOf(backends));
         }
-        BackendV2.BackendRowV2 backendRowV2 = new BackendV2.BackendRowV2();
-        for (int i = 0; i < backends.size(); i++) {
-            String ip = 
backends.get(i).substring(0,backends.get(i).indexOf(":"));
+        /*
+         * By default, the BE port you enter is is_alive=true
+         */
+        for (String s : backends) {
+            String ip = s.substring(0, s.indexOf(":"));
             try {
-                Integer port = 
Integer.valueOf(backends.get(i).substring(backends.get(i).indexOf(":")+1,backends.get(i).length()));
-                /**
+                int port = Integer.parseInt(s.substring(s.indexOf(":") + 1));
+                /*
                  * By default, the BE port you enter is is_alive=true
                  */
-                BackendV2.BackendRowV2 backend =  
backendRowV2.of(ip,port,true);
+                BackendV2.BackendRowV2 backend = BackendV2.BackendRowV2.of(ip, 
port, true);
                 backendRowV2s.add(backend);
             } catch (NumberFormatException e) {
                 logger.error("Doris BE is port error, please check 
configuration");
@@ -621,41 +489,26 @@ public class RestService implements Serializable {
      */
     @VisibleForTesting
     public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings 
sparkSettings,  Logger logger) throws DorisException {
-        /**
-         * If the specified BE does not exist, the FE mode is used
-         */
-        if(notBeNode(sparkSettings,logger)){
-            List<String> feNodeList = 
allEndpoints(sparkSettings.getProperty(DORIS_FENODES), logger);
-            for (String feNode : feNodeList){
-                try {
-                    String beUrl =   String.format("http://%s"; + BACKENDS_V2, 
feNode);
-                    HttpGet httpGet = new HttpGet(beUrl);
-                    String response = send(sparkSettings, httpGet, logger);
-                    logger.info("Backend Info:{}", response);
-                    List<BackendV2.BackendRowV2> backends = 
parseBackendV2(response, logger);
-                    logger.trace("Parse benodes '{}'.", backends);
-                    if (backends == null || backends.isEmpty()) {
-                        logger.error(ILLEGAL_ARGUMENT_MESSAGE, "benodes", 
backends);
-                        throw new IllegalArgumentException("benodes", 
String.valueOf(backends));
-                    }
-                    return backends;
-                } catch (ConnectedFailedException e) {
-                    logger.info("Doris FE node {} is unavailable: {}, Request 
the next Doris FE node", feNode, e.getMessage());
-                }
+        if 
(StringUtils.isNoneBlank(sparkSettings.getProperty(sparkSettings.getProperty(DORIS_BENODES))))
 {
+            return getBeNodes(sparkSettings, logger);
+        } else { // If the specified BE does not exist, the FE mode is used
+            String response = queryAllFrontends(sparkSettings, (frontend, 
enableHttps) ->
+                    new HttpGet(URLs.aliveBackend(frontend, enableHttps)), 
logger);
+            logger.info("Backend Info:{}", response);
+            List<BackendV2.BackendRowV2> backends = parseBackendV2(response, 
logger);
+            logger.trace("Parse benodes '{}'.", backends);
+            if (backends == null || backends.isEmpty()) {
+                logger.error(ILLEGAL_ARGUMENT_MESSAGE, "benodes", backends);
+                throw new IllegalArgumentException("benodes", 
String.valueOf(backends));
             }
-            String errMsg = "No Doris FE is available, please check 
configuration";
-            logger.error(errMsg);
-            throw new DorisException(errMsg);
-        }else {
-            return  getBeNodes(sparkSettings, logger);
+            return backends;
         }
     }
 
     static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger 
logger) throws DorisException {
-        com.fasterxml.jackson.databind.ObjectMapper mapper = new 
com.fasterxml.jackson.databind.ObjectMapper();
         BackendV2 backend;
         try {
-            backend = mapper.readValue(response, BackendV2.class);
+            backend = MAPPER.readValue(response, BackendV2.class);
         } catch (com.fasterxml.jackson.core.JsonParseException e) {
             String errMsg = "Doris BE's response is not a json. res: " + 
response;
             logger.error(errMsg, e);
@@ -756,20 +609,47 @@ public class RestService implements Serializable {
         return nodes;
     }
 
-
     /**
-     * Doris BE node is not
-     * @param logger slf4j logger
-     * @return  Doris BE node Yes or no
+     * query all frontend
+     *
+     * @param settings doris config
+     * @param func request provider
+     * @param logger logger
+     * @return http response string
+     * @throws DorisException
      */
-    public static Boolean notBeNode(SparkSettings sparkSettings, Logger 
logger){
-        String beNodes = sparkSettings.getProperty(DORIS_BENODES);
-        if(null == beNodes){
-            return true;
-        }else {
-            return false;
+    private static String queryAllFrontends(SparkSettings settings, 
BiFunction<String, Boolean, HttpUriRequest> func,
+                                          Logger logger) throws DorisException 
{
+        List<String> frontends = 
allEndpoints(settings.getProperty(DORIS_FENODES), logger);
+        boolean enableHttps = 
settings.getBooleanProperty(ConfigurationOptions.DORIS_ENABLE_HTTPS,
+                ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT);
+        CloseableHttpClient client = HttpUtil.getHttpClient(settings);
+        for (String frontend : frontends) {
+            try {
+                HttpUriRequest request = func.apply(frontend, enableHttps);
+                String user = settings.getProperty(DORIS_REQUEST_AUTH_USER, 
"");
+                String password = 
settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
+                logger.info("Send request to Doris FE '{}' with user '{}'.", 
request.getURI(), user);
+                request.setHeader(HttpHeaders.AUTHORIZATION, "Basic " +
+                        Base64.getEncoder().encodeToString((user + ":" + 
password).getBytes(StandardCharsets.UTF_8)));
+                CloseableHttpResponse response = client.execute(request);
+                if (response.getStatusLine().getStatusCode() == 
HttpStatus.SC_OK) {
+                    String resStr = EntityUtils.toString(response.getEntity());
+                    Map<String, Object> resMap = MAPPER.readValue(resStr,
+                            new TypeReference<Map<String, Object>>() {
+                            });
+                    if (resMap.containsKey("msg") && 
resMap.containsKey("data")) {
+                        return MAPPER.writeValueAsString(resMap.get("data"));
+                    }
+                    return resStr;
+                }
+            } catch (IOException e) {
+                logger.error("Doris FE node {} is unavailable, Request the 
next Doris FE node. Err: {}", frontend, e.getMessage());
+            }
         }
+        String errMsg = "No Doris FE is available, please check configuration";
+        logger.error(errMsg);
+        throw new DorisException(errMsg);
     }
 
-
 }
\ No newline at end of file
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 8cb4942..57cacd6 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -28,7 +28,7 @@ import org.apache.doris.spark.rest.RestService
 import org.apache.doris.spark.rest.models.BackendV2.BackendRowV2
 import org.apache.doris.spark.rest.models.RespContent
 import org.apache.doris.spark.sql.Utils
-import org.apache.doris.spark.util.ResponseUtil
+import org.apache.doris.spark.util.{HttpUtil, ResponseUtil, URLs}
 import org.apache.http.client.methods.{CloseableHttpResponse, HttpPut, 
HttpRequestBase, HttpUriRequest}
 import org.apache.http.entity.{BufferedHttpEntity, ByteArrayEntity, 
InputStreamEntity}
 import org.apache.http.impl.client.{CloseableHttpClient, 
DefaultRedirectStrategy, HttpClients}
@@ -56,10 +56,6 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
 
   private final val MAPPER: ObjectMapper = JsonMapper.builder().build()
 
-  private val LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
-
-  private val LOAD_2PC_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
-
   private val database: String = 
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.")(0)
 
   private val table: String = 
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.")(1)
@@ -75,11 +71,20 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
 
   private var currentLoadUrl: String = _
 
+  private val autoRedirect: Boolean = 
settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT,
+    ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT)
+
+  require(if 
(settings.getBooleanProperty(ConfigurationOptions.DORIS_ENABLE_HTTPS,
+    ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT)) autoRedirect else true, 
"https must open with auto redirect")
+
+  private val enableHttps: Boolean = 
settings.getBooleanProperty(ConfigurationOptions.DORIS_ENABLE_HTTPS,
+    ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT) && autoRedirect
+
   /**
    * execute stream load
    *
    * @param iterator row data iterator
-   * @param schema row schema
+   * @param schema   row schema
    * @throws stream load exception
    * @return transaction id
    */
@@ -88,7 +93,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
 
     var msg: Option[CommitMessage] = None
 
-    val client: CloseableHttpClient = getHttpClient
+    val client: CloseableHttpClient = HttpUtil.getHttpClient(settings)
     val label: String = generateLoadLabel()
 
     Try {
@@ -122,8 +127,8 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
 
     Try {
 
-      val address = getAddress
-      val abortUrl = String.format(LOAD_2PC_URL_PATTERN, address, database)
+      val node = getNode
+      val abortUrl = URLs.streamLoad2PC(node, database, enableHttps)
       val httpPut = new HttpPut(abortUrl)
       addCommonHeader(httpPut)
       httpPut.setHeader("txn_operation", "commit")
@@ -139,7 +144,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
       statusCode = response.getStatusLine.getStatusCode
       val reasonPhrase = response.getStatusLine.getReasonPhrase
       if (statusCode != 200) {
-        LOG.warn(s"commit failed with $address, reason $reasonPhrase")
+        LOG.warn(s"commit failed with $node, reason $reasonPhrase")
         throw new StreamLoadException("stream load error: " + reasonPhrase)
       }
 
@@ -188,7 +193,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
    */
   private def getStreamLoadProps: Map[String, String] = {
     val props = 
settings.asProperties().asScala.filter(_._1.startsWith(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX))
-      .map { case (k,v) => 
(k.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length), v)}
+      .map { case (k, v) => 
(k.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length), v) }
     if (props.getOrElse("add_double_quotes", "false").toBoolean) {
       LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to 
true for prop.")
       props.put("trim_double_quotes", "true")
@@ -227,13 +232,13 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
    * build load request, set params as request header
    *
    * @param iterator row data iterator
-   * @param schema row data schema
-   * @param label load label
+   * @param schema   row data schema
+   * @param label    load label
    * @return http request
    */
   private def buildLoadRequest(iterator: Iterator[InternalRow], schema: 
StructType, label: String): HttpUriRequest = {
 
-    currentLoadUrl = String.format(LOAD_URL_PATTERN, getAddress, database, 
table)
+    currentLoadUrl = URLs.streamLoad(getNode, database, table, enableHttps)
     val put = new HttpPut(currentLoadUrl)
     addCommonHeader(put)
 
@@ -269,18 +274,16 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
    *
    * if load data to be directly, check node available will be done before 
return.
    *
-   * @throws [[org.apache.doris.spark.exception.StreamLoadException]]
+   * @throws [ [ org.apache.doris.spark.exception.StreamLoadException]]
    * @return address
    */
   @throws[StreamLoadException]
-  private def getAddress: String = {
+  private def getNode: String = {
 
     var address: Option[String] = None
 
     Try {
 
-      val autoRedirect: Boolean = 
settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT,
-        ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT)
       if (autoRedirect) {
         val feNodes = settings.getProperty(ConfigurationOptions.DORIS_FENODES)
         address = Some(RestService.randomEndpoint(feNodes, LOG))
@@ -501,7 +504,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
 
     Try {
 
-      val abortUrl = String.format(LOAD_2PC_URL_PATTERN, getAddress, database)
+      val abortUrl = URLs.streamLoad2PC(getNode, database, enableHttps)
       val httpPut = new HttpPut(abortUrl)
       addCommonHeader(httpPut)
       httpPut.setHeader("txn_operation", "abort")
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/HttpUtil.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/HttpUtil.scala
new file mode 100644
index 0000000..5afb457
--- /dev/null
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/HttpUtil.scala
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util
+
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustAllStrategy}
+import org.apache.http.impl.client.{CloseableHttpClient, 
DefaultRedirectStrategy, HttpClients}
+import org.apache.http.ssl.SSLContexts
+
+import java.io.{File, FileInputStream}
+import java.security.KeyStore
+import scala.util.{Failure, Success, Try}
+
+object HttpUtil {
+
+  def getHttpClient(settings: SparkSettings): CloseableHttpClient = {
+    val connectTimeout = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,
+      ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+    val socketTimeout = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,
+      ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+    val requestConfig = 
RequestConfig.custom().setConnectTimeout(connectTimeout).setSocketTimeout(socketTimeout).build()
+    val clientBuilder = HttpClients.custom()
+      .setRedirectStrategy(new DefaultRedirectStrategy {
+        override def isRedirectable(method: String): Boolean = true
+      })
+      .setDefaultRequestConfig(requestConfig)
+    val enableHttps = settings.getBooleanProperty("doris.enable.https", false)
+    if (enableHttps) {
+      val props = settings.asProperties()
+      
require(props.containsKey(ConfigurationOptions.DORIS_HTTPS_KEY_STORE_PATH))
+      val keyStorePath: String = 
props.getProperty(ConfigurationOptions.DORIS_HTTPS_KEY_STORE_PATH)
+      val keyStoreFile = new File(keyStorePath)
+      if (!keyStoreFile.exists()) throw new IllegalArgumentException()
+      val keyStoreType: String = 
props.getProperty(ConfigurationOptions.DORIS_HTTPS_KEY_STORE_TYPE,
+        ConfigurationOptions.DORIS_HTTPS_KEY_STORE_TYPE_DEFAULT)
+      val keyStore = KeyStore.getInstance(keyStoreType)
+      var fis: FileInputStream = null
+      Try {
+        fis = new FileInputStream(keyStoreFile)
+        val password = 
props.getProperty(ConfigurationOptions.DORIS_HTTPS_KEY_STORE_PASSWORD)
+        keyStore.load(fis, if (password == null) null else 
password.toCharArray)
+      } match {
+        case Success(_) => if (fis != null) fis.close()
+        case Failure(e) =>
+          if (fis != null) fis.close()
+          throw e
+      }
+      val sslContext = SSLContexts.custom().loadTrustMaterial(keyStore, new 
TrustAllStrategy).build()
+      clientBuilder.setSSLSocketFactory(new 
SSLConnectionSocketFactory(sslContext))
+    }
+    clientBuilder.build()
+  }
+
+}
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/URLs.scala 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/URLs.scala
new file mode 100644
index 0000000..a2335e1
--- /dev/null
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/URLs.scala
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util
+
+object URLs {
+
+  private val HTTP_SCHEMA = "http"
+
+  private val HTTPS_SCHEMA = "https"
+
+  private def schema(enableHttps: Boolean): String = if (enableHttps) 
HTTPS_SCHEMA else HTTP_SCHEMA
+
+  def aliveBackend(feNode: String, enableHttps: Boolean = false) = 
s"${schema(enableHttps)}://$feNode/api/backends?is_alive=true"
+
+  def tableSchema(feNode: String, database: String, table: String, 
enableHttps: Boolean = false) =
+    s"${schema(enableHttps)}://$feNode/api/$database/$table/_schema"
+
+  def queryPlan(feNode: String, database: String, table: String, enableHttps: 
Boolean = false) =
+    s"${schema(enableHttps)}://$feNode/api/$database/$table/_query_plan"
+
+  def streamLoad(feNode: String, database: String, table: String, enableHttps: 
Boolean = false) =
+    s"${schema(enableHttps)}://$feNode/api/$database/$table/_stream_load"
+
+  def streamLoad2PC(feNode: String, database: String, enableHttps: Boolean = 
false) =
+    s"${schema(enableHttps)}://$feNode/api/$database/_stream_load_2pc"
+
+}
diff --git 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/load/StreamLoaderTest.java
 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/load/StreamLoaderTest.java
new file mode 100644
index 0000000..65320a7
--- /dev/null
+++ 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/load/StreamLoaderTest.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.load;
+
+import org.apache.doris.spark.cfg.ConfigurationOptions;
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.spark.SparkConf;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class StreamLoaderTest {
+
+    @Rule
+    public ExpectedException exception = ExpectedException.none();
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testEnableHttpsWithoutAutoRedirect() {
+        SparkConf sparkConf = new SparkConf();
+        sparkConf.set(ConfigurationOptions.DORIS_ENABLE_HTTPS, "true");
+        sparkConf.set(ConfigurationOptions.DORIS_TABLE_IDENTIFIER, "db.table");
+        sparkConf.set(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT, "false");
+        new StreamLoader(new SparkSettings(sparkConf), false);
+        sparkConf.set(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT, "true");
+        new StreamLoader(new SparkSettings(sparkConf), false);
+
+    }
+
+    @Test
+    public void testEnableHttpsWithAutoRedirect() {
+        SparkConf sparkConf = new SparkConf();
+        sparkConf.set(ConfigurationOptions.DORIS_ENABLE_HTTPS, "true");
+        sparkConf.set(ConfigurationOptions.DORIS_TABLE_IDENTIFIER, "db.table");
+        sparkConf.set(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT, "true");
+        new StreamLoader(new SparkSettings(sparkConf), false);
+
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to