This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 4559fb1e820200e67563ebb601b804af99fb782a
Author: casionone <[email protected]>
AuthorDate: Fri Jul 22 22:53:15 2022 +0800

    optimize presto engineconn
---
 linkis-engineconn-plugins/pom.xml                  |  1 +
 .../presto/PrestoEngineConnPlugin.scala            | 24 ++++----
 .../presto/conf/PrestoConfiguration.scala          |  2 +-
 .../presto/conf/PrestoEngineConf.scala             | 36 ++++++++++++
 .../presto/executer/PrestoEngineConnExecutor.scala | 47 +++++++++++-----
 .../presto/factory/PrestoEngineConnFactory.scala   |  4 +-
 .../engineplugin/presto/utils/SqlCodeParser.scala  | 65 ++++++++++++++++++++++
 7 files changed, 151 insertions(+), 28 deletions(-)

diff --git a/linkis-engineconn-plugins/pom.xml 
b/linkis-engineconn-plugins/pom.xml
index 9a810df1a..785210cc9 100644
--- a/linkis-engineconn-plugins/pom.xml
+++ b/linkis-engineconn-plugins/pom.xml
@@ -40,6 +40,7 @@
         <module>jdbc</module>
         <module>flink</module>
         <module>sqoop</module>
+        <module>presto</module>
     </modules>
 
 </project>
\ No newline at end of file
diff --git 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
index 063e75568..69cca75fc 100644
--- 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
+++ 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
@@ -18,33 +18,37 @@ package org.apache.linkis.engineplugin.presto
 
 import java.util
 
+import 
org.apache.linkis.engineplugin.presto.builder.PrestoProcessEngineConnLaunchBuilder
+import org.apache.linkis.engineplugin.presto.factory.PrestoEngineConnFactory
 import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
 import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
 import 
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
 import 
org.apache.linkis.manager.engineplugin.common.resource.{EngineResourceFactory, 
GenericEngineResourceFactory}
 import org.apache.linkis.manager.label.entity.Label
-import 
org.apache.linkis.engineplugin.presto.builder.PrestoProcessEngineConnLaunchBuilder
-import org.apache.linkis.engineplugin.presto.factory.PrestoEngineConnFactory
 
 class PrestoEngineConnPlugin extends EngineConnPlugin {
 
+  private val resourceLocker = new Object()
+
+  private val engineLaunchBuilderLocker = new Object()
+
+  private val engineFactoryLocker = new Object()
+
   private var engineResourceFactory: EngineResourceFactory = _
 
+  private var engineLaunchBuilder: EngineConnLaunchBuilder = _
+
   private var engineFactory: EngineConnFactory = _
 
   private val defaultLabels: util.List[Label[_]] = new 
util.ArrayList[Label[_]]()
 
-  private val resourceLocker = new Array[Byte](0)
-
-  private val engineFactoryLocker = new Array[Byte](0)
-
   override def init(params: util.Map[String, Any]): Unit = {
 
   }
 
   override def getEngineResourceFactory: EngineResourceFactory = {
-    if (null == engineResourceFactory) resourceLocker.synchronized {
-      if (null == engineResourceFactory) engineResourceFactory = new 
GenericEngineResourceFactory
+    if (null == engineResourceFactory) resourceLocker synchronized {
+      engineResourceFactory = new GenericEngineResourceFactory
     }
     engineResourceFactory
   }
@@ -54,8 +58,8 @@ class PrestoEngineConnPlugin extends EngineConnPlugin {
   }
 
   override def getEngineConnFactory: EngineConnFactory = {
-    if (null == engineFactory) engineFactoryLocker.synchronized {
-      if (null == engineFactory) engineFactory = new PrestoEngineConnFactory
+    if (null == engineFactory) engineFactoryLocker synchronized {
+      engineFactory = new PrestoEngineConnFactory
     }
     engineFactory
   }
diff --git 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
index bbdd5030f..86511a29b 100644
--- 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
+++ 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
@@ -28,7 +28,7 @@ object PrestoConfiguration {
   val ENTRANCE_PROTECTED_JOB_INSTANCE = 
CommonVars[Int]("wds.linkis.entrance.protected.job.instance", 0)
   val ENTRANCE_RESULTS_MAX_CACHE = 
CommonVars("wds.linkis.presto.resultSet.cache.max", new ByteType("512k"))
 
-  val PRESTO_HTTP_CONNECT_TIME_OUT = 
CommonVars[java.lang.Long]("wds.linkis.presto.http.connectTimeout", new 
lang.Long(60))
+  val PRESTO_HTTP_CONNECT_TIME_OUT = 
CommonVars[java.lang.Long]("wds.linkis.presto.http.connectTimeout", new 
lang.Long(60)) // unit in seconds
   val PRESTO_HTTP_READ_TIME_OUT = 
CommonVars[java.lang.Long]("wds.linkis.presto.http.readTimeout", new 
lang.Long(60))
 
 
diff --git 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.scala
 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.scala
new file mode 100644
index 000000000..a051ee706
--- /dev/null
+++ 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.conf
+
+import java.util
+
+import org.apache.linkis.common.conf.Configuration
+import 
org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfigWithGlobalConfig,
 ResponseQueryConfig}
+import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, 
UserCreatorLabel}
+import org.apache.linkis.protocol.CacheableProtocol
+import org.apache.linkis.rpc.RPCMapCache
+
+object PrestoEngineConf extends RPCMapCache[(UserCreatorLabel, 
EngineTypeLabel), String, 
String](Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue)
 {
+
+  override protected def createRequest(labelTuple: (UserCreatorLabel, 
EngineTypeLabel)): CacheableProtocol = {
+    RequestQueryEngineConfigWithGlobalConfig(labelTuple._1, labelTuple._2)
+  }
+  override protected def createMap(any: Any): util.Map[String, String] = any 
match {
+    case response: ResponseQueryConfig => response.getKeyAndValue
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
index 599e5a677..220e113b8 100644
--- 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
@@ -17,7 +17,6 @@
 package org.apache.linkis.engineplugin.presto.executer
 
 import java.net.URI
-import java.sql.SQLException
 import java.util
 import java.util._
 import java.util.concurrent.TimeUnit
@@ -27,6 +26,7 @@ import com.facebook.presto.spi.security.SelectedRole
 import com.google.common.cache.{Cache, CacheBuilder}
 import okhttp3.OkHttpClient
 import org.apache.commons.io.IOUtils
+import org.apache.commons.lang.exception.ExceptionUtils
 import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{OverloadUtils, Utils}
 import org.apache.linkis.engineconn.common.conf.{EngineConnConf, 
EngineConnConstant}
@@ -34,15 +34,17 @@ import 
org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
 import 
org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor,
 EngineExecutionContext}
 import org.apache.linkis.engineconn.core.EngineConnObject
 import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration._
+import org.apache.linkis.engineplugin.presto.conf.PrestoEngineConf
 import org.apache.linkis.engineplugin.presto.exception.{PrestoClientException, 
PrestoStateInvalidException}
+import org.apache.linkis.engineplugin.presto.utils.SqlCodeParser
 import org.apache.linkis.governance.common.paser.SQLCodeParser
 import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, 
LoadResource, NodeResource}
 import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
 import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, 
UserCreatorLabel}
 import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{ExecuteResponse, 
SuccessExecuteResponse}
+import org.apache.linkis.scheduler.executer.{ErrorExecuteResponse, 
ExecuteResponse, SuccessExecuteResponse}
 import org.apache.linkis.storage.domain.Column
 import org.apache.linkis.storage.resultset.ResultSetFactory
 import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
@@ -67,13 +69,19 @@ class PrestoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
 
   override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
     val user = getUserCreatorLabel(engineConnTask.getLables).getUser
-    clientSessionCache.put(engineConnTask.getTaskId, getClientSession(user, 
engineConnTask.getProperties))
+    val userCreatorLabel = 
engineConnTask.getLables.find(_.isInstanceOf[UserCreatorLabel]).get
+    val engineTypeLabel = 
engineConnTask.getLables.find(_.isInstanceOf[EngineTypeLabel]).get
+    var configMap: util.Map[String, String] = null
+    if (userCreatorLabel != null && engineTypeLabel != null) {
+      configMap = 
PrestoEngineConf.getCacheMap((userCreatorLabel.asInstanceOf[UserCreatorLabel], 
engineTypeLabel.asInstanceOf[EngineTypeLabel]))
+    }
+    clientSessionCache.put(engineConnTask.getTaskId, getClientSession(user, 
engineConnTask.getProperties, configMap))
     super.execute(engineConnTask)
   }
 
   override def executeLine(engineExecutorContext: EngineExecutionContext, 
code: String): ExecuteResponse = {
-    val realCode = code.trim
-    info(s"presto client begins to run psql code:\n $realCode")
+    val realCode = SqlCodeParser.parse(code.trim)
+    logger.info(s"presto client begins to run psql code:\n $realCode")
 
     val taskId = engineExecutorContext.getJobId.get
 
@@ -86,16 +94,20 @@ class PrestoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
       queryOutput(taskId, engineExecutorContext, statement)
     }
 
-    verifyServerError(taskId, engineExecutorContext, statement)
-
-    // update session
-    clientSessionCache.put(taskId, updateSession(clientSession, statement))
+    val errorResponse = verifyServerError(taskId, engineExecutorContext, 
statement)
+    if (errorResponse == null) {
+      // update session
+      clientSessionCache.put(taskId, updateSession(clientSession, statement))
+      SuccessExecuteResponse()
+    } else {
+      errorResponse
+    }
 
-    SuccessExecuteResponse()
   }
 
   override def executeCompletely(engineExecutorContext: 
EngineExecutionContext, code: String, completedLine: String): ExecuteResponse = 
null
 
+  // todo
   override def progress(taskID: String): Float = 0.0f
 
   override def getProgressInfo(taskID: String): Array[JobProgressInfo] = 
Array.empty[JobProgressInfo]
@@ -133,8 +145,10 @@ class PrestoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
 
   override def getConcurrentLimit: Int = ENGINE_CONCURRENT_LIMIT.getValue
 
-  private def getClientSession(user: String, taskParams : util.Map[String, 
Object]): ClientSession = {
+  private def getClientSession(user: String, taskParams: util.Map[String, 
Object], cacheMap: util.Map[String, String]): ClientSession = {
     val configMap = new util.HashMap[String, String]()
+    // 运行时指定的参数优先级大于管理台配置优先级
+    if (!CollectionUtils.isEmpty(cacheMap)) configMap.putAll(cacheMap)
     taskParams.asScala.foreach {
       case (key: String, value: Object) if value != null => configMap.put(key, 
String.valueOf(value))
       case _ =>
@@ -159,6 +173,7 @@ class PrestoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
     val preparedStatements: util.Map[String, String] = Collections.emptyMap()
     val roles: java.util.Map[String, SelectedRole] = Collections.emptyMap()
     val extraCredentials: util.Map[String, String] = Collections.emptyMap()
+
     val clientRequestTimeout: io.airlift.units.Duration = new 
io.airlift.units.Duration(0, TimeUnit.MILLISECONDS)
 
     new ClientSession(httpUri, user, source, traceToken, clientTags, 
clientInfo, catalog, schema, timeZonId, locale,
@@ -217,7 +232,7 @@ class PrestoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
   }
 
   // check presto error
-  private def verifyServerError(taskId: String, engineExecutorContext: 
EngineExecutionContext, statement: StatementClient): Unit = {
+  private def verifyServerError(taskId: String, engineExecutorContext: 
EngineExecutionContext, statement: StatementClient): ErrorExecuteResponse = {
     engineExecutorContext.pushProgress(progress(taskId), 
getProgressInfo(taskId))
     if (statement.isFinished) {
       val info: QueryStatusInfo = statement.finalStatusInfo()
@@ -228,10 +243,12 @@ class PrestoEngineConnExecutor(override val 
outputPrintLimit: Int, val id: Int)
         if (error.getFailureInfo != null) {
           cause = error.getFailureInfo.toException
         }
-        throw new SQLException(message, error.getSqlState, error.getErrorCode, 
cause)
-      }
+        
engineExecutorContext.appendStdout(LogUtils.generateERROR(ExceptionUtils.getFullStackTrace(cause)))
+        ErrorExecuteResponse(ExceptionUtils.getMessage(cause), cause)
+      } else null
     } else if (statement.isClientAborted) {
       warn(s"Presto statement is killed.")
+      null
     } else if (statement.isClientError) {
       throw PrestoClientException("Presto client error.")
     } else {
diff --git 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala
 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala
index 268f2e721..90347db31 100644
--- 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala
@@ -20,11 +20,11 @@ import 
org.apache.linkis.engineconn.common.creation.EngineCreationContext
 import org.apache.linkis.engineconn.common.engineconn.EngineConn
 import 
org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory
 import org.apache.linkis.engineconn.executor.entity.LabelExecutor
+import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration
+import org.apache.linkis.engineplugin.presto.executer.PrestoEngineConnExecutor
 import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
 import org.apache.linkis.manager.label.entity.engine.RunType.RunType
 import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
-import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration
-import org.apache.linkis.engineplugin.presto.executer.PrestoEngineConnExecutor
 
 class PrestoEngineConnFactory extends 
ComputationSingleExecutorEngineConnFactory {
 
diff --git 
a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/utils/SqlCodeParser.scala
 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/utils/SqlCodeParser.scala
new file mode 100644
index 000000000..6b16a72e0
--- /dev/null
+++ 
b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/utils/SqlCodeParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.utils
+
+import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration
+import org.apache.commons.lang3.StringUtils
+
+import scala.collection.mutable.ArrayBuffer
+
+
+object SqlCodeParser {
+  val separator = ";"
+  val subCol = "SELECT \"SUBCOL\" AS \"COL\" FROM ( SELECT 1 AS \"SUBCOL\" ) 
\"SUBQUERY\" GROUP BY \"COL\""
+
+  def parse(code: String): String = {
+    val codeBuffer = new ArrayBuffer[String]()
+
+    def appendStatement(sqlStatement: String): Unit = {
+      codeBuffer.append(sqlStatement)
+    }
+
+    if (StringUtils.contains(code, separator)) {
+      StringUtils.split(code, ";").foreach {
+        case s if StringUtils.isBlank(s) =>
+        case s => appendStatement(s.replaceAll("`", "\""));
+      }
+    } else {
+      code match {
+        case s if StringUtils.isBlank(s) =>
+        case s =>
+          val pattern = """`[a-zA-Z_0-9 ]+`""".r.unanchored
+          var tmpS = s
+          pattern.findAllIn(s).foreach(a => {
+            val s1 = a.replaceAll("\\s*", "")
+            tmpS = tmpS.replace(a, s1)
+          })
+          appendStatement(tmpS.replaceAll("`", "\""));
+      }
+    }
+
+    if(codeBuffer.size == 1) {
+      var code = codeBuffer(0)
+      code = code.trim.replaceAll("\n", " ").replaceAll("\\s+", " ")
+      if(code.contains(subCol)) {
+        codeBuffer(0) = "SELECT 1"
+      }
+    }
+
+    codeBuffer.toArray.head
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to