This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
     new 5be56c90c [LINKIS #1631] Add linkis-engineplugin-presto module (#2173)
5be56c90c is described below

commit 5be56c90c1a86507925ba15b4dd3d95fe50b8930
Author: Zhen Wang <[email protected]>
AuthorDate: Wed Jun 1 11:16:45 2022 +0800

    [LINKIS #1631] Add linkis-engineplugin-presto module (#2173)
---
 .../engineconn-plugins/presto/pom.xml              | 145 ++++++++++
 .../presto/src/main/assembly/distribution.xml      |  73 +++++
 .../main/resources/linkis-engineconn.properties    |  22 ++
 .../presto/src/main/resources/log4j2.xml           |  91 +++++++
 .../presto/PrestoEngineConnPlugin.scala            |  65 +++++
 .../PrestoProcessEngineConnLaunchBuilder.scala     |  29 ++
 .../presto/conf/PrestoConfiguration.scala          |  45 ++++
 .../presto/exception/PrestoException.scala         |  25 ++
 .../presto/executer/PrestoEngineConnExecutor.scala | 298 +++++++++++++++++++++
 .../presto/factory/PrestoEngineConnFactory.scala   |  39 +++
 10 files changed, 832 insertions(+)

diff --git a/linkis-engineconn-plugins/engineconn-plugins/presto/pom.xml 
b/linkis-engineconn-plugins/engineconn-plugins/presto/pom.xml
new file mode 100644
index 000000000..a677047f9
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/presto/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>linkis</artifactId>
+        <groupId>org.apache.linkis</groupId>
+        <version>1.1.1</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>linkis-engineplugin-presto</artifactId>
+
+    <properties>
+        <presto.version>0.234</presto.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.linkis</groupId>
+            <artifactId>linkis-engineconn-plugin-core</artifactId>
+            <version>${linkis.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.linkis</groupId>
+            <artifactId>linkis-computation-engineconn</artifactId>
+            <version>${linkis.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.linkis</groupId>
+            <artifactId>linkis-storage</artifactId>
+            <version>${linkis.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.linkis</groupId>
+            <artifactId>linkis-rpc</artifactId>
+            <version>${linkis.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.linkis</groupId>
+            <artifactId>linkis-common</artifactId>
+            <version>${linkis.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- presto -->
+        <dependency>
+            <groupId>com.facebook.presto</groupId>
+            <artifactId>presto-client</artifactId>
+            <version>${presto.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.facebook.presto</groupId>
+            <artifactId>presto-resource-group-managers</artifactId>
+            <version>${presto.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>log-manager</artifactId>
+                    <groupId>com.facebook.airlift</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.3</version>
+                <inherited>false</inherited>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <descriptors>
+                                
<descriptor>src/main/assembly/distribution.xml</descriptor>
+                            </descriptors>
+                        </configuration>
+                    </execution>
+                </executions>
+                <configuration>
+                    <skipAssembly>false</skipAssembly>
+                    <finalName>out</finalName>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                    <descriptors>
+                        
<descriptor>src/main/assembly/distribution.xml</descriptor>
+                    </descriptors>
+                </configuration>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/main/resources</directory>
+                <includes>
+                    <include>**/*.properties</include>
+                    <include>**/*.xml</include>
+                    <include>**/*.yml</include>
+                </includes>
+            </resource>
+        </resources>
+    </build>
+
+</project>
\ No newline at end of file
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/assembly/distribution.xml
 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/assembly/distribution.xml
new file mode 100644
index 000000000..59c43f4d5
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/assembly/distribution.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly
+        
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/2.3";
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/2.3
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+    <id>linkis-engineplugin-presto</id>
+    <formats>
+        <format>dir</format>
+        <format>zip</format>
+    </formats>
+    <includeBaseDirectory>true</includeBaseDirectory>
+    <baseDirectory>presto</baseDirectory>
+
+    <dependencySets>
+        <dependencySet>
+            <!-- Enable access to all projects in the current multimodule 
build! <useAllReactorProjects>true</useAllReactorProjects> -->
+            <!-- Now, select which projects to include in this module-set. -->
+            <outputDirectory>/dist/v${presto.version}/lib</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <useTransitiveDependencies>true</useTransitiveDependencies>
+            <unpack>false</unpack>
+            <useStrictFiltering>false</useStrictFiltering>
+            <useTransitiveFiltering>true</useTransitiveFiltering>
+
+        </dependencySet>
+    </dependencySets>
+
+    <fileSets>
+
+        <fileSet>
+            <directory>${basedir}/src/main/resources</directory>
+            <includes>
+                <include>linkis-engineconn.properties</include>
+                <include>log4j2.xml</include>
+            </includes>
+            <fileMode>0777</fileMode>
+            <outputDirectory>/dist/v${presto.version}/conf</outputDirectory>
+            <lineEnding>unix</lineEnding>
+        </fileSet>
+
+        <fileSet>
+            <directory>${basedir}/target</directory>
+            <includes>
+                <include>*.jar</include>
+            </includes>
+            <excludes>
+                <exclude>*doc.jar</exclude>
+            </excludes>
+            <fileMode>0777</fileMode>
+            <outputDirectory>/plugin/${presto.version}</outputDirectory>
+        </fileSet>
+
+    </fileSets>
+
+</assembly>
+
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/linkis-engineconn.properties
 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/linkis-engineconn.properties
new file mode 100644
index 000000000..d2497cda3
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/linkis-engineconn.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+wds.linkis.server.version=v1
+#wds.linkis.engineconn.debug.enable=true
+#wds.linkis.keytab.enable=true
+wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.presto.PrestoEngineConnPlugin
+
+wds.linkis.engineconn.support.parallelism=true
+
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/log4j2.xml
 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..020b94567
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/resources/log4j2.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~ 
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~ 
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+  
+<configuration status="error" monitorInterval="30">
+    <appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] 
%logger{36} %L %M - %msg%xEx%n"/>
+        </Console>
+
+        <Send name="Send" >
+            <Filters>
+                <ThresholdFilter level="WARN" onMatch="ACCEPT" 
onMismatch="DENY" />
+            </Filters>
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] 
%logger{36} %L %M - %msg%xEx%n"/>
+        </Send>
+
+        <Send name="SendPackage" >
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] 
%logger{36} %L %M - %msg%xEx%n"/>
+        </Send>
+
+        <Console name="stderr" target="SYSTEM_ERR">
+            <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" 
/>
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M 
- %msg%xEx%n"/>
+        </Console>
+    </appenders>
+
+    <loggers>
+      <root level="INFO">
+            <appender-ref ref="stderr"/>
+            <appender-ref ref="Console"/>
+            <appender-ref ref="Send"/>
+        </root>
+        <logger name="org.apache.hadoop.hive.ql.exec.StatsTask" level="info" 
additivity="true">
+            <appender-ref ref="SendPackage"/>
+        </logger>
+        <logger 
name="org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter" 
level="error" additivity="true">
+            <appender-ref ref="stderr"/>
+        </logger>
+        <logger name="com.netflix.discovery" level="warn" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.hadoop.yarn" level="warn" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.springframework" level="warn" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.linkis.server.security" level="warn" 
additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.hadoop.hive.ql.exec.mr.ExecDriver" 
level="fatal" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.hadoop.hdfs.KeyProviderCache" level="fatal" 
additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.spark_project.jetty" level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.eclipse.jetty" level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.springframework" level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.reflections.Reflections" level="ERROR" 
additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+
+        <logger name="org.apache.hadoop.ipc.Client" level="ERROR" 
additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+
+   </loggers>
+</configuration>
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
new file mode 100644
index 000000000..063e75568
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/PrestoEngineConnPlugin.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto
+
+import java.util
+
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
+import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
+import 
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
+import 
org.apache.linkis.manager.engineplugin.common.resource.{EngineResourceFactory, 
GenericEngineResourceFactory}
+import org.apache.linkis.manager.label.entity.Label
+import 
org.apache.linkis.engineplugin.presto.builder.PrestoProcessEngineConnLaunchBuilder
+import org.apache.linkis.engineplugin.presto.factory.PrestoEngineConnFactory
+
+class PrestoEngineConnPlugin extends EngineConnPlugin {
+
+  private var engineResourceFactory: EngineResourceFactory = _
+
+  private var engineFactory: EngineConnFactory = _
+
+  private val defaultLabels: util.List[Label[_]] = new 
util.ArrayList[Label[_]]()
+
+  private val resourceLocker = new Array[Byte](0)
+
+  private val engineFactoryLocker = new Array[Byte](0)
+
+  override def init(params: util.Map[String, Any]): Unit = {
+
+  }
+
+  override def getEngineResourceFactory: EngineResourceFactory = {
+    if (null == engineResourceFactory) resourceLocker.synchronized {
+      if (null == engineResourceFactory) engineResourceFactory = new 
GenericEngineResourceFactory
+    }
+    engineResourceFactory
+  }
+
+  override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
+    new PrestoProcessEngineConnLaunchBuilder
+  }
+
+  override def getEngineConnFactory: EngineConnFactory = {
+    if (null == engineFactory) engineFactoryLocker.synchronized {
+      if (null == engineFactory) engineFactory = new PrestoEngineConnFactory
+    }
+    engineFactory
+  }
+
+  override def getDefaultLabels: util.List[Label[_]] = defaultLabels
+
+}
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.scala
new file mode 100644
index 000000000..f9593508b
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/builder/PrestoProcessEngineConnLaunchBuilder.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.builder
+
+import 
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.storage.utils.StorageConfiguration
+
+class PrestoProcessEngineConnLaunchBuilder extends 
JavaProcessEngineConnLaunchBuilder {
+
+  override def getEngineStartUser(label: UserCreatorLabel): String = {
+    StorageConfiguration.HDFS_ROOT_USER.getValue
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
new file mode 100644
index 000000000..bbdd5030f
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/conf/PrestoConfiguration.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.conf
+
+import java.lang
+
+import org.apache.linkis.common.conf.{ByteType, CommonVars}
+
+object PrestoConfiguration {
+
+  val ENGINE_CONCURRENT_LIMIT = 
CommonVars[Int]("wds.linkis.engineconn.concurrent.limit", 100)
+
+  val ENTRANCE_MAX_JOB_INSTANCE = 
CommonVars[Int]("wds.linkis.entrance.max.job.instance", 100)
+  val ENTRANCE_PROTECTED_JOB_INSTANCE = 
CommonVars[Int]("wds.linkis.entrance.protected.job.instance", 0)
+  val ENTRANCE_RESULTS_MAX_CACHE = 
CommonVars("wds.linkis.presto.resultSet.cache.max", new ByteType("512k"))
+
+  val PRESTO_HTTP_CONNECT_TIME_OUT = 
CommonVars[java.lang.Long]("wds.linkis.presto.http.connectTimeout", new 
lang.Long(60))
+  val PRESTO_HTTP_READ_TIME_OUT = 
CommonVars[java.lang.Long]("wds.linkis.presto.http.readTimeout", new 
lang.Long(60))
+
+
+  val ENGINE_DEFAULT_LIMIT = CommonVars("wds.linkis.presto.default.limit", 
5000)
+  val PRESTO_URL = CommonVars("wds.linkis.presto.url", "http://127.0.0.1:8080";)
+  val PRESTO_RESOURCE_CONFIG_PATH = 
CommonVars("wds.linkis.presto.resource.config", "");
+  val PRESTO_USER_NAME = CommonVars("wds.linkis.presto.username", "default")
+  val PRESTO_PASSWORD = CommonVars("wds.linkis.presto.password", "")
+  val PRESTO_CATALOG = CommonVars("wds.linkis.presto.catalog", "system")
+  val PRESTO_SCHEMA = CommonVars("wds.linkis.presto.schema", "")
+  val PRESTO_SOURCE = CommonVars("wds.linkis.presto.source", "global")
+  val PRESTO_REQUEST_MEMORY = 
CommonVars("presto.session.query_max_total_memory", "8GB")
+
+}
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/exception/PrestoException.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/exception/PrestoException.scala
new file mode 100644
index 000000000..30f964f1b
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/exception/PrestoException.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.exception
+
+import org.apache.linkis.common.exception.ErrorException
+
+case class PrestoStateInvalidException(message: String) extends 
ErrorException(60011, message: String)
+
+case class PrestoClientException(message: String) extends 
ErrorException(60012, message: String)
+
+case class PrestoSourceGroupException(message: String) extends 
ErrorException(60013, message: String)
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
new file mode 100644
index 000000000..599e5a677
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/executer/PrestoEngineConnExecutor.scala
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.executer
+
+import java.net.URI
+import java.sql.SQLException
+import java.util
+import java.util._
+import java.util.concurrent.TimeUnit
+
+import com.facebook.presto.client._
+import com.facebook.presto.spi.security.SelectedRole
+import com.google.common.cache.{Cache, CacheBuilder}
+import okhttp3.OkHttpClient
+import org.apache.commons.io.IOUtils
+import org.apache.linkis.common.log.LogUtils
+import org.apache.linkis.common.utils.{OverloadUtils, Utils}
+import org.apache.linkis.engineconn.common.conf.{EngineConnConf, 
EngineConnConstant}
+import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
+import 
org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor,
 EngineExecutionContext}
+import org.apache.linkis.engineconn.core.EngineConnObject
+import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration._
+import org.apache.linkis.engineplugin.presto.exception.{PrestoClientException, 
PrestoStateInvalidException}
+import org.apache.linkis.governance.common.paser.SQLCodeParser
+import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, 
LoadResource, NodeResource}
+import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.protocol.engine.JobProgressInfo
+import org.apache.linkis.rpc.Sender
+import org.apache.linkis.scheduler.executer.{ExecuteResponse, 
SuccessExecuteResponse}
+import org.apache.linkis.storage.domain.Column
+import org.apache.linkis.storage.resultset.ResultSetFactory
+import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
+import org.springframework.util.CollectionUtils
+
+import scala.collection.JavaConverters._
+
+class PrestoEngineConnExecutor(override val outputPrintLimit: Int, val id: 
Int) extends ConcurrentComputationExecutor(outputPrintLimit) {
+
+  private var okHttpClient: OkHttpClient = 
PrestoEngineConnExecutor.OK_HTTP_CLIENT
+
+  private val executorLabels: util.List[Label[_]] = new 
util.ArrayList[Label[_]](2)
+
+  private val clientSessionCache: Cache[String, ClientSession] = 
CacheBuilder.newBuilder()
+    .expireAfterAccess(EngineConnConf.ENGINE_TASK_EXPIRE_TIME.getValue, 
TimeUnit.MILLISECONDS)
+    .maximumSize(EngineConnConstant.MAX_TASK_NUM).build()
+
+  override def init: Unit = {
+    setCodeParser(new SQLCodeParser)
+    super.init
+  }
+
+  override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
+    val user = getUserCreatorLabel(engineConnTask.getLables).getUser
+    clientSessionCache.put(engineConnTask.getTaskId, getClientSession(user, 
engineConnTask.getProperties))
+    super.execute(engineConnTask)
+  }
+
+  override def executeLine(engineExecutorContext: EngineExecutionContext, 
code: String): ExecuteResponse = {
+    val realCode = code.trim
+    info(s"presto client begins to run psql code:\n $realCode")
+
+    val taskId = engineExecutorContext.getJobId.get
+
+    val clientSession = clientSessionCache.getIfPresent(taskId)
+    val statement = StatementClientFactory.newStatementClient(okHttpClient, 
clientSession, realCode)
+
+    initialStatusUpdates(taskId, engineExecutorContext, statement)
+
+    if (statement.isRunning || (statement.isFinished && 
statement.finalStatusInfo().getError == null)) {
+      queryOutput(taskId, engineExecutorContext, statement)
+    }
+
+    verifyServerError(taskId, engineExecutorContext, statement)
+
+    // update session
+    clientSessionCache.put(taskId, updateSession(clientSession, statement))
+
+    SuccessExecuteResponse()
+  }
+
+  override def executeCompletely(engineExecutorContext: 
EngineExecutionContext, code: String, completedLine: String): ExecuteResponse = 
null
+
+  override def progress(taskID: String): Float = 0.0f
+
+  override def getProgressInfo(taskID: String): Array[JobProgressInfo] = 
Array.empty[JobProgressInfo]
+
+  override def getExecutorLabels(): util.List[Label[_]] = executorLabels
+
+  override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
+    if (!CollectionUtils.isEmpty(labels)) {
+      executorLabels.clear()
+      executorLabels.addAll(labels)
+    }
+  }
+
+  override def supportCallBackLogs(): Boolean = false
+
+  override def requestExpectedResource(expectedResource: NodeResource): 
NodeResource = {
+    null
+  }
+
+  override def getCurrentNodeResource(): NodeResource = {
+    val properties = EngineConnObject.getEngineCreationContext.getOptions
+    if 
(properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
+      val settingClientMemory = 
properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
+      if (!settingClientMemory.toLowerCase().endsWith("g")) {
+        properties.put(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key, 
settingClientMemory + "g")
+      }
+    }
+    val resource = new CommonNodeResource
+    val usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory, 1)
+    resource.setUsedResource(usedResource)
+    resource
+  }
+
+  override def getId(): String = Sender.getThisServiceInstance.getInstance + 
s"_$id"
+
+  override def getConcurrentLimit: Int = ENGINE_CONCURRENT_LIMIT.getValue
+
+  private def getClientSession(user: String, taskParams : util.Map[String, 
Object]): ClientSession = {
+    val configMap = new util.HashMap[String, String]()
+    taskParams.asScala.foreach {
+      case (key: String, value: Object) if value != null => configMap.put(key, 
String.valueOf(value))
+      case _ =>
+    }
+    val httpUri: URI = URI.create(PRESTO_URL.getValue(configMap))
+    val source: String = PRESTO_SOURCE.getValue(configMap)
+    val catalog: String = PRESTO_CATALOG.getValue(configMap)
+    val schema: String = PRESTO_SCHEMA.getValue(configMap)
+
+    val properties: util.Map[String, String] = configMap.asScala
+      .filter(tuple => tuple._1.startsWith("presto.session."))
+      .map(tuple => (tuple._1.substring("presto.session.".length), tuple._2))
+      .asJava
+
+    val clientInfo: String = "Linkis"
+    val transactionId: String = null
+    val traceToken: util.Optional[String] = Optional.empty()
+    val clientTags: util.Set[String] = Collections.emptySet()
+    val timeZonId = TimeZone.getDefault.getID
+    val locale: Locale = Locale.getDefault
+    val resourceEstimates: util.Map[String, String] = Collections.emptyMap()
+    val preparedStatements: util.Map[String, String] = Collections.emptyMap()
+    val roles: java.util.Map[String, SelectedRole] = Collections.emptyMap()
+    val extraCredentials: util.Map[String, String] = Collections.emptyMap()
+    val clientRequestTimeout: io.airlift.units.Duration = new 
io.airlift.units.Duration(0, TimeUnit.MILLISECONDS)
+
+    new ClientSession(httpUri, user, source, traceToken, clientTags, 
clientInfo, catalog, schema, timeZonId, locale,
+      resourceEstimates, properties, preparedStatements, roles, 
extraCredentials, transactionId, clientRequestTimeout)
+  }
+
+  private def getUserCreatorLabel(labels: Array[Label[_]]): UserCreatorLabel = 
{
+    labels
+      .find(l => l.isInstanceOf[UserCreatorLabel])
+      .get
+      .asInstanceOf[UserCreatorLabel]
+  }
+
+  private def initialStatusUpdates(taskId: String, engineExecutorContext: 
EngineExecutionContext, statement: StatementClient): Unit = {
+    while (statement.isRunning
+      && (statement.currentData().getData == null || 
statement.currentStatusInfo().getUpdateType != null)) {
+      engineExecutorContext.pushProgress(progress(taskId), 
getProgressInfo(taskId))
+      statement.advance()
+    }
+  }
+
+  private def queryOutput(taskId: String, engineExecutorContext: 
EngineExecutionContext, statement: StatementClient): Unit = {
+    var columnCount = 0
+    var rows = 0
+    val resultSetWriter = 
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
+    Utils.tryFinally({
+      var results: QueryStatusInfo = null
+      if (statement.isRunning) {
+        results = statement.currentStatusInfo()
+      } else {
+        results = statement.finalStatusInfo()
+      }
+      if (results.getColumns == null) {
+        throw new RuntimeException("presto columns is null.")
+      }
+      val columns = results.getColumns.asScala
+        .map(column => Column(column.getName, column.getType, 
"")).toArray[Column]
+      columnCount = columns.length
+      resultSetWriter.addMetaData(new TableMetaData(columns))
+      while (statement.isRunning) {
+        val data = statement.currentData().getData
+        if (data != null) for (row <- data.asScala) {
+          val rowArray = row.asScala.map(r => String.valueOf(r))
+          resultSetWriter.addRecord(new TableRecord(rowArray.toArray))
+          rows += 1
+        }
+        engineExecutorContext.pushProgress(progress(taskId), 
getProgressInfo(taskId))
+        statement.advance()
+      }
+    })(IOUtils.closeQuietly(resultSetWriter))
+
+    info(s"Fetched $columnCount col(s) : $rows row(s) in presto")
+    engineExecutorContext.appendStdout(LogUtils.generateInfo(s"Fetched 
$columnCount col(s) : $rows row(s) in presto"));
+    engineExecutorContext.sendResultSet(resultSetWriter)
+    IOUtils.closeQuietly(resultSetWriter)
+  }
+
+  // check presto error
+  private def verifyServerError(taskId: String, engineExecutorContext: 
EngineExecutionContext, statement: StatementClient): Unit = {
+    engineExecutorContext.pushProgress(progress(taskId), 
getProgressInfo(taskId))
+    if (statement.isFinished) {
+      val info: QueryStatusInfo = statement.finalStatusInfo()
+      if (info.getError != null) {
+        val error = Objects.requireNonNull(info.getError);
+        val message: String = s"Presto execute failed (#${info.getId}): 
${error.getMessage}"
+        var cause: Throwable = null
+        if (error.getFailureInfo != null) {
+          cause = error.getFailureInfo.toException
+        }
+        throw new SQLException(message, error.getSqlState, error.getErrorCode, 
cause)
+      }
+    } else if (statement.isClientAborted) {
+      warn(s"Presto statement is killed.")
+    } else if (statement.isClientError) {
+      throw PrestoClientException("Presto client error.")
+    } else {
+      throw PrestoStateInvalidException("Presto status error. Statement is not 
finished.")
+    }
+  }
+
+  private def updateSession(clientSession: ClientSession, statement: 
StatementClient): ClientSession = {
+    var newSession = clientSession
+    // update catalog and schema if present
+    if (statement.getSetCatalog.isPresent || statement.getSetSchema.isPresent) 
{
+      newSession = ClientSession.builder(newSession)
+        .withCatalog(statement.getSetCatalog.orElse(newSession.getCatalog))
+        .withSchema(statement.getSetSchema.orElse(newSession.getSchema))
+        .build
+    }
+
+    // update transaction ID if necessary
+    if (statement.isClearTransactionId) newSession = 
ClientSession.stripTransactionId(newSession)
+
+    var builder: ClientSession.Builder = ClientSession.builder(newSession)
+
+    if (statement.getStartedTransactionId != null) builder = 
builder.withTransactionId(statement.getStartedTransactionId)
+
+    // update session properties if present
+    if (!statement.getSetSessionProperties.isEmpty || 
!statement.getResetSessionProperties.isEmpty) {
+      val sessionProperties: util.Map[String, String] = new 
util.HashMap[String, String](newSession.getProperties)
+      sessionProperties.putAll(statement.getSetSessionProperties)
+      sessionProperties.keySet.removeAll(statement.getResetSessionProperties)
+      builder = builder.withProperties(sessionProperties)
+    }
+
+    // update session roles
+    if (!statement.getSetRoles.isEmpty) {
+      val roles: util.Map[String, SelectedRole] = new util.HashMap[String, 
SelectedRole](newSession.getRoles)
+      roles.putAll(statement.getSetRoles)
+      builder = builder.withRoles(roles)
+    }
+
+    // update prepared statements if present
+    if (!statement.getAddedPreparedStatements.isEmpty || 
!statement.getDeallocatedPreparedStatements.isEmpty) {
+      val preparedStatements: util.Map[String, String] = new 
util.HashMap[String, String](newSession.getPreparedStatements)
+      preparedStatements.putAll(statement.getAddedPreparedStatements)
+      
preparedStatements.keySet.removeAll(statement.getDeallocatedPreparedStatements)
+      builder = builder.withPreparedStatements(preparedStatements)
+    }
+
+    newSession
+  }
+
+  override def killAll(): Unit = {
+
+  }
+
+}
+
+object PrestoEngineConnExecutor {
+
+  private val OK_HTTP_CLIENT: OkHttpClient = new 
OkHttpClient.Builder().socketFactory(new SocketChannelSocketFactory)
+    .connectTimeout(PRESTO_HTTP_CONNECT_TIME_OUT.getValue, TimeUnit.SECONDS)
+    .readTimeout(PRESTO_HTTP_READ_TIME_OUT.getValue, TimeUnit.SECONDS)
+    .build()
+
+}
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala
new file mode 100644
index 000000000..268f2e721
--- /dev/null
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/presto/src/main/scala/org/apache/linkis/engineplugin/presto/factory/PrestoEngineConnFactory.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.engineplugin.presto.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import 
org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory
+import org.apache.linkis.engineconn.executor.entity.LabelExecutor
+import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
+import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration
+import org.apache.linkis.engineplugin.presto.executer.PrestoEngineConnExecutor
+
+class PrestoEngineConnFactory extends 
ComputationSingleExecutorEngineConnFactory {
+
+  override def newExecutor(id: Int, engineCreationContext: 
EngineCreationContext, engineConn: EngineConn): LabelExecutor = {
+    new 
PrestoEngineConnExecutor(PrestoConfiguration.ENGINE_DEFAULT_LIMIT.getValue, id)
+  }
+
+  override protected def getEngineConnType: EngineType = EngineType.PRESTO
+
+  override protected def getRunType: RunType = RunType.PRESTO_SQL
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to