This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new a9725bf [feature] support fe https request (#189)
a9725bf is described below
commit a9725bfc80e915a76476f32207518478da8a5848
Author: gnehil <[email protected]>
AuthorDate: Mon Feb 5 18:06:58 2024 +0800
[feature] support fe https request (#189)
---
.../doris/spark/cfg/ConfigurationOptions.java | 12 +
.../org/apache/doris/spark/rest/RestService.java | 328 +++++++--------------
.../org/apache/doris/spark/load/StreamLoader.scala | 41 +--
.../org/apache/doris/spark/util/HttpUtil.scala | 70 +++++
.../scala/org/apache/doris/spark/util/URLs.scala | 42 +++
.../apache/doris/spark/load/StreamLoaderTest.java | 54 ++++
6 files changed, 304 insertions(+), 243 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 6728906..437eabe 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -132,4 +132,16 @@ public interface ConfigurationOptions {
*/
String DORIS_SINK_DATA_COMPRESS_TYPE =
"doris.sink.properties.compress_type";
+ String DORIS_ENABLE_HTTPS = "doris.enable.https";
+
+ boolean DORIS_ENABLE_HTTPS_DEFAULT = false;
+
+ String DORIS_HTTPS_KEY_STORE_PATH = "doris.https.key-store-path";
+
+ String DORIS_HTTPS_KEY_STORE_TYPE = "doris.https.key-store-type";
+
+ String DORIS_HTTPS_KEY_STORE_TYPE_DEFAULT = "JKS";
+
+ String DORIS_HTTPS_KEY_STORE_PASSWORD = "doris.https.key-store-password";
+
}
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index 86e370c..c838830 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -50,13 +50,19 @@ import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.cfg.SparkSettings;
@@ -70,15 +76,23 @@ import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
+import org.apache.doris.spark.util.HttpUtil;
+import org.apache.doris.spark.util.URLs;
+import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
+import scala.collection.JavaConverters;
/**
* Service for communicate with Doris FE.
@@ -86,126 +100,9 @@ import com.google.common.annotations.VisibleForTesting;
public class RestService implements Serializable {
public final static int REST_RESPONSE_STATUS_OK = 200;
private static final String API_PREFIX = "/api";
- private static final String SCHEMA = "_schema";
- private static final String QUERY_PLAN = "_query_plan";
- @Deprecated
- private static final String BACKENDS = "/rest/v1/system?path=//backends";
- private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
-
- /**
- * send request to Doris FE and get response json string.
- * @param cfg configuration of request
- * @param request {@link HttpRequestBase} real request
- * @param logger {@link Logger}
- * @return Doris FE response in json string
- * @throws ConnectedFailedException throw when cannot connect to Doris FE
- */
- private static String send(Settings cfg, HttpRequestBase request, Logger
logger) throws
- ConnectedFailedException {
- int connectTimeout =
cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,
- ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT);
- int socketTimeout =
cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,
- ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT);
- int retries =
cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,
- ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT);
- logger.trace("connect timeout set to '{}'. socket timeout set to '{}'.
retries set to '{}'.",
- connectTimeout, socketTimeout, retries);
-
- RequestConfig requestConfig = RequestConfig.custom()
- .setConnectTimeout(connectTimeout)
- .setSocketTimeout(socketTimeout)
- .build();
-
- request.setConfig(requestConfig);
- String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
- String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
- logger.info("Send request to Doris FE '{}' with user '{}'.",
request.getURI(), user);
- IOException ex = null;
- int statusCode = -1;
-
- for (int attempt = 0; attempt < retries; attempt++) {
- logger.debug("Attempt {} to request {}.", attempt,
request.getURI());
- try {
- String response;
- if (request instanceof HttpGet){
- response = getConnectionGet(request.getURI().toString(),
user, password,logger);
- } else {
- response = getConnectionPost(request,user,
password,logger);
- }
- if (response == null) {
- logger.warn("Failed to get response from Doris FE {}, http
code is {}",
- request.getURI(), statusCode);
- continue;
- }
- logger.trace("Success get response from Doris FE: {}, response
is: {}.",
- request.getURI(), response);
- ObjectMapper mapper = new ObjectMapper();
- Map map = mapper.readValue(response, Map.class);
- //Handle the problem of inconsistent data format returned by
http v1 and v2
- if (map.containsKey("code") && map.containsKey("msg")) {
- Object data = map.get("data");
- return mapper.writeValueAsString(data);
- } else {
- return response;
- }
- } catch (IOException e) {
- ex = e;
- logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
- }
- }
-
- logger.error(CONNECT_FAILED_MESSAGE, request.getURI(), ex);
- throw new ConnectedFailedException(request.getURI().toString(),
statusCode, ex);
- }
+ private static final ObjectMapper MAPPER = JsonMapper.builder().build();
- private static String getConnectionGet(String request,String user, String
passwd,Logger logger) throws IOException {
- URL realUrl = new URL(request);
- // open connection
- HttpURLConnection connection =
(HttpURLConnection)realUrl.openConnection();
- String authEncoding =
Base64.getEncoder().encodeToString(String.format("%s:%s", user,
passwd).getBytes(StandardCharsets.UTF_8));
- connection.setRequestProperty("Authorization", "Basic " +
authEncoding);
- connection.connect();
- return parseResponse(connection,logger);
- }
-
- private static String parseResponse(HttpURLConnection connection,Logger
logger) throws IOException {
- if (connection.getResponseCode() != HttpStatus.SC_OK) {
- logger.warn("Failed to get response from Doris {}, http code is
{}",
- connection.getURL(), connection.getResponseCode());
- throw new IOException("Failed to get response from Doris");
- }
- StringBuilder result = new StringBuilder("");
- BufferedReader in = new BufferedReader(new
InputStreamReader(connection.getInputStream(), "utf-8"));
- String line;
- while ((line = in.readLine()) != null) {
- result.append(line);
- }
- if (in != null) {
- in.close();
- }
- return result.toString();
- }
-
- private static String getConnectionPost(HttpRequestBase request,String
user, String passwd,Logger logger) throws IOException {
- URL url = new URL(request.getURI().toString());
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setInstanceFollowRedirects(false);
- conn.setRequestMethod(request.getMethod());
- String authEncoding =
Base64.getEncoder().encodeToString(String.format("%s:%s", user,
passwd).getBytes(StandardCharsets.UTF_8));
- conn.setRequestProperty("Authorization", "Basic " + authEncoding);
- InputStream content = ((HttpPost)request).getEntity().getContent();
- String res = IOUtils.toString(content);
- conn.setDoOutput(true);
- conn.setDoInput(true);
- PrintWriter out = new PrintWriter(conn.getOutputStream());
- // send request params
- out.print(res);
- // flush
- out.flush();
- // read response
- return parseResponse(conn,logger);
- }
/**
* parse table identifier to array.
* @param tableIdentifier table identifier string
@@ -264,6 +161,7 @@ public class RestService implements Serializable {
"/";
}
+ @Deprecated
@VisibleForTesting
static String getUriStr(String feNode,Settings cfg, Logger logger) throws
IllegalArgumentException {
String[] identifier =
parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
@@ -285,20 +183,10 @@ public class RestService implements Serializable {
public static Schema getSchema(Settings cfg, Logger logger)
throws DorisException {
logger.trace("Finding schema.");
- List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES),
logger);
- for (String feNode: feNodeList) {
- try {
- HttpGet httpGet = new HttpGet(getUriStr(feNode,cfg, logger) +
SCHEMA);
- String response = send(cfg, httpGet, logger);
- logger.debug("Find schema response is '{}'.", response);
- return parseSchema(response, logger);
- } catch (ConnectedFailedException e) {
- logger.info("Doris FE node {} is unavailable: {}, Request the
next Doris FE node", feNode, e.getMessage());
- }
- }
- String errMsg = "No Doris FE is available, please check configuration";
- logger.error(errMsg);
- throw new DorisException(errMsg);
+ String[] identifier =
parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
+ String response = queryAllFrontends((SparkSettings) cfg, (frontend,
enableHttps) ->
+ new HttpGet(URLs.tableSchema(frontend, identifier[0],
identifier[1], enableHttps)), logger);
+ return parseSchema(response, logger);
}
/**
@@ -311,10 +199,9 @@ public class RestService implements Serializable {
@VisibleForTesting
public static Schema parseSchema(String response, Logger logger) throws
DorisException {
logger.trace("Parse response '{}' to schema.", response);
- ObjectMapper mapper = new ObjectMapper();
Schema schema;
try {
- schema = mapper.readValue(response, Schema.class);
+ schema = MAPPER.readValue(response, Schema.class);
} catch (JsonParseException e) {
String errMsg = "Doris FE's response is not a json. res: " +
response;
logger.error(errMsg, e);
@@ -359,35 +246,27 @@ public class RestService implements Serializable {
}
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
- List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES),
logger);
- for (String feNode: feNodeList) {
- try {
- HttpPost httpPost = new HttpPost(getUriStr(feNode,cfg, logger)
+ QUERY_PLAN);
- String entity = "{\"sql\": \""+ sql +"\"}";
- logger.debug("Post body Sending to Doris FE is: '{}'.",
entity);
- StringEntity stringEntity = new StringEntity(entity,
StandardCharsets.UTF_8);
- stringEntity.setContentEncoding("UTF-8");
- stringEntity.setContentType("application/json");
- httpPost.setEntity(stringEntity);
-
- String resStr = send(cfg, httpPost, logger);
- logger.debug("Find partition response is '{}'.", resStr);
- QueryPlan queryPlan = getQueryPlan(resStr, logger);
- Map<String, List<Long>> be2Tablets =
selectBeForTablet(queryPlan, logger);
- return tabletsMapToPartition(
- cfg,
- be2Tablets,
- queryPlan.getOpaqued_query_plan(),
- tableIdentifiers[0],
- tableIdentifiers[1],
- logger);
- } catch (ConnectedFailedException e) {
- logger.info("Doris FE node {} is unavailable: {}, Request the
next Doris FE node", feNode, e.getMessage());
- }
- }
- String errMsg = "No Doris FE is available, please check configuration";
- logger.error(errMsg);
- throw new DorisException(errMsg);
+ String finalSql = sql;
+ String response = queryAllFrontends((SparkSettings) cfg, (frontend,
enableHttps) -> {
+ HttpPost httpPost = new HttpPost(URLs.queryPlan(frontend,
tableIdentifiers[0], tableIdentifiers[1], enableHttps));
+ String entity = "{\"sql\": \""+ finalSql +"\"}";
+ logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
+ StringEntity stringEntity = new StringEntity(entity,
StandardCharsets.UTF_8);
+ stringEntity.setContentEncoding("UTF-8");
+ stringEntity.setContentType("application/json");
+ httpPost.setEntity(stringEntity);
+ return httpPost;
+ }, logger);
+ logger.debug("Find partition response is '{}'.", response);
+ QueryPlan queryPlan = getQueryPlan(response, logger);
+ Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan,
logger);
+ return tabletsMapToPartition(
+ cfg,
+ be2Tablets,
+ queryPlan.getOpaqued_query_plan(),
+ tableIdentifiers[0],
+ tableIdentifiers[1],
+ logger);
}
@@ -400,10 +279,9 @@ public class RestService implements Serializable {
*/
@VisibleForTesting
static QueryPlan getQueryPlan(String response, Logger logger) throws
DorisException {
- ObjectMapper mapper = new ObjectMapper();
QueryPlan queryPlan;
try {
- queryPlan = mapper.readValue(response, QueryPlan.class);
+ queryPlan = MAPPER.readValue(response, QueryPlan.class);
} catch (JsonParseException e) {
String errMsg = "Doris FE's response is not a json. res: " +
response;
logger.error(errMsg, e);
@@ -527,17 +405,6 @@ public class RestService implements Serializable {
return getBackend(sparkSettings, logger);
}
- /**
- * choice a Doris BE node to request.
- * @param logger slf4j logger
- * @return the chosen one Doris BE node
- * @throws IllegalArgumentException BE nodes is illegal
- */
- @VisibleForTesting
- public static String randomBackendV2(SparkSettings sparkSettings, Logger
logger) throws DorisException {
- return getBackend(sparkSettings, logger);
- }
-
private static String getBackend(SparkSettings sparkSettings, Logger
logger) throws DorisException {
List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings,
logger);
Collections.shuffle(backends);
@@ -555,10 +422,9 @@ public class RestService implements Serializable {
@Deprecated
@VisibleForTesting
static List<BackendRow> parseBackend(String response, Logger logger)
throws DorisException, IOException {
- com.fasterxml.jackson.databind.ObjectMapper mapper = new
com.fasterxml.jackson.databind.ObjectMapper();
Backend backend;
try {
- backend = mapper.readValue(response, Backend.class);
+ backend = MAPPER.readValue(response, Backend.class);
} catch (com.fasterxml.jackson.core.JsonParseException e) {
String errMsg = "Doris BE's response is not a json. res: " +
response;
logger.error(errMsg, e);
@@ -577,7 +443,7 @@ public class RestService implements Serializable {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
- List<BackendRow> backendRows = backend.getRows().stream().filter(v ->
v.getAlive()).collect(Collectors.toList());
+ List<BackendRow> backendRows =
backend.getRows().stream().filter(BackendRow::getAlive).collect(Collectors.toList());
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}
@@ -595,15 +461,17 @@ public class RestService implements Serializable {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "benodes", backends);
throw new IllegalArgumentException("benodes",
String.valueOf(backends));
}
- BackendV2.BackendRowV2 backendRowV2 = new BackendV2.BackendRowV2();
- for (int i = 0; i < backends.size(); i++) {
- String ip =
backends.get(i).substring(0,backends.get(i).indexOf(":"));
+ /*
+ * By default, the BE port you enter is is_alive=true
+ */
+ for (String s : backends) {
+ String ip = s.substring(0, s.indexOf(":"));
try {
- Integer port =
Integer.valueOf(backends.get(i).substring(backends.get(i).indexOf(":")+1,backends.get(i).length()));
- /**
+ int port = Integer.parseInt(s.substring(s.indexOf(":") + 1));
+ /*
* By default, the BE port you enter is is_alive=true
*/
- BackendV2.BackendRowV2 backend =
backendRowV2.of(ip,port,true);
+ BackendV2.BackendRowV2 backend = BackendV2.BackendRowV2.of(ip,
port, true);
backendRowV2s.add(backend);
} catch (NumberFormatException e) {
logger.error("Doris BE is port error, please check
configuration");
@@ -621,41 +489,26 @@ public class RestService implements Serializable {
*/
@VisibleForTesting
public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings
sparkSettings, Logger logger) throws DorisException {
- /**
- * If the specified BE does not exist, the FE mode is used
- */
- if(notBeNode(sparkSettings,logger)){
- List<String> feNodeList =
allEndpoints(sparkSettings.getProperty(DORIS_FENODES), logger);
- for (String feNode : feNodeList){
- try {
- String beUrl = String.format("http://%s" + BACKENDS_V2,
feNode);
- HttpGet httpGet = new HttpGet(beUrl);
- String response = send(sparkSettings, httpGet, logger);
- logger.info("Backend Info:{}", response);
- List<BackendV2.BackendRowV2> backends =
parseBackendV2(response, logger);
- logger.trace("Parse benodes '{}'.", backends);
- if (backends == null || backends.isEmpty()) {
- logger.error(ILLEGAL_ARGUMENT_MESSAGE, "benodes",
backends);
- throw new IllegalArgumentException("benodes",
String.valueOf(backends));
- }
- return backends;
- } catch (ConnectedFailedException e) {
- logger.info("Doris FE node {} is unavailable: {}, Request
the next Doris FE node", feNode, e.getMessage());
- }
+ if
(StringUtils.isNoneBlank(sparkSettings.getProperty(sparkSettings.getProperty(DORIS_BENODES))))
{
+ return getBeNodes(sparkSettings, logger);
+ } else { // If the specified BE does not exist, the FE mode is used
+ String response = queryAllFrontends(sparkSettings, (frontend,
enableHttps) ->
+ new HttpGet(URLs.aliveBackend(frontend, enableHttps)),
logger);
+ logger.info("Backend Info:{}", response);
+ List<BackendV2.BackendRowV2> backends = parseBackendV2(response,
logger);
+ logger.trace("Parse benodes '{}'.", backends);
+ if (backends == null || backends.isEmpty()) {
+ logger.error(ILLEGAL_ARGUMENT_MESSAGE, "benodes", backends);
+ throw new IllegalArgumentException("benodes",
String.valueOf(backends));
}
- String errMsg = "No Doris FE is available, please check
configuration";
- logger.error(errMsg);
- throw new DorisException(errMsg);
- }else {
- return getBeNodes(sparkSettings, logger);
+ return backends;
}
}
static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger
logger) throws DorisException {
- com.fasterxml.jackson.databind.ObjectMapper mapper = new
com.fasterxml.jackson.databind.ObjectMapper();
BackendV2 backend;
try {
- backend = mapper.readValue(response, BackendV2.class);
+ backend = MAPPER.readValue(response, BackendV2.class);
} catch (com.fasterxml.jackson.core.JsonParseException e) {
String errMsg = "Doris BE's response is not a json. res: " +
response;
logger.error(errMsg, e);
@@ -756,20 +609,47 @@ public class RestService implements Serializable {
return nodes;
}
-
/**
- * Doris BE node is not
- * @param logger slf4j logger
- * @return Doris BE node Yes or no
+ * query all frontend
+ *
+ * @param settings doris config
+ * @param func request provider
+ * @param logger logger
+ * @return http response string
+ * @throws DorisException
*/
- public static Boolean notBeNode(SparkSettings sparkSettings, Logger
logger){
- String beNodes = sparkSettings.getProperty(DORIS_BENODES);
- if(null == beNodes){
- return true;
- }else {
- return false;
+ private static String queryAllFrontends(SparkSettings settings,
BiFunction<String, Boolean, HttpUriRequest> func,
+ Logger logger) throws DorisException
{
+ List<String> frontends =
allEndpoints(settings.getProperty(DORIS_FENODES), logger);
+ boolean enableHttps =
settings.getBooleanProperty(ConfigurationOptions.DORIS_ENABLE_HTTPS,
+ ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT);
+ CloseableHttpClient client = HttpUtil.getHttpClient(settings);
+ for (String frontend : frontends) {
+ try {
+ HttpUriRequest request = func.apply(frontend, enableHttps);
+ String user = settings.getProperty(DORIS_REQUEST_AUTH_USER,
"");
+ String password =
settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
+ logger.info("Send request to Doris FE '{}' with user '{}'.",
request.getURI(), user);
+ request.setHeader(HttpHeaders.AUTHORIZATION, "Basic " +
+ Base64.getEncoder().encodeToString((user + ":" +
password).getBytes(StandardCharsets.UTF_8)));
+ CloseableHttpResponse response = client.execute(request);
+ if (response.getStatusLine().getStatusCode() ==
HttpStatus.SC_OK) {
+ String resStr = EntityUtils.toString(response.getEntity());
+ Map<String, Object> resMap = MAPPER.readValue(resStr,
+ new TypeReference<Map<String, Object>>() {
+ });
+ if (resMap.containsKey("msg") &&
resMap.containsKey("data")) {
+ return MAPPER.writeValueAsString(resMap.get("data"));
+ }
+ return resStr;
+ }
+ } catch (IOException e) {
+ logger.error("Doris FE node {} is unavailable, Request the
next Doris FE node. Err: {}", frontend, e.getMessage());
+ }
}
+ String errMsg = "No Doris FE is available, please check configuration";
+ logger.error(errMsg);
+ throw new DorisException(errMsg);
}
-
}
\ No newline at end of file
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 8cb4942..57cacd6 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -28,7 +28,7 @@ import org.apache.doris.spark.rest.RestService
import org.apache.doris.spark.rest.models.BackendV2.BackendRowV2
import org.apache.doris.spark.rest.models.RespContent
import org.apache.doris.spark.sql.Utils
-import org.apache.doris.spark.util.ResponseUtil
+import org.apache.doris.spark.util.{HttpUtil, ResponseUtil, URLs}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPut,
HttpRequestBase, HttpUriRequest}
import org.apache.http.entity.{BufferedHttpEntity, ByteArrayEntity,
InputStreamEntity}
import org.apache.http.impl.client.{CloseableHttpClient,
DefaultRedirectStrategy, HttpClients}
@@ -56,10 +56,6 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
private final val MAPPER: ObjectMapper = JsonMapper.builder().build()
- private val LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"
-
- private val LOAD_2PC_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc"
-
private val database: String =
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.")(0)
private val table: String =
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.")(1)
@@ -75,11 +71,20 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
private var currentLoadUrl: String = _
+ private val autoRedirect: Boolean =
settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT,
+ ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT)
+
+ require(if
(settings.getBooleanProperty(ConfigurationOptions.DORIS_ENABLE_HTTPS,
+ ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT)) autoRedirect else true,
"https must open with auto redirect")
+
+ private val enableHttps: Boolean =
settings.getBooleanProperty(ConfigurationOptions.DORIS_ENABLE_HTTPS,
+ ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT) && autoRedirect
+
/**
* execute stream load
*
* @param iterator row data iterator
- * @param schema row schema
+ * @param schema row schema
* @throws stream load exception
* @return transaction id
*/
@@ -88,7 +93,7 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
var msg: Option[CommitMessage] = None
- val client: CloseableHttpClient = getHttpClient
+ val client: CloseableHttpClient = HttpUtil.getHttpClient(settings)
val label: String = generateLoadLabel()
Try {
@@ -122,8 +127,8 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
Try {
- val address = getAddress
- val abortUrl = String.format(LOAD_2PC_URL_PATTERN, address, database)
+ val node = getNode
+ val abortUrl = URLs.streamLoad2PC(node, database, enableHttps)
val httpPut = new HttpPut(abortUrl)
addCommonHeader(httpPut)
httpPut.setHeader("txn_operation", "commit")
@@ -139,7 +144,7 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
statusCode = response.getStatusLine.getStatusCode
val reasonPhrase = response.getStatusLine.getReasonPhrase
if (statusCode != 200) {
- LOG.warn(s"commit failed with $address, reason $reasonPhrase")
+ LOG.warn(s"commit failed with $node, reason $reasonPhrase")
throw new StreamLoadException("stream load error: " + reasonPhrase)
}
@@ -188,7 +193,7 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
*/
private def getStreamLoadProps: Map[String, String] = {
val props =
settings.asProperties().asScala.filter(_._1.startsWith(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX))
- .map { case (k,v) =>
(k.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length), v)}
+ .map { case (k, v) =>
(k.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length), v) }
if (props.getOrElse("add_double_quotes", "false").toBoolean) {
LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to
true for prop.")
props.put("trim_double_quotes", "true")
@@ -227,13 +232,13 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
* build load request, set params as request header
*
* @param iterator row data iterator
- * @param schema row data schema
- * @param label load label
+ * @param schema row data schema
+ * @param label load label
* @return http request
*/
private def buildLoadRequest(iterator: Iterator[InternalRow], schema:
StructType, label: String): HttpUriRequest = {
- currentLoadUrl = String.format(LOAD_URL_PATTERN, getAddress, database,
table)
+ currentLoadUrl = URLs.streamLoad(getNode, database, table, enableHttps)
val put = new HttpPut(currentLoadUrl)
addCommonHeader(put)
@@ -269,18 +274,16 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
*
* if load data to be directly, check node available will be done before
return.
*
- * @throws [[org.apache.doris.spark.exception.StreamLoadException]]
+ * @throws [ [ org.apache.doris.spark.exception.StreamLoadException]]
* @return address
*/
@throws[StreamLoadException]
- private def getAddress: String = {
+ private def getNode: String = {
var address: Option[String] = None
Try {
- val autoRedirect: Boolean =
settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT,
- ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT)
if (autoRedirect) {
val feNodes = settings.getProperty(ConfigurationOptions.DORIS_FENODES)
address = Some(RestService.randomEndpoint(feNodes, LOG))
@@ -501,7 +504,7 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
Try {
- val abortUrl = String.format(LOAD_2PC_URL_PATTERN, getAddress, database)
+ val abortUrl = URLs.streamLoad2PC(getNode, database, enableHttps)
val httpPut = new HttpPut(abortUrl)
addCommonHeader(httpPut)
httpPut.setHeader("txn_operation", "abort")
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/HttpUtil.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/HttpUtil.scala
new file mode 100644
index 0000000..5afb457
--- /dev/null
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/HttpUtil.scala
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util
+
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustAllStrategy}
+import org.apache.http.impl.client.{CloseableHttpClient,
DefaultRedirectStrategy, HttpClients}
+import org.apache.http.ssl.SSLContexts
+
+import java.io.{File, FileInputStream}
+import java.security.KeyStore
+import scala.util.{Failure, Success, Try}
+
+object HttpUtil {
+
+ def getHttpClient(settings: SparkSettings): CloseableHttpClient = {
+ val connectTimeout =
settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,
+ ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+ val socketTimeout =
settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,
+ ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+ val requestConfig =
RequestConfig.custom().setConnectTimeout(connectTimeout).setSocketTimeout(socketTimeout).build()
+ val clientBuilder = HttpClients.custom()
+ .setRedirectStrategy(new DefaultRedirectStrategy {
+ override def isRedirectable(method: String): Boolean = true
+ })
+ .setDefaultRequestConfig(requestConfig)
+ val enableHttps = settings.getBooleanProperty("doris.enable.https", false)
+ if (enableHttps) {
+ val props = settings.asProperties()
+
require(props.containsKey(ConfigurationOptions.DORIS_HTTPS_KEY_STORE_PATH))
+ val keyStorePath: String =
props.getProperty(ConfigurationOptions.DORIS_HTTPS_KEY_STORE_PATH)
+ val keyStoreFile = new File(keyStorePath)
+ if (!keyStoreFile.exists()) throw new IllegalArgumentException()
+ val keyStoreType: String =
props.getProperty(ConfigurationOptions.DORIS_HTTPS_KEY_STORE_TYPE,
+ ConfigurationOptions.DORIS_HTTPS_KEY_STORE_TYPE_DEFAULT)
+ val keyStore = KeyStore.getInstance(keyStoreType)
+ var fis: FileInputStream = null
+ Try {
+ fis = new FileInputStream(keyStoreFile)
+ val password =
props.getProperty(ConfigurationOptions.DORIS_HTTPS_KEY_STORE_PASSWORD)
+ keyStore.load(fis, if (password == null) null else
password.toCharArray)
+ } match {
+ case Success(_) => if (fis != null) fis.close()
+ case Failure(e) =>
+ if (fis != null) fis.close()
+ throw e
+ }
+ val sslContext = SSLContexts.custom().loadTrustMaterial(keyStore, new
TrustAllStrategy).build()
+ clientBuilder.setSSLSocketFactory(new
SSLConnectionSocketFactory(sslContext))
+ }
+ clientBuilder.build()
+ }
+
+}
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/URLs.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/URLs.scala
new file mode 100644
index 0000000..a2335e1
--- /dev/null
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/URLs.scala
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util
+
+object URLs {
+
+ private val HTTP_SCHEMA = "http"
+
+ private val HTTPS_SCHEMA = "https"
+
+ private def schema(enableHttps: Boolean): String = if (enableHttps)
HTTPS_SCHEMA else HTTP_SCHEMA
+
+ def aliveBackend(feNode: String, enableHttps: Boolean = false) =
s"${schema(enableHttps)}://$feNode/api/backends?is_alive=true"
+
+ def tableSchema(feNode: String, database: String, table: String,
enableHttps: Boolean = false) =
+ s"${schema(enableHttps)}://$feNode/api/$database/$table/_schema"
+
+ def queryPlan(feNode: String, database: String, table: String, enableHttps:
Boolean = false) =
+ s"${schema(enableHttps)}://$feNode/api/$database/$table/_query_plan"
+
+ def streamLoad(feNode: String, database: String, table: String, enableHttps:
Boolean = false) =
+ s"${schema(enableHttps)}://$feNode/api/$database/$table/_stream_load"
+
+ def streamLoad2PC(feNode: String, database: String, enableHttps: Boolean =
false) =
+ s"${schema(enableHttps)}://$feNode/api/$database/_stream_load_2pc"
+
+}
diff --git
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/load/StreamLoaderTest.java
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/load/StreamLoaderTest.java
new file mode 100644
index 0000000..65320a7
--- /dev/null
+++
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/load/StreamLoaderTest.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.load;
+
+import org.apache.doris.spark.cfg.ConfigurationOptions;
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.spark.SparkConf;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class StreamLoaderTest {
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEnableHttpsWithoutAutoRedirect() {
+ SparkConf sparkConf = new SparkConf();
+ sparkConf.set(ConfigurationOptions.DORIS_ENABLE_HTTPS, "true");
+ sparkConf.set(ConfigurationOptions.DORIS_TABLE_IDENTIFIER, "db.table");
+ sparkConf.set(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT, "false");
+ new StreamLoader(new SparkSettings(sparkConf), false);
+ sparkConf.set(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT, "true");
+ new StreamLoader(new SparkSettings(sparkConf), false);
+
+ }
+
+ @Test
+ public void testEnableHttpsWithAutoRedirect() {
+ SparkConf sparkConf = new SparkConf();
+ sparkConf.set(ConfigurationOptions.DORIS_ENABLE_HTTPS, "true");
+ sparkConf.set(ConfigurationOptions.DORIS_TABLE_IDENTIFIER, "db.table");
+ sparkConf.set(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT, "true");
+ new StreamLoader(new SparkSettings(sparkConf), false);
+
+ }
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]