This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 59e526def Doris ec supports dataSources (#5060)
59e526def is described below
commit 59e526def1247f87cf70e48fb3d0175bf4c40d32
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Dec 29 10:56:36 2023 +0800
Doris ec supports dataSources (#5060)
* Doris ec supports dataSources
* Optimized code
---
.../doris/conf/DorisConfiguration.java | 6 ++
.../engineplugin/doris/constant/DorisConstant.java | 14 +++
.../doris/executor/DorisDatasourceParser.java | 117 ++++++++++++++++++++
.../doris/executor/DorisEngineConnExecutor.java | 120 +++++++++++++++++----
4 files changed, 234 insertions(+), 23 deletions(-)
diff --git
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java
index 24b80f734..4d691a9c3 100644
---
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java
+++
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java
@@ -72,6 +72,12 @@ public class DorisConfiguration {
public static final CommonVars<String> DORIS_PASSWORD =
CommonVars.apply("linkis.ec.doris.password", "");
+ public static final CommonVars<String> DORIS_DATASOURCE =
+ CommonVars.apply("linkis.ec.doris.datasource", "");
+
+ public static final CommonVars<String> DORIS_DATASOURCE_SYSTEM_QUERY_PARAM =
+ CommonVars.apply("linkis.ec.doris.datasource.systemQueryParam", "");
+
public static final CommonVars<Boolean> DORIS_RECONNECT_ENABLED =
CommonVars.apply("linkis.ec.doris.2pc.enabled", false, "two phase commit
Whether to enable");
diff --git
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java
index 771d51a67..6eb556c88 100644
---
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java
+++
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java
@@ -56,4 +56,18 @@ public class DorisConstant {
public static final String TXN_OPERATION = "txn_operation";
public static final Integer HTTP_SUCCEED = 200;
+
+ public static final String DS_JDBC_HOST = "host";
+
+ public static final String DS_JDBC_PORT = "port";
+
+ public static final String DS_JDBC_DB_NAME = "databaseName";
+
+ public static final String DS_JDBC_DB_INSTANCE = "instance";
+
+ public static final String DS_JDBC_USERNAME = "username";
+
+ public static final String DS_JDBC_PASSWORD = "password";
+
+ public static final String DS_JDBC_PASSWORD_HIDE_VALUE = "******";
}
diff --git
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisDatasourceParser.java
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisDatasourceParser.java
new file mode 100644
index 000000000..b9a0986d2
--- /dev/null
+++
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisDatasourceParser.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.doris.executor;
+
+import org.apache.linkis.common.utils.JsonUtils;
+import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient;
+import
org.apache.linkis.datasource.client.request.GetInfoPublishedByDataSourceNameAction;
+import org.apache.linkis.datasourcemanager.common.domain.DataSource;
+import org.apache.linkis.engineplugin.doris.constant.DorisConstant;
+
+import org.apache.commons.collections.MapUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DorisDatasourceParser {
+ private static final Logger logger =
LoggerFactory.getLogger(DorisDatasourceParser.class);
+
+ public static Map<String, String> queryDatasourceInfoByName(
+ String datasourceName, String username, String system) {
+ logger.info(
+ "Starting query ["
+ + system
+ + ", "
+ + username
+ + ", "
+ + datasourceName
+ + "] datasource info ......");
+ LinkisDataSourceRemoteClient dataSourceClient = new
LinkisDataSourceRemoteClient();
+ DataSource dataSource =
+ dataSourceClient
+ .getInfoPublishedByDataSourceName(
+ GetInfoPublishedByDataSourceNameAction.builder()
+ .setSystem(system)
+ .setDataSourceName(datasourceName)
+ .setUser(username)
+ .build())
+ .getDataSource();
+
+ return queryDatasourceParam(datasourceName, dataSource);
+ }
+
+ private static Map<String, String> queryDatasourceParam(
+ String datasourceName, DataSource dataSource) {
+ Map<String, String> paramMap = new HashMap<>();
+
+ if (dataSource == null) {
+ logger.warn("Doris dataSource is null: {}", datasourceName);
+ return paramMap;
+ }
+
+ if (dataSource.isExpire()) {
+ logger.warn("Doris dataSource of datasource name: {} is expired",
datasourceName);
+ return paramMap;
+ }
+
+ Map<String, Object> connectParams = dataSource.getConnectParams();
+ if (MapUtils.isEmpty(connectParams)) {
+ logger.warn("Doris dataSource connectParams is empty: {}",
datasourceName);
+ return paramMap;
+ }
+
+ paramMap.put(
+ DorisConstant.DS_JDBC_HOST,
+ String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_HOST,
"")));
+ paramMap.put(
+ DorisConstant.DS_JDBC_PORT,
+ String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_PORT,
"")));
+ paramMap.put(
+ DorisConstant.DS_JDBC_USERNAME,
+
String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_USERNAME, "")));
+ paramMap.put(
+ DorisConstant.DS_JDBC_PASSWORD,
+
String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_PASSWORD, "")));
+ paramMap.put(
+ DorisConstant.DS_JDBC_DB_NAME,
+ String.valueOf(
+ connectParams.getOrDefault(
+ DorisConstant.DS_JDBC_DB_NAME,
+ connectParams.getOrDefault(DorisConstant.DS_JDBC_DB_INSTANCE,
""))));
+
+ try {
+ HashMap<String, String> printMap = new HashMap<>();
+ printMap.putAll(paramMap);
+
+ // To hide the password and prevent leaks
+ if (printMap.containsKey(DorisConstant.DS_JDBC_PASSWORD)) {
+ printMap.put(DorisConstant.DS_JDBC_PASSWORD,
DorisConstant.DS_JDBC_PASSWORD_HIDE_VALUE);
+ }
+ String printMapString = JsonUtils.jackson().writeValueAsString(printMap);
+ logger.info("Load dataSource param: {}", printMapString);
+ } catch (JsonProcessingException e) {
+
+ }
+
+ return paramMap;
+ }
+}
diff --git
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java
index 8d6a49456..dd0a88ddd 100644
---
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java
+++
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java
@@ -29,6 +29,7 @@ import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecution
import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.engineplugin.doris.conf.DorisConfiguration;
import org.apache.linkis.engineplugin.doris.conf.DorisEngineConf;
+import org.apache.linkis.engineplugin.doris.constant.DorisConstant;
import org.apache.linkis.engineplugin.doris.errorcode.DorisErrorCodeSummary;
import org.apache.linkis.engineplugin.doris.exception.DorisException;
import org.apache.linkis.engineplugin.doris.exception.DorisParameterException;
@@ -109,12 +110,17 @@ public class DorisEngineConnExecutor extends
ConcurrentComputationExecutor {
private String dorisHost;
private String dorisDatabase;
+
+ private String datasourceDatabase;
private String dorisTable;
private String dorisUsername;
private String dorisPassword;
private String dorisStreamLoadFilePath;
private Integer dorisHttpPort;
+
+ private Integer dorisJdbcPort;
+
private CloseableHttpClient client;
public DorisEngineConnExecutor(int outputPrintLimit, int id) {
@@ -157,8 +163,6 @@ public class DorisEngineConnExecutor extends
ConcurrentComputationExecutor {
.forEach(entry -> configMap.put(entry.getKey(),
String.valueOf(entry.getValue())));
}
- checkParameter();
-
this.client =
HttpClients.custom()
.setRedirectStrategy(
@@ -174,6 +178,8 @@ public class DorisEngineConnExecutor extends
ConcurrentComputationExecutor {
@Override
public ExecuteResponse executeLine(EngineExecutionContext
engineExecutorContext, String code) {
+ checkParameter(engineExecutorContext);
+
String realCode;
if (StringUtils.isBlank(code)) {
throw new DorisException(
@@ -292,7 +298,6 @@ public class DorisEngineConnExecutor extends
ConcurrentComputationExecutor {
String dorisColumns = DorisConfiguration.DORIS_COLUMNS.getValue(configMap);
if (StringUtils.isBlank(dorisColumns)) {
- Integer dorisJdbcPort =
DorisConfiguration.DORIS_JDBC_PORT.getValue(configMap);
List<String> dorisCloumns =
DorisUtils.getDorisCloumns(
dorisHost, dorisJdbcPort, dorisUsername, dorisPassword,
dorisDatabase, dorisTable);
@@ -385,24 +390,90 @@ public class DorisEngineConnExecutor extends
ConcurrentComputationExecutor {
return false;
}
- private void checkParameter() {
- String dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap);
- String dorisUsername =
DorisConfiguration.DORIS_USER_NAME.getValue(configMap);
- Integer dorisHttpPort =
DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap);
+ private void checkParameter(EngineExecutionContext engineExecutorContext) {
+ this.dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap);
+ this.dorisUsername =
DorisConfiguration.DORIS_USER_NAME.getValue(configMap);
+ this.dorisHttpPort =
DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap);
+ this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap);
+ this.dorisJdbcPort =
DorisConfiguration.DORIS_JDBC_PORT.getValue(configMap);
- if (StringUtils.isBlank(dorisHost)
- || StringUtils.isBlank(dorisUsername)
- || dorisHttpPort == null) {
+ String dorisDatasourceName =
DorisConfiguration.DORIS_DATASOURCE.getValue(configMap);
+
+ // Data source parameters fail to be obtained, and task running cannot be
affected
+ // The datasource param overwrites the DorisConfiguration param
+ try {
+ if (StringUtils.isNotBlank(dorisDatasourceName)) {
+ String dorisSystemQueryParam =
+
DorisConfiguration.DORIS_DATASOURCE_SYSTEM_QUERY_PARAM.getValue(configMap);
+ String execSqlUser = getExecSqlUser(engineExecutorContext);
+
+ Map<String, String> dataSourceParamMap =
+ DorisDatasourceParser.queryDatasourceInfoByName(
+ dorisDatasourceName, execSqlUser, dorisSystemQueryParam);
+
+ if (MapUtils.isNotEmpty(dataSourceParamMap)) {
+ if (dataSourceParamMap.containsKey(DS_JDBC_HOST)
+ && StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_HOST)))
{
+ this.dorisHost = dataSourceParamMap.get(DS_JDBC_HOST);
+ }
+
+ if (dataSourceParamMap.containsKey(DS_JDBC_USERNAME)
+ &&
StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_USERNAME))) {
+ this.dorisUsername = dataSourceParamMap.get(DS_JDBC_USERNAME);
+ }
+
+ if (dataSourceParamMap.containsKey(DS_JDBC_PASSWORD)) {
+ this.dorisPassword = dataSourceParamMap.get(DS_JDBC_PASSWORD);
+ }
+
+ if (dataSourceParamMap.containsKey(DS_JDBC_PORT)
+ && StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_PORT)))
{
+ this.dorisJdbcPort =
Integer.valueOf(dataSourceParamMap.get(DS_JDBC_PORT));
+ }
+
+ if (dataSourceParamMap.containsKey(DS_JDBC_DB_NAME)
+ &&
StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_DB_NAME))) {
+ this.datasourceDatabase = dataSourceParamMap.get(DS_JDBC_DB_NAME);
+ }
+ } else {
+ logger.warn(
+ "Doris dataSource {} param is null, Skip get doris dataSource
parameters",
+ dorisDatasourceName);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("get doris dataSource {} param failed",
dorisDatasourceName, e);
+ }
+
+ if (StringUtils.isBlank(this.dorisHost)
+ || StringUtils.isBlank(this.dorisUsername)
+ || this.dorisHttpPort == null) {
logger.error("Doris check param failed.");
throw new DorisParameterException(
DorisErrorCodeSummary.CHECK_DORIS_PARAMETER_FAILED.getErrorCode(),
DorisErrorCodeSummary.CHECK_DORIS_PARAMETER_FAILED.getErrorDesc());
}
- this.dorisHost = dorisHost;
- this.dorisUsername = dorisUsername;
- this.dorisHttpPort = dorisHttpPort;
- this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap);
+ logger.info(
+ "Doris parameter dorisHost: {}, dorisUsername: {}, dorisPassword: {},
dorisHttpPort: {}, dorisJdbcPort: {}.",
+ this.dorisHost,
+ this.dorisUsername,
+ DorisConstant.DS_JDBC_PASSWORD_HIDE_VALUE,
+ this.dorisHttpPort,
+ this.dorisJdbcPort);
+ }
+
+ private String getExecSqlUser(EngineExecutionContext engineExecutionContext)
{
+ UserCreatorLabel userCreatorLabel =
+ (UserCreatorLabel)
+ Arrays.stream(engineExecutionContext.getLabels())
+ .filter(label -> label instanceof UserCreatorLabel)
+ .findFirst()
+ .orElse(null);
+ if (userCreatorLabel != null) {
+ return userCreatorLabel.getUser();
+ }
+ return null;
}
private void checkRequiredParameter(String code) {
@@ -417,14 +488,20 @@ public class DorisEngineConnExecutor extends
ConcurrentComputationExecutor {
DorisErrorCodeSummary.DORIS_CODE_FAILED_TO_CONVERT_JSON.getErrorDesc());
}
- String dorisStreamLoadFilePath =
+ this.dorisStreamLoadFilePath =
codeMap.getOrDefault(DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(), "");
- String dorisDatabase =
codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), "");
- String dorisTable =
codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), "");
+ this.dorisTable =
codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), "");
+ this.dorisDatabase =
codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), "");
+
+ String dorisDatasourceName =
DorisConfiguration.DORIS_DATASOURCE.getValue(configMap);
+ // The datasource param overwrites the DorisConfiguration param
+ if (StringUtils.isNotBlank(dorisDatasourceName) &&
StringUtils.isNotBlank(datasourceDatabase)) {
+ this.dorisDatabase = datasourceDatabase;
+ }
- if (StringUtils.isBlank(dorisStreamLoadFilePath)
- || StringUtils.isBlank(dorisDatabase)
- || StringUtils.isBlank(dorisTable)) {
+ if (StringUtils.isBlank(this.dorisStreamLoadFilePath)
+ || StringUtils.isBlank(this.dorisDatabase)
+ || StringUtils.isBlank(this.dorisTable)) {
logger.error(
"Check whether `{}`, `{}`, and `{}` are included in code json",
DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(),
@@ -435,9 +512,6 @@ public class DorisEngineConnExecutor extends
ConcurrentComputationExecutor {
DorisErrorCodeSummary.DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK.getErrorDesc());
}
- this.dorisStreamLoadFilePath = dorisStreamLoadFilePath;
- this.dorisDatabase = dorisDatabase;
- this.dorisTable = dorisTable;
logger.info(
"Doris parameter dorisStreamLoadFilePath: {}, dorisDatabase: {},
dorisTable: {}.",
this.dorisStreamLoadFilePath,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]