This is an automated email from the ASF dual-hosted git repository.

alexkun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit 7704ed1b7079b80549e3282884deac505b9fb791
Author: peacewong <[email protected]>
AuthorDate: Wed Nov 29 20:54:59 2023 +0800

    Optimize JDBC connection support cache by task id
---
 .../engineplugin/jdbc/ConnectionManager.java       | 18 +++--
 .../jdbc/executor/JDBCEngineConnExecutor.scala     | 93 +++++++++++-----------
 2 files changed, 59 insertions(+), 52 deletions(-)

diff --git 
a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
 
b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
index b9cd47945..a49613f8d 100644
--- 
a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
+++ 
b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
@@ -23,16 +23,19 @@ import 
org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConsta
 import 
org.apache.linkis.manager.engineplugin.jdbc.exception.JDBCParamsIllegalException;
 import org.apache.linkis.manager.engineplugin.jdbc.utils.JdbcParamUtils;
 
-import org.apache.commons.dbcp.BasicDataSource;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import javax.sql.DataSource;
 
+import java.io.Closeable;
 import java.security.PrivilegedExceptionAction;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.text.MessageFormat;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -42,7 +45,8 @@ import com.alibaba.druid.pool.DruidDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType.*;
+import static 
org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType.USERNAME;
+import static org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType.of;
 import static 
org.apache.linkis.manager.engineplugin.jdbc.errorcode.JDBCErrorCodeSummary.*;
 
 public class ConnectionManager {
@@ -103,8 +107,10 @@ public class ConnectionManager {
     }
     for (DataSource dataSource : this.dataSourceFactories.values()) {
       try {
-        ((BasicDataSource) dataSource).close();
-      } catch (SQLException e) {
+        if (dataSource instanceof Closeable) {
+          ((Closeable) dataSource).close();
+        }
+      } catch (Exception e) {
         LOG.error("Error while closing datasource...", e);
       }
     }
diff --git 
a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
index 336d1197f..8a2d64fa7 100644
--- 
a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executor/JDBCEngineConnExecutor.scala
@@ -19,6 +19,7 @@ package org.apache.linkis.manager.engineplugin.jdbc.executor
 
 import org.apache.linkis.common.conf.Configuration
 import org.apache.linkis.common.utils.{OverloadUtils, Utils}
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
 import org.apache.linkis.engineconn.computation.executor.execute.{
   ConcurrentComputationExecutor,
   EngineExecutionContext
@@ -78,6 +79,8 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: 
Int, val id: Int)
   private val progressMonitors: util.Map[String, ProgressMonitor[_]] =
     new ConcurrentHashMap[String, ProgressMonitor[_]]()
 
+  private val connectionCache: util.Map[String, Connection] = new 
util.HashMap[String, Connection]()
+
   override def init(): Unit = {
     logger.info("jdbc executor start init.")
     setCodeParser(new SQLCodeParser)
@@ -87,49 +90,59 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: 
Int, val id: Int)
     }
   }
 
-  override def executeLine(
-      engineExecutorContext: EngineExecutionContext,
-      code: String
-  ): ExecuteResponse = {
-    val realCode = code.trim()
-    val taskId = engineExecutorContext.getJobId.get
+  override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
+    val executeResponse = super.execute(engineConnTask)
+    if (StringUtils.isNotBlank(engineConnTask.getTaskId)) {
+      val connection = connectionCache.remove(engineConnTask.getTaskId)
+      logger.info(s"remove task ${engineConnTask.getTaskId} connection")
+      Utils.tryAndWarn(connection.close())
+    }
+    executeResponse
+  }
 
-    var properties: util.Map[String, String] = Collections.emptyMap()
+  private def getConnection(engineExecutorContext: EngineExecutionContext): 
Connection = {
 
-    Utils.tryCatch({
-      properties = getJDBCRuntimeParams(engineExecutorContext)
-    }) { e: Throwable =>
-      logger.error(s"try to build JDBC runtime params error! $e")
-      return ErrorExecuteResponse(e.getMessage, e)
+    val taskId = engineExecutorContext.getJobId.orNull
+    if (StringUtils.isNotBlank(taskId) && connectionCache.containsKey(taskId)) 
{
+      logger.info(
+        s"Task ${taskId}  paragraph 
${engineExecutorContext.getCurrentParagraph} from cache get connection"
+      )
+      return connectionCache.get(taskId)
     }
-
+    val properties: util.Map[String, String] = 
getJDBCRuntimeParams(engineExecutorContext)
     logger.info(s"The jdbc properties is: $properties")
     val dataSourceName = 
properties.get(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS)
     val dataSourceMaxVersionId =
       
properties.get(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS_MAX_VERSION_ID)
     logger.info(
-      s"The data source name is [$dataSourceName], and the jdbc client begins 
to run jdbc code:\n ${realCode.trim}"
+      s"The data source name is [$dataSourceName], and the jdbc client begins 
to run task ${taskId}"
     )
-    var connection: Connection = null
-    var statement: Statement = null
-    var resultSet: ResultSet = null
     logger.info(s"The data source properties is $properties")
-    Utils.tryCatch({
-      /* url + user as the cache key */
-      val jdbcUrl: String = properties.get(JDBCEngineConnConstant.JDBC_URL)
-      val execUser: String = 
properties.get(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER)
-      val proxyUser: String = 
properties.get(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY)
-      var dataSourceIdentifier = s"$jdbcUrl-$execUser-$proxyUser"
-      /* If datasource is used, use datasource name as the cache key */
-      if (StringUtils.isNotBlank(dataSourceName)) {
-        dataSourceIdentifier = s"$dataSourceName-$dataSourceMaxVersionId"
-      }
-      connection = connectionManager.getConnection(dataSourceIdentifier, 
properties)
-      logger.info("The jdbc connection has created successfully!")
-    }) { e: Throwable =>
-      logger.error(s"created data source connection error! $e")
-      return ErrorExecuteResponse("created data source connection error!", e)
+    /* url + user as the cache key */
+    val jdbcUrl: String = properties.get(JDBCEngineConnConstant.JDBC_URL)
+    val execUser: String = 
properties.get(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER)
+    val proxyUser: String = 
properties.get(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY)
+    var dataSourceIdentifier = s"$jdbcUrl-$execUser-$proxyUser"
+    /* If datasource is used, use datasource name as the cache key */
+    if (StringUtils.isNotBlank(dataSourceName)) {
+      dataSourceIdentifier = s"$dataSourceName-$dataSourceMaxVersionId"
+    }
+    val connection = connectionManager.getConnection(dataSourceIdentifier, 
properties)
+    if (StringUtils.isNotBlank(taskId)) {
+      connectionCache.put(taskId, connection)
     }
+    connection
+  }
+
+  override def executeLine(
+      engineExecutorContext: EngineExecutionContext,
+      code: String
+  ): ExecuteResponse = {
+
+    val taskId = engineExecutorContext.getJobId.get
+    val connection: Connection = getConnection(engineExecutorContext)
+    var statement: Statement = null
+    var resultSet: ResultSet = null
 
     try {
       statement = connection.createStatement()
@@ -167,14 +180,10 @@ class JDBCEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
         }
       } finally {
         if (resultSet != null) {
-          Utils.tryCatch({ resultSet.close() }) { case e: SQLException =>
-            logger.warn(e.getMessage)
-          }
+          Utils.tryAndWarn(resultSet.close())
         }
         if (statement != null) {
-          Utils.tryCatch({ statement.close() }) { case e: SQLException =>
-            logger.warn(e.getMessage)
-          }
+          Utils.tryAndWarn(statement.close())
         }
       }
     } catch {
@@ -182,14 +191,6 @@ class JDBCEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
         logger.error(s"Cannot run $code", e)
         return ErrorExecuteResponse(e.getMessage, e)
     } finally {
-      if (connection != null) {
-        try {
-          if (!connection.getAutoCommit) connection.commit()
-          connection.close()
-        } catch {
-          case e: SQLException => logger.warn("close connection error.", e)
-        }
-      }
       connectionManager.removeStatement(taskId)
     }
     SuccessExecuteResponse()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to