This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new dcbfa9647 Add READNE.md and executor tests for trino (#3483)
dcbfa9647 is described below
commit dcbfa964719198e1160c422fb3231d0034d3cb99
Author: Knypys <[email protected]>
AuthorDate: Tue Oct 18 10:15:14 2022 +0800
Add READNE.md and executor tests for trino (#3483)
---
docs/configuration/trino.md | 22 +++
docs/info-1.2.1.md | 35 +++--
docs/trino-usage.md | 150 +++++++++++++++++++++
.../trino/conf/TrinoConfiguration.scala | 40 +++---
.../trino/executor/TrinoEngineConnExecutor.scala | 50 +++----
.../executer/TestTrinoEngineConnExecutor.scala | 142 +++++++++++++++++++
6 files changed, 377 insertions(+), 62 deletions(-)
diff --git a/docs/configuration/trino.md b/docs/configuration/trino.md
new file mode 100644
index 000000000..21a33d5d4
--- /dev/null
+++ b/docs/configuration/trino.md
@@ -0,0 +1,22 @@
+## trino 配置
+
+| 模块名(服务名) | 参数名 | 默认值 |描述
|是否引用|
+| ------------ | ---------------------------------------|
----------------------|---------------------------------------- | ----- |
+| trino | linkis.trino.default.limit | 5000
| 查询的结果集返回条数限制 |
+| trino | linkis.trino.http.connectTimeout | 60
| 连接Trino服务器的超时时间(秒) |
+| trino | linkis.trino.http.readTimeout | 60
| 等待Trino服务器返回数据的超时时间(秒) |
+| trino | linkis.trino.resultSet.cache.max | 512k
| Trino结果集缓冲区大小 |
+| trino | linkis.trino.url |
http://127.0.0.1:8080 | Trino服务器URL |
+| trino | linkis.trino.user | null
| 用于连接Trino查询服务的用户名 |
+| trino | linkis.trino.password | null
| 用于连接Trino查询服务的密码 |
+| trino | linkis.trino.passwordCmd | null
| 用于连接Trino查询服务的密码回调命令 |
+| trino | linkis.trino.catalog | system
| 连接Trino查询时使用的catalog |
+| trino | linkis.trino.schema |
| 连接Trino查询服务的默认schema |
+| trino | linkis.trino.ssl.insecured | false
| 是否忽略服务器的SSL证书 |
+| trino | linkis.engineconn.concurrent.limit | 100
| 引擎最大并发 |
+| trino | linkis.trino.ssl.keystore | null
| Trino服务器SSL keystore路径 |
+| trino | linkis.trino.ssl.keystore.type | null
| Trino服务器SSL keystore类型 |
+| trino | linkis.trino.ssl.keystore.password | null
| Trino服务器SSL keystore密码 |
+| trino | linkis.trino.ssl.truststore | null
| Trino服务器SSL truststore路径 |
+| trino | linkis.trino.ssl.truststore.type | null
| Trino服务器SSL truststore类型 |
+| trino | linkis.trino.ssl.truststore.password | null
| Trino服务器SSL truststore密码 |
diff --git a/docs/info-1.2.1.md b/docs/info-1.2.1.md
index 4553a1bbc..3bf27564f 100644
--- a/docs/info-1.2.1.md
+++ b/docs/info-1.2.1.md
@@ -2,24 +2,23 @@
| 模块名(服务名)| 类型 | 参数名 | 默认值
| 描述 |
| ----------- | ----- |
-------------------------------------------------------- | ---------------- |
------------------------------------------------------- |
-|ps-metadataquery | 新增 | wds.linkis.server.mdq.mysql.relationship |
oracle,kingbase,postgresql,sqlserver,db2,greenplum,dm,mysql |
在mysql元数据服务兼容oracle,kingbase,postgresql,sqlserver,db2,greenplum,dm,驱动外部引入 |
-|cg-engineplugin | 新增 | wds.linkis.trino.default.limit | 5000 |
Trino查询的结果集返回条数限制 |
-|cg-engineplugin | 新增 | wds.linkis.trino.http.connectTimeout | 60 |
连接Trino服务器的超时时间 |
-|cg-engineplugin | 新增 | wds.linkis.trino.http.readTimeout | 60 |
等待Trino服务器返回数据的超时时间 |
-|cg-engineplugin | 新增 | wds.linkis.trino.resultSet.cache.max | 512k |
Trino结果集缓冲区大小 |
-|cg-engineplugin | 新增 | wds.linkis.trino.url | <http://127.0.0.1:8080> |
Trino服务器URL |
-|cg-engineplugin | 新增 | wds.linkis.trino.user | null | 用于连接Trino查询服务的用户名
|
-|cg-engineplugin | 新增 | wds.linkis.trino.password | null |
用于连接Trino查询服务的密码 |
-|cg-engineplugin | 新增 | wds.linkis.trino.passwordCmd | null |
用于连接Trino查询服务的密码回调命令 |
-|cg-engineplugin | 新增 | wds.linkis.trino.catalog | system |
连接Trino查询时使用的catalog |
-|cg-engineplugin | 新增 | wds.linkis.trino.schema | |
连接Trino查询服务的默认schema |
-|cg-engineplugin | 新增 | wds.linkis.trino.ssl.insecured | false |
是否忽略服务器的SSL证书 |
-|cg-engineplugin | 新增 | wds.linkis.trino.ssl.keystore | null |
keystore路径 |
-|cg-engineplugin | 新增 | wds.linkis.trino.ssl.keystore.type | null |
keystore类型 |
-|cg-engineplugin | 新增 | wds.linkis.trino.ssl.keystore.password | null |
keystore密码 |
-|cg-engineplugin | 新增 | wds.linkis.trino.ssl.truststore | null |
truststore路径 |
-|cg-engineplugin | 新增 | wds.linkis.trino.ssl.truststore.type | null |
truststore类型 |
-|cg-engineplugin | 新增 | wds.linkis.trino.ssl.truststore.password | null
| truststore密码 |
+|cg-engineplugin | 新增 | linkis.trino.default.limit | 5000 |
Trino查询的结果集返回条数限制 |
+|cg-engineplugin | 新增 | linkis.trino.http.connectTimeout | 60 |
连接Trino服务器的超时时间 |
+|cg-engineplugin | 新增 | linkis.trino.http.readTimeout | 60 |
等待Trino服务器返回数据的超时时间 |
+|cg-engineplugin | 新增 | linkis.trino.resultSet.cache.max | 512k |
Trino结果集缓冲区大小 |
+|cg-engineplugin | 新增 | linkis.trino.url | http://127.0.0.1:8080 |
Trino服务器URL |
+|cg-engineplugin | 新增 | linkis.trino.user | null | 用于连接Trino查询服务的用户名 |
+|cg-engineplugin | 新增 | linkis.trino.password | null | 用于连接Trino查询服务的密码 |
+|cg-engineplugin | 新增 | linkis.trino.passwordCmd | null |
用于连接Trino查询服务的密码回调命令 |
+|cg-engineplugin | 新增 | linkis.trino.catalog | system |
连接Trino查询时使用的catalog |
+|cg-engineplugin | 新增 | linkis.trino.schema | | 连接Trino查询服务的默认schema |
+|cg-engineplugin | 新增 | linkis.trino.ssl.insecured | false |
是否忽略服务器的SSL证书 |
+|cg-engineplugin | 新增 | linkis.trino.ssl.keystore | null | keystore路径 |
+|cg-engineplugin | 新增 | linkis.trino.ssl.keystore.type | null |
keystore类型 |
+|cg-engineplugin | 新增 | linkis.trino.ssl.keystore.password | null |
keystore密码 |
+|cg-engineplugin | 新增 | linkis.trino.ssl.truststore | null |
truststore路径 |
+|cg-engineplugin | 新增 | linkis.trino.ssl.truststore.type | null |
truststore类型 |
+|cg-engineplugin | 新增 | linkis.trino.ssl.truststore.password | null |
truststore密码 |
## 特性说明
diff --git a/docs/trino-usage.md b/docs/trino-usage.md
new file mode 100644
index 000000000..cfd199f8d
--- /dev/null
+++ b/docs/trino-usage.md
@@ -0,0 +1,150 @@
+---
+title: Trino 引擎
+sidebar_position: 12
+---
+
+本文主要介绍在 Linkis1.X 中,Trino 引擎的配置、部署和使用。
+
+## 1. 环境准备
+
+如果您希望在您的服务器上使用 Trino 引擎,您需要准备 Trino 服务并提供连接信息,如 Trino 集群的连接地址、用户名和密码等
+
+## 2. 部署和配置
+
+### 2.1 版本的选择和编译
+注意: 编译 Trino 引擎之前需要进行 Linkis 项目全量编译
+发布的安装部署包中默认不包含此引擎插件,
+你可以按[Linkis引擎安装指引](https://linkis.apache.org/zh-CN/blog/2022/04/15/how-to-download-engineconn-plugin)部署安装
,或者按以下流程,手动编译部署
+
+单独编译 Trino 引擎
+
+```
+${linkis_code_dir}/linkis-engineconn-plugins/trino/
+mvn clean install
+```
+
+### 2.2 物料的部署和加载
+
+将 2.1 步编译出来的引擎包,位于
+```bash
+${linkis_code_dir}/linkis-engineconn-plugins/trino/target/out/trino
+```
+上传到服务器的引擎目录下
+```bash
+${LINKIS_HOME}/lib/linkis-engineplugins
+```
+并重启linkis-engineplugin(或者通过引擎接口进行刷新)
+```bash
+cd ${LINKIS_HOME}/sbin
+sh linkis-daemon.sh restart cg-engineplugin
+```
+### 2.3 引擎的标签
+
+Linkis1.X是通过标签来进行的,所以需要在我们数据库中插入数据,插入的方式如下文所示。
+
+管理台的配置是按照引擎标签来进行管理的,如果新增的引擎,有配置参数需要配置的话,需要修改对应的表的元数据
+
+```
+linkis_ps_configuration_config_key: 插入引擎的配置参数的key和默认values
+linkis_cg_manager_label:插入引擎label如:hive-2.3.3
+linkis_ps_configuration_category: 插入引擎的目录关联关系
+linkis_ps_configuration_config_value: 插入引擎需要展示的配置
+linkis_ps_configuration_key_engine_relation:配置项和引擎的关联关系
+```
+
+```sql
+-- set variable
+SET @ENGINE_LABEL="trino-371";
+SET @ENGINE_IDE=CONCAT('*-IDE,',@ENGINE_LABEL);
+SET @ENGINE_ALL=CONCAT('*-*,',@TRINO_LABEL);
+SET @ENGINE_NAME="trino";
+
+-- add trino engine to IDE
+insert into `linkis_cg_manager_label` (`label_key`, `label_value`,
`label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES
('combined_userCreator_engineType', @ENGINE_ALL, 'OPTIONAL', 2, now(), now());
+insert into `linkis_cg_manager_label` (`label_key`, `label_value`,
`label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES
('combined_userCreator_engineType', @ENGINE_IDE, 'OPTIONAL', 2, now(), now());
+select @label_id := id from `linkis_cg_manager_label` where label_value =
@ENGINE_IDE;
+insert into `linkis_ps_configuration_category` (`label_id`, `level`) VALUES
(@label_id, 2);
+
+-- insert configuration key
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.default.limit', '查询的结果集返回条数限制', '结果集条数限制', '5000', 'None', '',
@ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.http.connectTimeout', '连接Trino服务器的超时时间', '连接超时时间(秒)', '60',
'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.http.readTimeout', '等待Trino服务器返回数据的超时时间', '传输超时时间(秒)', '60',
'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.resultSet.cache.max', 'Trino结果集缓冲区大小', '结果集缓冲区', '512k', 'None',
'', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.trino.url',
'Trino服务器URL', 'Trino服务器URL', 'http://127.0.0.1:8080', 'None', '',
@ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.trino.user',
'用于连接Trino查询服务的用户名', '用户名', 'null', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.password', '用于连接Trino查询服务的密码', '密码', 'null', 'None', '',
@ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.passwordCmd', '用于连接Trino查询服务的密码回调命令', '密码回调命令', 'null', 'None',
'', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.catalog', '连接Trino查询时使用的catalog', 'Catalog', 'system', 'None',
'', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES ('linkis.trino.schema',
'连接Trino查询服务的默认schema', 'Schema', '', 'None', '', @ENGINE_NAME, 0, 0, 1,
'数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.ssl.insecured', '是否忽略服务器的SSL证书', '验证SSL证书', 'false', 'None', '',
@ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.engineconn.concurrent.limit', '引擎最大并发', '引擎最大并发', '100', 'None', '',
@ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.ssl.keystore', 'Trino服务器SSL keystore路径', 'keystore路径', 'null',
'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.ssl.keystore.type', 'Trino服务器SSL keystore类型', 'keystore类型',
'null', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.ssl.keystore.password', 'Trino服务器SSL keystore密码', 'keystore密码',
'null', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.ssl.truststore', 'Trino服务器SSL truststore路径', 'truststore路径',
'null', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.ssl.truststore.type', 'Trino服务器SSL truststore类型',
'truststore类型', 'null', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`,
`name`, `default_value`, `validate_type`, `validate_range`, `engine_conn_type`,
`is_hidden`, `is_advanced`, `level`, `treeName`) VALUES
('linkis.trino.ssl.truststore.password', 'Trino服务器SSL truststore密码',
'truststore密码', 'null', 'None', '', @ENGINE_NAME, 0, 0, 1, '数据源配置');
+
+
+-- trino engine -*
+insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`,
`engine_type_label_id`)
+(select config.id as config_key_id, label.id AS engine_type_label_id FROM
`linkis_ps_configuration_config_key` config
+INNER JOIN `linkis_cg_manager_label` label ON config.engine_conn_type =
@ENGINE_NAME and label_value = @ENGINE_ALL);
+
+-- trino engine default configuration
+insert into `linkis_ps_configuration_config_value` (`config_key_id`,
`config_value`, `config_label_id`)
+(select relation.config_key_id AS config_key_id, '' AS config_value,
relation.engine_type_label_id AS config_label_id FROM
`linkis_ps_configuration_key_engine_relation` relation
+INNER JOIN `linkis_cg_manager_label` label ON relation.engine_type_label_id =
label.id AND label.label_value = @ENGINE_ALL);
+
+```
+
+### 2.4 Trino 引擎相关配置
+
+| 配置 | 默认值 |是否必须 | 说明
|
+| ---------------------------------------|
----------------------|--------|---------------------------------------- |
+| linkis.trino.default.limit | 5000 | 是 |
查询的结果集返回条数限制 |
+| linkis.trino.http.connectTimeout | 60 | 是 |
连接Trino服务器的超时时间 |
+| linkis.trino.http.readTimeout | 60 | 是 |
等待Trino服务器返回数据的超时时间 |
+| linkis.trino.resultSet.cache.max | 512k | 是 |
Trino结果集缓冲区大小 |
+| linkis.trino.url | http://127.0.0.1:8080 | 是 |
Trino服务器URL |
+| linkis.trino.user | null | 否 |
用于连接Trino查询服务的用户名 |
+| linkis.trino.password | null | 否 |
用于连接Trino查询服务的密码 |
+| linkis.trino.passwordCmd | null | 否 |
用于连接Trino查询服务的密码回调命令 |
+| linkis.trino.catalog | system | 否 |
连接Trino查询时使用的catalog |
+| linkis.trino.schema | | 否 |
连接Trino查询服务的默认schema |
+| linkis.trino.ssl.insecured | false | 是 |
是否忽略服务器的SSL证书 |
+| linkis.engineconn.concurrent.limit | 100 | 否 |
引擎最大并发 |
+| linkis.trino.ssl.keystore | null | 否 |
Trino服务器SSL keystore路径 |
+| linkis.trino.ssl.keystore.type | null | 否 |
Trino服务器SSL keystore类型 |
+| linkis.trino.ssl.keystore.password | null | 否 |
Trino服务器SSL keystore密码 |
+| linkis.trino.ssl.truststore | null | 否 |
Trino服务器SSL truststore路径 |
+| linkis.trino.ssl.truststore.type | null | 否 |
Trino服务器SSL truststore类型 |
+| linkis.trino.ssl.truststore.password | null | 否 |
Trino服务器SSL truststore密码 |
+
+## 3. Trino
+### 3.1 准备操作
+您需要配置Trino的连接信息,包括连接地址信息或用户名密码(如果启用)等信息。
+
+
+
+图3-1 Trino配置信息
+
+您也可以再提交任务接口中的params.configuration.runtime进行修改即可
+```shell
+linkis.trino.url
+linkis.trino.user
+linkis.trino.password
+```
+
+### 3.2 通过Linkis-cli进行任务提交
+**使用示例**
+
+Linkis 1.0后提供了cli的方式提交任务,我们只需要指定对应的EngineConn标签类型即可,Trino的使用如下:
+
+```shell
+ sh ./bin/linkis-cli -submitUser trino -engineType trino-371 -code 'select *
from default.test limit 10' -runtimeMap linkis.es.http.method=GET -runtimeMap
linkis.trino.url=127.0.0.1:8080
+```
+
+## 4. Trino引擎的用户设置
+
+Trino的用户设置主要是设置Trino的连接信息,但是建议用户将此密码等信息进行加密管理。
\ No newline at end of file
diff --git
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
index 42ac6e4cb..e5c6f5186 100644
---
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
+++
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/conf/TrinoConfiguration.scala
@@ -17,48 +17,48 @@
package org.apache.linkis.engineplugin.trino.conf
-import org.apache.linkis.common.conf.{ByteType, CommonVars}
+import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.storage.utils.StorageConfiguration
import java.lang
object TrinoConfiguration {
- val ENGINE_CONCURRENT_LIMIT =
CommonVars[Int]("wds.linkis.engineconn.concurrent.limit", 100)
+ val ENGINE_CONCURRENT_LIMIT =
CommonVars[Int]("linkis.engineconn.concurrent.limit", 100)
- val DEFAULT_LIMIT = CommonVars[Int]("wds.linkis.trino.default.limit", 5000)
+ val DEFAULT_LIMIT = CommonVars[Int]("linkis.trino.default.limit", 5000)
val TRINO_HTTP_CONNECT_TIME_OUT =
- CommonVars[java.lang.Long]("wds.linkis.trino.http.connectTimeout.seconds",
new lang.Long(60))
+ CommonVars[java.lang.Long]("linkis.trino.http.connectTimeout.seconds", new
lang.Long(60))
val TRINO_HTTP_READ_TIME_OUT =
- CommonVars[java.lang.Long]("wds.linkis.trino.http.readTimeout.seconds",
new lang.Long(60))
+ CommonVars[java.lang.Long]("linkis.trino.http.readTimeout.seconds", new
lang.Long(60))
- val TRINO_URL = CommonVars[String]("wds.linkis.trino.url",
"http://127.0.0.1:8080")
+ val TRINO_URL = CommonVars[String]("linkis.trino.url",
"http://127.0.0.1:8080")
- val TRINO_PASSWORD = CommonVars[String]("wds.linkis.trino.password", null)
- val TRINO_PASSWORD_CMD = CommonVars[String]("wds.linkis.trino.password.cmd",
null)
- val TRINO_CATALOG = CommonVars[String]("wds.linkis.trino.catalog", "system")
- val TRINO_SCHEMA = CommonVars[String]("wds.linkis.trino.schema", "")
- val TRINO_SOURCE = CommonVars[String]("wds.linkis.trino.source", "global")
+ val TRINO_PASSWORD = CommonVars[String]("linkis.trino.password", null)
+ val TRINO_PASSWORD_CMD = CommonVars[String]("linkis.trino.password.cmd",
null)
+ val TRINO_CATALOG = CommonVars[String]("linkis.trino.catalog", "system")
+ val TRINO_SCHEMA = CommonVars[String]("linkis.trino.schema", "")
+ val TRINO_SOURCE = CommonVars[String]("linkis.trino.source", "global")
- val TRINO_SSL_INSECURED =
CommonVars[Boolean]("wds.linkis.trino.ssl.insecured", true)
- val TRINO_SSL_KEYSTORE = CommonVars[String]("wds.linkis.trino.ssl.keystore",
null)
- val TRINO_SSL_KEYSTORE_TYPE =
CommonVars[String]("wds.linkis.trino.ssl.keystore.type", null)
+ val TRINO_SSL_INSECURED = CommonVars[Boolean]("linkis.trino.ssl.insecured",
true)
+ val TRINO_SSL_KEYSTORE = CommonVars[String]("linkis.trino.ssl.keystore",
null)
+ val TRINO_SSL_KEYSTORE_TYPE =
CommonVars[String]("linkis.trino.ssl.keystore.type", null)
val TRINO_SSL_KEYSTORE_PASSWORD =
- CommonVars[String]("wds.linkis.trino.ssl.keystore.password", null)
+ CommonVars[String]("linkis.trino.ssl.keystore.password", null)
- val TRINO_SSL_TRUSTSTORE =
CommonVars[String]("wds.linkis.trino.ssl.truststore", null)
- val TRINO_SSL_TRUSTSTORE_TYPE =
CommonVars[String]("wds.linkis.trino.ssl.truststore.type", null)
+ val TRINO_SSL_TRUSTSTORE = CommonVars[String]("linkis.trino.ssl.truststore",
null)
+ val TRINO_SSL_TRUSTSTORE_TYPE =
CommonVars[String]("linkis.trino.ssl.truststore.type", null)
val TRINO_SSL_TRUSTSTORE_PASSWORD =
- CommonVars[String]("wds.linkis.trino.ssl.truststore.password", null)
+ CommonVars[String]("linkis.trino.ssl.truststore.password", null)
- val TRINO_FORBID_GRANT =
CommonVars[Boolean]("wds.linkis.trino.forbid.grant", true)
+ val TRINO_FORBID_GRANT = CommonVars[Boolean]("linkis.trino.forbid.grant",
true)
val TRINO_FORBID_MODIFY_SCHEMA =
- CommonVars[Boolean]("wds.linkis.trino.forbid.modifySchema", true)
+ CommonVars[Boolean]("linkis.trino.forbid.modifySchema", true)
val TRINO_USER_ISOLATION_MODE =
CommonVars[Boolean]("linkis.trino.user.isolation.mode", false)
diff --git
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
index f9ecd2f0d..a1089ec62 100644
---
a/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/trino/src/main/scala/org/apache/linkis/engineplugin/trino/executor/TrinoEngineConnExecutor.scala
@@ -20,7 +20,6 @@ package org.apache.linkis.engineplugin.trino.executor
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{OverloadUtils, Utils}
import org.apache.linkis.engineconn.common.conf.{EngineConnConf,
EngineConnConstant}
-import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
import org.apache.linkis.engineconn.computation.executor.execute.{
ConcurrentComputationExecutor,
EngineExecutionContext
@@ -70,7 +69,7 @@ import javax.security.auth.callback.PasswordCallback
import java.net.URI
import java.util
import java.util._
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.{Callable, ConcurrentHashMap, TimeUnit}
import java.util.function.Supplier
import scala.collection.JavaConverters._
@@ -144,26 +143,6 @@ class TrinoEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
super.init
}
- override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
- val user = getCurrentUser(engineConnTask.getLables)
- val userCreatorLabel =
engineConnTask.getLables.find(_.isInstanceOf[UserCreatorLabel]).get
- val engineTypeLabel =
engineConnTask.getLables.find(_.isInstanceOf[EngineTypeLabel]).get
- var configMap: util.Map[String, String] = null
- if (userCreatorLabel != null && engineTypeLabel != null) {
- configMap = TrinoEngineConfig.getCacheMap(
- (
- userCreatorLabel.asInstanceOf[UserCreatorLabel],
- engineTypeLabel.asInstanceOf[EngineTypeLabel]
- )
- )
- }
- clientSessionCache.put(
- engineConnTask.getTaskId,
- getClientSession(user, engineConnTask.getProperties, configMap)
- )
- super.execute(engineConnTask)
- }
-
override def executeLine(
engineExecutorContext: EngineExecutionContext,
code: String
@@ -180,13 +159,36 @@ class TrinoEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
TrinoCode.checkCode(realCode)
logger.info(s"trino client begins to run psql code:\n $realCode")
+ val currentUser = getCurrentUser(engineExecutorContext.getLabels)
val trinoUser = Optional
.ofNullable(TRINO_DEFAULT_USER.getValue)
.orElseGet(new Supplier[String] {
- override def get(): String =
getCurrentUser(engineExecutorContext.getLabels)
+ override def get(): String = currentUser
})
val taskId = engineExecutorContext.getJobId.get
- val clientSession = clientSessionCache.getIfPresent(taskId)
+ val clientSession = clientSessionCache.get(
+ taskId,
+ new Callable[ClientSession] {
+ override def call(): ClientSession = {
+ val userCreatorLabel =
+
engineExecutorContext.getLabels.find(_.isInstanceOf[UserCreatorLabel]).get
+ val engineTypeLabel =
+
engineExecutorContext.getLabels.find(_.isInstanceOf[EngineTypeLabel]).get
+ var configMap: util.Map[String, String] = null
+ if (userCreatorLabel != null && engineTypeLabel != null) {
+ configMap = Utils.tryAndError(
+ TrinoEngineConfig.getCacheMap(
+ (
+ userCreatorLabel.asInstanceOf[UserCreatorLabel],
+ engineTypeLabel.asInstanceOf[EngineTypeLabel]
+ )
+ )
+ )
+ }
+ getClientSession(currentUser, engineExecutorContext.getProperties,
configMap)
+ }
+ }
+ )
val statement = StatementClientFactory.newStatementClient(
okHttpClientCache.computeIfAbsent(trinoUser, buildOkHttpClient),
clientSession,
diff --git
a/linkis-engineconn-plugins/trino/src/test/scala/org/apache/linkis/engineplugin/trino/executer/TestTrinoEngineConnExecutor.scala
b/linkis-engineconn-plugins/trino/src/test/scala/org/apache/linkis/engineplugin/trino/executer/TestTrinoEngineConnExecutor.scala
new file mode 100644
index 000000000..d8d2d66be
--- /dev/null
+++
b/linkis-engineconn-plugins/trino/src/test/scala/org/apache/linkis/engineplugin/trino/executer/TestTrinoEngineConnExecutor.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.trino.executer
+
+import org.apache.linkis.common.ServiceInstance
+import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.utils.Utils
+import org.apache.linkis.engineconn.common.creation.{
+ DefaultEngineCreationContext,
+ EngineCreationContext
+}
+import
org.apache.linkis.engineconn.computation.executor.entity.CommonEngineConnTask
+import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+import
org.apache.linkis.engineconn.computation.executor.utlis.ComputationEngineConstant
+import org.apache.linkis.engineplugin.trino.executor.TrinoEngineConnExecutor
+import org.apache.linkis.engineplugin.trino.factory.TrinoEngineConnFactory
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
+import org.apache.linkis.governance.common.utils.EngineConnArgumentsParser
+import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
+import org.apache.linkis.manager.label.builder.factory.{
+ LabelBuilderFactory,
+ LabelBuilderFactoryContext
+}
+import org.apache.linkis.manager.label.entity.Label
+
+import java.util
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import org.junit.jupiter.api.{Assertions, Test}
+
+class TestTrinoEngineConnExecutor {
+
+ private val engineCreationContext: EngineCreationContext = new
DefaultEngineCreationContext
+
+ private val labelBuilderFactory: LabelBuilderFactory =
+ LabelBuilderFactoryContext.getLabelBuilderFactory
+
+// @Test
+ def testExecuteLine: Unit = {
+ val engineconnConf = "--engineconn-conf"
+ val springConf = "--spring-conf"
+ val array = Array(
+ engineconnConf,
+ "wds.linkis.rm.instance=10",
+ engineconnConf,
+ "label.userCreator=root-IDE",
+ engineconnConf,
+ "ticketId=037ab855-0c41-4323-970d-7f75e71883b6",
+ engineconnConf,
+ "label.engineType=trino",
+ engineconnConf,
+ "linkis.trino.url=https://trino.dev.com/hive/hivetest",
+ engineconnConf,
+ "linkis.trino.ssl.insecured=true",
+ engineconnConf,
+ "linkis.trino.default.start.user=root",
+ engineconnConf,
+ "linkis.trino.password=123456",
+ springConf,
+ "eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/",
+ springConf,
+ "logging.config=classpath:log4j2.xml",
+ springConf,
+ "spring.profiles.active=engineconn",
+ springConf,
+ "server.port=35655",
+ springConf,
+ "spring.application.name=linkis-cg-engineconn"
+ )
+ this.init(array)
+ val cmd = "SHOW SCHEMAS"
+ val taskId = "1"
+ val task = new CommonEngineConnTask(taskId, false)
+ val properties = new util.HashMap[String, Object]
+ task.setProperties(properties)
+ task.data(ComputationEngineConstant.LOCK_TYPE_NAME, "lock")
+ task.setStatus(ExecutionNodeStatus.Scheduled)
+ val engineFactory: TrinoEngineConnFactory = new TrinoEngineConnFactory
+ val engine = engineFactory.createEngineConn(engineCreationContext)
+
+ val jdbcExecutor: TrinoEngineConnExecutor = engineFactory
+ .newExecutor(1, engineCreationContext, engine)
+ .asInstanceOf[TrinoEngineConnExecutor]
+ val engineExecutionContext = new EngineExecutionContext(jdbcExecutor,
Utils.getJvmUser)
+ engineExecutionContext.setJobId(taskId)
+ val anyArray = engineCreationContext.getLabels().toArray()
+ engineExecutionContext.setLabels(anyArray.map(_.asInstanceOf[Label[_]]))
+ val testPath = this.getClass.getClassLoader.getResource("").getPath
+ engineExecutionContext.setStorePath(testPath)
+ engineCreationContext.getOptions.foreach({ case (key, value) =>
+ engineExecutionContext.addProperty(key, value)
+ })
+ Assertions.assertNotNull(jdbcExecutor.getProgressInfo(taskId))
+ val response = jdbcExecutor.executeLine(engineExecutionContext, cmd)
+ Assertions.assertNotNull(response)
+ }
+
+ private def init(args: Array[String]): Unit = {
+ val arguments =
EngineConnArgumentsParser.getEngineConnArgumentsParser.parseToObj(args)
+ val engineConf = arguments.getEngineConnConfMap
+ this.engineCreationContext.setUser(engineConf.getOrElse("user",
Utils.getJvmUser))
+ this.engineCreationContext.setTicketId(engineConf.getOrElse("ticketId",
""))
+ val host = CommonVars(Environment.ECM_HOST.toString, "127.0.0.1").getValue
+ val port = CommonVars(Environment.ECM_PORT.toString, "80").getValue
+ this.engineCreationContext.setEMInstance(
+
ServiceInstance(GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME.getValue,
s"$host:$port")
+ )
+ val labels = new ArrayBuffer[Label[_]]
+ val labelArgs =
engineConf.filter(_._1.startsWith(EngineConnArgumentsParser.LABEL_PREFIX))
+ if (labelArgs.nonEmpty) {
+ labelArgs.foreach { case (key, value) =>
+ labels += labelBuilderFactory
+
.createLabel[Label[_]](key.replace(EngineConnArgumentsParser.LABEL_PREFIX, ""),
value)
+ }
+ engineCreationContext.setLabels(labels.toList)
+ }
+ val jMap = new java.util.HashMap[String, String](engineConf.size)
+ jMap.putAll(engineConf)
+ this.engineCreationContext.setOptions(jMap)
+ this.engineCreationContext.setArgs(args)
+ sys.props.putAll(jMap)
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]