This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b64a6903df [Fix][Zeta] Fix resource isolation not working on multi 
node (#7471)
b64a6903df is described below

commit b64a6903dff5c910493e03f32bc818e7a1b694d3
Author: chenqianwen <[email protected]>
AuthorDate: Wed Aug 28 09:34:52 2024 +0800

    [Fix][Zeta] Fix resource isolation not working on multi node (#7471)
    
    * [Bug] [resource-isolation tag_filter] separated-cluster-deployment, When 
starting the worker, specify group=platform, and when submitting the job, 
specify tag_filter, which is also group=platform, but cannot be matched.
    
    * [Bug] [resource-isolation tag_filter] separated-cluster-deployment, When 
starting the worker, specify group=platform, and when submitting the job, 
specify tag_filter, which is also group=platform, but cannot be matched.
    
    * [Bug] [resource-isolation tag_filter] separated-cluster-deployment, When 
starting the worker, specify group=platform, and when submitting the job, 
specify tag_filter, which is also group=platform, but cannot be matched.
    
    * [Fix][Zeta] Fix resource isolation not working on multi node test case
    
    * [Fix][Zeta] Fix resource isolation not working on multi node test case
    
    * [Fix][Zeta] Fix resource isolation not working on multi node test case
    
    * [Fix][Zeta] Fix resource isolation not working on multi node test case
    
    * Update 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/WorkerTagClusterTest.java
    
    ---------
    
    Co-authored-by: yanggld <[email protected]>
    Co-authored-by: Jia Fan <[email protected]>
---
 .../resourceIsolation/WorkerTagClusterTest.java    | 161 +++++++++++++++++++++
 .../resourcemanager/worker/WorkerProfile.java      |   2 +
 2 files changed, 163 insertions(+)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/WorkerTagClusterTest.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/WorkerTagClusterTest.java
new file mode 100644
index 0000000000..63736a90ae
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/resourceIsolation/WorkerTagClusterTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.engine.e2e.resourceIsolation;
+
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.e2e.TestUtils;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+
+import org.awaitility.Awaitility;
+import org.awaitility.core.ThrowingRunnable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class WorkerTagClusterTest {
+
+    HazelcastInstanceImpl masterNode1 = null;
+    HazelcastInstanceImpl workerNode1 = null;
+    String testClusterName = "WorkerTagClusterTest";
+
+    @BeforeEach
+    public void before() {
+        SeaTunnelConfig masterNode1Config = 
getSeaTunnelConfig(testClusterName);
+        SeaTunnelConfig workerNode1Config = 
getSeaTunnelConfig(testClusterName);
+        masterNode1 = 
SeaTunnelServerStarter.createMasterHazelcastInstance(masterNode1Config);
+        workerNode1 = 
SeaTunnelServerStarter.createWorkerHazelcastInstance(workerNode1Config);
+    }
+
+    @AfterEach
+    void afterClass() {
+        if (masterNode1 != null) {
+            masterNode1.shutdown();
+        }
+        if (workerNode1 != null) {
+            workerNode1.shutdown();
+        }
+    }
+
+    @Test
+    public void testTagMatch() throws Exception {
+        Map<String, String> tag = new HashMap<>();
+        tag.put("group", "platform");
+        tag.put("team", "team1");
+        testTagFilter(tag, 1);
+    }
+
+    @Test
+    public void testTagMatch2() throws Exception {
+        testTagFilter(null, 1);
+    }
+
+    @Test
+    public void testTagNotMatch() throws Exception {
+        Map<String, String> tag = new HashMap<>();
+        tag.put("group", "platform");
+        tag.put("team", "team1111111");
+        testTagFilter(tag, 0);
+    }
+
+    @Test
+    public void testTagNotMatch2() throws Exception {
+        testTagFilter(new HashMap<>(), 1);
+    }
+
+    public void testTagFilter(Map<String, String> tagFilter, int 
expectedWorkerCount)
+            throws Exception {
+        // waiting all node added to cluster
+        Awaitility.await()
+                .atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        new ThrowingRunnable() {
+                            @Override
+                            public void run() throws Throwable {
+                                Thread.sleep(2000);
+                                // check master and worker node
+                                Assertions.assertEquals(
+                                        2, 
masterNode1.getCluster().getMembers().size());
+                                NodeEngineImpl nodeEngine = 
masterNode1.node.nodeEngine;
+                                SeaTunnelServer server =
+                                        
nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+                                ResourceManager resourceManager =
+                                        
server.getCoordinatorService().getResourceManager();
+                                // if tag matched, then worker count is 1  
else 0
+                                int workerCount = 
resourceManager.workerCount(tagFilter);
+                                Assertions.assertEquals(expectedWorkerCount, 
workerCount);
+                            }
+                        });
+    }
+
+    private static SeaTunnelConfig getSeaTunnelConfig(String testClusterName) {
+        Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
+        
hazelcastConfig.setClusterName(TestUtils.getClusterName(testClusterName));
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
+        return seaTunnelConfig;
+    }
+
+    protected static String getHazelcastConfig() {
+        return "hazelcast:\n"
+                + "  cluster-name: seatunnel\n"
+                + "  network:\n"
+                + "    rest-api:\n"
+                + "      enabled: true\n"
+                + "      endpoint-groups:\n"
+                + "        CLUSTER_WRITE:\n"
+                + "          enabled: true\n"
+                + "    join:\n"
+                + "      tcp-ip:\n"
+                + "        enabled: true\n"
+                + "        member-list:\n"
+                + "          - localhost\n"
+                + "    port:\n"
+                + "      auto-increment: true\n"
+                + "      port-count: 100\n"
+                + "      port: 5801\n"
+                + "\n"
+                + "  properties:\n"
+                + "    hazelcast.invocation.max.retry.count: 200\n"
+                + "    hazelcast.tcp.join.port.try.count: 30\n"
+                + "    hazelcast.invocation.retry.pause.millis: 2000\n"
+                + "    
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+                + "    hazelcast.logging.type: log4j2\n"
+                + "    hazelcast.operation.generic.thread.count: 200\n"
+                + "  member-attributes:\n"
+                + "    group:\n"
+                + "      type: string\n"
+                + "      value: platform\n"
+                + "    team:\n"
+                + "      type: string\n"
+                + "      value: team1";
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index 0d0f8c8054..f357a690da 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -85,6 +85,7 @@ public class WorkerProfile implements 
IdentifiedDataSerializable {
             out.writeObject(unassignedSlot);
         }
         out.writeBoolean(dynamicSlot);
+        out.writeObject(attributes);
     }
 
     @Override
@@ -103,5 +104,6 @@ public class WorkerProfile implements 
IdentifiedDataSerializable {
             unassignedSlots[i] = in.readObject();
         }
         dynamicSlot = in.readBoolean();
+        attributes = in.readObject();
     }
 }

Reply via email to