This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2e92c987e [Pom]update version to 2.3.3-SNAPSHOT (#5043)
2e92c987e is described below
commit 2e92c987e4d4a9e7bf2da520ad73c375d23e3570
Author: Eric <[email protected]>
AuthorDate: Wed Jul 19 18:56:57 2023 +0800
[Pom]update version to 2.3.3-SNAPSHOT (#5043)
* update version to 2.3.3-SNAPSHOT
* update dependency version in know dependencies file
* Add logs to find job restore from master active switch error
---
pom.xml | 2 +-
.../engine/server/checkpoint/CheckpointManager.java | 4 ++++
.../engine/server/dag/physical/PhysicalVertex.java | 2 +-
.../seatunnel/engine/server/dag/physical/SubPlan.java | 15 ++++++++++++---
tools/dependencies/known-dependencies.txt | 4 ++--
5 files changed, 20 insertions(+), 7 deletions(-)
diff --git a/pom.xml b/pom.xml
index 51b03a26d..7dce624be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
<properties>
<!--todo The classification is too confusing, reclassify by type-->
- <revision>2.3.2-SNAPSHOT</revision>
+ <revision>2.3.3-SNAPSHOT</revision>
<seatunnel.config.shade.version>2.1.1</seatunnel.config.shade.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 9f9649f03..f34ae2f6a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -47,6 +47,7 @@ import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;
+import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -167,6 +168,9 @@ public class CheckpointManager {
}
public void reportedPipelineRunning(int pipelineId, boolean
alreadyStarted) {
+ log.info(
+ "reported pipeline running stack: "
+ +
Arrays.toString(Thread.currentThread().getStackTrace()));
getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 656664135..3c840a269 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -394,7 +394,7 @@ public class PhysicalVertex {
public boolean updateTaskState(
@NonNull ExecutionState current, @NonNull ExecutionState
targetState) {
synchronized (this) {
- LOGGER.fine(
+ LOGGER.info(
String.format(
"Try to update the task %s state from %s to %s",
taskFullName, current, targetState));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index bc9e3e2aa..83dd4e9d0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -53,7 +53,7 @@ public class SubPlan {
private static final ILogger LOGGER = Logger.getLogger(SubPlan.class);
/** The max num pipeline can restore. */
- public static final int PIPELINE_MAX_RESTORE_NUM = 2; // TODO should set
by config
+ public static final int PIPELINE_MAX_RESTORE_NUM = 3; // TODO should set
by config
private final List<PhysicalVertex> physicalVertexList;
@@ -332,6 +332,9 @@ public class SubPlan {
exception ->
ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
this.currPipelineStatus = endState;
+ LOGGER.info(
+ String.format(
+ "%s turn to end state %s.", pipelineFullName,
currPipelineStatus));
}
}
@@ -511,11 +514,17 @@ public class SubPlan {
LOGGER.severe(message);
throw new IllegalStateException(message);
}
-
+ LOGGER.info(
+ String.format(
+ "Reset pipeline %s state to %s",
+ getPipelineFullName(),
PipelineStatus.CREATED));
updateStateTimestamps(PipelineStatus.CREATED);
runningJobStateIMap.set(pipelineLocation,
PipelineStatus.CREATED);
this.currPipelineStatus = PipelineStatus.CREATED;
- ;
+ LOGGER.info(
+ String.format(
+ "Reset pipeline %s state to %s complete",
+ getPipelineFullName(),
PipelineStatus.CREATED));
return null;
},
new RetryUtils.RetryMaterial(
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 3a1e736b6..70bbd1c0d 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -22,8 +22,8 @@ protostuff-collectionschema-1.8.0.jar
protostuff-core-1.8.0.jar
protostuff-runtime-1.8.0.jar
scala-library-2.11.12.jar
-seatunnel-jackson-2.3.2-SNAPSHOT-optional.jar
-seatunnel-guava-2.3.2-SNAPSHOT-optional.jar
+seatunnel-jackson-2.3.3-SNAPSHOT-optional.jar
+seatunnel-guava-2.3.3-SNAPSHOT-optional.jar
slf4j-api-1.7.25.jar
jsqlparser-4.5.jar
animal-sniffer-annotations-1.17.jar