This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b64a6903df [Fix][Zeta] Fix resource isolation not working on multi
node (#7471)
b64a6903df is described below
commit b64a6903dff5c910493e03f32bc818e7a1b694d3
Author: chenqianwen <[email protected]>
AuthorDate: Wed Aug 28 09:34:52 2024 +0800
[Fix][Zeta] Fix resource isolation not working on multi node (#7471)
* [Bug] [resource-isolation tag_filter] separated-cluster-deployment, When
starting the worker, specify group=platform, and when submitting the job,
specify tag_filter, which is also group=platform, but cannot be matched.
* [Bug] [resource-isolation tag_filter] separated-cluster-deployment, When
starting the worker, specify group=platform, and when submitting the job,
specify tag_filter, which is also group=platform, but cannot be matched.
* [Bug] [resource-isolation tag_filter] separated-cluster-deployment, When
starting the worker, specify group=platform, and when submitting the job,
specify tag_filter, which is also group=platform, but cannot be matched.
* [Fix][Zeta] Fix resource isolation not working on multi node test case
* [Fix][Zeta] Fix resource isolation not working on multi node test case
* [Fix][Zeta] Fix resource isolation not working on multi node test case
* [Fix][Zeta] Fix resource isolation not working on multi node test case
* Update
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/WorkerTagClusterTest.java
---------
Co-authored-by: yanggld <[email protected]>
Co-authored-by: Jia Fan <[email protected]>
---
.../resourceIsolation/WorkerTagClusterTest.java | 161 +++++++++++++++++++++
.../resourcemanager/worker/WorkerProfile.java | 2 +
2 files changed, 163 insertions(+)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/WorkerTagClusterTest.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/WorkerTagClusterTest.java
new file mode 100644
index 0000000000..63736a90ae
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/WorkerTagClusterTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.engine.e2e.resourceIsolation;
+
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.e2e.TestUtils;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+
+import org.awaitility.Awaitility;
+import org.awaitility.core.ThrowingRunnable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class WorkerTagClusterTest {
+
+ HazelcastInstanceImpl masterNode1 = null;
+ HazelcastInstanceImpl workerNode1 = null;
+ String testClusterName = "WorkerTagClusterTest";
+
+ @BeforeEach
+ public void before() {
+ SeaTunnelConfig masterNode1Config =
getSeaTunnelConfig(testClusterName);
+ SeaTunnelConfig workerNode1Config =
getSeaTunnelConfig(testClusterName);
+ masterNode1 =
SeaTunnelServerStarter.createMasterHazelcastInstance(masterNode1Config);
+ workerNode1 =
SeaTunnelServerStarter.createWorkerHazelcastInstance(workerNode1Config);
+ }
+
+ @AfterEach
+ void afterClass() {
+ if (masterNode1 != null) {
+ masterNode1.shutdown();
+ }
+ if (workerNode1 != null) {
+ workerNode1.shutdown();
+ }
+ }
+
+ @Test
+ public void testTagMatch() throws Exception {
+ Map<String, String> tag = new HashMap<>();
+ tag.put("group", "platform");
+ tag.put("team", "team1");
+ testTagFilter(tag, 1);
+ }
+
+ @Test
+ public void testTagMatch2() throws Exception {
+ testTagFilter(null, 1);
+ }
+
+ @Test
+ public void testTagNotMatch() throws Exception {
+ Map<String, String> tag = new HashMap<>();
+ tag.put("group", "platform");
+ tag.put("team", "team1111111");
+ testTagFilter(tag, 0);
+ }
+
+ @Test
+ public void testTagNotMatch2() throws Exception {
+ testTagFilter(new HashMap<>(), 1);
+ }
+
+ public void testTagFilter(Map<String, String> tagFilter, int
expectedWorkerCount)
+ throws Exception {
+ // waiting all node added to cluster
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ new ThrowingRunnable() {
+ @Override
+ public void run() throws Throwable {
+ Thread.sleep(2000);
+ // check master and worker node
+ Assertions.assertEquals(
+ 2,
masterNode1.getCluster().getMembers().size());
+ NodeEngineImpl nodeEngine =
masterNode1.node.nodeEngine;
+ SeaTunnelServer server =
+
nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+ ResourceManager resourceManager =
+
server.getCoordinatorService().getResourceManager();
+ // if tag matched, then worker count is 1
else 0
+ int workerCount =
resourceManager.workerCount(tagFilter);
+ Assertions.assertEquals(expectedWorkerCount,
workerCount);
+ }
+ });
+ }
+
+ private static SeaTunnelConfig getSeaTunnelConfig(String testClusterName) {
+ Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
+
hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName));
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
+ return seaTunnelConfig;
+ }
+
+ protected static String getHazelcastConfig() {
+ return "hazelcast:\n"
+ + " cluster-name: seatunnel\n"
+ + " network:\n"
+ + " rest-api:\n"
+ + " enabled: true\n"
+ + " endpoint-groups:\n"
+ + " CLUSTER_WRITE:\n"
+ + " enabled: true\n"
+ + " join:\n"
+ + " tcp-ip:\n"
+ + " enabled: true\n"
+ + " member-list:\n"
+ + " - localhost\n"
+ + " port:\n"
+ + " auto-increment: true\n"
+ + " port-count: 100\n"
+ + " port: 5801\n"
+ + "\n"
+ + " properties:\n"
+ + " hazelcast.invocation.max.retry.count: 200\n"
+ + " hazelcast.tcp.join.port.try.count: 30\n"
+ + " hazelcast.invocation.retry.pause.millis: 2000\n"
+ + "
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ + " hazelcast.logging.type: log4j2\n"
+ + " hazelcast.operation.generic.thread.count: 200\n"
+ + " member-attributes:\n"
+ + " group:\n"
+ + " type: string\n"
+ + " value: platform\n"
+ + " team:\n"
+ + " type: string\n"
+ + " value: team1";
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index 0d0f8c8054..f357a690da 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -85,6 +85,7 @@ public class WorkerProfile implements
IdentifiedDataSerializable {
out.writeObject(unassignedSlot);
}
out.writeBoolean(dynamicSlot);
+ out.writeObject(attributes);
}
@Override
@@ -103,5 +104,6 @@ public class WorkerProfile implements
IdentifiedDataSerializable {
unassignedSlots[i] = in.readObject();
}
dynamicSlot = in.readBoolean();
+ attributes = in.readObject();
}
}