This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new 5be56c90c [LINKIS #1631] Add linkis-engineplugin-presto module (#2173)
5be56c90c is described below
commit 5be56c90c1a86507925ba15b4dd3d95fe50b8930
Author: Zhen Wang <[email protected]>
AuthorDate: Wed Jun 1 11:16:45 2022 +0800
[LINKIS #1631] Add linkis-engineplugin-presto module (#2173)
---
.../engineconn-plugins/presto/pom.xml | 145 ++++++++++
.../presto/src/main/assembly/distribution.xml | 73 +++++
.../main/resources/linkis-engineconn.properties | 22 ++
.../presto/src/main/resources/log4j2.xml | 91 +++++++
.../presto/PrestoEngineConnPlugin.scala | 65 +++++
.../PrestoProcessEngineConnLaunchBuilder.scala | 29 ++
.../presto/conf/PrestoConfiguration.scala | 45 ++++
.../presto/exception/PrestoException.scala | 25 ++
.../presto/executer/PrestoEngineConnExecutor.scala | 298 +++++++++++++++++++++
.../presto/factory/PrestoEngineConnFactory.scala | 39 +++
10 files changed, 832 insertions(+)
diff --git a/linkis-engineconn-plugins/engineconn-plugins/presto/pom.xml
b/linkis-engineconn-plugins/engineconn-plugins/presto/pom.xml
new file mode 100644
index 000000000..a677047f9
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/presto/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>linkis</artifactId>
+ <groupId>org.apache.linkis</groupId>
+ <version>1.1.1</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>linkis-engineplugin-presto</artifactId>
+
+ <properties>
+ <presto.version>0.234</presto.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-engineconn-plugin-core</artifactId>
+ <version>${linkis.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-computation-engineconn</artifactId>
+ <version>${linkis.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-storage</artifactId>
+ <version>${linkis.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-rpc</artifactId>
+ <version>${linkis.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-common</artifactId>
+ <version>${linkis.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- presto -->
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-client</artifactId>
+ <version>${presto.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-resource-group-managers</artifactId>
+ <version>${presto.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>log-manager</artifactId>
+ <groupId>com.facebook.airlift</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.3</version>
+ <inherited>false</inherited>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+
<descriptor>src/main/assembly/distribution.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ <configuration>
+ <skipAssembly>false</skipAssembly>
+ <finalName>out</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ <attach>false</attach>
+ <descriptors>
+
<descriptor>src/main/assembly/distribution.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <includes>
+ <include>**/*.properties</include>
+ <include>**/*.xml</include>
+ <include>**/*.yml</include>
+ </includes>
+ </resource>
+ </resources>
+ </build>
+
+</project>
\ No newline at end of file
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/assembly/distribution.xml
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/assembly/distribution.xml
new file mode 100644
index 000000000..59c43f4d5
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/assembly/distribution.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<assembly
+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/2.3"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/2.3
http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>linkis-engineplugin-presto</id>
+ <formats>
+ <format>dir</format>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>true</includeBaseDirectory>
+ <baseDirectory>presto</baseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <!-- Enable access to all projects in the current multimodule
build! <useAllReactorProjects>true</useAllReactorProjects> -->
+ <!-- Now, select which projects to include in this module-set. -->
+ <outputDirectory>/dist/v${presto.version}/lib</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <useTransitiveDependencies>true</useTransitiveDependencies>
+ <unpack>false</unpack>
+ <useStrictFiltering>false</useStrictFiltering>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+
+ </dependencySet>
+ </dependencySets>
+
+ <fileSets>
+
+ <fileSet>
+ <directory>${basedir}/src/main/resources</directory>
+ <includes>
+ <include>linkis-engineconn.properties</include>
+ <include>log4j2.xml</include>
+ </includes>
+ <fileMode>0777</fileMode>
+ <outputDirectory>/dist/v${presto.version}/conf</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ </fileSet>
+
+ <fileSet>
+ <directory>${basedir}/target</directory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ <excludes>
+ <exclude>*doc.jar</exclude>
+ </excludes>
+ <fileMode>0777</fileMode>
+ <outputDirectory>/plugin/${presto.version}</outputDirectory>
+ </fileSet>
+
+ </fileSets>
+
+</assembly>
+
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/linkis-engineconn.properties
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/linkis-engineconn.properties
new file mode 100644
index 000000000..d2497cda3
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/linkis-engineconn.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+wds.linkis.server.version=v1
+#wds.linkis.engineconn.debug.enable=true
+#wds.linkis.keytab.enable=true
+wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.presto.PrestoEngineConnPlugin
+
+wds.linkis.engineconn.support.parallelism=true
+
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/log4j2.xml
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..020b94567
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/log4j2.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration status="error" monitorInterval="30">
+ <appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t]
%logger{36} %L %M - %msg%xEx%n"/>
+ </Console>
+
+ <Send name="Send" >
+ <Filters>
+ <ThresholdFilter level="WARN" onMatch="ACCEPT"
onMismatch="DENY" />
+ </Filters>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t]
%logger{36} %L %M - %msg%xEx%n"/>
+ </Send>
+
+ <Send name="SendPackage" >
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t]
%logger{36} %L %M - %msg%xEx%n"/>
+ </Send>
+
+ <Console name="stderr" target="SYSTEM_ERR">
+ <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"
/>
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M
- %msg%xEx%n"/>
+ </Console>
+ </appenders>
+
+ <loggers>
+ <root level="INFO">
+ <appender-ref ref="stderr"/>
+ <appender-ref ref="Console"/>
+ <appender-ref ref="Send"/>
+ </root>
+ <logger name="org.apache.hadoop.hive.ql.exec.StatsTask" level="info"
additivity="true">
+ <appender-ref ref="SendPackage"/>
+ </logger>
+ <logger
name="org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter"
level="error" additivity="true">
+ <appender-ref ref="stderr"/>
+ </logger>
+ <logger name="com.netflix.discovery" level="warn" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.hadoop.yarn" level="warn" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.springframework" level="warn" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.linkis.server.security" level="warn"
additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.hadoop.hive.ql.exec.mr.ExecDriver"
level="fatal" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.apache.hadoop.hdfs.KeyProviderCache" level="fatal"
additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.spark_project.jetty" level="ERROR" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.eclipse.jetty" level="ERROR" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.springframework" level="ERROR" additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+ <logger name="org.reflections.Reflections" level="ERROR"
additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+
+ <logger name="org.apache.hadoop.ipc.Client" level="ERROR"
additivity="true">
+ <appender-ref ref="Send"/>
+ </logger>
+
+ </loggers>
+</configuration>
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
new file mode 100644
index 000000000..063e75568
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto
+
+import java.util
+
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
+import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
+import
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
+import
org.apache.linkis.manager.engineplugin.common.resource.{EngineResourceFactory,
GenericEngineResourceFactory}
+import org.apache.linkis.manager.label.entity.Label
+import
org.apache.linkis.engineplugin.presto.builder.PrestoProcessEngineConnLaunchBuilder
+import org.apache.linkis.engineplugin.presto.factory.PrestoEngineConnFactory
+
+class PrestoEngineConnPlugin extends EngineConnPlugin {
+
+ private var engineResourceFactory: EngineResourceFactory = _
+
+ private var engineFactory: EngineConnFactory = _
+
+ private val defaultLabels: util.List[Label[_]] = new
util.ArrayList[Label[_]]()
+
+ private val resourceLocker = new Array[Byte](0)
+
+ private val engineFactoryLocker = new Array[Byte](0)
+
+ override def init(params: util.Map[String, Any]): Unit = {
+
+ }
+
+ override def getEngineResourceFactory: EngineResourceFactory = {
+ if (null == engineResourceFactory) resourceLocker.synchronized {
+ if (null == engineResourceFactory) engineResourceFactory = new
GenericEngineResourceFactory
+ }
+ engineResourceFactory
+ }
+
+ override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
+ new PrestoProcessEngineConnLaunchBuilder
+ }
+
+ override def getEngineConnFactory: EngineConnFactory = {
+ if (null == engineFactory) engineFactoryLocker.synchronized {
+ if (null == engineFactory) engineFactory = new PrestoEngineConnFactory
+ }
+ engineFactory
+ }
+
+ override def getDefaultLabels: util.List[Label[_]] = defaultLabels
+
+}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.scala
new file mode 100644
index 000000000..f9593508b
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.builder
+
+import
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.storage.utils.StorageConfiguration
+
+class PrestoProcessEngineConnLaunchBuilder extends
JavaProcessEngineConnLaunchBuilder {
+
+ override def getEngineStartUser(label: UserCreatorLabel): String = {
+ StorageConfiguration.HDFS_ROOT_USER.getValue
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
new file mode 100644
index 000000000..bbdd5030f
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.conf
+
+import java.lang
+
+import org.apache.linkis.common.conf.{ByteType, CommonVars}
+
+object PrestoConfiguration {
+
+ val ENGINE_CONCURRENT_LIMIT =
CommonVars[Int]("wds.linkis.engineconn.concurrent.limit", 100)
+
+ val ENTRANCE_MAX_JOB_INSTANCE =
CommonVars[Int]("wds.linkis.entrance.max.job.instance", 100)
+ val ENTRANCE_PROTECTED_JOB_INSTANCE =
CommonVars[Int]("wds.linkis.entrance.protected.job.instance", 0)
+ val ENTRANCE_RESULTS_MAX_CACHE =
CommonVars("wds.linkis.presto.resultSet.cache.max", new ByteType("512k"))
+
+ val PRESTO_HTTP_CONNECT_TIME_OUT =
CommonVars[java.lang.Long]("wds.linkis.presto.http.connectTimeout", new
lang.Long(60))
+ val PRESTO_HTTP_READ_TIME_OUT =
CommonVars[java.lang.Long]("wds.linkis.presto.http.readTimeout", new
lang.Long(60))
+
+
+ val ENGINE_DEFAULT_LIMIT = CommonVars("wds.linkis.presto.default.limit",
5000)
+ val PRESTO_URL = CommonVars("wds.linkis.presto.url", "http://127.0.0.1:8080")
+ val PRESTO_RESOURCE_CONFIG_PATH =
CommonVars("wds.linkis.presto.resource.config", "");
+ val PRESTO_USER_NAME = CommonVars("wds.linkis.presto.username", "default")
+ val PRESTO_PASSWORD = CommonVars("wds.linkis.presto.password", "")
+ val PRESTO_CATALOG = CommonVars("wds.linkis.presto.catalog", "system")
+ val PRESTO_SCHEMA = CommonVars("wds.linkis.presto.schema", "")
+ val PRESTO_SOURCE = CommonVars("wds.linkis.presto.source", "global")
+ val PRESTO_REQUEST_MEMORY =
CommonVars("presto.session.query_max_total_memory", "8GB")
+
+}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/exception/PrestoException.scala
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/exception/PrestoException.scala
new file mode 100644
index 000000000..30f964f1b
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/exception/PrestoException.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.exception
+
+import org.apache.linkis.common.exception.ErrorException
+
+case class PrestoStateInvalidException(message: String) extends
ErrorException(60011, message: String)
+
+case class PrestoClientException(message: String) extends
ErrorException(60012, message: String)
+
+case class PrestoSourceGroupException(message: String) extends
ErrorException(60013, message: String)
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
new file mode 100644
index 000000000..599e5a677
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.executer
+
+import java.net.URI
+import java.sql.SQLException
+import java.util
+import java.util._
+import java.util.concurrent.TimeUnit
+
+import com.facebook.presto.client._
+import com.facebook.presto.spi.security.SelectedRole
+import com.google.common.cache.{Cache, CacheBuilder}
+import okhttp3.OkHttpClient
+import org.apache.commons.io.IOUtils
+import org.apache.linkis.common.log.LogUtils
+import org.apache.linkis.common.utils.{OverloadUtils, Utils}
+import org.apache.linkis.engineconn.common.conf.{EngineConnConf,
EngineConnConstant}
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
+import
org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor,
EngineExecutionContext}
+import org.apache.linkis.engineconn.core.EngineConnObject
+import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration._
+import org.apache.linkis.engineplugin.presto.exception.{PrestoClientException,
PrestoStateInvalidException}
+import org.apache.linkis.governance.common.paser.SQLCodeParser
+import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource,
LoadResource, NodeResource}
+import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.protocol.engine.JobProgressInfo
+import org.apache.linkis.rpc.Sender
+import org.apache.linkis.scheduler.executer.{ExecuteResponse,
SuccessExecuteResponse}
+import org.apache.linkis.storage.domain.Column
+import org.apache.linkis.storage.resultset.ResultSetFactory
+import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
+import org.springframework.util.CollectionUtils
+
+import scala.collection.JavaConverters._
+
+class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id:
Int) extends ConcurrentComputationExecutor(outputPrintLimit) {
+
+ private var okHttpClient: OkHttpClient =
PrestoEngineConnExecutor.OK_HTTP_CLIENT
+
+ private val executorLabels: util.List[Label[_]] = new
util.ArrayList[Label[_]](2)
+
+ private val clientSessionCache: Cache[String, ClientSession] =
CacheBuilder.newBuilder()
+ .expireAfterAccess(EngineConnConf.ENGINE_TASK_EXPIRE_TIME.getValue,
TimeUnit.MILLISECONDS)
+ .maximumSize(EngineConnConstant.MAX_TASK_NUM).build()
+
+ override def init: Unit = {
+ setCodeParser(new SQLCodeParser)
+ super.init
+ }
+
+ override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
+ val user = getUserCreatorLabel(engineConnTask.getLables).getUser
+ clientSessionCache.put(engineConnTask.getTaskId, getClientSession(user,
engineConnTask.getProperties))
+ super.execute(engineConnTask)
+ }
+
+ override def executeLine(engineExecutorContext: EngineExecutionContext,
code: String): ExecuteResponse = {
+ val realCode = code.trim
+ info(s"presto client begins to run psql code:\n $realCode")
+
+ val taskId = engineExecutorContext.getJobId.get
+
+ val clientSession = clientSessionCache.getIfPresent(taskId)
+ val statement = StatementClientFactory.newStatementClient(okHttpClient,
clientSession, realCode)
+
+ initialStatusUpdates(taskId, engineExecutorContext, statement)
+
+ if (statement.isRunning || (statement.isFinished &&
statement.finalStatusInfo().getError == null)) {
+ queryOutput(taskId, engineExecutorContext, statement)
+ }
+
+ verifyServerError(taskId, engineExecutorContext, statement)
+
+ // update session
+ clientSessionCache.put(taskId, updateSession(clientSession, statement))
+
+ SuccessExecuteResponse()
+ }
+
+ override def executeCompletely(engineExecutorContext:
EngineExecutionContext, code: String, completedLine: String): ExecuteResponse =
null
+
+ override def progress(taskID: String): Float = 0.0f
+
+ override def getProgressInfo(taskID: String): Array[JobProgressInfo] =
Array.empty[JobProgressInfo]
+
+ override def getExecutorLabels(): util.List[Label[_]] = executorLabels
+
+ override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
+ if (!CollectionUtils.isEmpty(labels)) {
+ executorLabels.clear()
+ executorLabels.addAll(labels)
+ }
+ }
+
+ override def supportCallBackLogs(): Boolean = false
+
+ override def requestExpectedResource(expectedResource: NodeResource):
NodeResource = {
+ null
+ }
+
+ override def getCurrentNodeResource(): NodeResource = {
+ val properties = EngineConnObject.getEngineCreationContext.getOptions
+ if
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
+ val settingClientMemory =
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
+ if (!settingClientMemory.toLowerCase().endsWith("g")) {
+ properties.put(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key,
settingClientMemory + "g")
+ }
+ }
+ val resource = new CommonNodeResource
+ val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
+ resource.setUsedResource(usedResource)
+ resource
+ }
+
+ override def getId(): String = Sender.getThisServiceInstance.getInstance +
s"_$id"
+
+ override def getConcurrentLimit: Int = ENGINE_CONCURRENT_LIMIT.getValue
+
+ private def getClientSession(user: String, taskParams : util.Map[String,
Object]): ClientSession = {
+ val configMap = new util.HashMap[String, String]()
+ taskParams.asScala.foreach {
+ case (key: String, value: Object) if value != null => configMap.put(key,
String.valueOf(value))
+ case _ =>
+ }
+ val httpUri: URI = URI.create(PRESTO_URL.getValue(configMap))
+ val source: String = PRESTO_SOURCE.getValue(configMap)
+ val catalog: String = PRESTO_CATALOG.getValue(configMap)
+ val schema: String = PRESTO_SCHEMA.getValue(configMap)
+
+ val properties: util.Map[String, String] = configMap.asScala
+ .filter(tuple => tuple._1.startsWith("presto.session."))
+ .map(tuple => (tuple._1.substring("presto.session.".length), tuple._2))
+ .asJava
+
+ val clientInfo: String = "Linkis"
+ val transactionId: String = null
+ val traceToken: util.Optional[String] = Optional.empty()
+ val clientTags: util.Set[String] = Collections.emptySet()
+ val timeZonId = TimeZone.getDefault.getID
+ val locale: Locale = Locale.getDefault
+ val resourceEstimates: util.Map[String, String] = Collections.emptyMap()
+ val preparedStatements: util.Map[String, String] = Collections.emptyMap()
+ val roles: java.util.Map[String, SelectedRole] = Collections.emptyMap()
+ val extraCredentials: util.Map[String, String] = Collections.emptyMap()
+ val clientRequestTimeout: io.airlift.units.Duration = new
io.airlift.units.Duration(0, TimeUnit.MILLISECONDS)
+
+ new ClientSession(httpUri, user, source, traceToken, clientTags,
clientInfo, catalog, schema, timeZonId, locale,
+ resourceEstimates, properties, preparedStatements, roles,
extraCredentials, transactionId, clientRequestTimeout)
+ }
+
+ private def getUserCreatorLabel(labels: Array[Label[_]]): UserCreatorLabel =
{
+ labels
+ .find(l => l.isInstanceOf[UserCreatorLabel])
+ .get
+ .asInstanceOf[UserCreatorLabel]
+ }
+
+ private def initialStatusUpdates(taskId: String, engineExecutorContext:
EngineExecutionContext, statement: StatementClient): Unit = {
+ while (statement.isRunning
+ && (statement.currentData().getData == null ||
statement.currentStatusInfo().getUpdateType != null)) {
+ engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId))
+ statement.advance()
+ }
+ }
+
+ private def queryOutput(taskId: String, engineExecutorContext:
EngineExecutionContext, statement: StatementClient): Unit = {
+ var columnCount = 0
+ var rows = 0
+ val resultSetWriter =
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
+ Utils.tryFinally({
+ var results: QueryStatusInfo = null
+ if (statement.isRunning) {
+ results = statement.currentStatusInfo()
+ } else {
+ results = statement.finalStatusInfo()
+ }
+ if (results.getColumns == null) {
+ throw new RuntimeException("presto columns is null.")
+ }
+ val columns = results.getColumns.asScala
+ .map(column => Column(column.getName, column.getType,
"")).toArray[Column]
+ columnCount = columns.length
+ resultSetWriter.addMetaData(new TableMetaData(columns))
+ while (statement.isRunning) {
+ val data = statement.currentData().getData
+ if (data != null) for (row <- data.asScala) {
+ val rowArray = row.asScala.map(r => String.valueOf(r))
+ resultSetWriter.addRecord(new TableRecord(rowArray.toArray))
+ rows += 1
+ }
+ engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId))
+ statement.advance()
+ }
+ })(IOUtils.closeQuietly(resultSetWriter))
+
+ info(s"Fetched $columnCount col(s) : $rows row(s) in presto")
+ engineExecutorContext.appendStdout(LogUtils.generateInfo(s"Fetched
$columnCount col(s) : $rows row(s) in presto"));
+ engineExecutorContext.sendResultSet(resultSetWriter)
+ IOUtils.closeQuietly(resultSetWriter)
+ }
+
+ // check presto error
+ private def verifyServerError(taskId: String, engineExecutorContext:
EngineExecutionContext, statement: StatementClient): Unit = {
+ engineExecutorContext.pushProgress(progress(taskId),
getProgressInfo(taskId))
+ if (statement.isFinished) {
+ val info: QueryStatusInfo = statement.finalStatusInfo()
+ if (info.getError != null) {
+ val error = Objects.requireNonNull(info.getError);
+ val message: String = s"Presto execute failed (#${info.getId}):
${error.getMessage}"
+ var cause: Throwable = null
+ if (error.getFailureInfo != null) {
+ cause = error.getFailureInfo.toException
+ }
+ throw new SQLException(message, error.getSqlState, error.getErrorCode,
cause)
+ }
+ } else if (statement.isClientAborted) {
+ warn(s"Presto statement is killed.")
+ } else if (statement.isClientError) {
+ throw PrestoClientException("Presto client error.")
+ } else {
+ throw PrestoStateInvalidException("Presto status error. Statement is not
finished.")
+ }
+ }
+
+ private def updateSession(clientSession: ClientSession, statement:
StatementClient): ClientSession = {
+ var newSession = clientSession
+ // update catalog and schema if present
+ if (statement.getSetCatalog.isPresent || statement.getSetSchema.isPresent)
{
+ newSession = ClientSession.builder(newSession)
+ .withCatalog(statement.getSetCatalog.orElse(newSession.getCatalog))
+ .withSchema(statement.getSetSchema.orElse(newSession.getSchema))
+ .build
+ }
+
+ // update transaction ID if necessary
+ if (statement.isClearTransactionId) newSession =
ClientSession.stripTransactionId(newSession)
+
+ var builder: ClientSession.Builder = ClientSession.builder(newSession)
+
+ if (statement.getStartedTransactionId != null) builder =
builder.withTransactionId(statement.getStartedTransactionId)
+
+ // update session properties if present
+ if (!statement.getSetSessionProperties.isEmpty ||
!statement.getResetSessionProperties.isEmpty) {
+ val sessionProperties: util.Map[String, String] = new
util.HashMap[String, String](newSession.getProperties)
+ sessionProperties.putAll(statement.getSetSessionProperties)
+ sessionProperties.keySet.removeAll(statement.getResetSessionProperties)
+ builder = builder.withProperties(sessionProperties)
+ }
+
+ // update session roles
+ if (!statement.getSetRoles.isEmpty) {
+ val roles: util.Map[String, SelectedRole] = new util.HashMap[String,
SelectedRole](newSession.getRoles)
+ roles.putAll(statement.getSetRoles)
+ builder = builder.withRoles(roles)
+ }
+
+ // update prepared statements if present
+ if (!statement.getAddedPreparedStatements.isEmpty ||
!statement.getDeallocatedPreparedStatements.isEmpty) {
+ val preparedStatements: util.Map[String, String] = new
util.HashMap[String, String](newSession.getPreparedStatements)
+ preparedStatements.putAll(statement.getAddedPreparedStatements)
+
preparedStatements.keySet.removeAll(statement.getDeallocatedPreparedStatements)
+ builder = builder.withPreparedStatements(preparedStatements)
+ }
+
+ newSession
+ }
+
+ override def killAll(): Unit = {
+
+ }
+
+}
+
+object PrestoEngineConnExecutor {
+
+ private val OK_HTTP_CLIENT: OkHttpClient = new
OkHttpClient.Builder().socketFactory(new SocketChannelSocketFactory)
+ .connectTimeout(PRESTO_HTTP_CONNECT_TIME_OUT.getValue, TimeUnit.SECONDS)
+ .readTimeout(PRESTO_HTTP_READ_TIME_OUT.getValue, TimeUnit.SECONDS)
+ .build()
+
+}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala
new file mode 100644
index 000000000..268f2e721
--- /dev/null
+++
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import
org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory
+import org.apache.linkis.engineconn.executor.entity.LabelExecutor
+import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
+import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration
+import org.apache.linkis.engineplugin.presto.executer.PrestoEngineConnExecutor
+
+class PrestoEngineConnFactory extends
ComputationSingleExecutorEngineConnFactory {
+
+ override def newExecutor(id: Int, engineCreationContext:
EngineCreationContext, engineConn: EngineConn): LabelExecutor = {
+ new
PrestoEngineConnExecutor(PrestoConfiguration.ENGINE_DEFAULT_LIMIT.getValue, id)
+ }
+
+ override protected def getEngineConnType: EngineType = EngineType.PRESTO
+
+ override protected def getRunType: RunType = RunType.PRESTO_SQL
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]