This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch eventmesh-function
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/eventmesh-function by this
push:
new ae621d490 [ISSUE #4979] Canal Connector supports bidirectional data
synchronization (#5011)
ae621d490 is described below
commit ae621d49082cbe7506c72ad1f553efbf3bc96006
Author: mike_xwm <[email protected]>
AuthorDate: Mon Jul 1 09:38:27 2024 +0800
[ISSUE #4979] Canal Connector supports bidirectional data synchronization
(#5011)
* [ISSUE #4979]Canal Connector supports bidirectional data synchronization
* add bash files for admin & runtime-v2
* fix ack offset read & persist
* fix checkStyle error
---
build.gradle | 7 +-
eventmesh-admin-server/bin/start-admin.sh | 201 +++++++++++++++++++++
.../server/web/db/entity/EventMeshJobDetail.java | 3 +-
.../position/EventMeshPositionBizService.java | 6 +-
.../service/position/IFetchPositionHandler.java | 4 +-
.../position/impl/MysqlPositionHandler.java | 16 +-
.../src/main/resources/application.yaml | 4 +-
.../connector/rdb/canal/CanalSinkConfig.java | 15 +-
.../connector/rdb/canal/CanalSourceConfig.java | 28 +--
.../common/remote/response/FetchJobResponse.java | 3 +-
.../remote/response/FetchPositionResponse.java | 6 +-
.../eventmesh-connector-canal/build.gradle | 1 -
.../connector/canal/dialect/MysqlDialect.java | 22 +--
.../interceptor/SqlBuilderLoadInterceptor.java | 13 +-
.../connector/canal/source/EntryParser.java | 110 +++++------
.../source/connector/CanalSourceConnector.java | 67 ++++---
.../canal/template/AbstractSqlTemplate.java | 10 +-
.../connector/canal/template/MysqlSqlTemplate.java | 2 +-
.../api/connector/SourceConnectorContext.java | 6 +
.../offsetmgmt/admin/AdminOffsetService.java | 23 ++-
.../eventmesh-registry-nacos/gradle.properties | 18 +-
eventmesh-runtime-v2/bin/start-v2.sh | 200 ++++++++++++++++++++
eventmesh-runtime-v2/bin/stop-v2.sh | 88 +++++++++
.../runtime/connector/ConnectorRuntime.java | 5 +
24 files changed, 688 insertions(+), 170 deletions(-)
diff --git a/build.gradle b/build.gradle
index 5aafc4c93..df86a4956 100644
--- a/build.gradle
+++ b/build.gradle
@@ -162,9 +162,13 @@ tasks.register('dist') {
["eventmesh-common",
"eventmesh-meta:eventmesh-meta-api",
"eventmesh-metrics-plugin:eventmesh-metrics-api",
+ "eventmesh-openconnect:eventmesh-openconnect-java",
+
"eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api",
"eventmesh-protocol-plugin:eventmesh-protocol-api",
+ "eventmesh-registry:eventmesh-registry-api",
"eventmesh-retry:eventmesh-retry-api",
"eventmesh-runtime",
+ "eventmesh-runtime-v2",
"eventmesh-security-plugin:eventmesh-security-api",
"eventmesh-spi",
"eventmesh-starter",
@@ -660,6 +664,7 @@ subprojects {
dependencyManagement {
dependencies {
+
dependency "org.apache.commons:commons-lang3:3.6"
dependency "org.apache.commons:commons-collections4:4.4"
dependency "org.apache.commons:commons-text:1.9"
@@ -709,7 +714,7 @@ subprojects {
dependency "com.mebigfatguy.fb-contrib:fb-contrib:7.6.0"
dependency "com.jayway.jsonpath:json-path:2.9.0"
- dependency
"org.springframework.boot:spring-boot-starter-web:2.7.18"
+ dependency "org.springframework.boot:spring-boot-starter-web:2.5.4"
dependency "io.openmessaging:registry-server:0.0.1"
dependency "org.junit.jupiter:junit-jupiter:5.6.0"
diff --git a/eventmesh-admin-server/bin/start-admin.sh
b/eventmesh-admin-server/bin/start-admin.sh
new file mode 100644
index 000000000..93c364439
--- /dev/null
+++ b/eventmesh-admin-server/bin/start-admin.sh
@@ -0,0 +1,201 @@
+#!/bin/bash
+#
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#===========================================================================================
+# Java Environment Setting
+#===========================================================================================
+set -e
+# Server configuration may be inconsistent, add these configurations to avoid
garbled code problems
+export LANG=en_US.UTF-8
+export LC_CTYPE=en_US.UTF-8
+export LC_ALL=en_US.UTF-8
+
+TMP_JAVA_HOME="/customize/your/java/home/here"
+
+# Detect operating system.
+OS=$(uname)
+
+function is_java8_or_11 {
+ local _java="$1"
+ [[ -x "$_java" ]] || return 1
+ [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java"
-version 2>&1)" =~ 'openjdk version "1.8' || "$("$_java" -version 2>&1)" =~
'java version "11' || "$("$_java" -version 2>&1)" =~ 'openjdk version "11' ]]
|| return 2
+ return 0
+}
+
+function extract_java_version {
+ local _java="$1"
+ local version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}'
| awk -F '.' '{if ($1 == 1 && $2 == 8) print "8"; else if ($1 == 11) print
"11"; else print "unknown"}')
+ echo "$version"
+}
+
+# 0(not running), 1(is running)
+#function is_proxyRunning {
+# local _pid="$1"
+# local pid=`ps ax | grep -i
'org.apache.eventmesh.runtime.boot.EventMeshStartup' |grep java | grep -v grep
| awk '{print $1}'|grep $_pid`
+# if [ -z "$pid" ] ; then
+# return 0
+# else
+# return 1
+# fi
+#}
+
+function get_pid {
+ local ppid=""
+ if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
+ ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
+ # If the process does not exist, it indicates that the previous
process terminated abnormally.
+ if [ ! -d /proc/$ppid ]; then
+ # Remove the residual file.
+ rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
+ echo -e "ERROR\t EventMesh process had already terminated unexpectedly
before, please check log output."
+ ppid=""
+ fi
+ else
+ if [[ $OS =~ Msys ]]; then
+ # There is a Bug on Msys that may not be able to kill
the identified process
+ ppid=`jps -v | grep -i
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v
grep | awk -F ' ' {'print $1'}`
+ elif [[ $OS =~ Darwin ]]; then
+ # Known problem: grep Java may not be able to
accurately identify Java processes
+ ppid=$(/bin/ps -o user,pid,command | grep "java" | grep
-i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root"
|awk -F ' ' {'print $2'})
+ else
+ if [ $DOCKER ]; then
+ # No need to exclude root user in Docker containers.
+ ppid=$(ps -C java -o user,pid,command --cols 99999 | grep
-w $EVENTMESH_ADMIN_HOME | grep -i
"org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print
$2'})
+ else
+ # It is required to identify the process as accurately as possible on
Linux.
+ ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w
$EVENTMESH_ADMIN_HOME | grep -i
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk
-F ' ' {'print $2'})
+ fi
+ fi
+ fi
+ echo "$ppid";
+}
+
+#===========================================================================================
+# Locate Java Executable
+#===========================================================================================
+
+if [[ -d "$TMP_JAVA_HOME" ]] && is_java8_or_11 "$TMP_JAVA_HOME/bin/java"; then
+ JAVA="$TMP_JAVA_HOME/bin/java"
+ JAVA_VERSION=$(extract_java_version "$TMP_JAVA_HOME/bin/java")
+elif [[ -d "$JAVA_HOME" ]] && is_java8_or_11 "$JAVA_HOME/bin/java"; then
+ JAVA="$JAVA_HOME/bin/java"
+ JAVA_VERSION=$(extract_java_version "$JAVA_HOME/bin/java")
+elif is_java8_or_11 "$(which java)"; then
+ JAVA="$(which java)"
+ JAVA_VERSION=$(extract_java_version "$(which java)")
+else
+ echo -e "ERROR\t Java 8 or 11 not found, operation abort."
+ exit 9;
+fi
+
+echo "EventMesh using Java version: $JAVA_VERSION, path: $JAVA"
+
+EVENTMESH_ADMIN_HOME=$(cd "$(dirname "$0")/.." && pwd)
+export EVENTMESH_ADMIN_HOME
+
+EVENTMESH_ADMIN_LOG_HOME="${EVENTMESH_ADMIN_HOME}/logs"
+export EVENTMESH_ADMIN_LOG_HOME
+
+echo -e "EVENTMESH_ADMIN_HOME :
${EVENTMESH_ADMIN_HOME}\nEVENTMESH_ADMIN_LOG_HOME : ${EVENTMESH_ADMIN_LOG_HOME}"
+
+function make_logs_dir {
+ if [ ! -e "${EVENTMESH_ADMIN_LOG_HOME}" ]; then mkdir -p
"${EVENTMESH_ADMIN_LOG_HOME}"; fi
+}
+
+error_exit ()
+{
+ echo -e "ERROR\t $1 !!"
+ exit 1
+}
+
+export JAVA_HOME
+
+#===========================================================================================
+# JVM Configuration
+#===========================================================================================
+#if [ $1 = "prd" -o $1 = "benchmark" ]; then JAVA_OPT="${JAVA_OPT} -server
-Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
+#elif [ $1 = "sit" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms256M -Xmx512M
-Xmn256m -XX:SurvivorRatio=4"
+#elif [ $1 = "dev" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms128M -Xmx256M
-Xmn128m -XX:SurvivorRatio=4"
+#fi
+
+GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"
+
+#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m
-XX:SurvivorRatio=4"
+JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep
APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
+JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
-XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
+JAVA_OPT="${JAVA_OPT} -verbose:gc"
+if [[ "$JAVA_VERSION" == "8" ]]; then
+ # Set JAVA_OPT for Java 8
+ JAVA_OPT="${JAVA_OPT} -Xloggc:${GC_LOG_FILE} -XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
+ JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps
-XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
+elif [[ "$JAVA_VERSION" == "11" ]]; then
+ # Set JAVA_OPT for Java 11
+ XLOG_PARAM="time,level,tags:filecount=5,filesize=30m"
+ JAVA_OPT="${JAVA_OPT} -Xlog:gc*:${GC_LOG_FILE}:${XLOG_PARAM}"
+ JAVA_OPT="${JAVA_OPT} -Xlog:safepoint:${GC_LOG_FILE}:${XLOG_PARAM}
-Xlog:ergo*=debug:${GC_LOG_FILE}:${XLOG_PARAM}"
+fi
+JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=${EVENTMESH_ADMIN_LOG_HOME}
-XX:ErrorFile=${EVENTMESH_ADMIN_LOG_HOME}/hs_err_%p.log"
+JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
+JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
+JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G"
+JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
+JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced"
+JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled"
+JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom"
+JAVA_OPT="${JAVA_OPT}
-Dlog4j.configurationFile=${EVENTMESH_ADMIN_HOME}/conf/log4j2.xml"
+JAVA_OPT="${JAVA_OPT} -Deventmesh.log.home=${EVENTMESH_ADMIN_LOG_HOME}"
+JAVA_OPT="${JAVA_OPT} -DconfPath=${EVENTMESH_ADMIN_HOME}/conf"
+JAVA_OPT="${JAVA_OPT} -DconfigurationPath=${EVENTMESH_ADMIN_HOME}/conf"
+JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard"
+JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
+JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
+
+#if [ -f "pid.file" ]; then
+# pid=`cat pid.file`
+# if ! is_proxyRunning "$pid"; then
+# echo "proxy is running already"
+# exit 9;
+# else
+# echo "err pid$pid, rm pid.file"
+# rm pid.file
+# fi
+#fi
+
+pid=$(get_pid)
+if [[ $pid == "ERROR"* ]]; then
+ echo -e "${pid}"
+ exit 9
+fi
+if [ -n "$pid" ]; then
+ echo -e "ERROR\t The server is already running (pid=$pid), there is no
need to execute start.sh again."
+ exit 9
+fi
+
+make_logs_dir
+
+echo "Using Java version: $JAVA_VERSION, path: $JAVA" >>
${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
+
+EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
+if [ $DOCKER ]; then
+ $JAVA $JAVA_OPT -classpath
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
$EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
+else
+ $JAVA $JAVA_OPT -classpath
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
$EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
+echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
+fi
+exit 0
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
index b4a836e8b..849a90a88 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
@@ -21,6 +21,7 @@ import org.apache.eventmesh.common.remote.JobState;
import org.apache.eventmesh.common.remote.job.JobTransportType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import java.util.List;
import java.util.Map;
import lombok.Data;
@@ -42,7 +43,7 @@ public class EventMeshJobDetail {
private String sinkConnectorDesc;
- private RecordPosition position;
+ private List<RecordPosition> position;
private JobState state;
}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
index 7d6febdf4..d3b6ff555 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
@@ -25,6 +25,8 @@ import
org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
+import java.util.List;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -38,7 +40,7 @@ public class EventMeshPositionBizService {
PositionHandlerFactory factory;
// called isValidateReportRequest before call this
- public RecordPosition getPosition(FetchPositionRequest request, Metadata
metadata) {
+ public List<RecordPosition> getPosition(FetchPositionRequest request,
Metadata metadata) {
if (request == null) {
return null;
}
@@ -68,7 +70,7 @@ public class EventMeshPositionBizService {
return handler.handler(request, metadata);
}
- public RecordPosition getPositionByJobID(Integer jobID, DataSourceType
type) {
+ public List<RecordPosition> getPositionByJobID(Integer jobID,
DataSourceType type) {
if (jobID == null || type == null) {
return null;
}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
index 9a4c324dc..2c039062f 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
@@ -21,10 +21,12 @@ import
org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
+import java.util.List;
+
/**
* IFetchPositionHandler
*/
public interface IFetchPositionHandler {
- RecordPosition handler(FetchPositionRequest request, Metadata metadata);
+ List<RecordPosition> handler(FetchPositionRequest request, Metadata
metadata);
}
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
index 623864fa6..525fe02c0 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
@@ -31,6 +31,7 @@ import
org.apache.eventmesh.common.remote.request.FetchPositionRequest;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
import org.apache.eventmesh.common.utils.JsonUtils;
+import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
@@ -142,20 +143,21 @@ public class MysqlPositionHandler extends PositionHandler
{
}
@Override
- public RecordPosition handler(FetchPositionRequest request, Metadata
metadata) {
- EventMeshMysqlPosition position =
positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
+ public List<RecordPosition> handler(FetchPositionRequest request, Metadata
metadata) {
+ List<EventMeshMysqlPosition> positionList =
positionService.list(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
request.getJobID()));
- RecordPosition recordPosition = null;
- if (position != null) {
+ List<RecordPosition> recordPositionList = new ArrayList<>();
+ for (EventMeshMysqlPosition position : positionList) {
+ RecordPosition recordPosition = new RecordPosition();
CanalRecordPartition partition = new CanalRecordPartition();
partition.setTimeStamp(position.getTimestamp());
partition.setJournalName(position.getJournalName());
+ recordPosition.setRecordPartition(partition);
CanalRecordOffset offset = new CanalRecordOffset();
offset.setOffset(position.getPosition());
- recordPosition = new RecordPosition();
- recordPosition.setRecordPartition(partition);
recordPosition.setRecordOffset(offset);
+ recordPositionList.add(recordPosition);
}
- return recordPosition;
+ return recordPositionList;
}
}
diff --git a/eventmesh-admin-server/src/main/resources/application.yaml
b/eventmesh-admin-server/src/main/resources/application.yaml
index ce396a09f..54795057c 100644
--- a/eventmesh-admin-server/src/main/resources/application.yaml
+++ b/eventmesh-admin-server/src/main/resources/application.yaml
@@ -18,8 +18,8 @@
spring:
datasource:
url:
jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
- username: root
- password: mike920830
+ username: //db_username
+ password: //db_password
driver-class-name: com.mysql.cj.jdbc.Driver
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
index f7a697625..85484b2ce 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
@@ -27,15 +27,20 @@ import lombok.EqualsAndHashCode;
@EqualsAndHashCode(callSuper = true)
public class CanalSinkConfig extends SinkConfig {
- private Integer batchSize = 50; // batchSize
+ // batchSize
+ private Integer batchSize = 50;
- private Boolean useBatch = true; // enable batch
+ // enable batch
+ private Boolean useBatch = true;
- private Integer poolSize = 5; // sink thread
size for single channel
+ // sink thread size for single channel
+ private Integer poolSize = 5;
- private SyncMode syncMode; // sync mode:
field/row
+ // sync mode: field/row
+ private SyncMode syncMode;
- private Boolean skipException = false; // skip sink
process exception
+ // skip sink process exception
+ private Boolean skipException = false;
public SinkConnectorConfig sinkConnectorConfig;
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
index e5edc5a78..d75ceb6b5 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
@@ -49,32 +49,32 @@ public class CanalSourceConfig extends SourceConfig {
private Long batchTimeout = -1L;
+ private String tableFilter;
+
+ private String fieldFilter;
+
private List<RecordPosition> recordPositions;
// ================================= channel parameter
// ================================
- private Boolean enableRemedy = false;
// enable remedy
+ // enable remedy
+ private Boolean enableRemedy = false;
- private SyncMode syncMode;
// sync mode: field/row
+ // sync mode: field/row
+ private SyncMode syncMode;
- private SyncConsistency syncConsistency;
// sync consistency
+ // sync consistency
+ private SyncConsistency syncConsistency;
// ================================= system parameter
// ================================
- private String systemSchema;
// Default is retl
-
- private String systemMarkTable;
// Bidirectional synchronization mark table
-
- private String systemMarkTableColumn;
// Column name of the bidirectional synchronization mark
-
- private String systemMarkTableInfo;
- // nfo information of the bidirectional synchronization mark, similar to
BI_SYNC
-
- private String systemBufferTable;
// sync buffer table
+ // Column name of the bidirectional synchronization mark
+ private String needSyncMarkTableColumnName = "needSync";
- private String systemDualTable;
// sync heartbeat table
+ // Column value of the bidirectional synchronization mark
+ private String needSyncMarkTableColumnValue = "needSync";
private SourceConnectorConfig sourceConnectorConfig;
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
index 137e49bdc..a51cb32b9 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java
@@ -22,6 +22,7 @@ import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.job.JobTransportType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import java.util.List;
import java.util.Map;
import lombok.Data;
@@ -45,7 +46,7 @@ public class FetchJobResponse extends BaseRemoteResponse {
private String sinkConnectorDesc;
- private RecordPosition position;
+ private List<RecordPosition> position;
private JobState state;
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
index e9a7a3828..613623d65 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchPositionResponse.java
@@ -20,6 +20,8 @@ package org.apache.eventmesh.common.remote.response;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import java.util.List;
+
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -27,7 +29,7 @@ import lombok.EqualsAndHashCode;
@EqualsAndHashCode(callSuper = true)
public class FetchPositionResponse extends BaseRemoteResponse {
- private RecordPosition recordPosition;
+ private List<RecordPosition> recordPosition;
public static FetchPositionResponse successResponse() {
FetchPositionResponse response = new FetchPositionResponse();
@@ -36,7 +38,7 @@ public class FetchPositionResponse extends BaseRemoteResponse
{
return response;
}
- public static FetchPositionResponse successResponse(RecordPosition
recordPosition) {
+ public static FetchPositionResponse successResponse(List<RecordPosition>
recordPosition) {
FetchPositionResponse response = successResponse();
response.setRecordPosition(recordPosition);
return response;
diff --git a/eventmesh-connectors/eventmesh-connector-canal/build.gradle
b/eventmesh-connectors/eventmesh-connector-canal/build.gradle
index 0d914b7ae..640cb5ce4 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-canal/build.gradle
@@ -26,7 +26,6 @@ dependencies {
implementation project(":eventmesh-common")
implementation canal
implementation "com.alibaba:druid:1.2.6"
-// implementation "org.apache.ddlutils:ddlutils:1.0"
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation "org.mockito:mockito-core"
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
index 32bb79b54..acd491ba6 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
@@ -19,29 +19,17 @@ package org.apache.eventmesh.connector.canal.dialect;
import org.apache.eventmesh.connector.canal.template.MysqlSqlTemplate;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.lob.LobHandler;
public class MysqlDialect extends AbstractDbDialect {
- private Map<List<String>, String> shardColumns;
-
public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler) {
super(jdbcTemplate, lobHandler);
sqlTemplate = new MysqlSqlTemplate();
}
- public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler,
String name, String databaseVersion,
- int majorVersion, int minorVersion) {
- super(jdbcTemplate, lobHandler, name, majorVersion, minorVersion);
- sqlTemplate = new MysqlSqlTemplate();
- }
-
public boolean isCharSpacePadded() {
return false;
}
@@ -66,16 +54,8 @@ public class MysqlDialect extends AbstractDbDialect {
return false;
}
- public String getShardColumns(String schema, String table) {
- if (isDRDS()) {
- return shardColumns.get(Arrays.asList(schema, table));
- } else {
- return null;
- }
- }
-
public String getDefaultCatalog() {
- return (String) jdbcTemplate.queryForObject("select database()",
String.class);
+ return jdbcTemplate.queryForObject("select database()", String.class);
}
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
index ab0776c17..24d6b42f8 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
@@ -28,11 +28,16 @@ import java.util.List;
import org.springframework.util.CollectionUtils;
+import lombok.Getter;
+import lombok.Setter;
+
/**
* compute latest sql
*/
public class SqlBuilderLoadInterceptor {
+ @Getter
+ @Setter
private DbDialect dbDialect;
public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord
record) {
@@ -128,12 +133,4 @@ public class SqlBuilderLoadInterceptor {
}
return result;
}
-
- public DbDialect getDbDialect() {
- return dbDialect;
- }
-
- public void setDbDialect(DbDialect dbDialect) {
- this.dbDialect = dbDialect;
- }
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 3031a15df..32c55ec42 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -23,9 +23,10 @@ import
org.apache.eventmesh.connector.canal.model.EventColumn;
import org.apache.eventmesh.connector.canal.model.EventColumnIndexComparable;
import org.apache.eventmesh.connector.canal.model.EventType;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -46,32 +47,26 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EntryParser {
- public List<CanalConnectRecord> parse(CanalSourceConfig sourceConfig,
List<Entry> datas) {
+ public Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig
sourceConfig, List<Entry> datas) {
List<CanalConnectRecord> recordList = new ArrayList<>();
List<Entry> transactionDataBuffer = new ArrayList<>();
+ // need check weather the entry is loopback
+ boolean needSync;
+ Map<Long, List<CanalConnectRecord>> recordMap = new HashMap<>();
try {
for (Entry entry : datas) {
switch (entry.getEntryType()) {
- case TRANSACTIONBEGIN:
- break;
case ROWDATA:
- transactionDataBuffer.add(entry);
+ RowChange rowChange =
RowChange.parseFrom(entry.getStoreValue());
+ needSync = checkNeedSync(sourceConfig,
rowChange.getRowDatas(0));
+ if (needSync) {
+ transactionDataBuffer.add(entry);
+ }
break;
case TRANSACTIONEND:
- for (Entry bufferEntry : transactionDataBuffer) {
- List<CanalConnectRecord> recordParsedList =
internParse(sourceConfig, bufferEntry);
- if (CollectionUtils.isEmpty(recordParsedList)) {
- continue;
- }
- long totalSize =
bufferEntry.getHeader().getEventLength();
- long eachSize = totalSize /
recordParsedList.size();
- for (CanalConnectRecord record : recordParsedList)
{
- if (record == null) {
- continue;
- }
- record.setSize(eachSize);
- recordList.add(record);
- }
+ parseRecordListWithEntryBuffer(sourceConfig,
recordList, transactionDataBuffer);
+ if (!recordList.isEmpty()) {
+
recordMap.put(entry.getHeader().getLogfileOffset(), recordList);
}
transactionDataBuffer.clear();
break;
@@ -79,27 +74,46 @@ public class EntryParser {
break;
}
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return recordMap;
+ }
- for (Entry bufferEntry : transactionDataBuffer) {
- List<CanalConnectRecord> recordParsedList =
internParse(sourceConfig, bufferEntry);
- if (CollectionUtils.isEmpty(recordParsedList)) {
+ private void parseRecordListWithEntryBuffer(CanalSourceConfig
sourceConfig, List<CanalConnectRecord> recordList,
+ List<Entry> transactionDataBuffer) {
+ for (Entry bufferEntry : transactionDataBuffer) {
+ List<CanalConnectRecord> recordParsedList =
internParse(sourceConfig, bufferEntry);
+ if (CollectionUtils.isEmpty(recordParsedList)) {
+ continue;
+ }
+ long totalSize = bufferEntry.getHeader().getEventLength();
+ long eachSize = totalSize / recordParsedList.size();
+ for (CanalConnectRecord record : recordParsedList) {
+ if (record == null) {
continue;
}
+ record.setSize(eachSize);
+ recordList.add(record);
+ }
+ }
+ }
- long totalSize = bufferEntry.getHeader().getEventLength();
- long eachSize = totalSize / recordParsedList.size();
- for (CanalConnectRecord record : recordParsedList) {
- if (record == null) {
- continue;
- }
- record.setSize(eachSize);
- recordList.add(record);
- }
+ private boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData
rowData) {
+ Column markedColumn =
getColumnIgnoreCase(rowData.getAfterColumnsList(),
sourceConfig.getNeedSyncMarkTableColumnName());
+ if (markedColumn != null) {
+ return StringUtils.equalsIgnoreCase(markedColumn.getValue(),
sourceConfig.getNeedSyncMarkTableColumnValue());
+ }
+ return false;
+ }
+
+ private Column getColumnIgnoreCase(List<Column> columns, String columName)
{
+ for (Column column : columns) {
+ if (column.getName().equalsIgnoreCase(columName)) {
+ return column;
}
- } catch (Exception e) {
- throw new RuntimeException(e);
}
- return recordList;
+ return null;
}
private List<CanalConnectRecord> internParse(CanalSourceConfig
sourceConfig, Entry entry) {
@@ -127,20 +141,9 @@ public class EntryParser {
return null;
}
- if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemSchema(),
schemaName)) {
- // do noting
- if (eventType.isDdl()) {
- return null;
- }
-
- if
(StringUtils.equalsIgnoreCase(sourceConfig.getSystemDualTable(), tableName)) {
- return null;
- }
- } else {
- if (eventType.isDdl()) {
- log.warn("unsupported ddl event type: {}", eventType);
- return null;
- }
+ if (eventType.isDdl()) {
+ log.warn("unsupported ddl event type: {}", eventType);
+ return null;
}
List<CanalConnectRecord> recordList = new ArrayList<>();
@@ -164,13 +167,12 @@ public class EntryParser {
List<Column> beforeColumns = rowData.getBeforeColumnsList();
List<Column> afterColumns = rowData.getAfterColumnsList();
- String tableName = canalConnectRecord.getSchemaName() + "." +
canalConnectRecord.getTableName();
boolean isRowMode = canalSourceConfig.getSyncMode().isRow();
- Map<String, EventColumn> keyColumns = new LinkedHashMap<String,
EventColumn>();
- Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<String,
EventColumn>();
- Map<String, EventColumn> notKeyColumns = new LinkedHashMap<String,
EventColumn>();
+ Map<String, EventColumn> keyColumns = new LinkedHashMap<>();
+ Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<>();
+ Map<String, EventColumn> notKeyColumns = new LinkedHashMap<>();
if (eventType.isInsert()) {
for (Column column : afterColumns) {
@@ -195,7 +197,7 @@ public class EntryParser {
keyColumns.put(column.getName(), copyEventColumn(column,
true));
} else {
if (isRowMode && entry.getHeader().getSourceType() ==
CanalEntry.Type.ORACLE) {
- notKeyColumns.put(column.getName(),
copyEventColumn(column, isRowMode));
+ notKeyColumns.put(column.getName(),
copyEventColumn(column, true));
}
}
}
@@ -233,7 +235,7 @@ public class EntryParser {
}
canalConnectRecord.setColumns(columns);
} else {
- throw new RuntimeException("this row data has no pks , entry: " +
entry.toString() + " and rowData: "
+ throw new RuntimeException("this row data has no pks , entry: " +
entry + " and rowData: "
+ rowData);
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index c179124ce..577142e00 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -31,16 +31,18 @@ import
org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.commons.lang3.StringUtils;
+
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
-
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
@@ -74,7 +76,9 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
private ClientIdentity clientIdentity;
- private String filter = null;
+ private String tableFilter = null;
+
+ private String fieldFilter = null;
private volatile boolean running = false;
@@ -95,6 +99,16 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext =
(SourceConnectorContext) connectorContext;
this.sourceConfig = (CanalSourceConfig)
sourceConnectorContext.getSourceConfig();
+ if (sourceConnectorContext.getRecordPositionList() != null) {
+
this.sourceConfig.setRecordPositions(sourceConnectorContext.getRecordPositionList());
+ }
+
+ if (StringUtils.isNotEmpty(sourceConfig.getTableFilter())) {
+ tableFilter = sourceConfig.getTableFilter();
+ }
+ if (StringUtils.isNotEmpty(sourceConfig.getFieldFilter())) {
+ fieldFilter = sourceConfig.getFieldFilter();
+ }
canalServer = CanalServerWithEmbedded.instance();
@@ -103,7 +117,7 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
public CanalInstance generate(String destination) {
Canal canal = buildCanal(sourceConfig);
- CanalInstanceWithManager instance = new
CanalInstanceWithManager(canal, filter) {
+ CanalInstanceWithManager instance = new
CanalInstanceWithManager(canal, tableFilter) {
protected CanalHAController initHaController() {
return super.initHaController();
@@ -118,6 +132,9 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
((MysqlEventParser)
eventParser).setSupportBinlogImages("FULL");
MysqlEventParser mysqlEventParser =
(MysqlEventParser) eventParser;
mysqlEventParser.setParallel(false);
+ if (StringUtils.isNotEmpty(fieldFilter)) {
+ mysqlEventParser.setFieldFilter(fieldFilter);
+ }
CanalHAController haController =
mysqlEventParser.getHaController();
if (!haController.isStart()) {
@@ -204,7 +221,7 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
canalServer.start();
canalServer.start(sourceConfig.getDestination());
- this.clientIdentity = new
ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(),
filter);
+ this.clientIdentity = new
ClientIdentity(sourceConfig.getDestination(), sourceConfig.getClientId(),
tableFilter);
canalServer.subscribe(clientIdentity);
running = true;
@@ -274,23 +291,31 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
EntryParser entryParser = new EntryParser();
List<ConnectRecord> result = new ArrayList<>();
-
- List<CanalConnectRecord> connectRecordList =
entryParser.parse(sourceConfig, entries);
-
- if (connectRecordList != null && !connectRecordList.isEmpty()) {
- CanalConnectRecord lastRecord =
connectRecordList.get(connectRecordList.size() - 1);
-
- CanalRecordPartition canalRecordPartition = new
CanalRecordPartition();
- canalRecordPartition.setJournalName(lastRecord.getJournalName());
- canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
-
- CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
- canalRecordOffset.setOffset(lastRecord.getBinLogOffset());
-
- ConnectRecord connectRecord = new
ConnectRecord(canalRecordPartition, canalRecordOffset,
System.currentTimeMillis());
- connectRecord.addExtension("messageId",
String.valueOf(message.getId()));
- connectRecord.setData(connectRecordList);
- result.add(connectRecord);
+ // key: Xid offset
+ Map<Long, List<CanalConnectRecord>> connectorRecordMap =
entryParser.parse(sourceConfig, entries);
+
+ if (!connectorRecordMap.isEmpty()) {
+ Set<Map.Entry<Long, List<CanalConnectRecord>>> entrySet =
connectorRecordMap.entrySet();
+ for (Map.Entry<Long, List<CanalConnectRecord>> entry : entrySet) {
+ // Xid offset
+ Long binLogOffset = entry.getKey();
+ List<CanalConnectRecord> connectRecordList = entry.getValue();
+ CanalConnectRecord lastRecord =
entry.getValue().get(connectRecordList.size() - 1);
+ CanalRecordPartition canalRecordPartition = new
CanalRecordPartition();
+
canalRecordPartition.setJournalName(lastRecord.getJournalName());
+ canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
+
+ CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
+ canalRecordOffset.setOffset(binLogOffset);
+
+ ConnectRecord connectRecord = new
ConnectRecord(canalRecordPartition, canalRecordOffset,
System.currentTimeMillis());
+ connectRecord.addExtension("messageId",
String.valueOf(message.getId()));
+ connectRecord.setData(connectRecordList);
+ result.add(connectRecord);
+ }
+ } else {
+ // for the message has been filtered need ack message
+ canalServer.ack(clientIdentity, message.getId());
}
return result;
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
index 10c647c8f..ceb509ef7 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
@@ -32,7 +32,7 @@ public abstract class AbstractSqlTemplate implements
SqlTemplate {
}
sql.append(" from ").append(getFullName(schemaName,
tableName)).append(" where ( ");
- appendColumnEquals(sql, pkNames, "and");
+ appendColumnEquals(sql, pkNames);
sql.append(" ) ");
return sql.toString().intern();
}
@@ -41,7 +41,7 @@ public abstract class AbstractSqlTemplate implements
SqlTemplate {
StringBuilder sql = new StringBuilder("update " +
getFullName(schemaName, tableName) + " set ");
appendExcludeSingleShardColumnEquals(sql, columnNames, ",", updatePks,
shardColumn);
sql.append(" where (");
- appendColumnEquals(sql, pkNames, "and");
+ appendColumnEquals(sql, pkNames);
sql.append(")");
return sql.toString().intern();
}
@@ -65,7 +65,7 @@ public abstract class AbstractSqlTemplate implements
SqlTemplate {
public String getDeleteSql(String schemaName, String tableName, String[]
pkNames) {
StringBuilder sql = new StringBuilder("delete from " +
getFullName(schemaName, tableName) + " where ");
- appendColumnEquals(sql, pkNames, "and");
+ appendColumnEquals(sql, pkNames);
return sql.toString().intern();
}
@@ -91,12 +91,12 @@ public abstract class AbstractSqlTemplate implements
SqlTemplate {
}
}
- protected void appendColumnEquals(StringBuilder sql, String[] columns,
String separator) {
+ protected void appendColumnEquals(StringBuilder sql, String[] columns) {
int size = columns.length;
for (int i = 0; i < size; i++) {
sql.append(" ").append(appendEscape(columns[i])).append(" =
").append("? ");
if (i != size - 1) {
- sql.append(separator);
+ sql.append("and");
}
}
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
index a169ed20f..37b45c746 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
@@ -47,7 +47,7 @@ public class MysqlSqlTemplate extends AbstractSqlTemplate {
size = columnNames.length;
for (int i = 0; i < size; i++) {
- if (!includePks && shardColumn != null &&
columnNames[i].equals(shardColumn)) {
+ if (!includePks && columnNames[i].equals(shardColumn)) {
continue;
}
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
index 76800d9c2..55c88ce55 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SourceConnectorContext.java
@@ -18,8 +18,11 @@
package org.apache.eventmesh.openconnect.api.connector;
import org.apache.eventmesh.common.config.connector.SourceConfig;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
import
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;
+import java.util.List;
+
import lombok.Data;
/**
@@ -32,4 +35,7 @@ public class SourceConnectorContext implements
ConnectorContext {
public SourceConfig sourceConfig;
+ // initial record position
+ public List<RecordPosition> recordPositionList;
+
}
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index c011a1520..16a6fca3d 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -69,6 +69,8 @@ public class AdminOffsetService implements
OffsetManagementService {
public KeyValueStore<RecordPartition, RecordOffset> positionStore;
+ public KeyValueStore<String, RecordPosition> positionStore2;
+
private String jobId;
private JobState jobState;
@@ -106,6 +108,7 @@ public class AdminOffsetService implements
OffsetManagementService {
ReportPositionRequest reportPositionRequest = new
ReportPositionRequest();
reportPositionRequest.setJobID(jobId);
reportPositionRequest.setState(jobState);
+ reportPositionRequest.setDataSourceType(dataSourceType);
reportPositionRequest.setAddress(IPUtils.getLocalAddress());
reportPositionRequest.setRecordPositionList(recordToSyncList);
@@ -119,6 +122,10 @@ public class AdminOffsetService implements
OffsetManagementService {
.build())
.build();
requestObserver.onNext(payload);
+
+ for (Map.Entry<RecordPartition, RecordOffset> entry :
recordMap.entrySet()) {
+ positionStore.remove(entry.getKey());
+ }
}
@Override
@@ -157,8 +164,9 @@ public class AdminOffsetService implements
OffsetManagementService {
JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(),
FetchPositionResponse.class);
assert fetchPositionResponse != null;
if (fetchPositionResponse.isSuccess()) {
-
positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(),
-
fetchPositionResponse.getRecordPosition().getRecordOffset());
+ for (RecordPosition recordPosition :
fetchPositionResponse.getRecordPosition()) {
+ positionStore.put(recordPosition.getRecordPartition(),
recordPosition.getRecordOffset());
+ }
}
}
}
@@ -175,9 +183,9 @@ public class AdminOffsetService implements
OffsetManagementService {
fetchPositionRequest.setJobID(jobId);
fetchPositionRequest.setAddress(IPUtils.getLocalAddress());
fetchPositionRequest.setDataSourceType(dataSourceType);
- RecordPosition recordPosition = new RecordPosition();
- recordPosition.setRecordPartition(partition);
- fetchPositionRequest.setRecordPosition(recordPosition);
+ RecordPosition fetchRecordPosition = new RecordPosition();
+ fetchRecordPosition.setRecordPartition(partition);
+ fetchPositionRequest.setRecordPosition(fetchRecordPosition);
Metadata metadata = Metadata.newBuilder()
.setType(FetchPositionRequest.class.getSimpleName())
@@ -195,8 +203,9 @@ public class AdminOffsetService implements
OffsetManagementService {
JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(),
FetchPositionResponse.class);
assert fetchPositionResponse != null;
if (fetchPositionResponse.isSuccess()) {
-
positionStore.put(fetchPositionResponse.getRecordPosition().getRecordPartition(),
-
fetchPositionResponse.getRecordPosition().getRecordOffset());
+ for (RecordPosition recordPosition :
fetchPositionResponse.getRecordPosition()) {
+ positionStore.put(recordPosition.getRecordPartition(),
recordPosition.getRecordOffset());
+ }
}
}
}
diff --git a/eventmesh-admin-server/src/main/resources/application.yaml
b/eventmesh-registry/eventmesh-registry-nacos/gradle.properties
similarity index 60%
copy from eventmesh-admin-server/src/main/resources/application.yaml
copy to eventmesh-registry/eventmesh-registry-nacos/gradle.properties
index ce396a09f..cf067e20b 100644
--- a/eventmesh-admin-server/src/main/resources/application.yaml
+++ b/eventmesh-registry/eventmesh-registry-nacos/gradle.properties
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -15,18 +14,5 @@
# limitations under the License.
#
-spring:
- datasource:
- url:
jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
- username: root
- password: mike920830
- driver-class-name: com.mysql.cj.jdbc.Driver
-mybatis-plus:
- mapper-locations: classpath:mapper/*.xml
- configuration:
- map-underscore-to-camel-case: false
- log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
-event-mesh:
- admin-server:
- service-name: DEFAULT_GROUP@@em_adm_server
- port: 8081
\ No newline at end of file
+pluginType=registryCenter
+pluginName=nacos
\ No newline at end of file
diff --git a/eventmesh-runtime-v2/bin/start-v2.sh
b/eventmesh-runtime-v2/bin/start-v2.sh
new file mode 100644
index 000000000..fc67c29d3
--- /dev/null
+++ b/eventmesh-runtime-v2/bin/start-v2.sh
@@ -0,0 +1,200 @@
+#!/bin/bash
+#
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#===========================================================================================
+# Java Environment Setting
+#===========================================================================================
+set -e
+# Server configuration may be inconsistent, add these configurations to avoid
garbled code problems
+export LANG=en_US.UTF-8
+export LC_CTYPE=en_US.UTF-8
+export LC_ALL=en_US.UTF-8
+
+TMP_JAVA_HOME="/customize/your/java/home/here"
+
+# Detect operating system.
+OS=$(uname)
+
+function is_java8_or_11 {
+ local _java="$1"
+ [[ -x "$_java" ]] || return 1
+ [[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java"
-version 2>&1)" =~ 'openjdk version "1.8' || "$("$_java" -version 2>&1)" =~
'java version "11' || "$("$_java" -version 2>&1)" =~ 'openjdk version "11' ]]
|| return 2
+ return 0
+}
+
+function extract_java_version {
+ local _java="$1"
+ local version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}'
| awk -F '.' '{if ($1 == 1 && $2 == 8) print "8"; else if ($1 == 11) print
"11"; else print "unknown"}')
+ echo "$version"
+}
+
+# 0(not running), 1(is running)
+#function is_proxyRunning {
+# local _pid="$1"
+# local pid=`ps ax | grep -i
'org.apache.eventmesh.runtime.boot.EventMeshStartup' |grep java | grep -v grep
| awk '{print $1}'|grep $_pid`
+# if [ -z "$pid" ] ; then
+# return 0
+# else
+# return 1
+# fi
+#}
+
+function get_pid {
+ local ppid=""
+ if [ -f ${EVENTMESH_HOME}/bin/pid.file ]; then
+ ppid=$(cat ${EVENTMESH_HOME}/bin/pid.file)
+ # If the process does not exist, it indicates that the previous
process terminated abnormally.
+ if [ ! -d /proc/$ppid ]; then
+ # Remove the residual file.
+ rm ${EVENTMESH_HOME}/bin/pid.file
+ echo -e "ERROR\t EventMesh process had already terminated unexpectedly
before, please check log output."
+ ppid=""
+ fi
+ else
+ if [[ $OS =~ Msys ]]; then
+ # There is a Bug on Msys that may not be able to kill
the identified process
+ ppid=`jps -v | grep -i
"org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep java | grep
-v grep | awk -F ' ' {'print $1'}`
+ elif [[ $OS =~ Darwin ]]; then
+ # Known problem: grep Java may not be able to
accurately identify Java processes
+ ppid=$(/bin/ps -o user,pid,command | grep "java" | grep
-i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev
"^root" |awk -F ' ' {'print $2'})
+ else
+ if [ $DOCKER ]; then
+ # No need to exclude root user in Docker containers.
+ ppid=$(ps -C java -o user,pid,command --cols 99999 | grep
-w $EVENTMESH_HOME | grep -i
"org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | awk -F ' ' {'print
$2'})
+ else
+ # It is required to identify the process as accurately as possible on
Linux.
+ ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w
$EVENTMESH_HOME | grep -i
"org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev "^root" |
awk -F ' ' {'print $2'})
+ fi
+ fi
+ fi
+ echo "$ppid";
+}
+
+#===========================================================================================
+# Locate Java Executable
+#===========================================================================================
+
+if [[ -d "$TMP_JAVA_HOME" ]] && is_java8_or_11 "$TMP_JAVA_HOME/bin/java"; then
+ JAVA="$TMP_JAVA_HOME/bin/java"
+ JAVA_VERSION=$(extract_java_version "$TMP_JAVA_HOME/bin/java")
+elif [[ -d "$JAVA_HOME" ]] && is_java8_or_11 "$JAVA_HOME/bin/java"; then
+ JAVA="$JAVA_HOME/bin/java"
+ JAVA_VERSION=$(extract_java_version "$JAVA_HOME/bin/java")
+elif is_java8_or_11 "$(which java)"; then
+ JAVA="$(which java)"
+ JAVA_VERSION=$(extract_java_version "$(which java)")
+else
+ echo -e "ERROR\t Java 8 or 11 not found, operation abort."
+ exit 9;
+fi
+
+echo "EventMesh using Java version: $JAVA_VERSION, path: $JAVA"
+
+EVENTMESH_HOME=$(cd "$(dirname "$0")/.." && pwd)
+export EVENTMESH_HOME
+
+EVENTMESH_LOG_HOME="${EVENTMESH_HOME}/logs"
+export EVENTMESH_LOG_HOME
+
+echo -e "EVENTMESH_HOME : ${EVENTMESH_HOME}\nEVENTMESH_LOG_HOME :
${EVENTMESH_LOG_HOME}"
+
+function make_logs_dir {
+ if [ ! -e "${EVENTMESH_LOG_HOME}" ]; then mkdir -p
"${EVENTMESH_LOG_HOME}"; fi
+}
+
+error_exit ()
+{
+ echo -e "ERROR\t $1 !!"
+ exit 1
+}
+
+export JAVA_HOME
+
+#===========================================================================================
+# JVM Configuration
+#===========================================================================================
+#if [ $1 = "prd" -o $1 = "benchmark" ]; then JAVA_OPT="${JAVA_OPT} -server
-Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
+#elif [ $1 = "sit" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms256M -Xmx512M
-Xmn256m -XX:SurvivorRatio=4"
+#elif [ $1 = "dev" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms128M -Xmx256M
-Xmn128m -XX:SurvivorRatio=4"
+#fi
+
+GC_LOG_FILE="${EVENTMESH_LOG_HOME}/eventmesh_gc_%p.log"
+
+#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m
-XX:SurvivorRatio=4"
+JAVA_OPT=`cat ${EVENTMESH_HOME}/conf/server.env | grep APP_START_JVM_OPTION:::
| awk -F ':::' {'print $2'}`
+JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
-XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
+JAVA_OPT="${JAVA_OPT} -verbose:gc"
+if [[ "$JAVA_VERSION" == "8" ]]; then
+ # Set JAVA_OPT for Java 8
+ JAVA_OPT="${JAVA_OPT} -Xloggc:${GC_LOG_FILE} -XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
+ JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps
-XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
+elif [[ "$JAVA_VERSION" == "11" ]]; then
+ # Set JAVA_OPT for Java 11
+ XLOG_PARAM="time,level,tags:filecount=5,filesize=30m"
+ JAVA_OPT="${JAVA_OPT} -Xlog:gc*:${GC_LOG_FILE}:${XLOG_PARAM}"
+ JAVA_OPT="${JAVA_OPT} -Xlog:safepoint:${GC_LOG_FILE}:${XLOG_PARAM}
-Xlog:ergo*=debug:${GC_LOG_FILE}:${XLOG_PARAM}"
+fi
+JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=${EVENTMESH_LOG_HOME}
-XX:ErrorFile=${EVENTMESH_LOG_HOME}/hs_err_%p.log"
+JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
+JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
+JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G"
+JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
+JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced"
+JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled"
+JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom"
+JAVA_OPT="${JAVA_OPT}
-Dlog4j.configurationFile=${EVENTMESH_HOME}/conf/log4j2.xml"
+JAVA_OPT="${JAVA_OPT} -Deventmesh.log.home=${EVENTMESH_LOG_HOME}"
+JAVA_OPT="${JAVA_OPT} -DconfPath=${EVENTMESH_HOME}/conf"
+JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard"
+JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
+JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_HOME}/plugin"
+
+#if [ -f "pid.file" ]; then
+# pid=`cat pid.file`
+# if ! is_proxyRunning "$pid"; then
+# echo "proxy is running already"
+# exit 9;
+# else
+# echo "err pid$pid, rm pid.file"
+# rm pid.file
+# fi
+#fi
+
+pid=$(get_pid)
+if [[ $pid == "ERROR"* ]]; then
+ echo -e "${pid}"
+ exit 9
+fi
+if [ -n "$pid" ]; then
+ echo -e "ERROR\t The server is already running (pid=$pid), there is no
need to execute start.sh again."
+ exit 9
+fi
+
+make_logs_dir
+
+echo "Using Java version: $JAVA_VERSION, path: $JAVA" >>
${EVENTMESH_LOG_HOME}/eventmesh.out
+
+EVENTMESH_MAIN=org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter
+if [ $DOCKER ]; then
+ $JAVA $JAVA_OPT -classpath
${EVENTMESH_HOME}/conf:${EVENTMESH_HOME}/apps/*:${EVENTMESH_HOME}/lib/*
$EVENTMESH_MAIN >> ${EVENTMESH_LOG_HOME}/eventmesh.out
+else
+ $JAVA $JAVA_OPT -classpath
${EVENTMESH_HOME}/conf:${EVENTMESH_HOME}/apps/*:${EVENTMESH_HOME}/lib/*
$EVENTMESH_MAIN >> ${EVENTMESH_LOG_HOME}/eventmesh.out 2>&1 &
+echo $!>${EVENTMESH_HOME}/bin/pid.file
+fi
+exit 0
diff --git a/eventmesh-runtime-v2/bin/stop-v2.sh
b/eventmesh-runtime-v2/bin/stop-v2.sh
new file mode 100644
index 000000000..177ae1e12
--- /dev/null
+++ b/eventmesh-runtime-v2/bin/stop-v2.sh
@@ -0,0 +1,88 @@
+#!/bin/bash
+#
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Detect operating system
+OS=$(uname)
+
+EVENTMESH_HOME=`cd $(dirname $0)/.. && pwd`
+
+export EVENTMESH_HOME
+
+function get_pid {
+ local ppid=""
+ if [ -f ${EVENTMESH_HOME}/bin/pid.file ]; then
+ ppid=$(cat ${EVENTMESH_HOME}/bin/pid.file)
+ # If the process does not exist, it indicates that the previous process
terminated abnormally.
+ if [ ! -d /proc/$ppid ]; then
+ # Remove the residual file and return an error status.
+ rm ${EVENTMESH_HOME}/bin/pid.file
+ echo -e "ERROR\t EventMesh process had already terminated unexpectedly
before, please check log output."
+ ppid=""
+ fi
+ else
+ if [[ $OS =~ Msys ]]; then
+ # There is a Bug on Msys that may not be able to kill
the identified process
+ ppid=`jps -v | grep -i
"org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep java | grep
-v grep | awk -F ' ' {'print $1'}`
+ elif [[ $OS =~ Darwin ]]; then
+ # Known problem: grep Java may not be able to
accurately identify Java processes
+ ppid=$(/bin/ps -o user,pid,command | grep "java" | grep
-i "org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev
"^root" |awk -F ' ' {'print $2'})
+ else
+ # It is required to identify the process as accurately
as possible on Linux
+ ppid=$(ps -C java -o user,pid,command --cols 99999 |
grep -w $EVENTMESH_HOME | grep -i
"org.apache.eventmesh.runtime.boot.RuntimeInstanceStarter" | grep -Ev "^root"
|awk -F ' ' {'print $2'})
+ fi
+ fi
+ echo "$ppid";
+}
+
+pid=$(get_pid)
+if [[ $pid == "ERROR"* ]]; then
+ echo -e "${pid}"
+ exit 9
+fi
+if [ -z "$pid" ];then
+ echo -e "ERROR\t No EventMesh server running."
+ exit 9
+fi
+
+kill ${pid}
+echo "Send shutdown request to EventMesh(${pid}) OK"
+
+[[ $OS =~ Msys ]] && PS_PARAM=" -W "
+stop_timeout=60
+for no in $(seq 1 $stop_timeout); do
+ if ps $PS_PARAM -p "$pid" 2>&1 > /dev/null; then
+ if [ $no -lt $stop_timeout ]; then
+ echo "[$no] server shutting down ..."
+ sleep 1
+ continue
+ fi
+
+ echo "shutdown server timeout, kill process: $pid"
+ kill -9 $pid; sleep 1; break;
+ echo "`date +'%Y-%m-%-d %H:%M:%S'` , pid : [$pid] , error
message : abnormal shutdown which can not be closed within 60s" >
../logs/shutdown.error
+ else
+ echo "shutdown server ok!"; break;
+ fi
+done
+
+if [ -f "pid.file" ]; then
+ rm pid.file
+fi
+
+
diff --git
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 2f16834b4..65676903d 100644
---
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -53,6 +53,8 @@ import org.apache.eventmesh.runtime.Runtime;
import org.apache.eventmesh.runtime.RuntimeInstanceConfig;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
+import org.apache.commons.collections4.CollectionUtils;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -201,6 +203,9 @@ public class ConnectorRuntime implements Runtime {
SourceConnectorContext sourceConnectorContext = new
SourceConnectorContext();
sourceConnectorContext.setSourceConfig(sourceConfig);
sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
+ if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
+
sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
+ }
// spi load offsetMgmtService
this.offsetManagement = new RecordOffsetManagement();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]