This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a6e5850494 [Improve][Test] Add test for ResourceManager to keep task 
will be deployed in different node (#5518)
a6e5850494 is described below

commit a6e58504945cac5d2e47f63ed97ebfa8b55367ce
Author: Jia Fan <[email protected]>
AuthorDate: Sat Oct 7 20:58:25 2023 +0800

    [Improve][Test] Add test for ResourceManager to keep task will be deployed 
in different node (#5518)
    
    * [Improve][Test] Add test for ResourceManager to keep task will be 
deployed in different node
---
 .../resourcemanager/AbstractResourceManager.java   |  8 +-
 .../resourcemanager/ResourceRequestHandler.java    |  3 +-
 .../resourcemanager/worker/WorkerProfile.java      |  2 +
 .../resourcemanager/FakeResourceManager.java       | 93 ++++++++++++++++++++++
 .../ResourceManagerFunctionTest.java               | 56 +++++++++++++
 5 files changed, 155 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index cf4f4e6e82..8b7e0b1864 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -29,10 +29,8 @@ import 
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.cluster.Member;
 import com.hazelcast.internal.services.MembershipServiceEvent;
-import com.hazelcast.spi.impl.InternalCompletableFuture;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.Operation;
-import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
@@ -75,7 +73,7 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
                         .map(Member::getAddress)
                         .collect(Collectors.toList());
         log.info("initWorker live nodes: " + aliveWorker);
-        List<InternalCompletableFuture<Void>> futures =
+        List<CompletableFuture<Void>> futures =
                 aliveWorker.stream()
                         .map(
                                 worker ->
@@ -86,7 +84,7 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
                                                                     worker, 
(WorkerProfile) p);
                                                         }))
                         .collect(Collectors.toList());
-        futures.forEach(InternalCompletableFuture::join);
+        futures.forEach(CompletableFuture::join);
         log.info("registerWorker: " + registerWorker);
     }
 
@@ -155,7 +153,7 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
         isRunning = false;
     }
 
-    protected <E> InvocationFuture<E> sendToMember(Operation operation, 
Address address) {
+    protected <E> CompletableFuture<E> sendToMember(Operation operation, 
Address address) {
         return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, 
address);
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index be5993997c..680aa1c07c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -27,7 +27,6 @@ import 
org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
-import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -139,7 +138,7 @@ public class ResourceRequestHandler {
 
     private CompletableFuture<SlotAndWorkerProfile> 
singleResourceRequestToMember(
             int i, ResourceProfile r, WorkerProfile workerProfile) {
-        InvocationFuture<SlotAndWorkerProfile> future =
+        CompletableFuture<SlotAndWorkerProfile> future =
                 resourceManager.sendToMember(
                         new RequestSlotOperation(jobId, r), 
workerProfile.getAddress());
         return future.whenComplete(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index e8a73d3383..836b25201e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -25,6 +25,7 @@ import com.hazelcast.cluster.Address;
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.IOException;
@@ -33,6 +34,7 @@ import java.io.IOException;
  * Used to describe the status of the current Worker, including address and 
resource assign status
  */
 @Data
+@AllArgsConstructor
 public class WorkerProfile implements IdentifiedDataSerializable {
 
     private Address address;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
new file mode 100644
index 0000000000..9c8595e167
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
+
+/** Used to test ResourceManager, override init method to register more 
workers. */
+public class FakeResourceManager extends AbstractResourceManager {
+    public FakeResourceManager(NodeEngine nodeEngine) {
+        super(nodeEngine);
+        init();
+    }
+
+    @Override
+    public void init() {
+        try {
+            Address address1 = new Address("localhost", 5801);
+            WorkerProfile workerProfile1 =
+                    new WorkerProfile(
+                            address1,
+                            new ResourceProfile(),
+                            new ResourceProfile(),
+                            new SlotProfile[] {},
+                            new SlotProfile[] {});
+            this.registerWorker.put(address1, workerProfile1);
+
+            Address address2 = new Address("localhost", 5802);
+            WorkerProfile workerProfile2 =
+                    new WorkerProfile(
+                            address2,
+                            new ResourceProfile(),
+                            new ResourceProfile(),
+                            new SlotProfile[] {},
+                            new SlotProfile[] {});
+            this.registerWorker.put(address2, workerProfile2);
+            Address address3 = new Address("localhost", 5803);
+            WorkerProfile workerProfile3 =
+                    new WorkerProfile(
+                            address3,
+                            new ResourceProfile(),
+                            new ResourceProfile(),
+                            new SlotProfile[] {},
+                            new SlotProfile[] {});
+            this.registerWorker.put(address3, workerProfile3);
+        } catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected <E> CompletableFuture<E> sendToMember(Operation operation, 
Address address) {
+        if (operation instanceof RequestSlotOperation) {
+            return (CompletableFuture<E>)
+                    CompletableFuture.completedFuture(
+                            new SlotAndWorkerProfile(
+                                    new WorkerProfile(
+                                            address,
+                                            new ResourceProfile(),
+                                            new ResourceProfile(),
+                                            new SlotProfile[] {},
+                                            new SlotProfile[] {}),
+                                    new SlotProfile(address, 1, new 
ResourceProfile(), "")));
+        } else {
+            return super.sendToMember(operation, address);
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFunctionTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFunctionTest.java
new file mode 100644
index 0000000000..acb4237f07
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFunctionTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.cluster.Address;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class ResourceManagerFunctionTest
+        extends AbstractSeaTunnelServerTest<ResourceManagerFunctionTest> {
+
+    @Test
+    public void testApplyResourceWithRandomResult()
+            throws ExecutionException, InterruptedException {
+        FakeResourceManager resourceManager = new 
FakeResourceManager(nodeEngine);
+
+        List<ResourceProfile> resourceProfiles = new ArrayList<>();
+        resourceProfiles.add(new ResourceProfile());
+        resourceProfiles.add(new ResourceProfile());
+        resourceProfiles.add(new ResourceProfile());
+        resourceProfiles.add(new ResourceProfile());
+        resourceProfiles.add(new ResourceProfile());
+        List<SlotProfile> slotProfiles = resourceManager.applyResources(1L, 
resourceProfiles).get();
+        Assertions.assertEquals(slotProfiles.size(), 5);
+
+        Set<Address> addresses =
+                
slotProfiles.stream().map(SlotProfile::getWorker).collect(Collectors.toSet());
+        Assertions.assertTrue(addresses.size() > 1);
+    }
+}

Reply via email to