This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7effefa [improvement] Supports traversal of Doris FE nodes when
searching for Doris BE (#11)
7effefa is described below
commit 7effefa160f7b6a8e9753ad07c516f0ee160906e
Author: Jiangqiao Xu <[email protected]>
AuthorDate: Sun Feb 27 11:08:53 2022 +0800
[improvement] Supports traversal of Doris FE nodes when searching for Doris
BE (#11)
---
.../org/apache/doris/flink/rest/RestService.java | 43 ++++++++++++++++++----
1 file changed, 36 insertions(+), 7 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 9812840..f1d0512 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -244,6 +244,26 @@ public class RestService implements Serializable {
}
/**
+ * choice a Doris FE node to request.
+ *
+ * @param feNodes Doris FE node list, separate be comma
+ * @param logger slf4j logger
+ * @return the array of Doris FE nodes
+ * @throws IllegalArgumentException fe nodes is illegal
+ */
+ @VisibleForTesting
+ static List<String> allEndpoints(String feNodes, Logger logger) throws
IllegalArgumentException {
+ logger.trace("Parse fenodes '{}'.", feNodes);
+ if (StringUtils.isEmpty(feNodes)) {
+ logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
+ throw new IllegalArgumentException("fenodes", feNodes);
+ }
+ List<String> nodes =
Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList());
+ Collections.shuffle(nodes);
+ return nodes;
+ }
+
+ /**
* choice a Doris BE node to request.
*
* @param options configuration of request
@@ -328,13 +348,22 @@ public class RestService implements Serializable {
@VisibleForTesting
static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options,
DorisReadOptions readOptions, Logger logger) throws DorisException, IOException
{
String feNodes = options.getFenodes();
- String feNode = randomEndpoint(feNodes, logger);
- String beUrl = "http://" + feNode + BACKENDS_V2;
- HttpGet httpGet = new HttpGet(beUrl);
- String response = send(options, readOptions, httpGet, logger);
- logger.info("Backend Info:{}", response);
- List<BackendV2.BackendRowV2> backends = parseBackendV2(response,
logger);
- return backends;
+ List<String> feNodeList = allEndpoints(feNodes, logger);
+ for (String feNode: feNodeList) {
+ try {
+ String beUrl = "http://" + feNode + BACKENDS_V2;
+ HttpGet httpGet = new HttpGet(beUrl);
+ String response = send(options, readOptions, httpGet, logger);
+ logger.info("Backend Info:{}", response);
+ List<BackendV2.BackendRowV2> backends =
parseBackendV2(response, logger);
+ return backends;
+ } catch (ConnectedFailedException e) {
+ logger.info("Doris FE node {} is unavailable: {}, Request the
next Doris FE node", feNode, e.getMessage());
+ }
+ }
+ String errMsg = "No Doris FE is available, please check configuration";
+ logger.error(errMsg);
+ throw new DorisException(errMsg);
}
static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger
logger) throws DorisException, IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]