Hisoka-X commented on code in PR #9688:
URL: https://github.com/apache/seatunnel/pull/9688#discussion_r2344352837
##########
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java:
##########
@@ -831,4 +847,87 @@ private ChangeStreamTableSourceCheckpoint
getSourceCheckpoint(
.collect(Collectors.toList());
return new ChangeStreamTableSourceCheckpoint(coordinatorState,
subtaskState);
}
+
+ private Config getMetalakeConfig(Config jobConfigTmp) {
+ Config update = jobConfigTmp;
+ String metalakeType = System.getenv("METALAKE_TYPE");
+ String metalakeUrl = System.getenv("METALAKE_URL");
+
+ MetalakeClient metalakeClient =
MetalakeClientFactory.create(metalakeType, metalakeUrl);
+
+ try {
+ ConfigList sourceList = jobConfigTmp.getList("source");
+ List<ConfigValue> newSourceList = new ArrayList<>(sourceList);
+
+ for (int i = 0; i < sourceList.size(); i++) {
+ ConfigObject sourceObj = (ConfigObject) sourceList.get(i);
+ if (sourceObj.containsKey("sourceId")) {
+ ConfigObject tmp = sourceObj;
+ String sourceId =
sourceObj.toConfig().getString("sourceId");
+ JsonNode metalakeJson =
metalakeClient.getMetaInfo(sourceId);
+ for (Map.Entry<String, ConfigValue> entry :
sourceObj.entrySet()) {
+ String subKey = entry.getKey();
+ ConfigValue value = entry.getValue();
+
+ if (value.valueType() == ConfigValueType.STRING) {
+ String strValue = (String) value.unwrapped();
+ if (strValue.startsWith("${") &&
strValue.endsWith("}")) {
+ String placeholder = strValue.substring(2,
strValue.length() - 1);
+
+ if (metalakeJson.has(placeholder)) {
+ String replaced =
metalakeJson.get(placeholder).asText();
+ tmp =
+ tmp.withValue(
+ subKey,
+
ConfigValueFactory.fromAnyRef(replaced));
+ }
Review Comment:
Please reuse `PlaceholderUtils`.
##########
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java:
##########
@@ -831,4 +847,87 @@ private ChangeStreamTableSourceCheckpoint
getSourceCheckpoint(
.collect(Collectors.toList());
return new ChangeStreamTableSourceCheckpoint(coordinatorState,
subtaskState);
}
+
+ private Config getMetalakeConfig(Config jobConfigTmp) {
Review Comment:
Could you do some refactor? I see this method three times.
##########
docs/zh/concept/metalake.md:
##########
@@ -0,0 +1,69 @@
+# METALAKE
+
+由于seatunnel在执行任务时,需要将数据库用户名与密码等隐私信息明文写在脚本中,可能会导致信息泄露;并且维护较为困难,数据源信息发生变更时可能需要手动更改。
+
+因此引入了metalake,将数据源的信息存储于Apache
Gravitino等metalake中,任务脚本采用`sourId`和占位符的方法来代替原本的用户名和密码等信息,运行时seatunnel-engine通过http请求从metalake获取信息,根据占位符进行替换。
Review Comment:
```suggestion
因此引入了metalake,将数据源的信息存储于Apache
Gravitino等metalake中,任务脚本采用`sourceId`和占位符的方法来代替原本的用户名和密码等信息,运行时seatunnel-engine通过http请求从metalake获取信息,根据占位符进行替换。
```
##########
docs/zh/concept/metalake.md:
##########
@@ -0,0 +1,69 @@
+# METALAKE
+
+由于seatunnel在执行任务时,需要将数据库用户名与密码等隐私信息明文写在脚本中,可能会导致信息泄露;并且维护较为困难,数据源信息发生变更时可能需要手动更改。
Review Comment:
```suggestion
由于SeaTunnel在执行任务时,需要将数据库用户名与密码等隐私信息明文写在脚本中,可能会导致信息泄露;并且维护较为困难,数据源信息发生变更时可能需要手动更改。
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/MetalakeIT.java:
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.PullPolicy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import com.github.dockerjava.api.DockerClient;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.awaitility.Awaitility.given;
+
+public class MetalakeIT extends SeaTunnelContainer {
Review Comment:
I think we don't need too complex test case. Just verify the job config has
right password/username after parse by metalake. Not need to run it. Also we
can mock environment by
https://github.com/apache/seatunnel/blob/dac8fda7f2dfa4fbc7bf4e7e567b7f52580e6f8e/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java#L758
##########
docs/zh/concept/metalake.md:
##########
@@ -0,0 +1,69 @@
+# METALAKE
+
+由于seatunnel在执行任务时,需要将数据库用户名与密码等隐私信息明文写在脚本中,可能会导致信息泄露;并且维护较为困难,数据源信息发生变更时可能需要手动更改。
+
+因此引入了metalake,将数据源的信息存储于Apache
Gravitino等metalake中,任务脚本采用`sourId`和占位符的方法来代替原本的用户名和密码等信息,运行时seatunnel-engine通过http请求从metalake获取信息,根据占位符进行替换。
+
+若要使用metalake,首先要修改**seatunnel-env.sh**中的环境变量:
+
+* `METALAKE_ENABLED`
+* `METALAKE_TYPE`
+* `METALAKE_URL`
+
+将`METALAKE_ENABLED`设为`true`,`METALAKE_TYPE`当前仅支持设为`gravitino`。
+
+对于Apache Gravitino,`METALAKE_URL`设为
+
+```
+http://host:port/api/metalakes/your_metalake_name/catalogs/
+```
+
+---
+
+## 使用示例:
+
+用户需要先在Gravitino中创建catalog,如
+
+```bash
+curl -L 'http://localhost:8090/api/metalakes/test_metalake/catalogs'
+-H 'Content-Type: application/json'
+-H 'Accept: application/vnd.gravitino.v1+json'
+-d '{
+ "name": "test_catalog",
+ "type": "relational",
+ "provider": "jdbc-mysql",
+ "comment": "for metalake test",
+ "properties": {
+ "jdbc-driver": "com.mysql.cj.jdbc.Driver",
+ "jdbc-url": "not used",
+ "jdbc-user": "root",
+ "jdbc-password": "Abc!@#135_seatunnel"
+ }
+}'
+```
+
+这样便在`test_metalake`中创建了一个`test_catalog`(`metalake`需要提前创建)
+
+于是`METALAKE_URL`可以设为
+
+```
+http://localhost:8090/api/metalakes/test_metalake/catalogs/
+```
+
+source可以写为
+
+```
+source {
+ Jdbc {
+ url =
"jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true"
+ driver = "${jdbc-driver}"
+ connection_check_timeout_sec = 100
+ sourceId = "test_catalog"
Review Comment:
why use sourceId not source_name?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]