This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a6e5850494 [Improve][Test] Add test for ResourceManager to keep task
will be deployed in different node (#5518)
a6e5850494 is described below
commit a6e58504945cac5d2e47f63ed97ebfa8b55367ce
Author: Jia Fan <[email protected]>
AuthorDate: Sat Oct 7 20:58:25 2023 +0800
[Improve][Test] Add test for ResourceManager to keep task will be deployed
in different node (#5518)
* [Improve][Test] Add test for ResourceManager to keep task will be
deployed in different node
---
.../resourcemanager/AbstractResourceManager.java | 8 +-
.../resourcemanager/ResourceRequestHandler.java | 3 +-
.../resourcemanager/worker/WorkerProfile.java | 2 +
.../resourcemanager/FakeResourceManager.java | 93 ++++++++++++++++++++++
.../ResourceManagerFunctionTest.java | 56 +++++++++++++
5 files changed, 155 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index cf4f4e6e82..8b7e0b1864 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -29,10 +29,8 @@ import
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.internal.services.MembershipServiceEvent;
-import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
-import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
@@ -75,7 +73,7 @@ public abstract class AbstractResourceManager implements
ResourceManager {
.map(Member::getAddress)
.collect(Collectors.toList());
log.info("initWorker live nodes: " + aliveWorker);
- List<InternalCompletableFuture<Void>> futures =
+ List<CompletableFuture<Void>> futures =
aliveWorker.stream()
.map(
worker ->
@@ -86,7 +84,7 @@ public abstract class AbstractResourceManager implements
ResourceManager {
worker,
(WorkerProfile) p);
}))
.collect(Collectors.toList());
- futures.forEach(InternalCompletableFuture::join);
+ futures.forEach(CompletableFuture::join);
log.info("registerWorker: " + registerWorker);
}
@@ -155,7 +153,7 @@ public abstract class AbstractResourceManager implements
ResourceManager {
isRunning = false;
}
- protected <E> InvocationFuture<E> sendToMember(Operation operation,
Address address) {
+ protected <E> CompletableFuture<E> sendToMember(Operation operation,
Address address) {
return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation,
address);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index be5993997c..680aa1c07c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -27,7 +27,6 @@ import
org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
import com.hazelcast.cluster.Address;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
-import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.util.ArrayList;
import java.util.Arrays;
@@ -139,7 +138,7 @@ public class ResourceRequestHandler {
private CompletableFuture<SlotAndWorkerProfile>
singleResourceRequestToMember(
int i, ResourceProfile r, WorkerProfile workerProfile) {
- InvocationFuture<SlotAndWorkerProfile> future =
+ CompletableFuture<SlotAndWorkerProfile> future =
resourceManager.sendToMember(
new RequestSlotOperation(jobId, r),
workerProfile.getAddress());
return future.whenComplete(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index e8a73d3383..836b25201e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -25,6 +25,7 @@ import com.hazelcast.cluster.Address;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.IOException;
@@ -33,6 +34,7 @@ import java.io.IOException;
* Used to describe the status of the current Worker, including address and
resource assign status
*/
@Data
+@AllArgsConstructor
public class WorkerProfile implements IdentifiedDataSerializable {
private Address address;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
new file mode 100644
index 0000000000..9c8595e167
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
+
+/** Used to test ResourceManager, override init method to register more
workers. */
+public class FakeResourceManager extends AbstractResourceManager {
+ public FakeResourceManager(NodeEngine nodeEngine) {
+ super(nodeEngine);
+ init();
+ }
+
+ @Override
+ public void init() {
+ try {
+ Address address1 = new Address("localhost", 5801);
+ WorkerProfile workerProfile1 =
+ new WorkerProfile(
+ address1,
+ new ResourceProfile(),
+ new ResourceProfile(),
+ new SlotProfile[] {},
+ new SlotProfile[] {});
+ this.registerWorker.put(address1, workerProfile1);
+
+ Address address2 = new Address("localhost", 5802);
+ WorkerProfile workerProfile2 =
+ new WorkerProfile(
+ address2,
+ new ResourceProfile(),
+ new ResourceProfile(),
+ new SlotProfile[] {},
+ new SlotProfile[] {});
+ this.registerWorker.put(address2, workerProfile2);
+ Address address3 = new Address("localhost", 5803);
+ WorkerProfile workerProfile3 =
+ new WorkerProfile(
+ address3,
+ new ResourceProfile(),
+ new ResourceProfile(),
+ new SlotProfile[] {},
+ new SlotProfile[] {});
+ this.registerWorker.put(address3, workerProfile3);
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected <E> CompletableFuture<E> sendToMember(Operation operation,
Address address) {
+ if (operation instanceof RequestSlotOperation) {
+ return (CompletableFuture<E>)
+ CompletableFuture.completedFuture(
+ new SlotAndWorkerProfile(
+ new WorkerProfile(
+ address,
+ new ResourceProfile(),
+ new ResourceProfile(),
+ new SlotProfile[] {},
+ new SlotProfile[] {}),
+ new SlotProfile(address, 1, new
ResourceProfile(), "")));
+ } else {
+ return super.sendToMember(operation, address);
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFunctionTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFunctionTest.java
new file mode 100644
index 0000000000..acb4237f07
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerFunctionTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.cluster.Address;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class ResourceManagerFunctionTest
+ extends AbstractSeaTunnelServerTest<ResourceManagerFunctionTest> {
+
+ @Test
+ public void testApplyResourceWithRandomResult()
+ throws ExecutionException, InterruptedException {
+ FakeResourceManager resourceManager = new
FakeResourceManager(nodeEngine);
+
+ List<ResourceProfile> resourceProfiles = new ArrayList<>();
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ List<SlotProfile> slotProfiles = resourceManager.applyResources(1L,
resourceProfiles).get();
+ Assertions.assertEquals(slotProfiles.size(), 5);
+
+ Set<Address> addresses =
+
slotProfiles.stream().map(SlotProfile::getWorker).collect(Collectors.toSet());
+ Assertions.assertTrue(addresses.size() > 1);
+ }
+}