This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ce426fb0f6 [fix][zeta] fix can't release resource issue (#6763)
ce426fb0f6 is described below
commit ce426fb0f693d33a94fc54c966eb4f9c6913bdbe
Author: Jarvis <[email protected]>
AuthorDate: Sun Apr 28 21:39:30 2024 +0800
[fix][zeta] fix can't release resource issue (#6763)
---
.../seatunnel/engine/e2e/JobClientJobProxyIT.java | 2 +-
.../engine/server/dag/physical/ResourceUtils.java | 22 ++++--
.../engine/server/dag/physical/SubPlan.java | 6 +-
.../engine/server/AbstractSeaTunnelServerTest.java | 2 +-
.../server/checkpoint/CheckpointStorageTest.java | 4 +-
.../resourcemanager/FixSlotResourceTest.java | 87 ++++++++++++++++++++++
6 files changed, 107 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index 84d0e51608..0bdf18c68e 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -63,6 +63,6 @@ public class JobClientJobProxyIT extends SeaTunnelContainer {
&& execResult
.getStderr()
.contains(
-
"org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException:
can't apply resource request"));
+
"org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException"));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
index 1bacda0326..c6bbc48b89 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/ResourceUtils.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import
org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -27,10 +28,11 @@ import lombok.NonNull;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
public class ResourceUtils {
- public static Map<TaskGroupLocation, SlotProfile> applyResourceForPipeline(
+ public static void applyResourceForPipeline(
@NonNull ResourceManager resourceManager, @NonNull SubPlan
subPlan) {
Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new
HashMap<>();
Map<TaskGroupLocation, SlotProfile> slotProfiles = new HashMap<>();
@@ -49,12 +51,20 @@ public class ResourceUtils {
task.getTaskGroupLocation(),
applyResourceForTask(resourceManager,
task)));
- for (Map.Entry<TaskGroupLocation, CompletableFuture<SlotProfile>>
future :
- futures.entrySet()) {
- slotProfiles.put(
- future.getKey(), future.getValue() == null ? null :
future.getValue().join());
+ futures.forEach(
+ (key, value) -> {
+ try {
+ slotProfiles.put(key, value == null ? null :
value.join());
+ } catch (CompletionException e) {
+ // do nothing
+ }
+ });
+ // set it first, avoid can't get it when get resource not enough
exception and need release
+ // applied resource
+
subPlan.getJobMaster().setOwnedSlotProfiles(subPlan.getPipelineLocation(),
slotProfiles);
+ if (futures.size() != slotProfiles.size()) {
+ throw new NoEnoughResourceException();
}
- return slotProfiles;
}
public static CompletableFuture<SlotProfile> applyResourceForTask(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 571e0f7125..f845294bbb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -597,15 +597,11 @@ public class SubPlan {
break;
case SCHEDULED:
try {
- slotProfiles =
- ResourceUtils.applyResourceForPipeline(
- jobMaster.getResourceManager(), this);
+
ResourceUtils.applyResourceForPipeline(jobMaster.getResourceManager(), this);
log.debug(
"slotProfiles: {}, PipelineLocation: {}",
slotProfiles,
this.getPipelineLocation());
- // sead slot information to JobMaster
- jobMaster.setOwnedSlotProfiles(pipelineLocation,
slotProfiles);
updatePipelineState(PipelineStatus.DEPLOYING);
} catch (Exception e) {
makePipelineFailing(e);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 2710c2cbd7..234fd20c8b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -81,7 +81,7 @@ public abstract class AbstractSeaTunnelServerTest<T extends
AbstractSeaTunnelSer
Config hazelcastConfig = Config.loadFromString(yaml);
hazelcastConfig.setClusterName(
TestUtils.getClusterName("AbstractSeaTunnelServerTest_" +
name));
- SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
instance =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
nodeEngine = instance.node.nodeEngine;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
index 13d86d011a..4e72114e25 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -48,7 +48,7 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
public SeaTunnelConfig loadSeaTunnelConfig() {
SeaTunnelConfig seaTunnelConfig = super.loadSeaTunnelConfig();
CheckpointConfig checkpointConfig =
seaTunnelConfig.getEngineConfig().getCheckpointConfig();
- // set a bigger interval in here and config file to avoid auto trigger
checkpoint affect
+ // set a big interval in here and config file to avoid auto trigger
checkpoint affect
// test result
checkpointConfig.setCheckpointInterval(Integer.MAX_VALUE);
seaTunnelConfig.getEngineConfig().setCheckpointConfig(checkpointConfig);
@@ -61,7 +61,6 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
long jobId = System.currentTimeMillis();
CheckpointConfig checkpointConfig =
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
-
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
CheckpointStorage checkpointStorage =
FactoryUtil.discoverFactory(
@@ -96,7 +95,6 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
long jobId = System.currentTimeMillis();
CheckpointConfig checkpointConfig =
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
-
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
CheckpointStorage checkpointStorage =
FactoryUtil.discoverFactory(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
new file mode 100644
index 0000000000..6c26f2398c
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+public class FixSlotResourceTest extends
AbstractSeaTunnelServerTest<FixSlotResourceTest> {
+
+ @Override
+ public SeaTunnelConfig loadSeaTunnelConfig() {
+ SeaTunnelConfig seaTunnelConfig = super.loadSeaTunnelConfig();
+ SlotServiceConfig slotServiceConfig =
+ seaTunnelConfig.getEngineConfig().getSlotServiceConfig();
+ slotServiceConfig.setDynamicSlot(false);
+ slotServiceConfig.setSlotNum(3);
+
seaTunnelConfig.getEngineConfig().setSlotServiceConfig(slotServiceConfig);
+ return seaTunnelConfig;
+ }
+
+ @Test
+ public void testEnoughResource() throws ExecutionException,
InterruptedException {
+ long jobId = System.currentTimeMillis();
+ List<ResourceProfile> resourceProfiles = new ArrayList<>();
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ List<SlotProfile> slotProfiles =
+ server.getCoordinatorService()
+ .getResourceManager()
+ .applyResources(jobId, resourceProfiles)
+ .get();
+ Assertions.assertEquals(slotProfiles.size(), 3);
+
server.getCoordinatorService().getResourceManager().releaseResources(jobId,
slotProfiles);
+ }
+
+ @Test
+ public void testNotEnoughResource() throws ExecutionException,
InterruptedException {
+ long jobId = System.currentTimeMillis();
+ List<ResourceProfile> resourceProfiles = new ArrayList<>();
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ try {
+ server.getCoordinatorService()
+ .getResourceManager()
+ .applyResources(jobId, resourceProfiles)
+ .get();
+ } catch (ExecutionException e) {
+
Assertions.assertTrue(e.getMessage().contains("NoEnoughResourceException"));
+ }
+ resourceProfiles.remove(0);
+ List<SlotProfile> slotProfiles =
+ server.getCoordinatorService()
+ .getResourceManager()
+ .applyResources(jobId, resourceProfiles)
+ .get();
+ Assertions.assertEquals(slotProfiles.size(), 3);
+
server.getCoordinatorService().getResourceManager().releaseResources(jobId,
slotProfiles);
+ }
+}