This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-for-flink-before-1.13
in repository
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/branch-for-flink-before-1.13
by this push:
new 3ebb83a [improvement] Supports traversal of Doris FE nodes when
searching for Doris BE (#11)
3ebb83a is described below
commit 3ebb83a9c65b275b538c1458be8cf4c0a77a68c1
Author: Jiangqiao Xu <[email protected]>
AuthorDate: Sun Feb 27 11:08:39 2022 +0800
[improvement] Supports traversal of Doris FE nodes when searching for Doris
BE (#11)
---
.../org/apache/doris/flink/rest/RestService.java | 45 ++++++++++++++++++----
1 file changed, 37 insertions(+), 8 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 0c4264f..c229cf0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -244,6 +244,26 @@ public class RestService implements Serializable {
}
/**
+ * choice a Doris FE node to request.
+ *
+ * @param feNodes Doris FE node list, separate be comma
+ * @param logger slf4j logger
+ * @return the array of Doris FE nodes
+ * @throws IllegalArgumentException fe nodes is illegal
+ */
+ @VisibleForTesting
+ static List<String> allEndpoints(String feNodes, Logger logger) throws
IllegalArgumentException {
+ logger.trace("Parse fenodes '{}'.", feNodes);
+ if (StringUtils.isEmpty(feNodes)) {
+ logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
+ throw new IllegalArgumentException("fenodes", feNodes);
+ }
+ List<String> nodes =
Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList());
+ Collections.shuffle(nodes);
+ return nodes;
+ }
+
+ /**
* choice a Doris BE node to request.
*
* @param options configuration of request
@@ -322,19 +342,28 @@ public class RestService implements Serializable {
*
* @param options configuration of request
* @param logger slf4j logger
- * @return the chosen one Doris BE node
+ * @return all Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options,
DorisReadOptions readOptions, Logger logger) throws DorisException, IOException
{
String feNodes = options.getFenodes();
- String feNode = randomEndpoint(feNodes, logger);
- String beUrl = "http://" + feNode + BACKENDS_V2;
- HttpGet httpGet = new HttpGet(beUrl);
- String response = send(options, readOptions, httpGet, logger);
- logger.info("Backend Info:{}", response);
- List<BackendV2.BackendRowV2> backends = parseBackendV2(response,
logger);
- return backends;
+ List<String> feNodeList = allEndpoints(feNodes, logger);
+ for (String feNode: feNodeList) {
+ try {
+ String beUrl = "http://" + feNode + BACKENDS_V2;
+ HttpGet httpGet = new HttpGet(beUrl);
+ String response = send(options, readOptions, httpGet, logger);
+ logger.info("Backend Info:{}", response);
+ List<BackendV2.BackendRowV2> backends =
parseBackendV2(response, logger);
+ return backends;
+ } catch (ConnectedFailedException e) {
+ logger.info("Doris FE node {} is unavailable: {}, Request the
next Doris FE node", feNode, e.getMessage());
+ }
+ }
+ String errMsg = "No Doris FE is available, please check configuration";
+ logger.error(errMsg);
+ throw new DorisException(errMsg);
}
static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger
logger) throws DorisException, IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]