This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch dev-1.2.0 in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit 4559fb1e820200e67563ebb601b804af99fb782a Author: casionone <[email protected]> AuthorDate: Fri Jul 22 22:53:15 2022 +0800 optimize presto engineconn --- linkis-engineconn-plugins/pom.xml | 1 + .../presto/PrestoEngineConnPlugin.scala | 24 ++++---- .../presto/conf/PrestoConfiguration.scala | 2 +- .../presto/conf/PrestoEngineConf.scala | 36 ++++++++++++ .../presto/executer/PrestoEngineConnExecutor.scala | 47 +++++++++++----- .../presto/factory/PrestoEngineConnFactory.scala | 4 +- .../engineplugin/presto/utils/SqlCodeParser.scala | 65 ++++++++++++++++++++++ 7 files changed, 151 insertions(+), 28 deletions(-) diff --git a/linkis-engineconn-plugins/pom.xml b/linkis-engineconn-plugins/pom.xml index 9a810df1a..785210cc9 100644 --- a/linkis-engineconn-plugins/pom.xml +++ b/linkis-engineconn-plugins/pom.xml @@ -40,6 +40,7 @@ <module>jdbc</module> <module>flink</module> <module>sqoop</module> + <module>presto</module> </modules> </project> \ No newline at end of file diff --git a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala index 063e75568..69cca75fc 100644 --- a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala +++ b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala @@ -18,33 +18,37 @@ package org.apache.linkis.engineplugin.presto import java.util +import org.apache.linkis.engineplugin.presto.builder.PrestoProcessEngineConnLaunchBuilder +import org.apache.linkis.engineplugin.presto.factory.PrestoEngineConnFactory import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder import org.apache.linkis.manager.engineplugin.common.resource.{EngineResourceFactory, GenericEngineResourceFactory} import org.apache.linkis.manager.label.entity.Label -import org.apache.linkis.engineplugin.presto.builder.PrestoProcessEngineConnLaunchBuilder -import org.apache.linkis.engineplugin.presto.factory.PrestoEngineConnFactory class PrestoEngineConnPlugin extends EngineConnPlugin { + private val resourceLocker = new Object() + + private val engineLaunchBuilderLocker = new Object() + + private val engineFactoryLocker = new Object() + private var engineResourceFactory: EngineResourceFactory = _ + private var engineLaunchBuilder: EngineConnLaunchBuilder = _ + private var engineFactory: EngineConnFactory = _ private val defaultLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]() - private val resourceLocker = new Array[Byte](0) - - private val engineFactoryLocker = new Array[Byte](0) - override def init(params: util.Map[String, Any]): Unit = { } override def getEngineResourceFactory: EngineResourceFactory = { - if (null == engineResourceFactory) resourceLocker.synchronized { - if (null == engineResourceFactory) engineResourceFactory = new GenericEngineResourceFactory + if (null == engineResourceFactory) resourceLocker synchronized { + engineResourceFactory = new GenericEngineResourceFactory } engineResourceFactory } @@ -54,8 +58,8 @@ class PrestoEngineConnPlugin extends EngineConnPlugin { } override def getEngineConnFactory: EngineConnFactory = { - if (null == engineFactory) engineFactoryLocker.synchronized { - if (null == engineFactory) engineFactory = new PrestoEngineConnFactory + if (null == engineFactory) engineFactoryLocker synchronized { + engineFactory = new PrestoEngineConnFactory } engineFactory } diff --git a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala index bbdd5030f..86511a29b 100644 --- a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala +++ b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala @@ -28,7 +28,7 @@ object PrestoConfiguration { val ENTRANCE_PROTECTED_JOB_INSTANCE = CommonVars[Int]("wds.linkis.entrance.protected.job.instance", 0) val ENTRANCE_RESULTS_MAX_CACHE = CommonVars("wds.linkis.presto.resultSet.cache.max", new ByteType("512k")) - val PRESTO_HTTP_CONNECT_TIME_OUT = CommonVars[java.lang.Long]("wds.linkis.presto.http.connectTimeout", new lang.Long(60)) + val PRESTO_HTTP_CONNECT_TIME_OUT = CommonVars[java.lang.Long]("wds.linkis.presto.http.connectTimeout", new lang.Long(60)) // unit in seconds val PRESTO_HTTP_READ_TIME_OUT = CommonVars[java.lang.Long]("wds.linkis.presto.http.readTimeout", new lang.Long(60)) diff --git a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.scala b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.scala new file mode 100644 index 000000000..a051ee706 --- /dev/null +++ b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoEngineConf.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.engineplugin.presto.conf + +import java.util + +import org.apache.linkis.common.conf.Configuration +import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig} +import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel} +import org.apache.linkis.protocol.CacheableProtocol +import org.apache.linkis.rpc.RPCMapCache + +object PrestoEngineConf extends RPCMapCache[(UserCreatorLabel, EngineTypeLabel), String, String](Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue) { + + override protected def createRequest(labelTuple: (UserCreatorLabel, EngineTypeLabel)): CacheableProtocol = { + RequestQueryEngineConfigWithGlobalConfig(labelTuple._1, labelTuple._2) + } + override protected def createMap(any: Any): util.Map[String, String] = any match { + case response: ResponseQueryConfig => response.getKeyAndValue + } + +} diff --git a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala index 599e5a677..220e113b8 100644 --- a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala @@ -17,7 +17,6 @@ package org.apache.linkis.engineplugin.presto.executer import java.net.URI -import java.sql.SQLException import java.util import java.util._ import java.util.concurrent.TimeUnit @@ -27,6 +26,7 @@ import com.facebook.presto.spi.security.SelectedRole import com.google.common.cache.{Cache, CacheBuilder} import okhttp3.OkHttpClient import org.apache.commons.io.IOUtils +import org.apache.commons.lang.exception.ExceptionUtils import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{OverloadUtils, Utils} import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant} @@ -34,15 +34,17 @@ import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask import org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor, EngineExecutionContext} import org.apache.linkis.engineconn.core.EngineConnObject import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration._ +import org.apache.linkis.engineplugin.presto.conf.PrestoEngineConf import org.apache.linkis.engineplugin.presto.exception.{PrestoClientException, PrestoStateInvalidException} +import org.apache.linkis.engineplugin.presto.utils.SqlCodeParser import org.apache.linkis.governance.common.paser.SQLCodeParser import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, LoadResource, NodeResource} import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf import org.apache.linkis.manager.label.entity.Label -import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel +import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel, UserCreatorLabel} import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.rpc.Sender -import org.apache.linkis.scheduler.executer.{ExecuteResponse, SuccessExecuteResponse} +import org.apache.linkis.scheduler.executer.{ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse} import org.apache.linkis.storage.domain.Column import org.apache.linkis.storage.resultset.ResultSetFactory import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord} @@ -67,13 +69,19 @@ class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) override def execute(engineConnTask: EngineConnTask): ExecuteResponse = { val user = getUserCreatorLabel(engineConnTask.getLables).getUser - clientSessionCache.put(engineConnTask.getTaskId, getClientSession(user, engineConnTask.getProperties)) + val userCreatorLabel = engineConnTask.getLables.find(_.isInstanceOf[UserCreatorLabel]).get + val engineTypeLabel = engineConnTask.getLables.find(_.isInstanceOf[EngineTypeLabel]).get + var configMap: util.Map[String, String] = null + if (userCreatorLabel != null && engineTypeLabel != null) { + configMap = PrestoEngineConf.getCacheMap((userCreatorLabel.asInstanceOf[UserCreatorLabel], engineTypeLabel.asInstanceOf[EngineTypeLabel])) + } + clientSessionCache.put(engineConnTask.getTaskId, getClientSession(user, engineConnTask.getProperties, configMap)) super.execute(engineConnTask) } override def executeLine(engineExecutorContext: EngineExecutionContext, code: String): ExecuteResponse = { - val realCode = code.trim - info(s"presto client begins to run psql code:\n $realCode") + val realCode = SqlCodeParser.parse(code.trim) + logger.info(s"presto client begins to run psql code:\n $realCode") val taskId = engineExecutorContext.getJobId.get @@ -86,16 +94,20 @@ class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) queryOutput(taskId, engineExecutorContext, statement) } - verifyServerError(taskId, engineExecutorContext, statement) - - // update session - clientSessionCache.put(taskId, updateSession(clientSession, statement)) + val errorResponse = verifyServerError(taskId, engineExecutorContext, statement) + if (errorResponse == null) { + // update session + clientSessionCache.put(taskId, updateSession(clientSession, statement)) + SuccessExecuteResponse() + } else { + errorResponse + } - SuccessExecuteResponse() } override def executeCompletely(engineExecutorContext: EngineExecutionContext, code: String, completedLine: String): ExecuteResponse = null + // todo override def progress(taskID: String): Float = 0.0f override def getProgressInfo(taskID: String): Array[JobProgressInfo] = Array.empty[JobProgressInfo] @@ -133,8 +145,10 @@ class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) override def getConcurrentLimit: Int = ENGINE_CONCURRENT_LIMIT.getValue - private def getClientSession(user: String, taskParams : util.Map[String, Object]): ClientSession = { + private def getClientSession(user: String, taskParams: util.Map[String, Object], cacheMap: util.Map[String, String]): ClientSession = { val configMap = new util.HashMap[String, String]() + // 运行时指定的参数优先级大于管理台配置优先级 + if (!CollectionUtils.isEmpty(cacheMap)) configMap.putAll(cacheMap) taskParams.asScala.foreach { case (key: String, value: Object) if value != null => configMap.put(key, String.valueOf(value)) case _ => @@ -159,6 +173,7 @@ class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) val preparedStatements: util.Map[String, String] = Collections.emptyMap() val roles: java.util.Map[String, SelectedRole] = Collections.emptyMap() val extraCredentials: util.Map[String, String] = Collections.emptyMap() + val clientRequestTimeout: io.airlift.units.Duration = new io.airlift.units.Duration(0, TimeUnit.MILLISECONDS) new ClientSession(httpUri, user, source, traceToken, clientTags, clientInfo, catalog, schema, timeZonId, locale, @@ -217,7 +232,7 @@ class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) } // check presto error - private def verifyServerError(taskId: String, engineExecutorContext: EngineExecutionContext, statement: StatementClient): Unit = { + private def verifyServerError(taskId: String, engineExecutorContext: EngineExecutionContext, statement: StatementClient): ErrorExecuteResponse = { engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)) if (statement.isFinished) { val info: QueryStatusInfo = statement.finalStatusInfo() @@ -228,10 +243,12 @@ class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) if (error.getFailureInfo != null) { cause = error.getFailureInfo.toException } - throw new SQLException(message, error.getSqlState, error.getErrorCode, cause) - } + engineExecutorContext.appendStdout(LogUtils.generateERROR(ExceptionUtils.getFullStackTrace(cause))) + ErrorExecuteResponse(ExceptionUtils.getMessage(cause), cause) + } else null } else if (statement.isClientAborted) { warn(s"Presto statement is killed.") + null } else if (statement.isClientError) { throw PrestoClientException("Presto client error.") } else { diff --git a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala index 268f2e721..90347db31 100644 --- a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala +++ b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala @@ -20,11 +20,11 @@ import org.apache.linkis.engineconn.common.creation.EngineCreationContext import org.apache.linkis.engineconn.common.engineconn.EngineConn import org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory import org.apache.linkis.engineconn.executor.entity.LabelExecutor +import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration +import org.apache.linkis.engineplugin.presto.executer.PrestoEngineConnExecutor import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType import org.apache.linkis.manager.label.entity.engine.RunType.RunType import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType} -import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration -import org.apache.linkis.engineplugin.presto.executer.PrestoEngineConnExecutor class PrestoEngineConnFactory extends ComputationSingleExecutorEngineConnFactory { diff --git a/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/utils/SqlCodeParser.scala b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/utils/SqlCodeParser.scala new file mode 100644 index 000000000..6b16a72e0 --- /dev/null +++ b/linkis-engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/utils/SqlCodeParser.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.engineplugin.presto.utils + +import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration +import org.apache.commons.lang3.StringUtils + +import scala.collection.mutable.ArrayBuffer + + +object SqlCodeParser { + val separator = ";" + val subCol = "SELECT \"SUBCOL\" AS \"COL\" FROM ( SELECT 1 AS \"SUBCOL\" ) \"SUBQUERY\" GROUP BY \"COL\"" + + def parse(code: String): String = { + val codeBuffer = new ArrayBuffer[String]() + + def appendStatement(sqlStatement: String): Unit = { + codeBuffer.append(sqlStatement) + } + + if (StringUtils.contains(code, separator)) { + StringUtils.split(code, ";").foreach { + case s if StringUtils.isBlank(s) => + case s => appendStatement(s.replaceAll("`", "\"")); + } + } else { + code match { + case s if StringUtils.isBlank(s) => + case s => + val pattern = """`[a-zA-Z_0-9 ]+`""".r.unanchored + var tmpS = s + pattern.findAllIn(s).foreach(a => { + val s1 = a.replaceAll("\\s*", "") + tmpS = tmpS.replace(a, s1) + }) + appendStatement(tmpS.replaceAll("`", "\"")); + } + } + + if(codeBuffer.size == 1) { + var code = codeBuffer(0) + code = code.trim.replaceAll("\n", " ").replaceAll("\\s+", " ") + if(code.contains(subCol)) { + codeBuffer(0) = "SELECT 1" + } + } + + codeBuffer.toArray.head + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
