This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3af6b2f2c6 [Feature][Zeta] Add tag to node used to filter worker when
submit job (#7045)
3af6b2f2c6 is described below
commit 3af6b2f2c6be41364661bd6872bd99e139986877
Author: Jarvis <[email protected]>
AuthorDate: Wed Jun 26 20:39:14 2024 +0800
[Feature][Zeta] Add tag to node used to filter worker when submit job
(#7045)
---
README.md | 2 +-
docs/en/about.md | 2 +-
docs/en/faq.md | 6 +-
docs/en/seatunnel-engine/resource-isolation.md | 83 ++++++++++++++
docs/{en => }/images/architecture_diagram.png | Bin
docs/{en => }/images/azkaban.png | Bin
docs/{en => }/images/checkstyle.png | Bin
docs/{en => }/images/kafka.png | Bin
docs/images/resource-isolation.png | Bin 0 -> 69956 bytes
docs/{en => }/images/seatunnel-workflow.svg | 0
docs/{en => }/images/seatunnel_architecture.png | Bin
docs/{en => }/images/seatunnel_starter.png | Bin
docs/{en => }/images/workflow.png | Bin
docs/sidebars.js | 3 +-
docs/zh/about.md | 2 +-
docs/zh/faq.md | 6 +-
docs/zh/images/architecture_diagram.png | Bin 77929 -> 0 bytes
docs/zh/images/azkaban.png | Bin 732486 -> 0 bytes
docs/zh/images/checkstyle.png | Bin 479660 -> 0 bytes
docs/zh/images/kafka.png | Bin 32151 -> 0 bytes
docs/zh/images/seatunnel-workflow.svg | 4 -
docs/zh/images/seatunnel_architecture.png | Bin 778394 -> 0 bytes
docs/zh/images/seatunnel_starter.png | Bin 423840 -> 0 bytes
docs/zh/images/workflow.png | Bin 258921 -> 0 bytes
docs/zh/seatunnel-engine/resource-isolation.md | 83 ++++++++++++++
.../api/configuration/ReadonlyConfig.java | 6 +-
.../apache/seatunnel/api/env/EnvCommonOptions.java | 6 ++
seatunnel-core/README.md | 2 +-
seatunnel-core/seatunnel-core-starter/README.md | 2 +-
.../e2e/resourceIsolation/ResourceIsolationIT.java | 64 +++++++++++
.../src/test/resources/hazelcast.yaml | 7 ++
.../resource-isolation/fakesource_to_console.conf | 54 +++++-----
.../fakesource_to_console_tag_not_match.conf | 54 +++++-----
.../src/main/resources/hazelcast.yaml | 2 +-
.../core/parse/MultipleTableJobConfigParser.java | 7 +-
.../server/dag/physical/PhysicalPlanGenerator.java | 11 +-
.../engine/server/dag/physical/ResourceUtils.java | 10 +-
.../engine/server/dag/physical/SubPlan.java | 5 +-
.../resourcemanager/AbstractResourceManager.java | 48 ++++++++-
.../server/resourcemanager/ResourceManager.java | 7 +-
.../resourcemanager/ResourceRequestHandler.java | 11 +-
.../resourcemanager/worker/WorkerProfile.java | 3 +
.../server/service/slot/DefaultSlotService.java | 1 +
.../engine/server/AbstractSeaTunnelServerTest.java | 56 +++++-----
.../resourcemanager/FakeResourceManager.java | 13 ++-
.../resourcemanager/FixSlotResourceTest.java | 6 +-
.../resourcemanager/ResourceManagerTest.java | 8 +-
.../server/resourcemanager/WorkerTagTest.java | 120 +++++++++++++++++++++
tools/documents/sync.sh | 2 +-
49 files changed, 567 insertions(+), 129 deletions(-)
diff --git a/README.md b/README.md
index 6e3d5d8e0f..5fa0d25501 100644
--- a/README.md
+++ b/README.md
@@ -61,7 +61,7 @@ SeaTunnel addresses common data integration challenges:
## SeaTunnel Workflow
-
+
Configure jobs, select execution engines, and parallelize data using Source
Connectors. Easily develop and extend connectors to meet your needs.
diff --git a/docs/en/about.md b/docs/en/about.md
index 38a5fe9545..5164dc081c 100644
--- a/docs/en/about.md
+++ b/docs/en/about.md
@@ -34,7 +34,7 @@ SeaTunnel focuses on data integration and data
synchronization, and is mainly de
## SeaTunnel work flowchart
-
+
The runtime process of SeaTunnel is shown in the figure above.
diff --git a/docs/en/faq.md b/docs/en/faq.md
index 7ff275f381..953cc2a956 100644
--- a/docs/en/faq.md
+++ b/docs/en/faq.md
@@ -65,9 +65,9 @@ Refer to:
[lightbend/config#456](https://github.com/lightbend/config/issues/456)
Of course! See the screenshot below:
-
+
-
+
## Does SeaTunnel have a case for configuring multiple sources, such as
configuring elasticsearch and hdfs in source at the same time?
@@ -184,7 +184,7 @@ The following conclusions can be drawn:
3. In general, both M and N are determined, and the conclusion can be drawn
from 2: The size of `spark.streaming.kafka.maxRatePerPartition` is positively
correlated with the size of `spark.executor.cores` *
`spark.executor.instances`, and it can be increased while increasing the
resource `maxRatePerPartition` to speed up consumption.
-
+
## How can I solve the Error `Exception in thread "main"
java.lang.NoSuchFieldError: INSTANCE`?
diff --git a/docs/en/seatunnel-engine/resource-isolation.md
b/docs/en/seatunnel-engine/resource-isolation.md
new file mode 100644
index 0000000000..f123e80982
--- /dev/null
+++ b/docs/en/seatunnel-engine/resource-isolation.md
@@ -0,0 +1,83 @@
+---
+
+sidebar_position: 9
+-------------------
+
+After version 2.3.6. SeaTunnel can add `tag` to each worker node, when you
submit job you can use `tag_filter` to filter the node you want run this job.
+
+# How to archive this:
+
+1. update the config in `hazelcast.yaml`,
+
+```yaml
+hazelcast:
+ cluster-name: seatunnel
+ network:
+ rest-api:
+ enabled: true
+ endpoint-groups:
+ CLUSTER_WRITE:
+ enabled: true
+ DATA:
+ enabled: true
+ join:
+ tcp-ip:
+ enabled: true
+ member-list:
+ - localhost
+ port:
+ auto-increment: false
+ port: 5801
+ properties:
+ hazelcast.invocation.max.retry.count: 20
+ hazelcast.tcp.join.port.try.count: 30
+ hazelcast.logging.type: log4j2
+ hazelcast.operation.generic.thread.count: 50
+ member-attributes:
+ group:
+ type: string
+ value: platform
+ team:
+ type: string
+ value: team1
+```
+
+In this config, we specify the tag by `member-attributes`, the node has
`group=platform, team=team1` tags.
+
+2. add `tag_filter` to your job config
+
+```hacon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ tag_filter {
+ group = "platform"
+ team = "team1"
+ }
+}
+source {
+ FakeSource {
+ result_table_name = "fake"
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ }
+ }
+ }
+}
+transform {
+}
+sink {
+ console {
+ source_table_name="fake"
+ }
+}
+```
+
+**Notice:**
+- If not set `tag_filter` in job config, it will random choose the node in all
active nodes.
+- When you add multiple tag in `tag_filter`, it need all key exist and value
match. if all node not match, you will get `NoEnoughResourceException`
exception.
+
+
+
diff --git a/docs/en/images/architecture_diagram.png
b/docs/images/architecture_diagram.png
similarity index 100%
rename from docs/en/images/architecture_diagram.png
rename to docs/images/architecture_diagram.png
diff --git a/docs/en/images/azkaban.png b/docs/images/azkaban.png
similarity index 100%
rename from docs/en/images/azkaban.png
rename to docs/images/azkaban.png
diff --git a/docs/en/images/checkstyle.png b/docs/images/checkstyle.png
similarity index 100%
rename from docs/en/images/checkstyle.png
rename to docs/images/checkstyle.png
diff --git a/docs/en/images/kafka.png b/docs/images/kafka.png
similarity index 100%
rename from docs/en/images/kafka.png
rename to docs/images/kafka.png
diff --git a/docs/images/resource-isolation.png
b/docs/images/resource-isolation.png
new file mode 100644
index 0000000000..3986cbfb59
Binary files /dev/null and b/docs/images/resource-isolation.png differ
diff --git a/docs/en/images/seatunnel-workflow.svg
b/docs/images/seatunnel-workflow.svg
similarity index 100%
rename from docs/en/images/seatunnel-workflow.svg
rename to docs/images/seatunnel-workflow.svg
diff --git a/docs/en/images/seatunnel_architecture.png
b/docs/images/seatunnel_architecture.png
similarity index 100%
rename from docs/en/images/seatunnel_architecture.png
rename to docs/images/seatunnel_architecture.png
diff --git a/docs/en/images/seatunnel_starter.png
b/docs/images/seatunnel_starter.png
similarity index 100%
rename from docs/en/images/seatunnel_starter.png
rename to docs/images/seatunnel_starter.png
diff --git a/docs/en/images/workflow.png b/docs/images/workflow.png
similarity index 100%
rename from docs/en/images/workflow.png
rename to docs/images/workflow.png
diff --git a/docs/sidebars.js b/docs/sidebars.js
index ad1093689f..f07b198557 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -178,7 +178,8 @@ const sidebars = {
"seatunnel-engine/checkpoint-storage",
"seatunnel-engine/rest-api",
"seatunnel-engine/tcp",
- "seatunnel-engine/engine-jar-storage-mode"
+ "seatunnel-engine/engine-jar-storage-mode",
+ "seatunnel-engine/resource-isolation",
]
},
{
diff --git a/docs/zh/about.md b/docs/zh/about.md
index 024c520852..ae789d4d7f 100644
--- a/docs/zh/about.md
+++ b/docs/zh/about.md
@@ -32,7 +32,7 @@ SeaTunnel专注于数据集成和数据同步,主要旨在解决数据集成
## SeaTunnel work flowchart
-
+
SeaTunnel的运行流程如上图所示。
diff --git a/docs/zh/faq.md b/docs/zh/faq.md
index 8c836a3612..5fdb06c280 100644
--- a/docs/zh/faq.md
+++ b/docs/zh/faq.md
@@ -65,9 +65,9 @@ your string 1
当然! 请参阅下面的屏幕截图:
-
+
-
+
## SeaTunnel是否有配置多个源的情况,例如同时在源中配置elasticsearch和hdfs?
@@ -185,7 +185,7 @@ sink {
3、一般来说,M和N都确定了,从2可以得出结论:`spark.streaming.kafka.maxRatePerPartition`的大小与`spark.executor.cores`
* `spark的大小正相关 .executor.instances`,可以在增加资源`maxRatePerPartition`的同时增加,以加快消耗。
-
+
## 如何解决错误 `Exception in thread "main" java.lang.NoSuchFieldError: INSTANCE`?
diff --git a/docs/zh/images/architecture_diagram.png
b/docs/zh/images/architecture_diagram.png
deleted file mode 100644
index ce72254694..0000000000
Binary files a/docs/zh/images/architecture_diagram.png and /dev/null differ
diff --git a/docs/zh/images/azkaban.png b/docs/zh/images/azkaban.png
deleted file mode 100644
index 78780dce2d..0000000000
Binary files a/docs/zh/images/azkaban.png and /dev/null differ
diff --git a/docs/zh/images/checkstyle.png b/docs/zh/images/checkstyle.png
deleted file mode 100644
index 4cf8303e71..0000000000
Binary files a/docs/zh/images/checkstyle.png and /dev/null differ
diff --git a/docs/zh/images/kafka.png b/docs/zh/images/kafka.png
deleted file mode 100644
index 14b22ebcbe..0000000000
Binary files a/docs/zh/images/kafka.png and /dev/null differ
diff --git a/docs/zh/images/seatunnel-workflow.svg
b/docs/zh/images/seatunnel-workflow.svg
deleted file mode 100644
index 7280e4a4c4..0000000000
--- a/docs/zh/images/seatunnel-workflow.svg
+++ /dev/null
@@ -1,4 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Do not edit this file with editors other than diagrams.net -->
-<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
-<svg xmlns="http://www.w3.org/2000/svg"
xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="622px"
height="718px" viewBox="-0.5 -0.5 622 718" content="<mxfile
host="Electron" modified="2021-12-30T15:17:57.852Z"
agent="5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML,
like Gecko) draw.io/15.4.0 Chrome/91.0.4472.164 Electron/13.5.0
Safari/537.36" etag="y11mgoacIhryQ4lqCp5C"
version="15.4.0" type="device& [...]
\ No newline at end of file
diff --git a/docs/zh/images/seatunnel_architecture.png
b/docs/zh/images/seatunnel_architecture.png
deleted file mode 100644
index c96cb272e5..0000000000
Binary files a/docs/zh/images/seatunnel_architecture.png and /dev/null differ
diff --git a/docs/zh/images/seatunnel_starter.png
b/docs/zh/images/seatunnel_starter.png
deleted file mode 100644
index 4d9700899a..0000000000
Binary files a/docs/zh/images/seatunnel_starter.png and /dev/null differ
diff --git a/docs/zh/images/workflow.png b/docs/zh/images/workflow.png
deleted file mode 100644
index 9ce48b8bfc..0000000000
Binary files a/docs/zh/images/workflow.png and /dev/null differ
diff --git a/docs/zh/seatunnel-engine/resource-isolation.md
b/docs/zh/seatunnel-engine/resource-isolation.md
new file mode 100644
index 0000000000..ea09d6a892
--- /dev/null
+++ b/docs/zh/seatunnel-engine/resource-isolation.md
@@ -0,0 +1,83 @@
+---
+
+sidebar_position: 9
+-------------------
+
+在2.3.6版本之后, SeaTunnel支持对每个实例添加`tag`,
然后在提交任务时可以在配置文件中使用`tag_filter`来选择任务将要运行的节点.
+
+# 如何实现改功能
+
+1. 更新`hazelcast.yaml`文件
+
+```yaml
+hazelcast:
+ cluster-name: seatunnel
+ network:
+ rest-api:
+ enabled: true
+ endpoint-groups:
+ CLUSTER_WRITE:
+ enabled: true
+ DATA:
+ enabled: true
+ join:
+ tcp-ip:
+ enabled: true
+ member-list:
+ - localhost
+ port:
+ auto-increment: false
+ port: 5801
+ properties:
+ hazelcast.invocation.max.retry.count: 20
+ hazelcast.tcp.join.port.try.count: 30
+ hazelcast.logging.type: log4j2
+ hazelcast.operation.generic.thread.count: 50
+ member-attributes:
+ group:
+ type: string
+ value: platform
+ team:
+ type: string
+ value: team1
+```
+
+在这个配置中, 我们通过`member-attributes`设置了`group=platform, team=team1`这样两个`tag`
+
+2. 在任务的配置中添加`tag_filter`来选择你需要运行该任务的节点
+
+```hacon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ tag_filter {
+ group = "platform"
+ team = "team1"
+ }
+}
+source {
+ FakeSource {
+ result_table_name = "fake"
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ }
+ }
+ }
+}
+transform {
+}
+sink {
+ console {
+ source_table_name="fake"
+ }
+}
+```
+
+**注意:**
+- 当在任务的配置中, 没有添加`tag_filter`时, 会从所有节点中随机选择节点来运行任务.
+- 当`tag_filter`中存在多个过滤条件时, 会根据key存在以及value相等的全部匹配的节点, 当没有找到匹配的节点时, 会抛出
`NoEnoughResourceException`异常.
+
+
+
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
index 81d64500dc..0ac179ae76 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -69,8 +69,8 @@ public class ReadonlyConfig implements Serializable {
/**
* Transform to Config todo: This method should be removed after we remove
Config
*
- * @deprecated Please use ReadonlyConfig directly
* @return Config
+ * @deprecated Please use ReadonlyConfig directly
*/
@Deprecated
public Config toConfig() {
@@ -96,6 +96,10 @@ public class ReadonlyConfig implements Serializable {
}
}
+ public Map<String, Object> getSourceMap() {
+ return confData;
+ }
+
public <T> Optional<T> getOptional(Option<T> option) {
if (option == null) {
throw new NullPointerException("Option not be null.");
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index cabf0856dc..1054b91e97 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -94,4 +94,10 @@ public interface EnvCommonOptions {
.mapType()
.noDefaultValue()
.withDescription("custom parameters for run engine");
+
+ Option<Map<String, String>> NODE_TAG_FILTER =
+ Options.key("tag_filter")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Define the worker where the job runs by
tag");
}
diff --git a/seatunnel-core/README.md b/seatunnel-core/README.md
index 5f666dcebc..ec9d027a65 100644
--- a/seatunnel-core/README.md
+++ b/seatunnel-core/README.md
@@ -1,7 +1,7 @@
# Introduction
This module is the seatunnel job entrypoint. SeaTunnel jobs are started by the
below process.
-
+
- seatunnel-core-flink: The flink job starter.
- seatunnel-core-flink-sql: The flink sql job starter.
diff --git a/seatunnel-core/seatunnel-core-starter/README.md
b/seatunnel-core/seatunnel-core-starter/README.md
index 835a5b1992..7420c941b9 100644
--- a/seatunnel-core/seatunnel-core-starter/README.md
+++ b/seatunnel-core/seatunnel-core-starter/README.md
@@ -2,7 +2,7 @@
This module is the base start module for SeaTunnel new connector API.
-
+
# SeaTunnel Job Execute Process
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/ResourceIsolationIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/ResourceIsolationIT.java
new file mode 100644
index 0000000000..266d4ff018
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/ResourceIsolationIT.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e.resourceIsolation;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class ResourceIsolationIT extends TestSuiteBase {
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "only work on Zeta")
+ public void testTagMatch(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/resource-isolation/fakesource_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "only work on Zeta")
+ public void testTagNotMatch(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob(
+
"/resource-isolation/fakesource_to_console_tag_not_match.conf");
+ Assertions.assertNotEquals(0, execResult.getExitCode());
+ Assertions.assertTrue(
+ StringUtils.isNotBlank(execResult.getStderr())
+ && execResult
+ .getStderr()
+ .contains(
+
"org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException"));
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
index c28e9c94d5..7bd3fdf8f2 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/hazelcast.yaml
@@ -41,3 +41,10 @@ hazelcast:
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true
hazelcast.logging.type: log4j2
hazelcast.operation.generic.thread.count: 200
+ member-attributes:
+ group:
+ type: string
+ value: platform
+ team:
+ type: string
+ value: team1
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console.conf
similarity index 59%
copy from
seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console.conf
index 64376c9a44..c960427899 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console.conf
@@ -14,28 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
-hazelcast:
- cluster-name: seatunnel
- network:
- rest-api:
- enabled: true
- endpoint-groups:
- CLUSTER_WRITE:
- enabled: true
- DATA:
- enabled: true
- join:
- tcp-ip:
- enabled: true
- member-list:
- - localhost
- port:
- auto-increment: true
- port-count: 100
- port: 5801
- properties:
- hazelcast.invocation.max.retry.count: 20
- hazelcast.tcp.join.port.try.count: 30
- hazelcast.logging.type: log4j2
- hazelcast.operation.generic.thread.count: 50
+env {
+ job.mode = "BATCH"
+ tag_filter {
+ group = "platform"
+ team = "team1"
+ }
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ schema {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+sink {
+ console {
+ }
+
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console_tag_not_match.conf
similarity index 59%
copy from
seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console_tag_not_match.conf
index 64376c9a44..9952fde5d4 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/resource-isolation/fakesource_to_console_tag_not_match.conf
@@ -14,28 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
-hazelcast:
- cluster-name: seatunnel
- network:
- rest-api:
- enabled: true
- endpoint-groups:
- CLUSTER_WRITE:
- enabled: true
- DATA:
- enabled: true
- join:
- tcp-ip:
- enabled: true
- member-list:
- - localhost
- port:
- auto-increment: true
- port-count: 100
- port: 5801
- properties:
- hazelcast.invocation.max.retry.count: 20
- hazelcast.tcp.join.port.try.count: 30
- hazelcast.logging.type: log4j2
- hazelcast.operation.generic.thread.count: 50
+env {
+ job.mode = "BATCH"
+ tag_filter {
+ group = "error_tag"
+ team = "error_tag"
+ }
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ schema {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+sink {
+ console {
+ }
+
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
index 64376c9a44..0b48069c3e 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
@@ -38,4 +38,4 @@ hazelcast:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
- hazelcast.operation.generic.thread.count: 50
+ hazelcast.operation.generic.thread.count: 50
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index c1ff66c0d3..172bbbff5f 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -276,12 +276,7 @@ public class MultipleTableJobConfigParser {
||
jobConfig.getName().equals(EnvCommonOptions.JOB_NAME.defaultValue())) {
jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME));
}
- envOptions
- .toMap()
- .forEach(
- (k, v) -> {
- jobConfig.getEnvOptions().put(k, v);
- });
+ jobConfig.getEnvOptions().putAll(envOptions.getSourceMap());
}
private static <T extends Factory> boolean isFallback(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 62f83b53b9..2a18984a95 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -150,7 +151,12 @@ public class PhysicalPlanGenerator {
}
public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
-
+ Map<String, String> tagFilter =
+ (Map<String, String>)
+ jobImmutableInformation
+ .getJobConfig()
+ .getEnvOptions()
+ .get(EnvCommonOptions.NODE_TAG_FILTER.key());
// TODO Determine which tasks do not need to be restored according to
state
CopyOnWriteArrayList<PassiveCompletableFuture<PipelineStatus>>
waitForCompleteBySubPlanList = new CopyOnWriteArrayList<>();
@@ -205,7 +211,8 @@ public class PhysicalPlanGenerator {
jobImmutableInformation,
executorService,
runningJobStateIMap,
- runningJobStateTimestampsIMap);
+ runningJobStateTimestampsIMap,
+ tagFilter);
});
PhysicalPlan physicalPlan =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
index c6bbc48b89..cab3e8fa99 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
@@ -42,14 +42,16 @@ public class ResourceUtils {
coordinator ->
futures.put(
coordinator.getTaskGroupLocation(),
- applyResourceForTask(resourceManager,
coordinator)));
+ applyResourceForTask(
+ resourceManager, coordinator,
subPlan.getTags())));
subPlan.getPhysicalVertexList()
.forEach(
task ->
futures.put(
task.getTaskGroupLocation(),
- applyResourceForTask(resourceManager,
task)));
+ applyResourceForTask(
+ resourceManager, task,
subPlan.getTags())));
futures.forEach(
(key, value) -> {
@@ -68,9 +70,9 @@ public class ResourceUtils {
}
public static CompletableFuture<SlotProfile> applyResourceForTask(
- ResourceManager resourceManager, PhysicalVertex task) {
+ ResourceManager resourceManager, PhysicalVertex task, Map<String,
String> tags) {
// TODO custom resource size
return resourceManager.applyResource(
- task.getTaskGroupLocation().getJobId(), new ResourceProfile());
+ task.getTaskGroupLocation().getJobId(), new ResourceProfile(),
tags);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index f845294bbb..54f795db92 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -69,6 +69,7 @@ public class SubPlan {
private final String pipelineFullName;
private final IMap<Object, Object> runningJobStateIMap;
+ private final Map<String, String> tags;
/**
* Timestamps (in milliseconds) as returned by {@code
System.currentTimeMillis()} when the
@@ -114,7 +115,8 @@ public class SubPlan {
@NonNull JobImmutableInformation jobImmutableInformation,
@NonNull ExecutorService executorService,
@NonNull IMap runningJobStateIMap,
- @NonNull IMap runningJobStateTimestampsIMap) {
+ @NonNull IMap runningJobStateTimestampsIMap,
+ Map<String, String> tags) {
this.pipelineId = pipelineId;
this.pipelineLocation =
new PipelineLocation(jobImmutableInformation.getJobId(),
pipelineId);
@@ -158,6 +160,7 @@ public class SubPlan {
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
this.executorService = executorService;
+ this.tags = tags;
}
public synchronized PassiveCompletableFuture<PipelineExecutionState>
initStateFuture() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 9d9b9d9a76..e7d709750e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -89,10 +90,11 @@ public abstract class AbstractResourceManager implements
ResourceManager {
}
@Override
- public CompletableFuture<SlotProfile> applyResource(long jobId,
ResourceProfile resourceProfile)
+ public CompletableFuture<SlotProfile> applyResource(
+ long jobId, ResourceProfile resourceProfile, Map<String, String>
tagFilter)
throws NoEnoughResourceException {
CompletableFuture<SlotProfile> completableFuture = new
CompletableFuture<>();
- applyResources(jobId, Collections.singletonList(resourceProfile))
+ applyResources(jobId, Collections.singletonList(resourceProfile),
tagFilter)
.whenComplete(
(profile, error) -> {
if (error != null) {
@@ -129,9 +131,44 @@ public abstract class AbstractResourceManager implements
ResourceManager {
@Override
public CompletableFuture<List<SlotProfile>> applyResources(
- long jobId, List<ResourceProfile> resourceProfile) throws
NoEnoughResourceException {
+ long jobId, List<ResourceProfile> resourceProfile, Map<String,
String> tagFilter)
+ throws NoEnoughResourceException {
waitingWorkerRegister();
- return new ResourceRequestHandler(jobId, resourceProfile,
registerWorker, this).request();
+ ConcurrentMap<Address, WorkerProfile> matchedWorker;
+ if (tagFilter == null || tagFilter.isEmpty()) {
+ matchedWorker = registerWorker;
+ } else {
+ matchedWorker =
+ registerWorker.entrySet().stream()
+ .filter(
+ e -> {
+ Map<String, String> workerAttr =
+ e.getValue().getAttributes();
+ if (workerAttr == null ||
workerAttr.isEmpty()) {
+ return false;
+ }
+ boolean match = true;
+ for (Map.Entry<String, String> entry :
+ tagFilter.entrySet()) {
+ if
(!workerAttr.containsKey(entry.getKey())
+ || !workerAttr
+
.get(entry.getKey())
+
.equals(entry.getValue())) {
+ return false;
+ }
+ }
+ return match;
+ })
+ .collect(
+ Collectors.toConcurrentMap(
+ Map.Entry::getKey,
Map.Entry::getValue));
+ }
+ if (matchedWorker.isEmpty()) {
+ log.error("No matched worker with tag filter {}.", tagFilter);
+ throw new NoEnoughResourceException();
+ }
+ return new ResourceRequestHandler(jobId, resourceProfile,
matchedWorker, this)
+ .request(tagFilter);
}
protected boolean supportDynamicWorker() {
@@ -143,7 +180,8 @@ public abstract class AbstractResourceManager implements
ResourceManager {
*
* @param resourceProfiles the worker should have resource profile list
*/
- protected void findNewWorker(List<ResourceProfile> resourceProfiles) {
+ protected void findNewWorker(
+ List<ResourceProfile> resourceProfiles, Map<String, String>
tagFilter) {
throw new UnsupportedOperationException(
"Unsupported operation to find new worker in " +
this.getClass().getName());
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index 8a04b21e4b..8e222b0682 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -24,16 +24,19 @@ import
org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import com.hazelcast.internal.services.MembershipServiceEvent;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
public interface ResourceManager {
void init();
- CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile
resourceProfile)
+ CompletableFuture<SlotProfile> applyResource(
+ long jobId, ResourceProfile resourceProfile, Map<String, String>
tagFilter)
throws NoEnoughResourceException;
CompletableFuture<List<SlotProfile>> applyResources(
- long jobId, List<ResourceProfile> resourceProfile) throws
NoEnoughResourceException;
+ long jobId, List<ResourceProfile> resourceProfile, Map<String,
String> tagFilter)
+ throws NoEnoughResourceException;
CompletableFuture<Void> releaseResources(long jobId, List<SlotProfile>
profiles);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index 680aa1c07c..0af3738a4a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -72,7 +73,7 @@ public class ResourceRequestHandler {
this.resourceManager = resourceManager;
}
- public CompletableFuture<List<SlotProfile>> request() {
+ public CompletableFuture<List<SlotProfile>> request(Map<String, String>
tags) {
List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new
ArrayList<>();
for (int i = 0; i < resourceProfile.size(); i++) {
ResourceProfile r = resourceProfile.get(i);
@@ -96,7 +97,7 @@ public class ResourceRequestHandler {
if (resultSlotProfiles.size() <
resourceProfile.size()) {
// meaning have some slot not request
success
if
(resourceManager.supportDynamicWorker()) {
- applyByDynamicWorker();
+ applyByDynamicWorker(tags);
} else {
completeRequestWithException(
new
NoEnoughResourceException(
@@ -188,7 +189,7 @@ public class ResourceRequestHandler {
* third-party resource management to create a new worker, and then
complete the resource
* application
*/
- private void applyByDynamicWorker() {
+ private void applyByDynamicWorker(Map<String, String> tags) {
List<ResourceProfile> needApplyResource = new ArrayList<>();
List<Integer> needApplyIndex = new ArrayList<>();
for (int i = 0; i < resultSlotProfiles.size(); i++) {
@@ -197,9 +198,9 @@ public class ResourceRequestHandler {
needApplyIndex.add(i);
}
}
- resourceManager.findNewWorker(needApplyResource);
+ resourceManager.findNewWorker(needApplyResource, tags);
resourceManager
- .applyResources(jobId, needApplyResource)
+ .applyResources(jobId, needApplyResource, tags)
.whenComplete(
withTryCatch(
LOGGER,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index 836b25201e..291df1f1f8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -29,6 +29,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.IOException;
+import java.util.Map;
/**
* Used to describe the status of the current Worker, including address and
resource assign status
@@ -47,6 +48,8 @@ public class WorkerProfile implements
IdentifiedDataSerializable {
private SlotProfile[] unassignedSlots;
+ private Map<String, String> attributes;
+
public WorkerProfile(Address address) {
this.address = address;
this.unassignedResource = new ResourceProfile();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index a01ddfd79b..250a6f2eb4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -258,6 +258,7 @@ public class DefaultSlotService implements SlotService {
workerProfile.setAssignedSlots(assignedSlots.values().toArray(new
SlotProfile[0]));
workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new
SlotProfile[0]));
workerProfile.setUnassignedResource(unassignedResource.get());
+
workerProfile.setAttributes(nodeEngine.getLocalMember().getAttributes());
return workerProfile;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 234fd20c8b..434e76132d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -52,33 +52,7 @@ public abstract class AbstractSeaTunnelServerTest<T extends
AbstractSeaTunnelSer
@BeforeAll
public void before() {
String name = ((T) this).getClass().getName();
- String yaml =
- "hazelcast:\n"
- + " cluster-name: seatunnel\n"
- + " network:\n"
- + " rest-api:\n"
- + " enabled: true\n"
- + " endpoint-groups:\n"
- + " CLUSTER_WRITE:\n"
- + " enabled: true\n"
- + " join:\n"
- + " tcp-ip:\n"
- + " enabled: true\n"
- + " member-list:\n"
- + " - localhost\n"
- + " port:\n"
- + " auto-increment: true\n"
- + " port-count: 100\n"
- + " port: 5801\n"
- + "\n"
- + " properties:\n"
- + " hazelcast.invocation.max.retry.count: 200\n"
- + " hazelcast.tcp.join.port.try.count: 30\n"
- + " hazelcast.invocation.retry.pause.millis: 2000\n"
- + "
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
- + " hazelcast.logging.type: log4j2\n"
- + " hazelcast.operation.generic.thread.count:
200\n";
- Config hazelcastConfig = Config.loadFromString(yaml);
+ Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
hazelcastConfig.setClusterName(
TestUtils.getClusterName("AbstractSeaTunnelServerTest_" +
name));
SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
@@ -89,6 +63,34 @@ public abstract class AbstractSeaTunnelServerTest<T extends
AbstractSeaTunnelSer
LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
}
+ protected String getHazelcastConfig() {
+ return "hazelcast:\n"
+ + " cluster-name: seatunnel\n"
+ + " network:\n"
+ + " rest-api:\n"
+ + " enabled: true\n"
+ + " endpoint-groups:\n"
+ + " CLUSTER_WRITE:\n"
+ + " enabled: true\n"
+ + " join:\n"
+ + " tcp-ip:\n"
+ + " enabled: true\n"
+ + " member-list:\n"
+ + " - localhost\n"
+ + " port:\n"
+ + " auto-increment: true\n"
+ + " port-count: 100\n"
+ + " port: 5801\n"
+ + "\n"
+ + " properties:\n"
+ + " hazelcast.invocation.max.retry.count: 200\n"
+ + " hazelcast.tcp.join.port.try.count: 30\n"
+ + " hazelcast.invocation.retry.pause.millis: 2000\n"
+ + "
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ + " hazelcast.logging.type: log4j2\n"
+ + " hazelcast.operation.generic.thread.count: 200\n";
+ }
+
public SeaTunnelConfig loadSeaTunnelConfig() {
return ConfigProvider.locateAndGetSeaTunnelConfig();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
index 9c8595e167..e8ba7ed94a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
@@ -28,6 +28,7 @@ import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.net.UnknownHostException;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
/** Used to test ResourceManager, override init method to register more
workers. */
@@ -47,7 +48,8 @@ public class FakeResourceManager extends
AbstractResourceManager {
new ResourceProfile(),
new ResourceProfile(),
new SlotProfile[] {},
- new SlotProfile[] {});
+ new SlotProfile[] {},
+ Collections.emptyMap());
this.registerWorker.put(address1, workerProfile1);
Address address2 = new Address("localhost", 5802);
@@ -57,7 +59,8 @@ public class FakeResourceManager extends
AbstractResourceManager {
new ResourceProfile(),
new ResourceProfile(),
new SlotProfile[] {},
- new SlotProfile[] {});
+ new SlotProfile[] {},
+ Collections.emptyMap());
this.registerWorker.put(address2, workerProfile2);
Address address3 = new Address("localhost", 5803);
WorkerProfile workerProfile3 =
@@ -66,7 +69,8 @@ public class FakeResourceManager extends
AbstractResourceManager {
new ResourceProfile(),
new ResourceProfile(),
new SlotProfile[] {},
- new SlotProfile[] {});
+ new SlotProfile[] {},
+ Collections.emptyMap());
this.registerWorker.put(address3, workerProfile3);
} catch (UnknownHostException e) {
throw new RuntimeException(e);
@@ -84,7 +88,8 @@ public class FakeResourceManager extends
AbstractResourceManager {
new ResourceProfile(),
new ResourceProfile(),
new SlotProfile[] {},
- new SlotProfile[] {}),
+ new SlotProfile[] {},
+ Collections.emptyMap()),
new SlotProfile(address, 1, new
ResourceProfile(), "")));
} else {
return super.sendToMember(operation, address);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
index cf67ec8de0..cbba82dda8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
@@ -56,7 +56,7 @@ public class FixSlotResourceTest extends
AbstractSeaTunnelServerTest<FixSlotReso
List<SlotProfile> slotProfiles =
server.getCoordinatorService()
.getResourceManager()
- .applyResources(jobId, resourceProfiles)
+ .applyResources(jobId, resourceProfiles, null)
.get();
Assertions.assertEquals(slotProfiles.size(), 3);
server.getCoordinatorService().getResourceManager().releaseResources(jobId,
slotProfiles);
@@ -73,7 +73,7 @@ public class FixSlotResourceTest extends
AbstractSeaTunnelServerTest<FixSlotReso
try {
server.getCoordinatorService()
.getResourceManager()
- .applyResources(jobId, resourceProfiles)
+ .applyResources(jobId, resourceProfiles, null)
.get();
} catch (ExecutionException e) {
Assertions.assertTrue(e.getMessage().contains("NoEnoughResourceException"));
@@ -93,7 +93,7 @@ public class FixSlotResourceTest extends
AbstractSeaTunnelServerTest<FixSlotReso
List<SlotProfile> slotProfiles =
server.getCoordinatorService()
.getResourceManager()
- .applyResources(jobId, resourceProfiles)
+ .applyResources(jobId, resourceProfiles, null)
.get();
Assertions.assertEquals(slotProfiles.size(), 3);
server.getCoordinatorService().getResourceManager().releaseResources(jobId,
slotProfiles);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
index 9244283889..2589e6530c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
@@ -55,7 +55,7 @@ public class ResourceManagerTest extends
AbstractSeaTunnelServerTest<ResourceMan
resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(200)));
resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(300)));
List<SlotProfile> slotProfiles =
- resourceManager.applyResources(jobId, resourceProfiles).get();
+ resourceManager.applyResources(jobId, resourceProfiles,
null).get();
Assertions.assertEquals(
resourceProfiles.get(0).getHeapMemory().getBytes(),
@@ -78,7 +78,8 @@ public class ResourceManagerTest extends
AbstractSeaTunnelServerTest<ResourceMan
resourceManager
.applyResource(
jobId,
- new ResourceProfile(CPU.of(0),
Memory.of(Long.MAX_VALUE)))
+ new ResourceProfile(CPU.of(0),
Memory.of(Long.MAX_VALUE)),
+ null)
.get());
}
@@ -93,7 +94,8 @@ public class ResourceManagerTest extends
AbstractSeaTunnelServerTest<ResourceMan
resourceProfiles.add(new ResourceProfile());
resourceProfiles.add(new ResourceProfile());
resourceProfiles.add(new ResourceProfile());
- List<SlotProfile> slotProfiles = resourceManager.applyResources(1L,
resourceProfiles).get();
+ List<SlotProfile> slotProfiles =
+ resourceManager.applyResources(1L, resourceProfiles,
null).get();
Assertions.assertEquals(slotProfiles.size(), 5);
boolean hasDifferentWorker = false;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/WorkerTagTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/WorkerTagTest.java
new file mode 100644
index 0000000000..3eb27f2451
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/WorkerTagTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
+import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class WorkerTagTest extends AbstractSeaTunnelServerTest<WorkerTagTest> {
+
+ private ResourceManager resourceManager;
+
+ private final long jobId = 5;
+
+ @BeforeAll
+ public void before() {
+ super.before();
+ resourceManager = server.getCoordinatorService().getResourceManager();
+ server.getSlotService();
+ }
+
+ @Override
+ protected String getHazelcastConfig() {
+ // for the use case not set node attribute, it tested in
ResourceManagerTest and
+ // FixSlotResourceTest
+ return "hazelcast:\n"
+ + " cluster-name: seatunnel\n"
+ + " network:\n"
+ + " rest-api:\n"
+ + " enabled: true\n"
+ + " endpoint-groups:\n"
+ + " CLUSTER_WRITE:\n"
+ + " enabled: true\n"
+ + " join:\n"
+ + " tcp-ip:\n"
+ + " enabled: true\n"
+ + " member-list:\n"
+ + " - localhost\n"
+ + " port:\n"
+ + " auto-increment: true\n"
+ + " port-count: 100\n"
+ + " port: 5801\n"
+ + "\n"
+ + " properties:\n"
+ + " hazelcast.invocation.max.retry.count: 200\n"
+ + " hazelcast.tcp.join.port.try.count: 30\n"
+ + " hazelcast.invocation.retry.pause.millis: 2000\n"
+ + "
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ + " hazelcast.logging.type: log4j2\n"
+ + " hazelcast.operation.generic.thread.count: 200\n"
+ + " member-attributes:\n"
+ + " group:\n"
+ + " type: string\n"
+ + " value: platform\n"
+ + " team:\n"
+ + " type: string\n"
+ + " value: team1";
+ }
+
+ @Test
+ public void testTagMatch() {
+ Map<String, String> tag = new HashMap<>();
+ tag.put("group", "platform");
+ tag.put("team", "team1");
+ Assertions.assertDoesNotThrow(() -> testApplyResourceByTag(tag));
+ }
+
+ @Test
+ public void testNullTag() {
+ Assertions.assertDoesNotThrow(() -> testApplyResourceByTag(null));
+ }
+
+ @Test
+ public void testTagNotMatch() {
+ Map<String, String> tag = new HashMap<>();
+ tag.put("group", "platform");
+ tag.put("team", "team2");
+ Assertions.assertThrows(NoEnoughResourceException.class, () ->
testApplyResourceByTag(tag));
+ }
+
+ private void testApplyResourceByTag(Map<String, String> tag)
+ throws ExecutionException, InterruptedException {
+ List<ResourceProfile> resourceProfiles = new ArrayList<>();
+ resourceProfiles.add(new ResourceProfile(CPU.of(0), Memory.of(100)));
+ List<SlotProfile> slotProfiles =
+ resourceManager.applyResources(jobId, resourceProfiles,
tag).get();
+
+ Assertions.assertEquals(
+ resourceProfiles.get(0).getHeapMemory().getBytes(),
+
slotProfiles.get(0).getResourceProfile().getHeapMemory().getBytes());
+
+ resourceManager.releaseResources(jobId, slotProfiles).get();
+ }
+}
diff --git a/tools/documents/sync.sh b/tools/documents/sync.sh
index f8549e6023..cd67231a81 100644
--- a/tools/documents/sync.sh
+++ b/tools/documents/sync.sh
@@ -20,7 +20,7 @@
set -euo pipefail
PR_DIR=$1
-PR_IMG_DIR="${PR_DIR}/docs/en/images"
+PR_IMG_DIR="${PR_DIR}/docs/images"
PR_DOC_DIR="${PR_DIR}/docs/en"
PR_SIDEBAR_PATH="${PR_DIR}/docs/sidebars.js"