This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch eventmesh-function
in repository https://gitbox.apache.org/repos/asf/eventmesh.git

commit 8b361778c34ad7e7df95daa55f1741700176d401
Author: mike_xwm <[email protected]>
AuthorDate: Mon Jul 1 09:38:27 2024 +0800

    [ISSUE #4979] Canal Connector supports bidirectional data synchronization 
(#5011)
    
    * [ISSUE #4979]Canal Connector supports bidirectional data synchronization
    
    * add bash files for admin & runtime-v2
    
    * fix ack offset read & persist
    
    * fix checkStyle error
---
 build.gradle                                       |   7 +-
 eventmesh-admin-server/bin/start-admin.sh          | 201 +++++++++++++++++++++
 .../server/web/db/entity/EventMeshJobDetail.java   |   3 +-
 .../position/EventMeshPositionBizService.java      |   6 +-
 .../service/position/IFetchPositionHandler.java    |   4 +-
 .../position/impl/MysqlPositionHandler.java        |  16 +-
 .../src/main/resources/application.yaml            |   4 +-
 .../connector/rdb/canal/CanalSinkConfig.java       |  15 +-
 .../connector/rdb/canal/CanalSourceConfig.java     |  28 +--
 .../common/remote/response/FetchJobResponse.java   |   3 +-
 .../remote/response/FetchPositionResponse.java     |   6 +-
 .../eventmesh-connector-canal/build.gradle         |   1 -
 .../connector/canal/dialect/MysqlDialect.java      |  22 +--
 .../interceptor/SqlBuilderLoadInterceptor.java     |  13 +-
 .../connector/canal/source/EntryParser.java        | 110 +++++------
 .../source/connector/CanalSourceConnector.java     |  67 ++++---
 .../canal/template/AbstractSqlTemplate.java        |  10 +-
 .../connector/canal/template/MysqlSqlTemplate.java |   2 +-
 .../api/connector/SourceConnectorContext.java      |   6 +
 .../offsetmgmt/admin/AdminOffsetService.java       |  23 ++-
 .../eventmesh-registry-nacos/gradle.properties     |  18 +-
 eventmesh-runtime-v2/bin/start-v2.sh               | 200 ++++++++++++++++++++
 eventmesh-runtime-v2/bin/stop-v2.sh                |  88 +++++++++
 .../runtime/connector/ConnectorRuntime.java        |   5 +
 24 files changed, 688 insertions(+), 170 deletions(-)

diff --git a/build.gradle b/build.gradle
index 5aafc4c93..df86a4956 100644
--- a/build.gradle
+++ b/build.gradle
@@ -162,9 +162,13 @@ tasks.register('dist') {
             ["eventmesh-common",
              "eventmesh-meta:eventmesh-meta-api",
              "eventmesh-metrics-plugin:eventmesh-metrics-api",
+             "eventmesh-openconnect:eventmesh-openconnect-java",
+             
"eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api",
              "eventmesh-protocol-plugin:eventmesh-protocol-api",
+             "eventmesh-registry:eventmesh-registry-api",
              "eventmesh-retry:eventmesh-retry-api",
              "eventmesh-runtime",
+             "eventmesh-runtime-v2",
              "eventmesh-security-plugin:eventmesh-security-api",
              "eventmesh-spi",
              "eventmesh-starter",
@@ -660,6 +664,7 @@ subprojects {
 
     dependencyManagement {
         dependencies {
+
             dependency "org.apache.commons:commons-lang3:3.6"
             dependency "org.apache.commons:commons-collections4:4.4"
             dependency "org.apache.commons:commons-text:1.9"
@@ -709,7 +714,7 @@ subprojects {
             dependency "com.mebigfatguy.fb-contrib:fb-contrib:7.6.0"
             dependency "com.jayway.jsonpath:json-path:2.9.0"
 
-            dependency 
"org.springframework.boot:spring-boot-starter-web:2.7.18"
+            dependency "org.springframework.boot:spring-boot-starter-web:2.5.4"
             dependency "io.openmessaging:registry-server:0.0.1"
 
             dependency "org.junit.jupiter:junit-jupiter:5.6.0"
diff --git a/eventmesh-admin-server/bin/start-admin.sh 
b/eventmesh-admin-server/bin/start-admin.sh
new file mode 100644
index 000000000..93c364439
--- /dev/null
+++ b/eventmesh-admin-server/bin/start-admin.sh
@@ -0,0 +1,201 @@
+#!/bin/bash
+#
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#===========================================================================================
+# Java Environment Setting
+#===========================================================================================
+set -e
+# Server configuration may be inconsistent, add these configurations to avoid 
garbled code problems
+export LANG=en_US.UTF-8
+export LC_CTYPE=en_US.UTF-8
+export LC_ALL=en_US.UTF-8
+
+TMP_JAVA_HOME="/customize/your/java/home/here"
+
+# Detect operating system.
+OS=$(uname)
+
+function is_java8_or_11 {
+        local _java="$1"
+        [[ -x "$_java" ]] || return 1
+        [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" 
-version 2>&1)" =~ 'openjdk version "1.8' || "$("$_java" -version 2>&1)" =~ 
'java version "11' || "$("$_java" -version 2>&1)" =~ 'openjdk version "11' ]] 
|| return 2
+        return 0
+}
+
+function extract_java_version {
+    local _java="$1"
+    local version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}' 
| awk -F '.' '{if ($1 == 1 && $2 == 8) print "8"; else if ($1 == 11) print 
"11"; else print "unknown"}')
+    echo "$version"
+}
+
+# 0(not running),  1(is running)
+#function is_proxyRunning {
+#        local _pid="$1"
+#        local pid=`ps ax | grep -i 
'org.apache.eventmesh.runtime.boot.EventMeshStartup' |grep java | grep -v grep 
| awk '{print $1}'|grep $_pid`
+#        if [ -z "$pid" ] ; then
+#            return 0
+#        else
+#            return 1
+#        fi
+#}
+
+function get_pid {
+       local ppid=""
+       if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
+               ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
+               # If the process does not exist, it indicates that the previous 
process terminated abnormally.
+    if [ ! -d /proc/$ppid ]; then
+      # Remove the residual file.
+      rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
+      echo -e "ERROR\t EventMesh process had already terminated unexpectedly 
before, please check log output."
+      ppid=""
+    fi
+       else
+               if [[ $OS =~ Msys ]]; then
+                       # There is a Bug on Msys that may not be able to kill 
the identified process
+                       ppid=`jps -v | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v 
grep | awk -F ' ' {'print $1'}`
+               elif [[ $OS =~ Darwin ]]; then
+                       # Known problem: grep Java may not be able to 
accurately identify Java processes
+                       ppid=$(/bin/ps -o user,pid,command | grep "java" | grep 
-i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" 
|awk -F ' ' {'print $2'})
+               else
+                 if [ $DOCKER ]; then
+                   # No need to exclude root user in Docker containers.
+                   ppid=$(ps -C java -o user,pid,command --cols 99999 | grep 
-w $EVENTMESH_ADMIN_HOME | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print 
$2'})
+                 else
+        # It is required to identify the process as accurately as possible on 
Linux.
+        ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w 
$EVENTMESH_ADMIN_HOME | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk 
-F ' ' {'print $2'})
+      fi
+               fi
+       fi
+       echo "$ppid";
+}
+
+#===========================================================================================
+# Locate Java Executable
+#===========================================================================================
+
+if [[ -d "$TMP_JAVA_HOME" ]] && is_java8_or_11 "$TMP_JAVA_HOME/bin/java"; then
+        JAVA="$TMP_JAVA_HOME/bin/java"
+        JAVA_VERSION=$(extract_java_version "$TMP_JAVA_HOME/bin/java")
+elif [[ -d "$JAVA_HOME" ]] && is_java8_or_11 "$JAVA_HOME/bin/java"; then
+        JAVA="$JAVA_HOME/bin/java"
+        JAVA_VERSION=$(extract_java_version "$JAVA_HOME/bin/java")
+elif is_java8_or_11 "$(which java)"; then
+        JAVA="$(which java)"
+        JAVA_VERSION=$(extract_java_version "$(which java)")
+else
+        echo -e "ERROR\t Java 8 or 11 not found, operation abort."
+        exit 9;
+fi
+
+echo "EventMesh using Java version: $JAVA_VERSION, path: $JAVA"
+
+EVENTMESH_ADMIN_HOME=$(cd "$(dirname "$0")/.." && pwd)
+export EVENTMESH_ADMIN_HOME
+
+EVENTMESH_ADMIN_LOG_HOME="${EVENTMESH_ADMIN_HOME}/logs"
+export EVENTMESH_ADMIN_LOG_HOME
+
+echo -e "EVENTMESH_ADMIN_HOME : 
${EVENTMESH_ADMIN_HOME}\nEVENTMESH_ADMIN_LOG_HOME : ${EVENTMESH_ADMIN_LOG_HOME}"
+
+function make_logs_dir {
+        if [ ! -e "${EVENTMESH_ADMIN_LOG_HOME}" ]; then mkdir -p 
"${EVENTMESH_ADMIN_LOG_HOME}"; fi
+}
+
+error_exit ()
+{
+    echo -e "ERROR\t $1 !!"
+    exit 1
+}
+
+export JAVA_HOME
+
+#===========================================================================================
+# JVM Configuration
+#===========================================================================================
+#if [ $1 = "prd" -o $1 = "benchmark" ]; then JAVA_OPT="${JAVA_OPT} -server 
-Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
+#elif [ $1 = "sit" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms256M -Xmx512M 
-Xmn256m -XX:SurvivorRatio=4"
+#elif [ $1 = "dev" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms128M -Xmx256M 
-Xmn128m -XX:SurvivorRatio=4"
+#fi
+
+GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"
+
+#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m 
-XX:SurvivorRatio=4"
+JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep 
APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
+JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m 
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 
-XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
+JAVA_OPT="${JAVA_OPT} -verbose:gc"
+if [[ "$JAVA_VERSION" == "8" ]]; then
+    # Set JAVA_OPT for Java 8
+    JAVA_OPT="${JAVA_OPT} -Xloggc:${GC_LOG_FILE} -XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
+    JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
-XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
+elif [[ "$JAVA_VERSION" == "11" ]]; then
+    # Set JAVA_OPT for Java 11
+    XLOG_PARAM="time,level,tags:filecount=5,filesize=30m"
+    JAVA_OPT="${JAVA_OPT} -Xlog:gc*:${GC_LOG_FILE}:${XLOG_PARAM}"
+    JAVA_OPT="${JAVA_OPT} -Xlog:safepoint:${GC_LOG_FILE}:${XLOG_PARAM} 
-Xlog:ergo*=debug:${GC_LOG_FILE}:${XLOG_PARAM}"
+fi
+JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=${EVENTMESH_ADMIN_LOG_HOME} 
-XX:ErrorFile=${EVENTMESH_ADMIN_LOG_HOME}/hs_err_%p.log"
+JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
+JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
+JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G"
+JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
+JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced"
+JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled"
+JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom"
+JAVA_OPT="${JAVA_OPT} 
-Dlog4j.configurationFile=${EVENTMESH_ADMIN_HOME}/conf/log4j2.xml"
+JAVA_OPT="${JAVA_OPT} -Deventmesh.log.home=${EVENTMESH_ADMIN_LOG_HOME}"
+JAVA_OPT="${JAVA_OPT} -DconfPath=${EVENTMESH_ADMIN_HOME}/conf"
+JAVA_OPT="${JAVA_OPT} -DconfigurationPath=${EVENTMESH_ADMIN_HOME}/conf"
+JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard"
+JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
+JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
+
+#if [ -f "pid.file" ]; then
+#        pid=`cat pid.file`
+#        if ! is_proxyRunning "$pid"; then
+#            echo "proxy is running already"
+#            exit 9;
+#        else
+#          echo "err pid$pid, rm pid.file"
+#            rm pid.file
+#        fi
+#fi
+
+pid=$(get_pid)
+if [[ $pid == "ERROR"* ]]; then
+  echo -e "${pid}"
+  exit 9
+fi
+if [ -n "$pid" ]; then
+       echo -e "ERROR\t The server is already running (pid=$pid), there is no 
need to execute start.sh again."
+       exit 9
+fi
+
+make_logs_dir
+
+echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> 
${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
+
+EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
+if [ $DOCKER ]; then
+       $JAVA $JAVA_OPT -classpath 
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
 $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
+else
+       $JAVA $JAVA_OPT -classpath 
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
 $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
+echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
+fi
+exit 0
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
index b4a836e8b..849a90a88 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
@@ -21,6 +21,7 @@ import org.apache.eventmesh.common.remote.JobState;
 import org.apache.eventmesh.common.remote.job.JobTransportType;
 import org.apache.eventmesh.common.remote.offset.RecordPosition;
 
+import java.util.List;
 import java.util.Map;
 
 import lombok.Data;
@@ -42,7 +43,7 @@ public class EventMeshJobDetail {
 
     private String sinkConnectorDesc;
 
-    private RecordPosition position;
+    private List<RecordPosition> position;
 
     private JobState state;
 }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
index 7d6febdf4..d3b6ff555 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
@@ -25,6 +25,8 @@ import 
org.apache.eventmesh.common.remote.offset.RecordPosition;
 import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
 import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
 
+import java.util.List;
+
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -38,7 +40,7 @@ public class EventMeshPositionBizService {
     PositionHandlerFactory factory;
 
     // called isValidateReportRequest before call this
-    public RecordPosition getPosition(FetchPositionRequest request, Metadata 
metadata) {
+    public List<RecordPosition> getPosition(FetchPositionRequest request, 
Metadata metadata) {
         if (request == null) {
             return null;
         }
@@ -68,7 +70,7 @@ public class EventMeshPositionBizService {
         return handler.handler(request, metadata);
     }
 
-    public RecordPosition getPositionByJobID(Integer jobID, DataSourceType 
type) {
+    public List<RecordPosition> getPositionByJobID(Integer jobID, 
DataSourceType type) {
         if (jobID == null || type == null) {
             return null;
         }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
index 9a4c324dc..2c039062f 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
@@ -21,10 +21,12 @@ import 
org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
 import org.apache.eventmesh.common.remote.offset.RecordPosition;
 import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
 
+import java.util.List;
+
 /**
  * IFetchPositionHandler
  */
 public interface IFetchPositionHandler {
 
-    RecordPosition handler(FetchPositionRequest request, Metadata metadata);
+    List<RecordPosition> handler(FetchPositionRequest request, Metadata 
metadata);
 }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
index 623864fa6..525fe02c0 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
@@ -31,6 +31,7 @@ import 
org.apache.eventmesh.common.remote.request.FetchPositionRequest;
 import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
 import org.apache.eventmesh.common.utils.JsonUtils;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -142,20 +143,21 @@ public class MysqlPositionHandler extends PositionHandler 
{
     }
 
     @Override
-    public RecordPosition handler(FetchPositionRequest request, Metadata 
metadata) {
-        EventMeshMysqlPosition position = 
positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
+    public List<RecordPosition> handler(FetchPositionRequest request, Metadata 
metadata) {
+        List<EventMeshMysqlPosition> positionList = 
positionService.list(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
             request.getJobID()));
-        RecordPosition recordPosition = null;
-        if (position != null) {
+        List<RecordPosition> recordPositionList = new ArrayList<>();
+        for (EventMeshMysqlPosition position : positionList) {
+            RecordPosition recordPosition = new RecordPosition();
             CanalRecordPartition partition = new CanalRecordPartition();
             partition.setTimeStamp(position.getTimestamp());
             partition.setJournalName(position.getJournalName());
+            recordPosition.setRecordPartition(partition);
             CanalRecordOffset offset = new CanalRecordOffset();
             offset.setOffset(position.getPosition());
-            recordPosition = new RecordPosition();
-            recordPosition.setRecordPartition(partition);
             recordPosition.setRecordOffset(offset);
+            recordPositionList.add(recordPosition);
         }
-        return recordPosition;
+        return recordPositionList;
     }
 }
diff --git a/eventmesh-admin-server/src/main/resources/application.yaml 
b/eventmesh-admin-server/src/main/resources/application.yaml
index 420a22fe6..54795057c 100644
--- a/eventmesh-admin-server/src/main/resources/application.yaml
+++ b/eventmesh-admin-server/src/main/resources/application.yaml
@@ -18,8 +18,8 @@
 spring:
   datasource:
     url: 
jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
-    username: root
-    password: default
+    username: //db_username
+    password: //db_password
     driver-class-name: com.mysql.cj.jdbc.Driver
 mybatis-plus:
   mapper-locations: classpath:mapper/*.xml
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
index f7a697625..85484b2ce 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
@@ -27,15 +27,20 @@ import lombok.EqualsAndHashCode;
 @EqualsAndHashCode(callSuper = true)
 public class CanalSinkConfig extends SinkConfig {
 
-    private Integer batchSize = 50;                          // batchSize
+    // batchSize
+    private Integer batchSize = 50;
 
-    private Boolean useBatch = true;                        // enable batch
+    // enable batch
+    private Boolean useBatch = true;
 
-    private Integer poolSize = 5;                           // sink thread 
size for single channel
+    // sink thread size for single channel
+    private Integer poolSize = 5;
 
-    private SyncMode syncMode;                              // sync mode: 
field/row
+    // sync mode: field/row
+    private SyncMode syncMode;
 
-    private Boolean skipException = false;                  // skip sink 
process exception
+    // skip sink process exception
+    private Boolean skipException = false;
 
     public SinkConnectorConfig sinkConnectorConfig;
 
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
index e5edc5a78..d75ceb6b5 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
@@ -49,32 +49,32 @@ public class CanalSourceConfig extends SourceConfig {
 
     private Long batchTimeout = -1L;
 
+    private String tableFilter;
+
+    private String fieldFilter;
+
     private List<RecordPosition> recordPositions;
 
     // ================================= channel parameter
     // ================================
 
-    private Boolean enableRemedy = false;                                      
       // enable remedy
+    // enable remedy
+    private Boolean enableRemedy = false;
 
-    private SyncMode syncMode;                                                 
// sync mode: field/row
+    // sync mode: field/row
+    private SyncMode syncMode;
 
-    private SyncConsistency syncConsistency;                                   
       // sync consistency
+    // sync consistency
+    private SyncConsistency syncConsistency;
 
     // ================================= system parameter
     // ================================
 
-    private String systemSchema;                                             
// Default is retl
-
-    private String systemMarkTable;                                          
// Bidirectional synchronization mark table
-
-    private String systemMarkTableColumn;                                    
// Column name of the bidirectional synchronization mark
-
-    private String systemMarkTableInfo;
-    // nfo information of the bidirectional synchronization mark, similar to 
BI_SYNC
-
-    private String systemBufferTable;                                        
// sync buffer table
+    // Column name of the bidirectional synchronization mark
+    private String needSyncMarkTableColumnName = "needSync";
 
-    private String systemDualTable;                                          
// sync heartbeat table
+    // Column value of the bidirectional synchronization mark
+    private String needSyncMarkTableColumnValue = "needSync";
 
     private SourceConnectorConfig sourceConnectorConfig;
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
index 137e49bdc..a51cb32b9 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
@@ -22,6 +22,7 @@ import org.apache.eventmesh.common.remote.exception.ErrorCode;
 import org.apache.eventmesh.common.remote.job.JobTransportType;
 import org.apache.eventmesh.common.remote.offset.RecordPosition;
 
+import java.util.List;
 import java.util.Map;
 
 import lombok.Data;
@@ -45,7 +46,7 @@ public class FetchJobResponse extends BaseRemoteResponse {
 
     private String sinkConnectorDesc;
 
-    private RecordPosition position;
+    private List<RecordPosition> position;
 
     private JobState state;
 
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
index e9a7a3828..613623d65 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
@@ -20,6 +20,8 @@ package org.apache.eventmesh.common.remote.response;
 import org.apache.eventmesh.common.remote.exception.ErrorCode;
 import org.apache.eventmesh.common.remote.offset.RecordPosition;
 
+import java.util.List;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
@@ -27,7 +29,7 @@ import lombok.EqualsAndHashCode;
 @EqualsAndHashCode(callSuper = true)
 public class FetchPositionResponse extends BaseRemoteResponse {
 
-    private RecordPosition recordPosition;
+    private List<RecordPosition> recordPosition;
 
     public static FetchPositionResponse successResponse() {
         FetchPositionResponse response = new FetchPositionResponse();
@@ -36,7 +38,7 @@ public class FetchPositionResponse extends BaseRemoteResponse 
{
         return response;
     }
 
-    public static FetchPositionResponse successResponse(RecordPosition 
recordPosition) {
+    public static FetchPositionResponse successResponse(List<RecordPosition> 
recordPosition) {
         FetchPositionResponse response = successResponse();
         response.setRecordPosition(recordPosition);
         return response;
diff --git a/eventmesh-connectors/eventmesh-connector-canal/build.gradle 
b/eventmesh-connectors/eventmesh-connector-canal/build.gradle
index 0d914b7ae..640cb5ce4 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-canal/build.gradle
@@ -26,7 +26,6 @@ dependencies {
     implementation project(":eventmesh-common")
     implementation canal
     implementation "com.alibaba:druid:1.2.6"
-//    implementation "org.apache.ddlutils:ddlutils:1.0"
     compileOnly 'org.projectlombok:lombok'
     annotationProcessor 'org.projectlombok:lombok'
     testImplementation "org.mockito:mockito-core"
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
index 32bb79b54..acd491ba6 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
@@ -19,29 +19,17 @@ package org.apache.eventmesh.connector.canal.dialect;
 
 import org.apache.eventmesh.connector.canal.template.MysqlSqlTemplate;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.support.lob.LobHandler;
 
 
 public class MysqlDialect extends AbstractDbDialect {
 
-    private Map<List<String>, String> shardColumns;
-
     public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler) {
         super(jdbcTemplate, lobHandler);
         sqlTemplate = new MysqlSqlTemplate();
     }
 
-    public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, 
String name, String databaseVersion,
-                        int majorVersion, int minorVersion) {
-        super(jdbcTemplate, lobHandler, name, majorVersion, minorVersion);
-        sqlTemplate = new MysqlSqlTemplate();
-    }
-
     public boolean isCharSpacePadded() {
         return false;
     }
@@ -66,16 +54,8 @@ public class MysqlDialect extends AbstractDbDialect {
         return false;
     }
 
-    public String getShardColumns(String schema, String table) {
-        if (isDRDS()) {
-            return shardColumns.get(Arrays.asList(schema, table));
-        } else {
-            return null;
-        }
-    }
-
     public String getDefaultCatalog() {
-        return (String) jdbcTemplate.queryForObject("select database()", 
String.class);
+        return jdbcTemplate.queryForObject("select database()", String.class);
     }
 
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
index ab0776c17..24d6b42f8 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
@@ -28,11 +28,16 @@ import java.util.List;
 
 import org.springframework.util.CollectionUtils;
 
+import lombok.Getter;
+import lombok.Setter;
+
 /**
  * compute latest sql
  */
 public class SqlBuilderLoadInterceptor {
 
+    @Getter
+    @Setter
     private DbDialect dbDialect;
 
     public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord 
record) {
@@ -128,12 +133,4 @@ public class SqlBuilderLoadInterceptor {
         }
         return result;
     }
-
-    public DbDialect getDbDialect() {
-        return dbDialect;
-    }
-
-    public void setDbDialect(DbDialect dbDialect) {
-        this.dbDialect = dbDialect;
-    }
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 3031a15df..32c55ec42 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -23,9 +23,10 @@ import 
org.apache.eventmesh.connector.canal.model.EventColumn;
 import org.apache.eventmesh.connector.canal.model.EventColumnIndexComparable;
 import org.apache.eventmesh.connector.canal.model.EventType;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,32 +47,26 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class EntryParser {
 
-    public List<CanalConnectRecord> parse(CanalSourceConfig sourceConfig, 
List<Entry> datas) {
+    public Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig 
sourceConfig, List<Entry> datas) {
         List<CanalConnectRecord> recordList = new ArrayList<>();
         List<Entry> transactionDataBuffer = new ArrayList<>();
+        // need check weather the entry is loopback
+        boolean needSync;
+        Map<Long, List<CanalConnectRecord>> recordMap = new HashMap<>();
         try {
             for (Entry entry : datas) {
                 switch (entry.getEntryType()) {
-                    case TRANSACTIONBEGIN:
-                        break;
                     case ROWDATA:
-                        transactionDataBuffer.add(entry);
+                        RowChange rowChange = 
RowChange.parseFrom(entry.getStoreValue());
+                        needSync = checkNeedSync(sourceConfig, 
rowChange.getRowDatas(0));
+                        if (needSync) {
+                            transactionDataBuffer.add(entry);
+                        }
                         break;
                     case TRANSACTIONEND:
-                        for (Entry bufferEntry : transactionDataBuffer) {
-                            List<CanalConnectRecord> recordParsedList = 
internParse(sourceConfig, bufferEntry);
-                            if (CollectionUtils.isEmpty(recordParsedList)) {
-                                continue;
-                            }
-                            long totalSize = 
bufferEntry.getHeader().getEventLength();
-                            long eachSize = totalSize / 
recordParsedList.size();
-                            for (CanalConnectRecord record : recordParsedList) 
{
-                                if (record == null) {
-                                    continue;
-                                }
-                                record.setSize(eachSize);
-                                recordList.add(record);
-                            }
+                        parseRecordListWithEntryBuffer(sourceConfig, 
recordList, transactionDataBuffer);
+                        if (!recordList.isEmpty()) {
+                            
recordMap.put(entry.getHeader().getLogfileOffset(), recordList);
                         }
                         transactionDataBuffer.clear();
                         break;
@@ -79,27 +74,46 @@ public class EntryParser {
                         break;
                 }
             }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return recordMap;
+    }
 
-            for (Entry bufferEntry : transactionDataBuffer) {
-                List<CanalConnectRecord> recordParsedList = 
internParse(sourceConfig, bufferEntry);
-                if (CollectionUtils.isEmpty(recordParsedList)) {
+    private void parseRecordListWithEntryBuffer(CanalSourceConfig 
sourceConfig, List<CanalConnectRecord> recordList,
+        List<Entry> transactionDataBuffer) {
+        for (Entry bufferEntry : transactionDataBuffer) {
+            List<CanalConnectRecord> recordParsedList = 
internParse(sourceConfig, bufferEntry);
+            if (CollectionUtils.isEmpty(recordParsedList)) {
+                continue;
+            }
+            long totalSize = bufferEntry.getHeader().getEventLength();
+            long eachSize = totalSize / recordParsedList.size();
+            for (CanalConnectRecord record : recordParsedList) {
+                if (record == null) {
                     continue;
                 }
+                record.setSize(eachSize);
+                recordList.add(record);
+            }
+        }
+    }
 
-                long totalSize = bufferEntry.getHeader().getEventLength();
-                long eachSize = totalSize / recordParsedList.size();
-                for (CanalConnectRecord record : recordParsedList) {
-                    if (record == null) {
-                        continue;
-                    }
-                    record.setSize(eachSize);
-                    recordList.add(record);
-                }
+    private boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData 
rowData) {
+        Column markedColumn = 
getColumnIgnoreCase(rowData.getAfterColumnsList(), 
sourceConfig.getNeedSyncMarkTableColumnName());
+        if (markedColumn != null) {
+            return StringUtils.equalsIgnoreCase(markedColumn.getValue(), 
sourceConfig.getNeedSyncMarkTableColumnValue());
+        }
+        return false;
+    }
+
+    private Column getColumnIgnoreCase(List<Column> columns, String columName) 
{
+        for (Column column : columns) {
+            if (column.getName().equalsIgnoreCase(columName)) {
+                return column;
             }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
         }
-        return recordList;
+        return null;
     }
 
     private List<CanalConnectRecord> internParse(CanalSourceConfig 
sourceConfig, Entry entry) {
@@ -127,20 +141,9 @@ public class EntryParser {
             return null;
         }
 
-        if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemSchema(), 
schemaName)) {
-            // do noting
-            if (eventType.isDdl()) {
-                return null;
-            }
-
-            if 
(StringUtils.equalsIgnoreCase(sourceConfig.getSystemDualTable(), tableName)) {
-                return null;
-            }
-        } else {
-            if (eventType.isDdl()) {
-                log.warn("unsupported ddl event type: {}", eventType);
-                return null;
-            }
+        if (eventType.isDdl()) {
+            log.warn("unsupported ddl event type: {}", eventType);
+            return null;
         }
 
         List<CanalConnectRecord> recordList = new ArrayList<>();
@@ -164,13 +167,12 @@ public class EntryParser {
 
         List<Column> beforeColumns = rowData.getBeforeColumnsList();
         List<Column> afterColumns = rowData.getAfterColumnsList();
-        String tableName = canalConnectRecord.getSchemaName() + "." + 
canalConnectRecord.getTableName();
 
         boolean isRowMode = canalSourceConfig.getSyncMode().isRow();
 
-        Map<String, EventColumn> keyColumns = new LinkedHashMap<String, 
EventColumn>();
-        Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<String, 
EventColumn>();
-        Map<String, EventColumn> notKeyColumns = new LinkedHashMap<String, 
EventColumn>();
+        Map<String, EventColumn> keyColumns = new LinkedHashMap<>();
+        Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<>();
+        Map<String, EventColumn> notKeyColumns = new LinkedHashMap<>();
 
         if (eventType.isInsert()) {
             for (Column column : afterColumns) {
@@ -195,7 +197,7 @@ public class EntryParser {
                     keyColumns.put(column.getName(), copyEventColumn(column, 
true));
                 } else {
                     if (isRowMode && entry.getHeader().getSourceType() == 
CanalEntry.Type.ORACLE) {
-                        notKeyColumns.put(column.getName(), 
copyEventColumn(column, isRowMode));
+                        notKeyColumns.put(column.getName(), 
copyEventColumn(column, true));
                     }
                 }
             }
@@ -233,7 +235,7 @@ public class EntryParser {
             }
             canalConnectRecord.setColumns(columns);
         } else {
-            throw new RuntimeException("this row data has no pks , entry: " + 
entry.toString() + " and rowData: "
+            throw new RuntimeException("this row data has no pks , entry: " + 
entry + " and rowData: "
                 + rowData);
         }
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index c179124ce..577142e00 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -31,16 +31,18 @@ import 
org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
 import org.apache.eventmesh.openconnect.api.source.Source;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
 
-
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
 import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
@@ -74,7 +76,9 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
 
     private ClientIdentity clientIdentity;
 
-    private String filter = null;
+    private String tableFilter = null;
+
+    private String fieldFilter = null;
 
     private volatile boolean running = false;
 
@@ -95,6 +99,16 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
     public void init(ConnectorContext connectorContext) throws Exception {
         SourceConnectorContext sourceConnectorContext = 
(SourceConnectorContext) connectorContext;
         this.sourceConfig = (CanalSourceConfig) 
sourceConnectorContext.getSourceConfig();
+        if (sourceConnectorContext.getRecordPositionList() != null) {
+            
this.sourceConfig.setRecordPositions(sourceConnectorContext.getRecordPositionList());
+        }
+
+        if (StringUtils.isNotEmpty(sourceConfig.getTableFilter())) {
+            tableFilter = sourceConfig.getTableFilter();
+        }
+        if (StringUtils.isNotEmpty(sourceConfig.getFieldFilter())) {
+            fieldFilter = sourceConfig.getFieldFilter();
+        }
 
         canalServer = CanalServerWithEmbedded.instance();
 
@@ -103,7 +117,7 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
             public CanalInstance generate(String destination) {
                 Canal canal = buildCanal(sourceConfig);
 
-                CanalInstanceWithManager instance = new 
CanalInstanceWithManager(canal, filter) {
+                CanalInstanceWithManager instance = new 
CanalInstanceWithManager(canal, tableFilter) {
 
                     protected CanalHAController initHaController() {
                         return super.initHaController();
@@ -118,6 +132,9 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
                             ((MysqlEventParser) 
eventParser).setSupportBinlogImages("FULL");
                             MysqlEventParser mysqlEventParser = 
(MysqlEventParser) eventParser;
                             mysqlEventParser.setParallel(false);
+                            if (StringUtils.isNotEmpty(fieldFilter)) {
+                                mysqlEventParser.setFieldFilter(fieldFilter);
+                            }
 
                             CanalHAController haController = 
mysqlEventParser.getHaController();
                             if (!haController.isStart()) {
@@ -204,7 +221,7 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
         canalServer.start();
 
         canalServer.start(sourceConfig.getDestination());
-        this.clientIdentity = new 
ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(), 
filter);
+        this.clientIdentity = new 
ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(), 
tableFilter);
         canalServer.subscribe(clientIdentity);
 
         running = true;
@@ -274,23 +291,31 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
         EntryParser entryParser = new EntryParser();
 
         List<ConnectRecord> result = new ArrayList<>();
-
-        List<CanalConnectRecord> connectRecordList = 
entryParser.parse(sourceConfig, entries);
-
-        if (connectRecordList != null && !connectRecordList.isEmpty()) {
-            CanalConnectRecord lastRecord = 
connectRecordList.get(connectRecordList.size() - 1);
-
-            CanalRecordPartition canalRecordPartition = new 
CanalRecordPartition();
-            canalRecordPartition.setJournalName(lastRecord.getJournalName());
-            canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
-
-            CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
-            canalRecordOffset.setOffset(lastRecord.getBinLogOffset());
-
-            ConnectRecord connectRecord = new 
ConnectRecord(canalRecordPartition, canalRecordOffset, 
System.currentTimeMillis());
-            connectRecord.addExtension("messageId", 
String.valueOf(message.getId()));
-            connectRecord.setData(connectRecordList);
-            result.add(connectRecord);
+        // key: Xid offset
+        Map<Long, List<CanalConnectRecord>> connectorRecordMap = 
entryParser.parse(sourceConfig, entries);
+
+        if (!connectorRecordMap.isEmpty()) {
+            Set<Map.Entry<Long, List<CanalConnectRecord>>> entrySet = 
connectorRecordMap.entrySet();
+            for (Map.Entry<Long, List<CanalConnectRecord>> entry : entrySet) {
+                // Xid offset
+                Long binLogOffset = entry.getKey();
+                List<CanalConnectRecord> connectRecordList = entry.getValue();
+                CanalConnectRecord lastRecord = 
entry.getValue().get(connectRecordList.size() - 1);
+                CanalRecordPartition canalRecordPartition = new 
CanalRecordPartition();
+                
canalRecordPartition.setJournalName(lastRecord.getJournalName());
+                canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
+
+                CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
+                canalRecordOffset.setOffset(binLogOffset);
+
+                ConnectRecord connectRecord = new 
ConnectRecord(canalRecordPartition, canalRecordOffset, 
System.currentTimeMillis());
+                connectRecord.addExtension("messageId", 
String.valueOf(message.getId()));
+                connectRecord.setData(connectRecordList);
+                result.add(connectRecord);
+            }
+        } else {
+            // for the message has been filtered need ack message
+            canalServer.ack(clientIdentity, message.getId());
         }
 
         return result;
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
index 10c647c8f..ceb509ef7 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
@@ -32,7 +32,7 @@ public abstract class AbstractSqlTemplate implements 
SqlTemplate {
         }
 
         sql.append(" from ").append(getFullName(schemaName, 
tableName)).append(" where ( ");
-        appendColumnEquals(sql, pkNames, "and");
+        appendColumnEquals(sql, pkNames);
         sql.append(" ) ");
         return sql.toString().intern();
     }
@@ -41,7 +41,7 @@ public abstract class AbstractSqlTemplate implements 
SqlTemplate {
         StringBuilder sql = new StringBuilder("update " + 
getFullName(schemaName, tableName) + " set ");
         appendExcludeSingleShardColumnEquals(sql, columnNames, ",", updatePks, 
shardColumn);
         sql.append(" where (");
-        appendColumnEquals(sql, pkNames, "and");
+        appendColumnEquals(sql, pkNames);
         sql.append(")");
         return sql.toString().intern();
     }
@@ -65,7 +65,7 @@ public abstract class AbstractSqlTemplate implements 
SqlTemplate {
 
     public String getDeleteSql(String schemaName, String tableName, String[] 
pkNames) {
         StringBuilder sql = new StringBuilder("delete from " + 
getFullName(schemaName, tableName) + " where ");
-        appendColumnEquals(sql, pkNames, "and");
+        appendColumnEquals(sql, pkNames);
         return sql.toString().intern();
     }
 
@@ -91,12 +91,12 @@ public abstract class AbstractSqlTemplate implements 
SqlTemplate {
         }
     }
 
-    protected void appendColumnEquals(StringBuilder sql, String[] columns, 
String separator) {
+    protected void appendColumnEquals(StringBuilder sql, String[] columns) {
         int size = columns.length;
         for (int i = 0; i < size; i++) {
             sql.append(" ").append(appendEscape(columns[i])).append(" = 
").append("? ");
             if (i != size - 1) {
-                sql.append(separator);
+                sql.append("and");
             }
         }
     }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
index a169ed20f..37b45c746 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
@@ -47,7 +47,7 @@ public class MysqlSqlTemplate extends AbstractSqlTemplate {
 
         size = columnNames.length;
         for (int i = 0; i < size; i++) {
-            if (!includePks && shardColumn != null && 
columnNames[i].equals(shardColumn)) {
+            if (!includePks && columnNames[i].equals(shardColumn)) {
                 continue;
             }
 
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
index 76800d9c2..55c88ce55 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
@@ -18,8 +18,11 @@
 package org.apache.eventmesh.openconnect.api.connector;
 
 import org.apache.eventmesh.common.config.connector.SourceConfig;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;
 
+import java.util.List;
+
 import lombok.Data;
 
 /**
@@ -32,4 +35,7 @@ public class SourceConnectorContext implements 
ConnectorContext {
 
     public SourceConfig sourceConfig;
 
+    // initial record position
+    public List<RecordPosition> recordPositionList;
+
 }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index c011a1520..16a6fca3d 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -69,6 +69,8 @@ public class AdminOffsetService implements 
OffsetManagementService {
 
     public KeyValueStore<RecordPartition, RecordOffset> positionStore;
 
+    public KeyValueStore<String, RecordPosition> positionStore2;
+
     private String jobId;
 
     private JobState jobState;
@@ -106,6 +108,7 @@ public class AdminOffsetService implements 
OffsetManagementService {
         ReportPositionRequest reportPositionRequest = new 
ReportPositionRequest();
         reportPositionRequest.setJobID(jobId);
         reportPositionRequest.setState(jobState);
+        reportPositionRequest.setDataSourceType(dataSourceType);
         reportPositionRequest.setAddress(IPUtils.getLocalAddress());
 
         reportPositionRequest.setRecordPositionList(recordToSyncList);
@@ -119,6 +122,10 @@ public class AdminOffsetService implements 
OffsetManagementService {
                 .build())
             .build();
         requestObserver.onNext(payload);
+
+        for (Map.Entry<RecordPartition, RecordOffset> entry : 
recordMap.entrySet()) {
+            positionStore.remove(entry.getKey());
+        }
     }
 
     @Override
@@ -157,8 +164,9 @@ public class AdminOffsetService implements 
OffsetManagementService {
                     
JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), 
FetchPositionResponse.class);
                 assert fetchPositionResponse != null;
                 if (fetchPositionResponse.isSuccess()) {
-                    
positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(),
-                        
fetchPositionResponse.getRecordPosition().getRecordOffset());
+                    for (RecordPosition recordPosition : 
fetchPositionResponse.getRecordPosition()) {
+                        positionStore.put(recordPosition.getRecordPartition(), 
recordPosition.getRecordOffset());
+                    }
                 }
             }
         }
@@ -175,9 +183,9 @@ public class AdminOffsetService implements 
OffsetManagementService {
             fetchPositionRequest.setJobID(jobId);
             fetchPositionRequest.setAddress(IPUtils.getLocalAddress());
             fetchPositionRequest.setDataSourceType(dataSourceType);
-            RecordPosition recordPosition = new RecordPosition();
-            recordPosition.setRecordPartition(partition);
-            fetchPositionRequest.setRecordPosition(recordPosition);
+            RecordPosition fetchRecordPosition = new RecordPosition();
+            fetchRecordPosition.setRecordPartition(partition);
+            fetchPositionRequest.setRecordPosition(fetchRecordPosition);
 
             Metadata metadata = Metadata.newBuilder()
                 .setType(FetchPositionRequest.class.getSimpleName())
@@ -195,8 +203,9 @@ public class AdminOffsetService implements 
OffsetManagementService {
                     
JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), 
FetchPositionResponse.class);
                 assert fetchPositionResponse != null;
                 if (fetchPositionResponse.isSuccess()) {
-                    
positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(),
-                        
fetchPositionResponse.getRecordPosition().getRecordOffset());
+                    for (RecordPosition recordPosition : 
fetchPositionResponse.getRecordPosition()) {
+                        positionStore.put(recordPosition.getRecordPartition(), 
recordPosition.getRecordOffset());
+                    }
                 }
             }
         }
diff --git a/eventmesh-admin-server/src/main/resources/application.yaml 
b/eventmesh-registry/eventmesh-registry-nacos/gradle.properties
similarity index 60%
copy from eventmesh-admin-server/src/main/resources/application.yaml
copy to eventmesh-registry/eventmesh-registry-nacos/gradle.properties
index 420a22fe6..cf067e20b 100644
--- a/eventmesh-admin-server/src/main/resources/application.yaml
+++ b/eventmesh-registry/eventmesh-registry-nacos/gradle.properties
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -15,18 +14,5 @@
 # limitations under the License.
 #
 
-spring:
-  datasource:
-    url: 
jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
-    username: root
-    password: default
-    driver-class-name: com.mysql.cj.jdbc.Driver
-mybatis-plus:
-  mapper-locations: classpath:mapper/*.xml
-  configuration:
-    map-underscore-to-camel-case: false
-    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
-event-mesh:
-  admin-server:
-    service-name: DEFAULT_GROUP@@em_adm_server
-    port: 8081
\ No newline at end of file
+pluginType=registryCenter
+pluginName=nacos
\ No newline at end of file
diff --git a/eventmesh-runtime-v2/bin/start-v2.sh 
b/eventmesh-runtime-v2/bin/start-v2.sh
new file mode 100644
index 000000000..fc67c29d3
--- /dev/null
+++ b/eventmesh-runtime-v2/bin/start-v2.sh
@@ -0,0 +1,200 @@
+#!/bin/bash
+#
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#===========================================================================================
+# Java Environment Setting
+#===========================================================================================
+set -e
+# Server configuration may be inconsistent, add these configurations to avoid 
garbled code problems
+export LANG=en_US.UTF-8
+export LC_CTYPE=en_US.UTF-8
+export LC_ALL=en_US.UTF-8
+
+TMP_JAVA_HOME="/customize/your/java/home/here"
+
+# Detect operating system.
+OS=$(uname)
+
+function is_java8_or_11 {
+        local _java="$1"
+        [[ -x "$_java" ]] || return 1
+        [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" 
-version 2>&1)" =~ 'openjdk version "1.8' || "$("$_java" -version 2>&1)" =~ 
'java version "11' || "$("$_java" -version 2>&1)" =~ 'openjdk version "11' ]] 
|| return 2
+        return 0
+}
+
+function extract_java_version {
+    local _java="$1"
+    local version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}' 
| awk -F '.' '{if ($1 == 1 && $2 == 8) print "8"; else if ($1 == 11) print 
"11"; else print "unknown"}')
+    echo "$version"
+}
+
+# 0(not running),  1(is running)
+#function is_proxyRunning {
+#        local _pid="$1"
+#        local pid=`ps ax | grep -i 
'org.apache.eventmesh.runtime.boot.EventMeshStartup' |grep java | grep -v grep 
| awk '{print $1}'|grep $_pid`
+#        if [ -z "$pid" ] ; then
+#            return 0
+#        else
+#            return 1
+#        fi
+#}
+
+function get_pid {
+       local ppid=""
+       if [ -f ${EVENTMESH_HOME}/bin/pid.file ]; then
+               ppid=$(cat ${EVENTMESH_HOME}/bin/pid.file)
+               # If the process does not exist, it indicates that the previous 
process terminated abnormally.
+    if [ ! -d /proc/$ppid ]; then
+      # Remove the residual file.
+      rm ${EVENTMESH_HOME}/bin/pid.file
+      echo -e "ERROR\t EventMesh process had already terminated unexpectedly 
before, please check log output."
+      ppid=""
+    fi
+       else
+               if [[ $OS =~ Msys ]]; then
+                       # There is a Bug on Msys that may not be able to kill 
the identified process
+                       ppid=`jps -v | grep -i 
"org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep java | grep 
-v grep | awk -F ' ' {'print $1'}`
+               elif [[ $OS =~ Darwin ]]; then
+                       # Known problem: grep Java may not be able to 
accurately identify Java processes
+                       ppid=$(/bin/ps -o user,pid,command | grep "java" | grep 
-i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev 
"^root" |awk -F ' ' {'print $2'})
+               else
+                 if [ $DOCKER ]; then
+                   # No need to exclude root user in Docker containers.
+                   ppid=$(ps -C java -o user,pid,command --cols 99999 | grep 
-w $EVENTMESH_HOME | grep -i 
"org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | awk -F ' ' {'print 
$2'})
+                 else
+        # It is required to identify the process as accurately as possible on 
Linux.
+        ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w 
$EVENTMESH_HOME | grep -i 
"org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev "^root" | 
awk -F ' ' {'print $2'})
+      fi
+               fi
+       fi
+       echo "$ppid";
+}
+
+#===========================================================================================
+# Locate Java Executable
+#===========================================================================================
+
+if [[ -d "$TMP_JAVA_HOME" ]] && is_java8_or_11 "$TMP_JAVA_HOME/bin/java"; then
+        JAVA="$TMP_JAVA_HOME/bin/java"
+        JAVA_VERSION=$(extract_java_version "$TMP_JAVA_HOME/bin/java")
+elif [[ -d "$JAVA_HOME" ]] && is_java8_or_11 "$JAVA_HOME/bin/java"; then
+        JAVA="$JAVA_HOME/bin/java"
+        JAVA_VERSION=$(extract_java_version "$JAVA_HOME/bin/java")
+elif is_java8_or_11 "$(which java)"; then
+        JAVA="$(which java)"
+        JAVA_VERSION=$(extract_java_version "$(which java)")
+else
+        echo -e "ERROR\t Java 8 or 11 not found, operation abort."
+        exit 9;
+fi
+
+echo "EventMesh using Java version: $JAVA_VERSION, path: $JAVA"
+
+EVENTMESH_HOME=$(cd "$(dirname "$0")/.." && pwd)
+export EVENTMESH_HOME
+
+EVENTMESH_LOG_HOME="${EVENTMESH_HOME}/logs"
+export EVENTMESH_LOG_HOME
+
+echo -e "EVENTMESH_HOME : ${EVENTMESH_HOME}\nEVENTMESH_LOG_HOME : 
${EVENTMESH_LOG_HOME}"
+
+function make_logs_dir {
+        if [ ! -e "${EVENTMESH_LOG_HOME}" ]; then mkdir -p 
"${EVENTMESH_LOG_HOME}"; fi
+}
+
+error_exit ()
+{
+    echo -e "ERROR\t $1 !!"
+    exit 1
+}
+
+export JAVA_HOME
+
+#===========================================================================================
+# JVM Configuration
+#===========================================================================================
+#if [ $1 = "prd" -o $1 = "benchmark" ]; then JAVA_OPT="${JAVA_OPT} -server 
-Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
+#elif [ $1 = "sit" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms256M -Xmx512M 
-Xmn256m -XX:SurvivorRatio=4"
+#elif [ $1 = "dev" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms128M -Xmx256M 
-Xmn128m -XX:SurvivorRatio=4"
+#fi
+
+GC_LOG_FILE="${EVENTMESH_LOG_HOME}/eventmesh_gc_%p.log"
+
+#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m 
-XX:SurvivorRatio=4"
+JAVA_OPT=`cat ${EVENTMESH_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: 
| awk -F ':::' {'print $2'}`
+JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m 
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 
-XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
+JAVA_OPT="${JAVA_OPT} -verbose:gc"
+if [[ "$JAVA_VERSION" == "8" ]]; then
+    # Set JAVA_OPT for Java 8
+    JAVA_OPT="${JAVA_OPT} -Xloggc:${GC_LOG_FILE} -XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
+    JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
-XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
+elif [[ "$JAVA_VERSION" == "11" ]]; then
+    # Set JAVA_OPT for Java 11
+    XLOG_PARAM="time,level,tags:filecount=5,filesize=30m"
+    JAVA_OPT="${JAVA_OPT} -Xlog:gc*:${GC_LOG_FILE}:${XLOG_PARAM}"
+    JAVA_OPT="${JAVA_OPT} -Xlog:safepoint:${GC_LOG_FILE}:${XLOG_PARAM} 
-Xlog:ergo*=debug:${GC_LOG_FILE}:${XLOG_PARAM}"
+fi
+JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=${EVENTMESH_LOG_HOME} 
-XX:ErrorFile=${EVENTMESH_LOG_HOME}/hs_err_%p.log"
+JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
+JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
+JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G"
+JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
+JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced"
+JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled"
+JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom"
+JAVA_OPT="${JAVA_OPT} 
-Dlog4j.configurationFile=${EVENTMESH_HOME}/conf/log4j2.xml"
+JAVA_OPT="${JAVA_OPT} -Deventmesh.log.home=${EVENTMESH_LOG_HOME}"
+JAVA_OPT="${JAVA_OPT} -DconfPath=${EVENTMESH_HOME}/conf"
+JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard"
+JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
+JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_HOME}/plugin"
+
+#if [ -f "pid.file" ]; then
+#        pid=`cat pid.file`
+#        if ! is_proxyRunning "$pid"; then
+#            echo "proxy is running already"
+#            exit 9;
+#        else
+#          echo "err pid$pid, rm pid.file"
+#            rm pid.file
+#        fi
+#fi
+
+pid=$(get_pid)
+if [[ $pid == "ERROR"* ]]; then
+  echo -e "${pid}"
+  exit 9
+fi
+if [ -n "$pid" ]; then
+       echo -e "ERROR\t The server is already running (pid=$pid), there is no 
need to execute start.sh again."
+       exit 9
+fi
+
+make_logs_dir
+
+echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> 
${EVENTMESH_LOG_HOME}/eventmesh.out
+
+EVENTMESH_MAIN=org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter
+if [ $DOCKER ]; then
+       $JAVA $JAVA_OPT -classpath 
${EVENTMESH_HOME}/conf:${EVENTMESH_HOME}/apps/*:${EVENTMESH_HOME}/lib/* 
$EVENTMESH_MAIN >> ${EVENTMESH_LOG_HOME}/eventmesh.out
+else
+       $JAVA $JAVA_OPT -classpath 
${EVENTMESH_HOME}/conf:${EVENTMESH_HOME}/apps/*:${EVENTMESH_HOME}/lib/* 
$EVENTMESH_MAIN >> ${EVENTMESH_LOG_HOME}/eventmesh.out 2>&1 &
+echo $!>${EVENTMESH_HOME}/bin/pid.file
+fi
+exit 0
diff --git a/eventmesh-runtime-v2/bin/stop-v2.sh 
b/eventmesh-runtime-v2/bin/stop-v2.sh
new file mode 100644
index 000000000..177ae1e12
--- /dev/null
+++ b/eventmesh-runtime-v2/bin/stop-v2.sh
@@ -0,0 +1,88 @@
+#!/bin/bash
+#
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Detect operating system
+OS=$(uname)
+
+EVENTMESH_HOME=`cd $(dirname $0)/.. && pwd`
+
+export EVENTMESH_HOME
+
+function get_pid {
+       local ppid=""
+       if [ -f ${EVENTMESH_HOME}/bin/pid.file ]; then
+               ppid=$(cat ${EVENTMESH_HOME}/bin/pid.file)
+    # If the process does not exist, it indicates that the previous process 
terminated abnormally.
+    if [ ! -d /proc/$ppid ]; then
+      # Remove the residual file and return an error status.
+      rm ${EVENTMESH_HOME}/bin/pid.file
+      echo -e "ERROR\t EventMesh process had already terminated unexpectedly 
before, please check log output."
+      ppid=""
+    fi
+       else
+               if [[ $OS =~ Msys ]]; then
+                       # There is a Bug on Msys that may not be able to kill 
the identified process
+                       ppid=`jps -v | grep -i 
"org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep java | grep 
-v grep | awk -F ' ' {'print $1'}`
+               elif [[ $OS =~ Darwin ]]; then
+                       # Known problem: grep Java may not be able to 
accurately identify Java processes
+                       ppid=$(/bin/ps -o user,pid,command | grep "java" | grep 
-i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev 
"^root" |awk -F ' ' {'print $2'})
+               else
+                       # It is required to identify the process as accurately 
as possible on Linux
+                       ppid=$(ps -C java -o user,pid,command --cols 99999 | 
grep -w $EVENTMESH_HOME | grep -i 
"org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev "^root" 
|awk -F ' ' {'print $2'})
+               fi
+       fi
+       echo "$ppid";
+}
+
+pid=$(get_pid)
+if [[ $pid == "ERROR"* ]]; then
+  echo -e "${pid}"
+  exit 9
+fi
+if [ -z "$pid" ];then
+       echo -e "ERROR\t No EventMesh server running."
+       exit 9
+fi
+
+kill ${pid}
+echo "Send shutdown request to EventMesh(${pid}) OK"
+
+[[ $OS =~ Msys ]] && PS_PARAM=" -W "
+stop_timeout=60
+for no in $(seq 1 $stop_timeout); do
+       if ps $PS_PARAM -p "$pid" 2>&1 > /dev/null; then
+               if [ $no -lt $stop_timeout ]; then
+                       echo "[$no] server shutting down ..."
+                       sleep 1
+                       continue
+               fi
+
+               echo "shutdown server timeout, kill process: $pid"
+               kill -9 $pid; sleep 1; break;
+               echo "`date +'%Y-%m-%-d %H:%M:%S'` , pid : [$pid] , error 
message : abnormal shutdown which can not be closed within 60s" > 
../logs/shutdown.error
+       else
+               echo "shutdown server ok!"; break;
+       fi
+done
+
+if [ -f "pid.file" ]; then
+    rm pid.file
+fi
+
+
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 2f16834b4..65676903d 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -53,6 +53,8 @@ import org.apache.eventmesh.runtime.Runtime;
 import org.apache.eventmesh.runtime.RuntimeInstanceConfig;
 import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
+import org.apache.commons.collections4.CollectionUtils;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -201,6 +203,9 @@ public class ConnectorRuntime implements Runtime {
         SourceConnectorContext sourceConnectorContext = new 
SourceConnectorContext();
         sourceConnectorContext.setSourceConfig(sourceConfig);
         sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
+        if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
+            
sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
+        }
 
         // spi load offsetMgmtService
         this.offsetManagement = new RecordOffsetManagement();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to