This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 59e526def Doris ec supports dataSources (#5060)
59e526def is described below

commit 59e526def1247f87cf70e48fb3d0175bf4c40d32
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Dec 29 10:56:36 2023 +0800

    Doris ec supports dataSources (#5060)
    
    * Doris ec supports dataSources
    
    * Optimized code
---
 .../doris/conf/DorisConfiguration.java             |   6 ++
 .../engineplugin/doris/constant/DorisConstant.java |  14 +++
 .../doris/executor/DorisDatasourceParser.java      | 117 ++++++++++++++++++++
 .../doris/executor/DorisEngineConnExecutor.java    | 120 +++++++++++++++++----
 4 files changed, 234 insertions(+), 23 deletions(-)

diff --git 
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java
 
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java
index 24b80f734..4d691a9c3 100644
--- 
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java
+++ 
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java
@@ -72,6 +72,12 @@ public class DorisConfiguration {
   public static final CommonVars<String> DORIS_PASSWORD =
       CommonVars.apply("linkis.ec.doris.password", "");
 
+  public static final CommonVars<String> DORIS_DATASOURCE =
+      CommonVars.apply("linkis.ec.doris.datasource", "");
+
+  public static final CommonVars<String> DORIS_DATASOURCE_SYSTEM_QUERY_PARAM =
+      CommonVars.apply("linkis.ec.doris.datasource.systemQueryParam", "");
+
   public static final CommonVars<Boolean> DORIS_RECONNECT_ENABLED =
       CommonVars.apply("linkis.ec.doris.2pc.enabled", false, "two phase commit 
Whether to enable");
 
diff --git 
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java
 
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java
index 771d51a67..6eb556c88 100644
--- 
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java
+++ 
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java
@@ -56,4 +56,18 @@ public class DorisConstant {
   public static final String TXN_OPERATION = "txn_operation";
 
   public static final Integer HTTP_SUCCEED = 200;
+
+  public static final String DS_JDBC_HOST = "host";
+
+  public static final String DS_JDBC_PORT = "port";
+
+  public static final String DS_JDBC_DB_NAME = "databaseName";
+
+  public static final String DS_JDBC_DB_INSTANCE = "instance";
+
+  public static final String DS_JDBC_USERNAME = "username";
+
+  public static final String DS_JDBC_PASSWORD = "password";
+
+  public static final String DS_JDBC_PASSWORD_HIDE_VALUE = "******";
 }
diff --git 
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisDatasourceParser.java
 
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisDatasourceParser.java
new file mode 100644
index 000000000..b9a0986d2
--- /dev/null
+++ 
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisDatasourceParser.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.doris.executor;
+
+import org.apache.linkis.common.utils.JsonUtils;
+import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient;
+import 
org.apache.linkis.datasource.client.request.GetInfoPublishedByDataSourceNameAction;
+import org.apache.linkis.datasourcemanager.common.domain.DataSource;
+import org.apache.linkis.engineplugin.doris.constant.DorisConstant;
+
+import org.apache.commons.collections.MapUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DorisDatasourceParser {
+  private static final Logger logger = 
LoggerFactory.getLogger(DorisDatasourceParser.class);
+
+  public static Map<String, String> queryDatasourceInfoByName(
+      String datasourceName, String username, String system) {
+    logger.info(
+        "Starting query ["
+            + system
+            + ", "
+            + username
+            + ", "
+            + datasourceName
+            + "] datasource info ......");
+    LinkisDataSourceRemoteClient dataSourceClient = new 
LinkisDataSourceRemoteClient();
+    DataSource dataSource =
+        dataSourceClient
+            .getInfoPublishedByDataSourceName(
+                GetInfoPublishedByDataSourceNameAction.builder()
+                    .setSystem(system)
+                    .setDataSourceName(datasourceName)
+                    .setUser(username)
+                    .build())
+            .getDataSource();
+
+    return queryDatasourceParam(datasourceName, dataSource);
+  }
+
+  private static Map<String, String> queryDatasourceParam(
+      String datasourceName, DataSource dataSource) {
+    Map<String, String> paramMap = new HashMap<>();
+
+    if (dataSource == null) {
+      logger.warn("Doris dataSource is null: {}", datasourceName);
+      return paramMap;
+    }
+
+    if (dataSource.isExpire()) {
+      logger.warn("Doris dataSource of datasource name: {} is expired", 
datasourceName);
+      return paramMap;
+    }
+
+    Map<String, Object> connectParams = dataSource.getConnectParams();
+    if (MapUtils.isEmpty(connectParams)) {
+      logger.warn("Doris dataSource connectParams is empty: {}", 
datasourceName);
+      return paramMap;
+    }
+
+    paramMap.put(
+        DorisConstant.DS_JDBC_HOST,
+        String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_HOST, 
"")));
+    paramMap.put(
+        DorisConstant.DS_JDBC_PORT,
+        String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_PORT, 
"")));
+    paramMap.put(
+        DorisConstant.DS_JDBC_USERNAME,
+        
String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_USERNAME, "")));
+    paramMap.put(
+        DorisConstant.DS_JDBC_PASSWORD,
+        
String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_PASSWORD, "")));
+    paramMap.put(
+        DorisConstant.DS_JDBC_DB_NAME,
+        String.valueOf(
+            connectParams.getOrDefault(
+                DorisConstant.DS_JDBC_DB_NAME,
+                connectParams.getOrDefault(DorisConstant.DS_JDBC_DB_INSTANCE, 
""))));
+
+    try {
+      HashMap<String, String> printMap = new HashMap<>();
+      printMap.putAll(paramMap);
+
+      // To hide the password and prevent leaks
+      if (printMap.containsKey(DorisConstant.DS_JDBC_PASSWORD)) {
+        printMap.put(DorisConstant.DS_JDBC_PASSWORD, 
DorisConstant.DS_JDBC_PASSWORD_HIDE_VALUE);
+      }
+      String printMapString = JsonUtils.jackson().writeValueAsString(printMap);
+      logger.info("Load dataSource param: {}", printMapString);
+    } catch (JsonProcessingException e) {
+
+    }
+
+    return paramMap;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java
 
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java
index 8d6a49456..dd0a88ddd 100644
--- 
a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java
+++ 
b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java
@@ -29,6 +29,7 @@ import 
org.apache.linkis.engineconn.computation.executor.execute.EngineExecution
 import org.apache.linkis.engineconn.core.EngineConnObject;
 import org.apache.linkis.engineplugin.doris.conf.DorisConfiguration;
 import org.apache.linkis.engineplugin.doris.conf.DorisEngineConf;
+import org.apache.linkis.engineplugin.doris.constant.DorisConstant;
 import org.apache.linkis.engineplugin.doris.errorcode.DorisErrorCodeSummary;
 import org.apache.linkis.engineplugin.doris.exception.DorisException;
 import org.apache.linkis.engineplugin.doris.exception.DorisParameterException;
@@ -109,12 +110,17 @@ public class DorisEngineConnExecutor extends 
ConcurrentComputationExecutor {
 
   private String dorisHost;
   private String dorisDatabase;
+
+  private String datasourceDatabase;
   private String dorisTable;
   private String dorisUsername;
   private String dorisPassword;
 
   private String dorisStreamLoadFilePath;
   private Integer dorisHttpPort;
+
+  private Integer dorisJdbcPort;
+
   private CloseableHttpClient client;
 
   public DorisEngineConnExecutor(int outputPrintLimit, int id) {
@@ -157,8 +163,6 @@ public class DorisEngineConnExecutor extends 
ConcurrentComputationExecutor {
           .forEach(entry -> configMap.put(entry.getKey(), 
String.valueOf(entry.getValue())));
     }
 
-    checkParameter();
-
     this.client =
         HttpClients.custom()
             .setRedirectStrategy(
@@ -174,6 +178,8 @@ public class DorisEngineConnExecutor extends 
ConcurrentComputationExecutor {
 
   @Override
   public ExecuteResponse executeLine(EngineExecutionContext 
engineExecutorContext, String code) {
+    checkParameter(engineExecutorContext);
+
     String realCode;
     if (StringUtils.isBlank(code)) {
       throw new DorisException(
@@ -292,7 +298,6 @@ public class DorisEngineConnExecutor extends 
ConcurrentComputationExecutor {
     String dorisColumns = DorisConfiguration.DORIS_COLUMNS.getValue(configMap);
 
     if (StringUtils.isBlank(dorisColumns)) {
-      Integer dorisJdbcPort = 
DorisConfiguration.DORIS_JDBC_PORT.getValue(configMap);
       List<String> dorisCloumns =
           DorisUtils.getDorisCloumns(
               dorisHost, dorisJdbcPort, dorisUsername, dorisPassword, 
dorisDatabase, dorisTable);
@@ -385,24 +390,90 @@ public class DorisEngineConnExecutor extends 
ConcurrentComputationExecutor {
     return false;
   }
 
-  private void checkParameter() {
-    String dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap);
-    String dorisUsername = 
DorisConfiguration.DORIS_USER_NAME.getValue(configMap);
-    Integer dorisHttpPort = 
DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap);
+  private void checkParameter(EngineExecutionContext engineExecutorContext) {
+    this.dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap);
+    this.dorisUsername = 
DorisConfiguration.DORIS_USER_NAME.getValue(configMap);
+    this.dorisHttpPort = 
DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap);
+    this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap);
+    this.dorisJdbcPort = 
DorisConfiguration.DORIS_JDBC_PORT.getValue(configMap);
 
-    if (StringUtils.isBlank(dorisHost)
-        || StringUtils.isBlank(dorisUsername)
-        || dorisHttpPort == null) {
+    String dorisDatasourceName = 
DorisConfiguration.DORIS_DATASOURCE.getValue(configMap);
+
+    // Data source parameters fail to be obtained, and task running cannot be 
affected
+    // The datasource  param overwrites the DorisConfiguration param
+    try {
+      if (StringUtils.isNotBlank(dorisDatasourceName)) {
+        String dorisSystemQueryParam =
+            
DorisConfiguration.DORIS_DATASOURCE_SYSTEM_QUERY_PARAM.getValue(configMap);
+        String execSqlUser = getExecSqlUser(engineExecutorContext);
+
+        Map<String, String> dataSourceParamMap =
+            DorisDatasourceParser.queryDatasourceInfoByName(
+                dorisDatasourceName, execSqlUser, dorisSystemQueryParam);
+
+        if (MapUtils.isNotEmpty(dataSourceParamMap)) {
+          if (dataSourceParamMap.containsKey(DS_JDBC_HOST)
+              && StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_HOST))) 
{
+            this.dorisHost = dataSourceParamMap.get(DS_JDBC_HOST);
+          }
+
+          if (dataSourceParamMap.containsKey(DS_JDBC_USERNAME)
+              && 
StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_USERNAME))) {
+            this.dorisUsername = dataSourceParamMap.get(DS_JDBC_USERNAME);
+          }
+
+          if (dataSourceParamMap.containsKey(DS_JDBC_PASSWORD)) {
+            this.dorisPassword = dataSourceParamMap.get(DS_JDBC_PASSWORD);
+          }
+
+          if (dataSourceParamMap.containsKey(DS_JDBC_PORT)
+              && StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_PORT))) 
{
+            this.dorisJdbcPort = 
Integer.valueOf(dataSourceParamMap.get(DS_JDBC_PORT));
+          }
+
+          if (dataSourceParamMap.containsKey(DS_JDBC_DB_NAME)
+              && 
StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_DB_NAME))) {
+            this.datasourceDatabase = dataSourceParamMap.get(DS_JDBC_DB_NAME);
+          }
+        } else {
+          logger.warn(
+              "Doris dataSource {} param is null, Skip get doris dataSource 
parameters",
+              dorisDatasourceName);
+        }
+      }
+    } catch (Exception e) {
+      logger.error("get doris dataSource {} param failed", 
dorisDatasourceName, e);
+    }
+
+    if (StringUtils.isBlank(this.dorisHost)
+        || StringUtils.isBlank(this.dorisUsername)
+        || this.dorisHttpPort == null) {
       logger.error("Doris check param failed.");
       throw new DorisParameterException(
           DorisErrorCodeSummary.CHECK_DORIS_PARAMETER_FAILED.getErrorCode(),
           DorisErrorCodeSummary.CHECK_DORIS_PARAMETER_FAILED.getErrorDesc());
     }
 
-    this.dorisHost = dorisHost;
-    this.dorisUsername = dorisUsername;
-    this.dorisHttpPort = dorisHttpPort;
-    this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap);
+    logger.info(
+        "Doris parameter dorisHost: {}, dorisUsername: {}, dorisPassword: {}, 
dorisHttpPort: {}, dorisJdbcPort: {}.",
+        this.dorisHost,
+        this.dorisUsername,
+        DorisConstant.DS_JDBC_PASSWORD_HIDE_VALUE,
+        this.dorisHttpPort,
+        this.dorisJdbcPort);
+  }
+
+  private String getExecSqlUser(EngineExecutionContext engineExecutionContext) 
{
+    UserCreatorLabel userCreatorLabel =
+        (UserCreatorLabel)
+            Arrays.stream(engineExecutionContext.getLabels())
+                .filter(label -> label instanceof UserCreatorLabel)
+                .findFirst()
+                .orElse(null);
+    if (userCreatorLabel != null) {
+      return userCreatorLabel.getUser();
+    }
+    return null;
   }
 
   private void checkRequiredParameter(String code) {
@@ -417,14 +488,20 @@ public class DorisEngineConnExecutor extends 
ConcurrentComputationExecutor {
           
DorisErrorCodeSummary.DORIS_CODE_FAILED_TO_CONVERT_JSON.getErrorDesc());
     }
 
-    String dorisStreamLoadFilePath =
+    this.dorisStreamLoadFilePath =
         
codeMap.getOrDefault(DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(), "");
-    String dorisDatabase = 
codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), "");
-    String dorisTable = 
codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), "");
+    this.dorisTable = 
codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), "");
+    this.dorisDatabase = 
codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), "");
+
+    String dorisDatasourceName = 
DorisConfiguration.DORIS_DATASOURCE.getValue(configMap);
+    // The datasource  param overwrites the DorisConfiguration  param
+    if (StringUtils.isNotBlank(dorisDatasourceName) && 
StringUtils.isNotBlank(datasourceDatabase)) {
+      this.dorisDatabase = datasourceDatabase;
+    }
 
-    if (StringUtils.isBlank(dorisStreamLoadFilePath)
-        || StringUtils.isBlank(dorisDatabase)
-        || StringUtils.isBlank(dorisTable)) {
+    if (StringUtils.isBlank(this.dorisStreamLoadFilePath)
+        || StringUtils.isBlank(this.dorisDatabase)
+        || StringUtils.isBlank(this.dorisTable)) {
       logger.error(
           "Check whether `{}`, `{}`, and `{}` are included in code json",
           DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(),
@@ -435,9 +512,6 @@ public class DorisEngineConnExecutor extends 
ConcurrentComputationExecutor {
           
DorisErrorCodeSummary.DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK.getErrorDesc());
     }
 
-    this.dorisStreamLoadFilePath = dorisStreamLoadFilePath;
-    this.dorisDatabase = dorisDatabase;
-    this.dorisTable = dorisTable;
     logger.info(
         "Doris parameter dorisStreamLoadFilePath: {}, dorisDatabase: {}, 
dorisTable: {}.",
         this.dorisStreamLoadFilePath,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to