This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3e86102f47 [Feature][Connector-V2]Support Doris Fe Node HA (#8311)
3e86102f47 is described below
commit 3e86102f47a2da80b3d1161d56293bc0ec6c70c6
Author: xiaochen <[email protected]>
AuthorDate: Wed Dec 18 19:13:07 2024 +0800
[Feature][Connector-V2]Support Doris Fe Node HA (#8311)
---
.../connectors/doris/rest/RestService.java | 48 +++++++++++--------
.../doris/sink/writer/DorisSinkWriter.java | 42 ++++++++++------
.../doris/rest/models/RestServiceTest.java | 56 ----------------------
3 files changed, 55 insertions(+), 91 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
index 77c23f7341..cb2f33df93 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
@@ -240,22 +240,7 @@ public class RestService implements Serializable {
}
@VisibleForTesting
- public static String randomEndpoint(String feNodes, Logger logger)
- throws DorisConnectorException {
- logger.trace("Parse fenodes '{}'.", feNodes);
- if (StringUtils.isEmpty(feNodes)) {
- String errMsg =
- String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE,
"fenodes", feNodes);
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
- List<String> nodes = Arrays.asList(feNodes.split(","));
- Collections.shuffle(nodes);
- return nodes.get(0).trim();
- }
-
- @VisibleForTesting
- static String getUriStr(
- DorisSourceConfig dorisSourceConfig, DorisSourceTable
dorisSourceTable, Logger logger)
+ static String getUriStr(String node, DorisSourceTable dorisSourceTable,
Logger logger)
throws DorisConnectorException {
String tableIdentifier =
dorisSourceTable.getTablePath().getDatabaseName()
@@ -263,7 +248,7 @@ public class RestService implements Serializable {
+ dorisSourceTable.getTablePath().getTableName();
String[] identifier = parseIdentifier(tableIdentifier, logger);
return "http://"
- + randomEndpoint(dorisSourceConfig.getFrontends(), logger)
+ + node.trim()
+ API_PREFIX
+ "/"
+ identifier[0]
@@ -298,16 +283,37 @@ public class RestService implements Serializable {
}
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
- HttpPost httpPost =
- new HttpPost(getUriStr(dorisSourceConfig, dorisSourceTable,
logger) + QUERY_PLAN);
String entity = "{\"sql\": \"" + sql + "\"}";
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
StringEntity stringEntity = new StringEntity(entity,
StandardCharsets.UTF_8);
stringEntity.setContentEncoding("UTF-8");
stringEntity.setContentType("application/json");
- httpPost.setEntity(stringEntity);
- String resStr = send(dorisSourceConfig, httpPost, logger);
+ List<String> feNodes =
Arrays.asList(dorisSourceConfig.getFrontends().split(","));
+ Collections.shuffle(feNodes);
+ int feNodesNum = feNodes.size();
+ String resStr = null;
+
+ for (int i = 0; i < feNodesNum; i++) {
+ try {
+ HttpPost httpPost =
+ new HttpPost(
+ getUriStr(feNodes.get(i), dorisSourceTable,
logger) + QUERY_PLAN);
+ httpPost.setEntity(stringEntity);
+ resStr = send(dorisSourceConfig, httpPost, logger);
+ break;
+ } catch (Exception e) {
+ if (i == feNodesNum - 1) {
+ throw new DorisConnectorException(
+ DorisConnectorErrorCode.REST_SERVICE_FAILED, e);
+ }
+ log.error(
+ "Find partition error for feNode: {} with exception:
{}",
+ feNodes.get(i),
+ e.getMessage());
+ }
+ }
+
logger.debug("Find partition response is '{}'.", resStr);
QueryPlan queryPlan = getQueryPlan(resStr, logger);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan,
logger);
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index fa0d671e82..2d40ea7bdd 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.rest.RestService;
import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
@@ -98,21 +97,36 @@ public class DorisSinkWriter
}
private void initializeLoad() {
- String backend =
RestService.randomEndpoint(dorisSinkConfig.getFrontends(), log);
- try {
- this.dorisStreamLoad =
- new DorisStreamLoad(
- backend,
- catalogTable.getTablePath(),
- dorisSinkConfig,
- labelGenerator,
- new HttpUtil().getHttpClient());
- if (dorisSinkConfig.getEnable2PC()) {
- dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId +
1);
+
+ List<String> feNodes =
Arrays.asList(dorisSinkConfig.getFrontends().split(","));
+ Collections.shuffle(feNodes);
+ int feNodesNum = feNodes.size();
+
+ for (int i = 0; i < feNodesNum; i++) {
+ try {
+ this.dorisStreamLoad =
+ new DorisStreamLoad(
+ feNodes.get(i),
+ catalogTable.getTablePath(),
+ dorisSinkConfig,
+ labelGenerator,
+ new HttpUtil().getHttpClient());
+ if (dorisSinkConfig.getEnable2PC()) {
+ dorisStreamLoad.abortPreCommit(labelPrefix,
lastCheckpointId + 1);
+ }
+ break;
+ } catch (Exception e) {
+ if (i == feNodesNum - 1) {
+ throw new DorisConnectorException(
+ DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
+ }
+ log.error(
+ "stream load error for feNode: {} with exception: {}",
+ feNodes.get(i),
+ e.getMessage());
}
- } catch (Exception e) {
- throw new
DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
}
+
startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
// when uploading data in streaming mode, we need to regularly detect
whether there are
// exceptions.
diff --git
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java
deleted file mode 100644
index aa917d5766..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import org.apache.seatunnel.connectors.doris.rest.RestService;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class RestServiceTest {
-
- @Test
- void testRandomEndpoint() {
-
- List<String> list =
- Arrays.asList(
- "fe_host1:fe_http_port1",
- "fe_host2:fe_http_port2",
- "fe_host3:fe_http_port3",
- "fe_host4:fe_http_port4",
- "fe_host5:fe_http_port5");
-
- boolean hasDifferentAddress = false;
- for (int i = 0; i < 5; i++) {
- Set<String> addresses =
- list.stream()
- .map(address ->
RestService.randomEndpoint(String.join(",", list), log))
- .collect(Collectors.toSet());
- hasDifferentAddress = addresses.size() > 1;
- }
- Assertions.assertTrue(hasDifferentAddress);
- }
-}