This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a66923a93 [Bug][Engine Server] fix checkpoint stuck error (#3213)
a66923a93 is described below
commit a66923a93a6b0d4fa6543991cc7ad990b31c3666
Author: Eric <[email protected]>
AuthorDate: Mon Oct 31 12:53:35 2022 +0800
[Bug][Engine Server] fix checkpoint stuck error (#3213)
* add debug logs to PipelineBaseScheduler
* [Engine] [Log] add apply resource log
* [Engine] [Resource] fix resource active check bug
* [Engine] [Test] Add operation retry count
---
.../src/test/resources/log4j.properties | 1 +
.../apache/seatunnel/engine/common/Constant.java | 2 +-
.../server/checkpoint/CheckpointCoordinator.java | 1 +
.../engine/server/execution/TaskGroupLocation.java | 9 +++++
.../resourcemanager/AbstractResourceManager.java | 15 ++++++--
.../server/scheduler/PipelineBaseScheduler.java | 43 ++++++++++++----------
6 files changed, 47 insertions(+), 24 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties
index c4398d09b..8757e3b86 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j.properties
@@ -22,3 +22,4 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
%c{1}: %m%n
log4j.logger.org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator=DEBUG
+log4j.logger.org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler=DEBUG
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 37040a6ef..f7534884e 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -36,7 +36,7 @@ public class Constant {
public static final String HAZELCAST_SEATUNNEL_DEFAULT_YAML =
"seatunnel.yaml";
- public static final int OPERATION_RETRY_TIME = 5;
+ public static final int OPERATION_RETRY_TIME = 10;
public static final int OPERATION_RETRY_SLEEP = 2000;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index dd82f64c2..fc2ec8a15 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -381,6 +381,7 @@ public class CheckpointCoordinator {
);
// TODO: clear related future & scheduler task
pendingCheckpoints.clear();
+ pendingCounter.set(0);
scheduler.shutdownNow();
scheduler = Executors.newScheduledThreadPool(
1, runnable -> {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
index 250846af5..ef453cf1a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
@@ -52,4 +52,13 @@ public class TaskGroupLocation implements Serializable {
public int hashCode() {
return new HashCodeBuilder(17,
37).append(jobId).append(pipelineId).append(taskGroupId).toHashCode();
}
+
+ @Override
+ public String toString() {
+ return "TaskGroupLocation{" +
+ "jobId=" + jobId +
+ ", pipelineId=" + pipelineId +
+ ", taskGroupId=" + taskGroupId +
+ '}';
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index e13c02807..3ad2c5dc2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -170,9 +170,18 @@ public abstract class AbstractResourceManager implements
ResourceManager {
@Override
public boolean slotActiveCheck(SlotProfile profile) {
- return registerWorker.values().stream()
- .flatMap(workerProfile ->
Arrays.stream(workerProfile.getAssignedSlots()))
- .anyMatch(s -> s.getSlotID() == profile.getSlotID());
+ boolean active = false;
+ if (registerWorker.containsKey(profile.getWorker())) {
+ active =
Arrays.stream(registerWorker.get(profile.getWorker()).getAssignedSlots())
+ .allMatch(s -> s.getSlotID() == profile.getSlotID());
+ }
+
+ if (!active) {
+ LOGGER.info("received slot active check failed, profile: " +
profile);
+ } else {
+ LOGGER.fine("received slot active check success, profile: " +
profile);
+ }
+ return active;
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index c0e95bf0c..7fdfe323f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -31,9 +31,8 @@ import
org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
@@ -43,8 +42,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+@Slf4j
public class PipelineBaseScheduler implements JobScheduler {
- private static final ILogger LOGGER =
Logger.getLogger(PipelineBaseScheduler.class);
private final PhysicalPlan physicalPlan;
private final long jobId;
@@ -91,6 +90,8 @@ public class PipelineBaseScheduler implements JobScheduler {
Map<TaskGroupLocation, SlotProfile> slotProfiles =
getOrApplyResourceForPipeline(pipeline,
jobMaster.getOwnedSlotProfiles(pipeline.getPipelineLocation()));
+ log.debug("slotProfiles: {}", slotProfiles);
+
// To ensure release pipeline resource after new master node
active, we need store slotProfiles first and then deploy tasks.
jobMaster.setOwnedSlotProfiles(pipeline.getPipelineLocation(),
slotProfiles);
// deploy pipeline
@@ -125,14 +126,22 @@ public class PipelineBaseScheduler implements
JobScheduler {
private SlotProfile getOrApplyResourceForTask(@NonNull PhysicalVertex task,
Map<TaskGroupLocation,
SlotProfile> ownedSlotProfiles) {
+ SlotProfile oldProfile;
if (ownedSlotProfiles == null
|| ownedSlotProfiles.isEmpty()
- || ownedSlotProfiles.get(task.getTaskGroupLocation()) == null
- ||
!resourceManager.slotActiveCheck(ownedSlotProfiles.get(task.getTaskGroupLocation())))
{
- return applyResourceForTask(task).join();
+ || ownedSlotProfiles.get(task.getTaskGroupLocation()) == null) {
+ oldProfile = null;
+ } else {
+ oldProfile = ownedSlotProfiles.get(task.getTaskGroupLocation());
+ }
+ if (oldProfile == null ||
!resourceManager.slotActiveCheck(oldProfile)) {
+ SlotProfile newProfile = applyResourceForTask(task).join();
+ log.info(String.format("use new profile: %s to replace not active
profile: %s for task %s", newProfile, oldProfile, task));
+ return newProfile;
}
+ log.info(String.format("use active old profile: %s for task %s",
oldProfile, task));
task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
- return ownedSlotProfiles.get(task.getTaskGroupLocation());
+ return oldProfile;
}
private Map<TaskGroupLocation, SlotProfile>
applyResourceForPipeline(@NonNull SubPlan subPlan) {
@@ -160,9 +169,7 @@ public class PipelineBaseScheduler implements JobScheduler {
return resourceManager.applyResource(jobId, new
ResourceProfile());
} else if
(ExecutionState.CANCELING.equals(task.getExecutionState()) ||
ExecutionState.CANCELED.equals(task.getExecutionState())) {
- LOGGER.info(
- String.format("%s be canceled, skip %s this task.",
task.getTaskFullName(),
- ExecutionState.SCHEDULED));
+ log.info("{} be canceled, skip {} this task.",
task.getTaskFullName(), ExecutionState.SCHEDULED);
return null;
} else {
makeTaskFailed(task.getTaskGroupLocation(),
@@ -184,8 +191,7 @@ public class PipelineBaseScheduler implements JobScheduler {
});
} else if (ExecutionState.CANCELING.equals(task.getExecutionState()) ||
ExecutionState.CANCELED.equals(task.getExecutionState())) {
- LOGGER.info(
- String.format("%s be canceled, skip %s this task.",
task.getTaskFullName(), ExecutionState.DEPLOYING));
+ log.info("{} be canceled, skip {} this task.",
task.getTaskFullName(), ExecutionState.DEPLOYING);
return null;
} else {
jobMaster.updateTaskExecutionState(
@@ -218,9 +224,7 @@ public class PipelineBaseScheduler implements JobScheduler {
deployCoordinatorFuture.toArray(new CompletableFuture[0]));
voidCompletableFuture.get();
if (!pipeline.updatePipelineState(PipelineStatus.DEPLOYING,
PipelineStatus.RUNNING)) {
- LOGGER.info(
- String.format("%s turn to state %s, skip the running
state.", pipeline.getPipelineFullName(),
- pipeline.getPipelineState()));
+ log.info("{} turn to state {}, skip the running state.",
pipeline.getPipelineFullName(), pipeline.getPipelineState());
}
} catch (Exception e) {
makePipelineFailed(pipeline, e);
@@ -228,8 +232,8 @@ public class PipelineBaseScheduler implements JobScheduler {
} else if
(PipelineStatus.CANCELING.equals(pipeline.getPipelineState()) ||
PipelineStatus.CANCELED.equals(pipeline.getPipelineState())) {
// may be canceled
- LOGGER.info(String.format("%s turn to state %s, skip %s this
pipeline.", pipeline.getPipelineFullName(),
- pipeline.getPipelineState(), PipelineStatus.DEPLOYING));
+ log.info("{} turn to state {}, skip {} this pipeline.",
pipeline.getPipelineFullName(),
+ pipeline.getPipelineState(), PipelineStatus.DEPLOYING);
} else {
makePipelineFailed(pipeline, new JobException(
String.format("%s turn to a unexpected state: %s, stop
scheduler job", pipeline.getPipelineFullName(),
@@ -246,9 +250,8 @@ public class PipelineBaseScheduler implements JobScheduler {
if (PipelineStatus.CANCELING.equals(pipeline.getPipelineState()) ||
PipelineStatus.CANCELED.equals(pipeline.getPipelineState())) {
// may be canceled
- LOGGER.info(
- String.format("%s turn to state %s, skip %s this pipeline.",
pipeline.getPipelineFullName(),
- pipeline.getPipelineState(), targetState));
+ log.info("{} turn to state {}, skip {} this pipeline.",
pipeline.getPipelineFullName(),
+ pipeline.getPipelineState(), targetState);
} else {
throw new JobException(
String.format("%s turn to a unexpected state: %s, stop
scheduler job",