This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch storage-api
in repository https://gitbox.apache.org/repos/asf/eventmesh.git

commit c0e182a0998f6d1a3f1a7f7240b2d7ade5417d69
Author: githublaohu <[email protected]>
AuthorDate: Mon Apr 24 16:57:13 2023 +0800

    Resolve conflicts with storage-api branch
---
 .../eventmesh-connector-api/build.gradle           |   6 +
 .../eventmesh-connector-api/gradle.properties      |   4 +-
 .../api/connector/storage/CloudEventUtils.java     |  45 +++-
 ...va => ConnectorResourceServiceStorageImpl.java} |  19 +-
 .../eventmesh/api/connector/storage/Constant.java  |  21 +-
 .../api/connector/storage/StorageConfig.java       |   4 +-
 .../api/connector/storage/StorageConnector.java    |   4 +
 .../storage/StorageConnectorMetedata.java          |   2 +
 .../connector/storage/StorageConnectorProxy.java   |  21 +-
 .../connector/storage/StorageConnectorService.java | 253 +++++++++++----------
 .../api/connector/storage/StorageConsumer.java     | 102 ++++++---
 .../api/connector/storage/StorageProducer.java     |   9 +-
 .../api/connector/storage/data/PullRequest.java    |  61 ++---
 .../api/connector/storage/data/TopicInfo.java      |   2 +
 .../connector/storage/metadata/RouteHandler.java   |  23 +-
 .../storage/metadata/StorageMetaServcie.java       |  74 ++++--
 .../connector/storage/pull/PullCallbackImpl.java   |   6 +-
 .../connector/storage/pull/StoragePullService.java |  11 +-
 .../storage/reply/ReplyOperationService.java       | 206 +++++++++--------
 ...entmesh.api.connector.ConnectorResourceService} |   2 +-
 .../org.apache.eventmesh.api.consumer.Consumer}    |   2 +-
 .../org.apache.eventmesh.api.producer.Producer}    |   2 +-
 .../storage/StorageConnectorProxyTest.java         | 132 +++++++++++
 .../storage/pull/StoragePullServiceTest.java       |  94 ++++++++
 .../storage/reply/ReplyOperationServiceTest.java   | 151 ++++++++++++
 .../storage/jdbc/AbstractJDBCStorageConnector.java | 201 ++++++++--------
 .../jdbc/AbstractJDBCStorageConnectorMetadata.java |  29 ++-
 .../storage/jdbc/JDBCStorageConnector.java         | 202 ++++++++--------
 .../storage/jdbc/ResultSetTransformUtils.java      |  25 +-
 .../connector/storage/jdbc/SQL/BaseSQL.java        |   2 +
 .../storage/jdbc/SQL/BaseSQLOperation.java         |   2 +
 .../storage/jdbc/SQL/CloudEventSQLOperation.java   |   6 +-
 .../jdbc/SQL/SQLServiceInvocationHandler.java      |   5 +-
 ...ventmesh.api.connector.storage.StorageConnector |   2 +-
 .../src/main/resource/mysql-base.yaml              |   5 +-
 .../src/main/resource/mysql-cloudevent.yaml        |  20 +-
 .../storage/jdbc/JDBCStorageConnectorTest.java     | 143 +++++++++++-
 eventmesh-runtime/build.gradle                     |   1 +
 eventmesh-runtime/conf/eventmesh.properties        |  11 +-
 39 files changed, 1345 insertions(+), 565 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle 
b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
index 25c68d12e..be494c74c 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
@@ -23,10 +23,16 @@ dependencies {
     api "io.dropwizard.metrics:metrics-healthchecks"
     api "io.dropwizard.metrics:metrics-annotation"
     api "io.dropwizard.metrics:metrics-json"
+    
+    implementation 'io.cloudevents:cloudevents-json-jackson:2.4.0'
 
     compileOnly 'org.projectlombok:lombok'
     annotationProcessor 'org.projectlombok:lombok'
 
+       implementation "org.mockito:mockito-core"
+    implementation "org.powermock:powermock-module-junit4"
+    implementation "org.powermock:powermock-api-mockito2"
+       
     testCompileOnly 'org.projectlombok:lombok'
     testAnnotationProcessor 'org.projectlombok:lombok'
 
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties 
b/eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties
index a9fd83fea..41f4a5718 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties
@@ -13,4 +13,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
\ No newline at end of file
+#
+pluginType=connector
+pluginName=storage
\ No newline at end of file
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/CloudEventUtils.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/CloudEventUtils.java
index d935c5ba9..ecf289fc9 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/CloudEventUtils.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/CloudEventUtils.java
@@ -20,24 +20,31 @@ package org.apache.eventmesh.api.connector.storage;
 import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo;
 
 import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 import io.cloudevents.CloudEvent;
+import io.cloudevents.core.format.EventFormat;
 import io.cloudevents.core.impl.BaseCloudEvent;
+import io.cloudevents.core.provider.EventFormatProvider;
 import io.cloudevents.core.v03.CloudEventV03;
 import io.cloudevents.core.v1.CloudEventV1;
 
 public class CloudEventUtils {
+       
+       public static EventFormat eventFormat = 
EventFormatProvider.getInstance().resolveFormat("application/cloudevents+json");
 
     private static Field CLOUD_EVENT_EXTENSIONS_FIELD;
 
     static {
         try {
-            CLOUD_EVENT_EXTENSIONS_FIELD = 
BaseCloudEvent.class.getField("extensions");
+            CLOUD_EVENT_EXTENSIONS_FIELD = 
BaseCloudEvent.class.getDeclaredField("extensions");
             CLOUD_EVENT_EXTENSIONS_FIELD.setAccessible(true);
         } catch (NoSuchFieldException | SecurityException e) {
-            e.printStackTrace();
+            throw new RuntimeException(e.getMessage() , e );
         }
     }
 
@@ -50,22 +57,48 @@ public class CloudEventUtils {
                 extensions.put(key, value);
                 return cloudEvent;
             } catch (Exception e) {
-                e.printStackTrace();
+                throw new RuntimeException(e);
             }
         }
         return null;
     }
+    
+       public static String getTableName(CloudEvent cloudEvent) {
+               return cloudEvent.getSubject();
+       }
+       
+       public static String checkConsumerGroupName(String topic) {
+               return topic.replace("-", "_");
+       }
+
+       public static List<Object> getParameterToCloudEvent(CloudEvent 
cloudEvent) {
+               List<Object> parameterList = new ArrayList<>();
+               String id = (String) cloudEvent.getExtension("cloudeventid");
+               parameterList.add(Objects.isNull(id)?"1":id);// id
+               parameterList.add(getTableName(cloudEvent));// topic
+               parameterList.add("");// cloud_event_storage_node_adress
+               parameterList.add("");// cloud_event_type
+               parameterList.add("");// cloud_event_producer_group_name
+               parameterList.add("");// cloud_event_source
+               parameterList.add("application/cloudevents+json");// 
cloud_event_content_type
+               // parameterList.add("");//cloud_event_tag
+               parameterList.add("");// cloud_event_extensions
+               String data = new 
String(CloudEventUtils.eventFormat.serialize(cloudEvent), 
Charset.forName("UTF-8"));
+               parameterList.add(data);// cloud_event_data
+               parameterList.add("{}");
+               return parameterList;
+       }
 
     public static String getNodeAdress(CloudEvent cloudEvent) {
-        return (String) cloudEvent.getExtension(Constant.NODE_ADDRESS);
+        return (String) 
cloudEvent.getExtension(Constant.STORAGE_CONFIG_ADDRESS);
     }
 
     public static String getTopic(CloudEvent cloudEvent) {
-        return null;
+        return cloudEvent.getSubject();
     }
 
     public static String getId(CloudEvent cloudEvent) {
-        return null;
+        return "";
     }
 
     public static CloudEvent createCloudEvent(CloudEventInfo cloudEventInfo) {
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorInfo.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/ConnectorResourceServiceStorageImpl.java
similarity index 71%
rename from 
eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorInfo.java
rename to 
eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/ConnectorResourceServiceStorageImpl.java
index 65a3fecd4..274cd0ad9 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorInfo.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/ConnectorResourceServiceStorageImpl.java
@@ -14,13 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.eventmesh.api.connector.storage;
 
-import lombok.Data;
+import org.apache.eventmesh.api.connector.ConnectorResourceService;
+
+public class ConnectorResourceServiceStorageImpl implements 
ConnectorResourceService{
+
+       @Override
+       public void init() throws Exception {
+               // TODO Auto-generated method stub
+               
+       }
 
-@Data
-public class StorageConnectorInfo {
+       @Override
+       public void release() throws Exception {
+               // TODO Auto-generated method stub
+               
+       }
 
-       private boolean distinguishTopic;
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/Constant.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/Constant.java
index f08d62cf0..ceef0e5e9 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/Constant.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/Constant.java
@@ -19,5 +19,24 @@ package org.apache.eventmesh.api.connector.storage;
 
 public class Constant {
 
-       public static final String NODE_ADDRESS = "nodeAddress";
+       public static final String STORAGE_CONFIG_ADDRESS = 
"eventMesh.connector.plugin.storage.nodeaddress";
+       
+       public static final String STORAGE_CONFIG_TYPE = 
"eventMesh.connector.plugin.storage.type";
+
+       public static final String STORAGE_CONFIG_JDBC_PARAMETER = 
"eventMesh.connector.plugin.storage.jdbc.parameter";
+
+       public static final String STORAGE_CONFIG_USER_NAME = 
"eventMesh.connector.plugin.storage.username";
+
+       public static final String STORAGE_CONFIG_PASSWORD = 
"eventMesh.connector.plugin.storage.password";
+       
+       public static final String STORAGE_CONFIG_JDBC_TYPE = 
"eventMesh.connector.plugin.storage.jdbc.dbType";
+       
+       public static final String STORAGE_CONFIG_JDBC_MAXACTIVE = 
"eventMesh.connector.plugin.storage.jdbc.maxActive";
+       
+       public static final String STORAGE_CONFIG_JDBC_MAXWAIT = 
"eventMesh.connector.plugin.storage.jdbc.maxWait";
+       
+       public static final String STORAGE_ID = "storageid";
+       
+       public static final String STORAGE_NODE_ADDRESS = "address";
+
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConfig.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConfig.java
index 67eedb144..64c6fb88f 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConfig.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConfig.java
@@ -22,7 +22,7 @@ import lombok.Data;
 @Data
 public class StorageConfig {
 
-    private long pullInterval;
+    private long pullInterval = 200;
 
-    private long pullThresholdForQueue;
+    private long pullThresholdForQueue = 5;
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java
index 6e0493ca4..65756630a 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java
@@ -61,6 +61,10 @@ public interface StorageConnector extends LifeCycle {
     public List<CloudEvent> pull(PullRequest pullRequest) throws Exception;
 
     void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context);
+    
+    public default String getTopic(String topic) {
+       return topic;
+    }
 
     public default int deleteCloudEvent(CloudEvent cloudEvent) {
         return 0;
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorMetedata.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorMetedata.java
index 4843c25fb..84bb4edcc 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorMetedata.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorMetedata.java
@@ -35,5 +35,7 @@ public interface StorageConnectorMetedata {
     public int createTopic(TopicInfo topicInfo) throws Exception;
 
     public int createConsumerGroupInfo(ConsumerGroupInfo consumerGroupInfo) 
throws Exception;
+    
+    public List<TopicInfo> geTopicInfos(Set<String> topics,String key) throws 
Exception;
 
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxy.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxy.java
index a0339c65e..211594faf 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxy.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxy.java
@@ -24,11 +24,13 @@ import 
org.apache.eventmesh.api.connector.storage.data.PullRequest;
 import org.apache.eventmesh.api.connector.storage.data.TopicInfo;
 import org.apache.eventmesh.api.connector.storage.metadata.RouteHandler;
 import org.apache.eventmesh.api.connector.storage.metadata.StorageMetaServcie;
+import org.apache.eventmesh.api.connector.storage.reply.ReplyOperation;
 import org.apache.eventmesh.api.connector.storage.reply.ReplyOperationService;
 import org.apache.eventmesh.api.connector.storage.reply.RequestReplyInfo;
 import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
 import org.apache.eventmesh.api.exception.OnExceptionContext;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -36,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import io.cloudevents.CloudEvent;
+import lombok.Setter;
 
 public class StorageConnectorProxy implements StorageConnector {
 
@@ -45,10 +48,13 @@ public class StorageConnectorProxy implements 
StorageConnector {
 
     private RouteHandler routeHandler = new RouteHandler();
 
+    @Setter
     private ReplyOperationService replyService;
 
+    @Setter
     private StorageMetaServcie storageMetaServcie;
 
+    @Setter
     private Executor executor;
 
     @Override
@@ -63,7 +69,12 @@ public class StorageConnectorProxy implements 
StorageConnector {
     public void init(Properties properties) throws Exception {
     }
 
+    public  Collection<StorageConnector> getStorageConnectorList(){
+       return this.storageConnectorByKeyMap.values();
+    }
+    
     public void setConnector(StorageConnector storageConnector, String key) {
+       routeHandler.addStorageConnector(storageConnector);
         storageConnectorMap.put(storageConnector, key);
         storageConnectorByKeyMap.put(key, storageConnector);
     }
@@ -84,6 +95,7 @@ public class StorageConnectorProxy implements 
StorageConnector {
             if (storageConnector instanceof StorageConnectorMetedata
                 && !storageMetaServcie.isTopic(storageConnector, 
CloudEventUtils.getTopic(cloudEvent))) {
                 TopicInfo topicInfo = new TopicInfo();
+                topicInfo.setTopicName(CloudEventUtils.getTopic(cloudEvent));
                 StorageConnectorMetedata storageConnectorMetedata = 
(StorageConnectorMetedata) storageConnector;
                 storageConnectorMetedata.createTopic(topicInfo);
             }
@@ -114,15 +126,18 @@ public class StorageConnectorProxy implements 
StorageConnector {
     public void doRequest(CloudEvent cloudEvent, RequestReplyCallback 
requestReplyCallback, long timeout) {
         try {
             StorageConnector storageConnector = routeHandler.select();
+            if(!(storageConnector instanceof ReplyOperation)) {
+               return;
+            }
             String key = storageConnectorMap.get(storageConnector);
-            CloudEventUtils.setValue(cloudEvent, "nodeAddress", key);
+            CloudEventUtils.setValue(cloudEvent, 
Constant.STORAGE_CONFIG_ADDRESS, key);
             storageConnector.request(cloudEvent, requestReplyCallback, 
timeout);
-            Long storageId = (Long) cloudEvent.getExtension("storageId");
+            Long storageId = Long.valueOf( 
cloudEvent.getExtension(Constant.STORAGE_ID).toString());
             RequestReplyInfo requestReplyInfo = new RequestReplyInfo();
             requestReplyInfo.setStorageId(storageId);
             requestReplyInfo.setTimeOut(System.currentTimeMillis() + timeout);
             requestReplyInfo.setRequestReplyCallback(requestReplyCallback);
-            replyService.setRequestReplyInfo(null, cloudEvent.getType(), 
storageId, requestReplyInfo);
+            replyService.setRequestReplyInfo((ReplyOperation)storageConnector, 
cloudEvent.getType(), storageId, requestReplyInfo);
         } catch (Exception e) {
             requestReplyCallback.onException(e);
         }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorService.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorService.java
index 287c2b915..e4e51a215 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorService.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorService.java
@@ -22,9 +22,9 @@ import 
org.apache.eventmesh.api.connector.storage.data.PullRequest;
 import org.apache.eventmesh.api.connector.storage.metadata.StorageMetaServcie;
 import org.apache.eventmesh.api.connector.storage.pull.StoragePullService;
 import org.apache.eventmesh.api.connector.storage.reply.ReplyOperationService;
+import org.apache.eventmesh.common.config.ConfigurationWrapper;
 import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
-import java.net.URL;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,122 +42,137 @@ import lombok.Getter;
 
 public class StorageConnectorService implements LifeCycle {
 
-    private static final StorageConnectorService instance = new 
StorageConnectorService();
-
-    private StoragePullService pullService = new StoragePullService();
-
-    private StorageMetaServcie storageMetaServcie = new StorageMetaServcie();
-
-    private ReplyOperationService replyService = new ReplyOperationService();
-
-    private Map<String, StorageConnector> storageConnectorMap = new 
HashMap<>();
-
-    private Executor executor;
-
-    private ScheduledExecutorService scheduledExecutor;
-
-    @Getter
-    private StorageConnector storageConnector = new StorageConnectorProxy();
-
-    public static StorageConnectorService getInstance() {
-        return instance;
-    }
-
-    private StorageConnectorService() {
-        this.executor = new 
ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 10,
-            Runtime.getRuntime().availableProcessors() * 100, 1000 * 60 * 60, 
TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(), new ThreadFactory() {
-            AtomicInteger index = new AtomicInteger();
-
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "storage-connent-" + 
index.getAndIncrement());
-            }
-        });
-        this.scheduledExecutor = 
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 
10,
-            new ThreadFactory() {
-                AtomicInteger index = new AtomicInteger();
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "storage-connent-shceduled-" + 
index.getAndIncrement());
-                }
-            });
-        this.storageMetaServcie = new StorageMetaServcie();
-        this.storageMetaServcie.setScheduledExecutor(scheduledExecutor);
-        this.storageMetaServcie.setStoragePullService(pullService);
-        this.replyService.setExecutor(executor);
-        this.pullService.setExecutor(executor);
-        this.executor.execute(pullService);
-        this.scheduled();
-
-    }
-
-    public void scheduled() {
-        scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                storageMetaServcie.pullMeteData();
-            }
-        }, 5, 1000, TimeUnit.MILLISECONDS);
-        scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                replyService.execute();
-            }
-        }, 5, 5, TimeUnit.MILLISECONDS);
-    }
-
-    public StorageConnector createConsumerByStorageConnector(Properties 
properties) {
-        StorageConnector storageConnector = 
this.createConsumerByStorageConnector(properties);
-        if (storageConnector instanceof StorageConnectorMetedata) {
-            
this.storageMetaServcie.registerStorageConnector((StorageConnectorMetedata) 
storageConnector);
-        }
-
-        return storageConnector;
-    }
-
-    public StorageConnector createProducerByStorageConnector(Properties 
properties, List<PullRequest> pullRequests) {
-        StorageConnector storageConnector = 
this.createConsumerByStorageConnector(properties);
-        this.storageMetaServcie.registerPullRequest(pullRequests, 
storageConnector);
-        return storageConnector;
-    }
-
-    public StorageConnector createStorageConnector(Properties properties) 
throws Exception {
-        URL url = new URL(properties.getProperty(""));
-        String host = url.getHost() + ":" + url.getPort();
-        String[] hosts = host.split(",");
-        StorageConnectorProxy connectorProxy = new StorageConnectorProxy();
-        for (String address : hosts) {
-            StorageConnector storageConnector = 
EventMeshExtensionFactory.getExtension(StorageConnector.class,
-                url.getProtocol());
-            properties.setProperty("nodeAddress", address);
-            properties.setProperty("protocol", url.getProtocol());
-            storageConnector.init(properties);
-            String key = url.getProtocol() + "://" + address;
-            connectorProxy.setConnector(storageConnector, key);
-            storageConnectorMap.put(key, storageConnector);
-        }
-
-        return connectorProxy;
-    }
-
-    @Override
-    public boolean isStarted() {
-        return true;
-    }
-
-    @Override
-    public boolean isClosed() {
-        return false;
-    }
-
-    @Override
-    public void start() {
-    }
-
-    @Override
-    public void shutdown() {
-        storageConnectorMap.values().forEach(value -> value.shutdown());
-    }
+       private static final StorageConnectorService instance = new 
StorageConnectorService();
+
+       private StoragePullService pullService = new StoragePullService();
+
+       private StorageMetaServcie storageMetaServcie = new 
StorageMetaServcie();
+
+       private ReplyOperationService replyService = new 
ReplyOperationService();
+
+       private Map<String, StorageConnector> storageConnectorMap = new 
HashMap<>();
+
+       @Getter
+       private Executor executor;
+
+       private ScheduledExecutorService scheduledExecutor;
+
+       @Getter
+       private StorageConnector storageConnector = new StorageConnectorProxy();
+
+       public static StorageConnectorService getInstance() {
+               return instance;
+       }
+
+       private StorageConnectorService() {
+               this.executor = new 
ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 10,
+                               Runtime.getRuntime().availableProcessors() * 
300, 1000 * 60 * 60, TimeUnit.SECONDS,
+                               new LinkedBlockingQueue<>(), new 
ThreadFactory() {
+                                       AtomicInteger index = new 
AtomicInteger();
+
+                                       @Override
+                                       public Thread newThread(Runnable r) {
+                                               return new Thread(r, 
"storage-connent-" + index.getAndIncrement());
+                                       }
+                               });
+               this.scheduledExecutor = 
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 
10,
+                               new ThreadFactory() {
+                                       AtomicInteger index = new 
AtomicInteger();
+
+                                       @Override
+                                       public Thread newThread(Runnable r) {
+                                               return new Thread(r, 
"storage-connent-shceduled-" + index.getAndIncrement());
+                                       }
+                               });
+               this.storageMetaServcie = new StorageMetaServcie();
+               this.storageMetaServcie.setScheduledExecutor(scheduledExecutor);
+               this.storageMetaServcie.setStoragePullService(pullService);
+               this.storageMetaServcie.setExecutor(executor);
+               this.replyService.setExecutor(executor);
+               this.pullService.setExecutor(executor);
+               this.pullService.setScheduledExecutor(scheduledExecutor);
+               StorageConfig storageConfig = new StorageConfig();
+               this.pullService.setStorageConfig(storageConfig);
+               this.executor.execute(pullService);
+               this.scheduled();
+
+       }
+
+       public void scheduled() {
+               scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+                       @Override
+                       public void run() {
+                               storageMetaServcie.pullMeteData();
+                       }
+               }, 5, 1000, TimeUnit.MILLISECONDS);
+               scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+                       @Override
+                       public void run() {
+                               replyService.execute();
+                       }
+               }, 5, 5, TimeUnit.MILLISECONDS);
+       }
+
+       public StorageConnector createProducerByStorageConnector(Properties 
properties) {
+               StorageConnector storageConnector = 
this.createStorageConnector(properties);
+               
this.storageMetaServcie.registerStorageConnector(storageConnector);
+               return storageConnector;
+       }
+
+       public StorageConnector createConsumerByStorageConnector(Properties 
properties, List<PullRequest> pullRequests) {
+               try {
+                       StorageConnector storageConnector = 
this.createProducerByStorageConnector(properties);
+                       
this.storageMetaServcie.registerPullRequest(pullRequests, storageConnector);
+                       return storageConnector;
+               } catch (Exception e) {
+                       throw new RuntimeException(e.getMessage(), e);
+               }
+       }
+
+       public StorageConnector createStorageConnector(Properties properties) {
+               try {
+                       ConfigurationWrapper configurationWrapper = new 
ConfigurationWrapper(
+                                       System.getProperty("confPath", 
System.getenv("confPath")), "eventmesh.properties", false);
+                       configurationWrapper.getProperties().putAll(properties);
+                       properties = configurationWrapper.getProperties();
+                       String storageType = 
properties.getProperty(Constant.STORAGE_CONFIG_TYPE);
+                       String[] hosts = 
properties.getProperty(Constant.STORAGE_CONFIG_ADDRESS).split(";");
+                       StorageConnectorProxy connectorProxy = new 
StorageConnectorProxy();
+                       connectorProxy.setExecutor(executor);
+                       connectorProxy.setReplyService(replyService);
+                       
connectorProxy.setStorageMetaServcie(storageMetaServcie);
+                       for (String address : hosts) {
+                               StorageConnector storageConnector = 
EventMeshExtensionFactory.getExtension(StorageConnector.class,
+                                               storageType);
+                               
properties.setProperty(Constant.STORAGE_NODE_ADDRESS, address);
+                               storageConnector.init(properties);
+                               storageConnector.start();
+                               String key = storageType + "://" + address;
+                               connectorProxy.setConnector(storageConnector, 
key);
+                               storageConnectorMap.put(key, storageConnector);
+                       }
+                       return connectorProxy;
+               } catch (Exception e) {
+                       throw new RuntimeException(e.getMessage(), e);
+               }
+       }
+
+       @Override
+       public boolean isStarted() {
+               return true;
+       }
+
+       @Override
+       public boolean isClosed() {
+               return false;
+       }
+
+       @Override
+       public void start() {
+       }
+
+       @Override
+       public void shutdown() {
+               storageConnectorMap.values().forEach(value -> value.shutdown());
+       }
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConsumer.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConsumer.java
index 49c7fcde1..616a84ca1 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConsumer.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConsumer.java
@@ -18,60 +18,94 @@ package org.apache.eventmesh.api.connector.storage;
 
 import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.api.EventListener;
+import org.apache.eventmesh.api.connector.storage.data.PullRequest;
+import org.apache.eventmesh.api.connector.storage.pull.PullCallbackImpl;
 import org.apache.eventmesh.api.consumer.Consumer;
+import org.apache.eventmesh.common.Constants;
 
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Properties;
+import java.util.Set;
 
 import io.cloudevents.CloudEvent;
 
 public class StorageConsumer implements Consumer {
 
-    private StorageConnector storageOperation;
+       private StorageConnector storageOperation;
 
-    @Override
-    public boolean isStarted() {
-        return storageOperation.isStarted();
-    }
+       private EventListener listener;
 
-    @Override
-    public boolean isClosed() {
-        return storageOperation.isClosed();
-    }
+       private Properties keyValue;
 
-    @Override
-    public void start() {
-        storageOperation.start();
-    }
+       private Set<String> topicSet = new HashSet<>();
 
-    @Override
-    public void shutdown() {
-        storageOperation.shutdown();
-    }
+       @Override
+       public boolean isStarted() {
+               return storageOperation.isStarted();
+       }
 
-    @Override
-    public void init(Properties keyValue) throws Exception {
-        storageOperation.init(keyValue);
-    }
+       @Override
+       public boolean isClosed() {
+               return storageOperation.isClosed();
+       }
 
-    @Override
-    public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext 
context) {
-        storageOperation.updateOffset(cloudEvents, context);
-    }
+       @Override
+       public void start() {
+               if (Objects.isNull(listener)) {
 
-    @Override
-    public void subscribe(String topic) throws Exception {
+               }
 
-    }
+               if (topicSet.isEmpty()) {
 
-    @Override
-    public void unsubscribe(String topic) {
+               }
 
-    }
+               StorageConnectorService storageConnectorService = 
StorageConnectorService.getInstance();
+               List<PullRequest> pullRequestList = new 
ArrayList<PullRequest>(topicSet.size());
+               for (String topic : topicSet) {
+                       PullRequest pullRequest = new PullRequest();
+                       pullRequest.setTopicName(topic);
+                       
pullRequest.setConsumerGroupName(keyValue.getProperty(Constants.CONSUMER_GROUP));
+                       PullCallbackImpl pullCallback = new PullCallbackImpl();
+                       pullCallback.setEventListener(listener);
+                       
pullCallback.setExecutor(storageConnectorService.getExecutor());
+                       pullRequest.setPullCallback(pullCallback);
+                       pullRequestList.add(pullRequest);
+               }
+               storageOperation = 
storageConnectorService.createConsumerByStorageConnector(this.keyValue, 
pullRequestList);
 
-    @Override
-    public void registerEventListener(EventListener listener) {
+       }
 
-    }
+       @Override
+       public void shutdown() {
+               storageOperation.shutdown();
+       }
+
+       @Override
+       public void init(Properties keyValue) throws Exception {
+               this.keyValue = keyValue;
+       }
+
+       @Override
+       public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext 
context) {
+               storageOperation.updateOffset(cloudEvents, context);
+       }
+
+       @Override
+       public void subscribe(String topic) throws Exception {
+               topicSet.add(topic);
+       }
+
+       @Override
+       public void unsubscribe(String topic) {
+               topicSet.remove(topic);
+       }
+
+       @Override
+       public void registerEventListener(EventListener listener) {
+               this.listener = listener;
+       }
 
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageProducer.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageProducer.java
index e43be785f..12ab9ea15 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageProducer.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageProducer.java
@@ -27,6 +27,8 @@ import io.cloudevents.CloudEvent;
 public class StorageProducer implements Producer {
 
     private StorageConnector storageOperation;
+    
+    private Properties keyValue;
 
     @Override
     public boolean isStarted() {
@@ -40,7 +42,8 @@ public class StorageProducer implements Producer {
 
     @Override
     public void start() {
-        storageOperation.start();
+       StorageConnectorService storageConnectorService = 
StorageConnectorService.getInstance();
+       storageOperation = 
storageConnectorService.createProducerByStorageConnector(this.keyValue);
     }
 
     @Override
@@ -50,7 +53,7 @@ public class StorageProducer implements Producer {
 
     @Override
     public void init(Properties keyValue) throws Exception {
-        storageOperation.init(keyValue);
+        this.keyValue = keyValue;
 
     }
 
@@ -76,7 +79,7 @@ public class StorageProducer implements Producer {
 
     @Override
     public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) 
throws Exception {
-        return false;
+        return storageOperation.reply(cloudEvent, sendCallback);
     }
 
     @Override
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java
index 48e712e6a..9e10147c9 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.api.connector.storage.data;
 
+import org.apache.eventmesh.api.EventListener;
 import org.apache.eventmesh.api.connector.storage.StorageConnector;
 import org.apache.eventmesh.api.connector.storage.pull.PullCallback;
 
@@ -33,48 +34,48 @@ import lombok.Data;
 @Data
 public class PullRequest {
 
-    private static final AtomicLong INCREASING_ID = new AtomicLong();
+       private static final AtomicLong INCREASING_ID = new AtomicLong();
 
-    private long id = INCREASING_ID.incrementAndGet();
+       private long id = INCREASING_ID.incrementAndGet();
 
-    private String topicName;
+       private String topicName;
 
-    private String consumerGroupName;
+       private String consumerGroupName;
 
-    private String nextId;
+       private String nextId = "0";
 
-    private String processSign;
+       private String processSign;
 
-    private StorageConnector storageConnector;
+       private StorageConnector storageConnector;
 
-    private AtomicBoolean isEliminate = new AtomicBoolean(true);
+       private AtomicBoolean isEliminate = new AtomicBoolean(true);
 
-    private AtomicInteger stock = new AtomicInteger();
+       private AtomicInteger stock = new AtomicInteger();
 
-    private PullCallback pullCallback;
+       private PullCallback pullCallback;
 
-    private List<PullRequest> pullRequests;
+       private List<PullRequest> pullRequests;
 
-    private Map<String, PullRequest> topicAndPullRequests;
+       private Map<String, PullRequest> topicAndPullRequests;
 
-    public synchronized void setPullRequests(List<PullRequest> pullRequests) {
-        this.pullRequests = pullRequests;
-        this.topicAndPullRequests = null;
-    }
+       public synchronized void setPullRequests(List<PullRequest> 
pullRequests) {
+               this.pullRequests = pullRequests;
+               this.topicAndPullRequests = null;
+       }
 
-    public List<PullRequest> getPullRequests() {
-        List<PullRequest> pullRequests = this.pullRequests;
-        return pullRequests;
-    }
+       public List<PullRequest> getPullRequests() {
+               List<PullRequest> pullRequests = this.pullRequests;
+               return pullRequests;
+       }
 
-    public Map<String, PullRequest> getTopicAndPullRequests() {
-        if (Objects.isNull(this.topicAndPullRequests)) {
-            Map<String, PullRequest> map = new HashMap<>();
-            for (PullRequest pullRequest : pullRequests) {
-                map.put(pullRequest.getTopicName(), pullRequest);
-            }
-            this.topicAndPullRequests = map;
-        }
-        return this.topicAndPullRequests;
-    }
+       public Map<String, PullRequest> getTopicAndPullRequests() {
+               if (Objects.isNull(this.topicAndPullRequests)) {
+                       Map<String, PullRequest> map = new HashMap<>();
+                       for (PullRequest pullRequest : pullRequests) {
+                               map.put(pullRequest.getTopicName(), 
pullRequest);
+                       }
+                       this.topicAndPullRequests = map;
+               }
+               return this.topicAndPullRequests;
+       }
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/TopicInfo.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/TopicInfo.java
index 310af9f98..098c43118 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/TopicInfo.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/TopicInfo.java
@@ -28,6 +28,8 @@ import lombok.Data;
 public class TopicInfo {
 
     private String topicName;
+    
+    private String dbTablesName;
 
     private int writeQueueNums;
 
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java
index c11c346ff..601eba94a 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java
@@ -19,20 +19,37 @@ package org.apache.eventmesh.api.connector.storage.metadata;
 
 import org.apache.eventmesh.api.connector.storage.StorageConnector;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import lombok.Setter;
 
 public class RouteHandler {
 
     @Setter
-    List<StorageConnector> storageConnector;
+    List<StorageConnector> storageConnector = new ArrayList<>();
 
+    private RouteSelect souteSelect = new PollRouteSelect();
 
-    private RouteSelect souteSelect;
-
+    public void addStorageConnector(StorageConnector storageConnector) {
+       this.storageConnector.add(storageConnector);
+    }
 
     public StorageConnector select() {
         return souteSelect.select(storageConnector);
     }
+    
+    
+    public class PollRouteSelect implements RouteSelect{
+
+       private AtomicLong index = new AtomicLong();
+       
+               @Override
+               public StorageConnector select(List<StorageConnector> 
storageConnector) {
+                       int value = 
(int)(index.getAndIncrement()%storageConnector.size());
+                       return storageConnector.get(value);
+               }
+       
+    }
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java
index faaa2f648..2d6d26a01 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.api.connector.storage.metadata;
 
 import org.apache.eventmesh.api.connector.storage.StorageConnector;
 import org.apache.eventmesh.api.connector.storage.StorageConnectorMetedata;
+import org.apache.eventmesh.api.connector.storage.StorageConnectorProxy;
 import org.apache.eventmesh.api.connector.storage.data.ConsumerGroupInfo;
 import org.apache.eventmesh.api.connector.storage.data.Metadata;
 import org.apache.eventmesh.api.connector.storage.data.PullRequest;
@@ -31,10 +32,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public class StorageMetaServcie {
 
     protected static final Logger messageLogger = 
LoggerFactory.getLogger("message");
 
-    private static final String PROCESS_SIGN = 
Long.toString(System.currentTimeMillis());
+    private static final String PROCESS_SIGN = UUID.randomUUID().toString();
 
     @Setter
     private ScheduledExecutorService scheduledExecutor;
@@ -59,23 +60,42 @@ public class StorageMetaServcie {
     private Map<StorageConnectorMetedata, Metadata> metaDataMap = new 
ConcurrentHashMap<>();
 
     public void init() {
-        scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+        /*scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
                 StorageMetaServcie.this.pullMeteData();
             }
-        }, 5, 1000, TimeUnit.MILLISECONDS);
+        }, 5, 1000, TimeUnit.MILLISECONDS);*/
     }
 
-    public void registerStorageConnector(StorageConnectorMetedata 
storageConnector) {
-        metaDataMap.put(storageConnector, new Metadata());
+    public void registerStorageConnector(Object storageConnector) {
+       if(storageConnector instanceof StorageConnectorProxy) {
+               StorageConnectorProxy storageConnectorProxy = 
(StorageConnectorProxy)storageConnector;
+               for(StorageConnector connector : 
storageConnectorProxy.getStorageConnectorList()) {
+                       if(connector instanceof StorageConnectorMetedata) {
+                               
metaDataMap.put((StorageConnectorMetedata)connector, new Metadata());
+                       }
+               }
+       }else {
+               if(storageConnector instanceof StorageConnectorMetedata) {
+                       
metaDataMap.put((StorageConnectorMetedata)storageConnector, new Metadata());
+                       }
+       }
+        
     }
 
     public void registerPullRequest(List<PullRequest> pullRequests, 
StorageConnector storageConnector) {
         executor.execute(new Runnable() {
             @Override
             public void run() {
-                StorageMetaServcie.this.doRegisterPullRequest(pullRequests, 
storageConnector);
+               if(storageConnector instanceof StorageConnectorProxy) {
+                       StorageConnectorProxy storageConnectorProxy = 
(StorageConnectorProxy)storageConnector;
+                       for(StorageConnector connector : 
storageConnectorProxy.getStorageConnectorList()) {
+                               
StorageMetaServcie.this.doRegisterPullRequest(pullRequests, connector);
+                       }
+               }else {
+                       
StorageMetaServcie.this.doRegisterPullRequest(pullRequests, storageConnector);
+               }
             }
 
         });
@@ -83,6 +103,11 @@ public class StorageMetaServcie {
 
     public void doRegisterPullRequest(List<PullRequest> pullRequests, 
StorageConnector storageConnector) {
         try {
+               if(Objects.isNull(pullRequests) || pullRequests.isEmpty()) {
+                       //TODO
+                       return;
+               }
+               
             StorageConnectorMetedata storageConnectorMetedata = null;
             if (storageConnector instanceof StorageConnectorMetedata) {
                 storageConnectorMetedata = (StorageConnectorMetedata) 
storageConnector;
@@ -92,29 +117,31 @@ public class StorageMetaServcie {
             Set<String> topicSet = new HashSet<>();
             Map<String, TopicInfo> topicInfoMap = new HashMap<>();
             if (Objects.nonNull(storageConnectorMetedata)) {
-                List<ConsumerGroupInfo> consumerGroupInfos = 
storageConnectorMetedata.getConsumerGroupInfo();
-                consumerGroupInfos.forEach(value -> 
consumerGroupInfoMap.put(value.getConsumerGroupName(), value));
+                //List<ConsumerGroupInfo> consumerGroupInfos = 
storageConnectorMetedata.getConsumerGroupInfo();
+                //consumerGroupInfos.forEach(value -> 
consumerGroupInfoMap.put(value.getConsumerGroupName(), value));
                 topicSet = storageConnectorMetedata.getTopic();
-                storageConnectorMetedata.geTopicInfos(pullRequests)
+                
storageConnectorMetedata.geTopicInfos(topicSet,pullRequests.get(0).getConsumerGroupName())
                     .forEach(value -> topicInfoMap.put(value.getTopicName(), 
value));
             }
             for (PullRequest pullRequest : pullRequests) {
                 if (Objects.nonNull(storageConnectorMetedata) && 
!topicSet.contains(pullRequest.getTopicName())) {
-                    try {
-                        if (!topicSet.contains(pullRequest.getTopicName())) {
+                        String  topic = pullRequest.getTopicName();
+                               if (!topicSet.contains(topic)) {
                             TopicInfo topicInfo = new TopicInfo();
+                            topicInfo.setTopicName(pullRequest.getTopicName());
                             storageConnectorMetedata.createTopic(topicInfo);
+                        }else {
+                                TopicInfo topicInfo = topicInfoMap.get(topic);
+                                if(Objects.nonNull(topicInfo)) {
+                                        
pullRequest.setNextId(Long.toString(topicInfo.getCurrentId()));
+                                }
                         }
-                        if 
(!consumerGroupInfoMap.containsKey(pullRequest.getConsumerGroupName())) {
+                        if 
(consumerGroupInfoMap.containsKey(pullRequest.getConsumerGroupName())) {
                             ConsumerGroupInfo consumerGroupInfo = new 
ConsumerGroupInfo();
-                            
storageConnectorMetedata.createConsumerGroupInfo(consumerGroupInfo);
+                            
consumerGroupInfo.setConsumerGroupName(pullRequest.getConsumerGroupName());
+                            
//storageConnectorMetedata.createConsumerGroupInfo(consumerGroupInfo);
                         }
-                        TopicInfo topicInfo = 
topicInfoMap.get(pullRequest.getTopicName());
-                        
pullRequest.setNextId(Long.toString(topicInfo.getCurrentId()));
-                    } catch (Exception e) {
-
-                    }
-
+                       
                 }
                 pullRequest.setProcessSign(PROCESS_SIGN);
                 pullRequest.setStorageConnector(storageConnector);
@@ -125,6 +152,7 @@ public class StorageMetaServcie {
         }
     }
 
+    
     public void pullMeteData() {
         for (StorageConnectorMetedata storageConnectorMetedata : 
metaDataMap.keySet()) {
             executor.execute(new Runnable() {
@@ -148,7 +176,11 @@ public class StorageMetaServcie {
 
     public boolean isTopic(StorageConnector storageConnector, String topic) {
         if (storageConnector instanceof StorageConnectorMetedata) {
-            return metaDataMap.get((StorageConnectorMetedata) 
storageConnector).getTopicSet().contains(topic);
+               Metadata metadata = metaDataMap.get((StorageConnectorMetedata) 
storageConnector);
+               if(Objects.isNull(metadata) || 
metadata.getTopicSet().isEmpty()) {
+                       return false;
+               }
+            return 
metadata.getTopicSet().contains(storageConnector.getTopic(topic));
         }
         return true;
 
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallbackImpl.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallbackImpl.java
index 4d668c83d..ab0592284 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallbackImpl.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallbackImpl.java
@@ -28,7 +28,9 @@ import java.util.concurrent.Executor;
 import io.cloudevents.CloudEvent;
 
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 
+@Slf4j(topic = "message")
 public class PullCallbackImpl implements PullCallback {
 
     @Setter
@@ -55,7 +57,7 @@ public class PullCallbackImpl implements PullCallback {
                             eventListener.consume(cloudEvent, context);
                             pullRequest.getStock().decrementAndGet();
                         } catch (Exception e) {
-                                                       e.printStackTrace();
+                                                       
log.error(e.getMessage(),e);
                         }
                     }
                 });
@@ -63,7 +65,7 @@ public class PullCallbackImpl implements PullCallback {
             }
 
         } catch (Exception e) {
-                       e.printStackTrace();
+               log.error(e.getMessage(),e);
         }
     }
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullService.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullService.java
index a3ada5f38..df906daee 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullService.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullService.java
@@ -45,6 +45,7 @@ public class StoragePullService implements Runnable {
 
     private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new 
LinkedBlockingQueue<PullRequest>();
 
+    @Setter
     private ScheduledExecutorService scheduledExecutor;
 
     @Setter
@@ -54,7 +55,7 @@ public class StoragePullService implements Runnable {
     private Executor executor;
 
     private boolean isStopped() {
-        return true;
+        return false;
     }
 
     public void executePullRequestLater(final PullRequest pullRequest, final 
long timeDelay) {
@@ -86,7 +87,7 @@ public class StoragePullService implements Runnable {
         while (!this.isStopped()) {
             try {
                 final PullRequest pullRequest = this.pullRequestQueue.take();
-                if (pullRequest.getStock().get() < 
this.storageConfig.getPullThresholdForQueue()) {
+                if (pullRequest.getStock().get() > 
this.storageConfig.getPullThresholdForQueue()) {
                     this.executePullRequestLater(pullRequest, 
storageConfig.getPullInterval());
                     continue;
                 }
@@ -109,10 +110,10 @@ public class StoragePullService implements Runnable {
         try {
             List<CloudEvent> cloudEventList = 
pullRequest.getStorageConnector().pull(pullRequest);
             if (Objects.isNull(cloudEventList) || cloudEventList.isEmpty()) {
-                logger.info("");
+                logger.info("pull resquest get data is null , 
consumerGroupName is {} , topicName is 
{}",pullRequest.getConsumerGroupName(),pullRequest.getTopicName());
                 return;
             }
-            if (Objects.nonNull(pullRequest.getPullRequests())) {
+            if (Objects.isNull(pullRequest.getPullRequests())) {
                 this.setNextId(pullRequest, cloudEventList);
                 pullRequest.getPullCallback().onSuccess(pullRequest, 
cloudEventList);
 
@@ -135,7 +136,7 @@ public class StoragePullService implements Runnable {
                 }
             }
         } catch (Exception e) {
-            logger.error("pull", e);
+            logger.error(e.getMessage(), e);
         } finally {
             this.executePullRequestLater(pullRequest, 
storageConfig.getPullInterval());
         }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java
index 1d93ed4f0..8c37e0c18 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.api.connector.storage.reply;
 
+import org.apache.eventmesh.api.connector.storage.CloudEventUtils;
 import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo;
 
 import java.util.ArrayList;
@@ -30,101 +31,120 @@ import java.util.concurrent.Executor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.cloudevents.CloudEvent;
 import lombok.Setter;
 
+/**
+ * 
+ * @author laohu
+ *
+ */
 public class ReplyOperationService {
 
-    protected static final Logger messageLogger = 
LoggerFactory.getLogger("message");
-
-    @Setter
-    private Executor executor;
-
-    protected Map<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> 
replyOperationMap = new ConcurrentHashMap<>();
-
-    public void setRequestReplyInfo(ReplyOperation replyOperation, String 
topic, Long id,
-                                    RequestReplyInfo requestReplyInfo) {
-        Map<String, Map<Long, RequestReplyInfo>> replyMap = 
replyOperationMap.get(replyOperation);
-        if (Objects.isNull(replyMap)) {
-            replyMap = replyOperationMap.computeIfAbsent(replyOperation, k -> 
new ConcurrentHashMap<>());
-        }
-        Map<Long, RequestReplyInfo> requestReplyInfoMap = replyMap.get(topic);
-        if (Objects.isNull(requestReplyInfoMap)) {
-            requestReplyInfoMap = replyMap.computeIfAbsent(topic, k -> new 
ConcurrentHashMap<>());
-        }
-        requestReplyInfoMap.put(id, requestReplyInfo);
-    }
-
-    public void reply(ReplyOperation replyOperation, Map<String, Map<Long, 
RequestReplyInfo>> replyMap) {
-        if (replyMap.isEmpty()) {
-            return;
-        }
-        long time = System.currentTimeMillis();
-        List<ReplyRequest> replyRequestList = new ArrayList<>();
-        for (Entry<String, Map<Long, RequestReplyInfo>> entry : 
replyMap.entrySet()) {
-            if (entry.getValue().isEmpty()) {
-                continue;
-            }
-            ReplyRequest replyRequest = new ReplyRequest();
-            List<Long> list = new ArrayList<>();
-            for (Entry<Long, RequestReplyInfo> entry2 : 
entry.getValue().entrySet()) {
-                if (entry2.getValue().getTimeOut() > time) {
-                    list.add(entry2.getKey());
-                } else {
-                    replyMap.remove(entry.getKey());
-                    messageLogger.warn("");
-                    RuntimeException runtimeException = new RuntimeException();
-                    
entry2.getValue().getRequestReplyCallback().onException(runtimeException);
-                }
-            }
-            replyRequest.setTopic(entry.getKey());
-            replyRequest.setIdList(list);
-            replyRequestList.add(replyRequest);
-        }
-        if (replyRequestList.isEmpty()) {
-            messageLogger.info("");
-            return;
-        }
-        try {
-            List<CloudEventInfo> cloudEventList = 
replyOperation.queryReplyCloudEvent(replyRequestList);
-            if (cloudEventList.isEmpty()) {
-                messageLogger.warn("");
-            }
-            for (CloudEventInfo cloudEventInfo : cloudEventList) {
-                RequestReplyInfo replyInfo = null;
-                try {
-                    cloudEventInfo.getCloudEventInfoId();
-
-                    replyInfo = 
replyMap.get(cloudEventInfo.getCloudEventTopic()).remove(Long.valueOf(cloudEventInfo.getCloudEventInfoId()));
-                    if (Objects.isNull(replyInfo)) {
-                        continue;
-                    }
-                    cloudEventInfo.getCloudEventReplyData();
-                    replyInfo.getRequestReplyCallback().onSuccess(null);
-                } catch (Exception e) {
-                    if (Objects.nonNull(replyInfo)) {
-                        replyInfo.getRequestReplyCallback().onException(e);
-                    }
-                    messageLogger.error(e.getMessage(), e);
-                }
-            }
-        } catch (Exception e) {
-            messageLogger.error(e.getMessage(), e);
-        }
-    }
-
-    public void execute() {
-        if (replyOperationMap.isEmpty()) {
-            return;
-        }
-        for (Entry<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> 
entry : replyOperationMap.entrySet()) {
-            executor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    reply(entry.getKey(), entry.getValue());
-                }
-            });
-
-        }
-
-    }
+       protected static final Logger messageLogger = 
LoggerFactory.getLogger("message");
+
+       @Setter
+       private Executor executor;
+
+       protected Map<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> 
replyOperationMap = new ConcurrentHashMap<>();
+
+       public void setRequestReplyInfo(ReplyOperation replyOperation, String 
topic, Long id,
+                       RequestReplyInfo requestReplyInfo) {
+               Map<String, Map<Long, RequestReplyInfo>> replyMap = 
replyOperationMap.get(replyOperation);
+               if (Objects.isNull(replyMap)) {
+                       replyMap = 
replyOperationMap.computeIfAbsent(replyOperation, k -> new 
ConcurrentHashMap<>());
+               }
+               Map<Long, RequestReplyInfo> requestReplyInfoMap = 
replyMap.get(topic);
+               if (Objects.isNull(requestReplyInfoMap)) {
+                       requestReplyInfoMap = replyMap.computeIfAbsent(topic, k 
-> new ConcurrentHashMap<>());
+               }
+               requestReplyInfoMap.put(id, requestReplyInfo);
+       }
+
+       public List<ReplyRequest> checkReply(Map<String, Map<Long, 
RequestReplyInfo>> replyMap) {
+               long time = System.currentTimeMillis();
+               List<ReplyRequest> replyRequestList = new ArrayList<>();
+               for (Entry<String, Map<Long, RequestReplyInfo>> entry : 
replyMap.entrySet()) {
+                       if (entry.getValue().isEmpty()) {
+                               continue;
+                       }
+                       ReplyRequest replyRequest = new ReplyRequest();
+                       List<Long> list = new ArrayList<>();
+                       for (Entry<Long, RequestReplyInfo> entry2 : 
entry.getValue().entrySet()) {
+                               if (entry2.getValue().getTimeOut() > time) {
+                                       list.add(entry2.getKey());
+                               } else {
+                                       
entry.getValue().remove(entry2.getKey());
+                                       messageLogger.warn("");
+                                       RuntimeException runtimeException = new 
RuntimeException();
+                                       
entry2.getValue().getRequestReplyCallback().onException(runtimeException);
+                               }
+                       }
+                       if (!list.isEmpty()) {
+                               replyRequest.setTopic(entry.getKey());
+                               replyRequest.setIdList(list);
+                               replyRequestList.add(replyRequest);
+                       }
+               }
+               return replyRequestList;
+       }
+
+       public void callback(List<CloudEventInfo> cloudEventList, Map<String, 
Map<Long, RequestReplyInfo>> replyMap) {
+               for (CloudEventInfo cloudEventInfo : cloudEventList) {
+                       RequestReplyInfo replyInfo = null;
+                       try {
+                               replyInfo = 
replyMap.get(cloudEventInfo.getCloudEventTopic())
+                                               
.remove(Long.valueOf(cloudEventInfo.getCloudEventInfoId()));
+                               if (Objects.isNull(replyInfo)) {
+                                       continue;
+                               }
+                               CloudEvent cloudEvent = 
CloudEventUtils.eventFormat
+                                               
.deserialize(cloudEventInfo.getCloudEventReplyData().getBytes("UTF-8"));
+                               
replyInfo.getRequestReplyCallback().onSuccess(cloudEvent);
+                       } catch (Exception e) {
+                               if (Objects.nonNull(replyInfo)) {
+                                       
replyInfo.getRequestReplyCallback().onException(e);
+                               }
+                               messageLogger.error(e.getMessage(), e);
+                       }
+               }
+       }
+
+       public void reply(ReplyOperation replyOperation, Map<String, Map<Long, 
RequestReplyInfo>> replyMap) {
+
+               List<ReplyRequest> replyRequestList = this.checkReply(replyMap);
+               if (replyRequestList.isEmpty()) {
+                       messageLogger.info("");
+                       return;
+               }
+               try {
+                       List<CloudEventInfo> cloudEventList = 
replyOperation.queryReplyCloudEvent(replyRequestList);
+                       if (cloudEventList.isEmpty()) {
+                               messageLogger.warn("");
+                               return;
+                       }
+                       this.callback(cloudEventList, replyMap);
+               } catch (Exception e) {
+                       messageLogger.error(e.getMessage(), e);
+               }
+       }
+
+       public void execute() {
+               if (replyOperationMap.isEmpty()) {
+                       return;
+               }
+               for (Entry<ReplyOperation, Map<String, Map<Long, 
RequestReplyInfo>>> entry : replyOperationMap.entrySet()) {
+                       if (entry.getValue().isEmpty()) {
+                               continue;
+                       }
+                       executor.execute(new Runnable() {
+                               @Override
+                               public void run() {
+                                       reply(entry.getKey(), entry.getValue());
+                               }
+                       });
+
+               }
+
+       }
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.connector.ConnectorResourceService
similarity index 90%
copy from 
eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
copy to 
eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.connector.ConnectorResourceService
index 04d1a953b..ad95b7c4d 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.connector.ConnectorResourceService
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-jdbc=JDBCStorageConnector
\ No newline at end of file
+storage=org.apache.eventmesh.api.connector.storage.ConnectorResourceServiceStorageImpl
\ No newline at end of file
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer
similarity index 92%
copy from 
eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
copy to 
eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer
index 04d1a953b..a699a16a8 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-jdbc=JDBCStorageConnector
\ No newline at end of file
+storage=org.apache.eventmesh.api.connector.storage.StorageConsumer
\ No newline at end of file
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.Producer
similarity index 92%
copy from 
eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
copy to 
eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.Producer
index 04d1a953b..ab8d6dc66 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.Producer
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-jdbc=JDBCStorageConnector
\ No newline at end of file
+storage=org.apache.eventmesh.api.connector.storage.StorageProducer
\ No newline at end of file
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxyTest.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxyTest.java
new file mode 100644
index 000000000..c235d95d1
--- /dev/null
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxyTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eventmesh.api.connector.storage;
+
+import org.apache.eventmesh.api.RequestReplyCallback;
+import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.api.connector.storage.metadata.RouteHandler;
+import org.apache.eventmesh.api.connector.storage.metadata.StorageMetaServcie;
+import org.apache.eventmesh.api.connector.storage.reply.ReplyOperation;
+import org.apache.eventmesh.api.connector.storage.reply.ReplyOperationService;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+public class StorageConnectorProxyTest {
+
+       private StorageConnectorProxy storageConnectorProxy = new 
StorageConnectorProxy();
+
+       private RouteHandler routeHandler = new RouteHandler();
+
+       private StorageConnector one = 
Mockito.mock(StorageConnectorMetedataAndConnector.class);
+
+       private StorageConnector two = 
Mockito.mock(StorageConnectorMetedataAndConnector.class);
+
+       private String key = "127.0.0.1";
+
+       private Executor executor;
+
+       private StorageMetaServcie storageMetaServcie = 
Mockito.mock(StorageMetaServcie.class);
+
+       private ReplyOperationService replyOperationService = 
Mockito.mock(ReplyOperationService.class);
+       
+       CloudEvent cloudEvent;
+
+       @Before
+       public void init() throws URISyntaxException {
+               this.executor = new 
ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 10,
+                               Runtime.getRuntime().availableProcessors() * 
300, 1000 * 60 * 60, TimeUnit.SECONDS,
+                               new LinkedBlockingQueue<>(), new 
ThreadFactory() {
+                                       AtomicInteger index = new 
AtomicInteger();
+
+                                       @Override
+                                       public Thread newThread(Runnable r) {
+                                               return new Thread(r, 
"storage-connent-" + index.getAndIncrement());
+                                       }
+                               });
+               storageConnectorProxy.setExecutor(executor);
+               storageConnectorProxy.setStorageMetaServcie(storageMetaServcie);
+               storageConnectorProxy.setReplyService(replyOperationService);
+               cloudEvent = 
CloudEventBuilder.v03().withExtension("cloudeventid", 
"1").withId("1").withType("1231")
+               .withSource(new 
URI("")).withSubject("1").withExtension(Constant.STORAGE_ID, 
"1").withData("1".getBytes()).build();
+       }
+
+       @Test
+       public void test_routehandler() {
+               routeHandler.addStorageConnector(one);
+               routeHandler.addStorageConnector(two);
+               for (int i = 0; i < 100; i++) {
+                       StorageConnector select = routeHandler.select();
+                       StorageConnector check = i % 2 == 0 ? one : two;
+                       Assert.assertEquals(select, check);
+               }
+       }
+
+       @Test
+       public void tst_setConnector() {
+               storageConnectorProxy.setConnector(one, key);
+       }
+
+       @Test
+       public void test_publish() throws Exception {
+               storageConnectorProxy.setConnector(one, key);
+
+               Mockito.when(storageMetaServcie.isTopic(Mockito.any(), 
Mockito.anyString())).thenReturn(false).thenReturn(true).thenThrow(RuntimeException.class);
+               SendCallback sendCallback = Mockito.mock(SendCallback.class);
+               storageConnectorProxy.publish(cloudEvent, sendCallback);
+               storageConnectorProxy.publish(cloudEvent, sendCallback);
+               storageConnectorProxy.publish(cloudEvent, sendCallback);
+               Thread.sleep(10);
+               Mockito.verify(storageMetaServcie, 
Mockito.times(3)).isTopic(Mockito.any(), Mockito.any());
+               Mockito.verify(sendCallback, 
Mockito.atLeastOnce()).onException(Mockito.any());
+       }
+
+       @Test
+       public void test_request() throws Exception {
+               storageConnectorProxy.setConnector(one, key);
+               RequestReplyCallback requestReplyCallback = 
Mockito.mock(RequestReplyCallback.class);
+               storageConnectorProxy.request(cloudEvent, requestReplyCallback, 
1000);
+               Thread.sleep(10);
+       }
+       
+       @Test
+       public void test() throws UnsupportedEncodingException {
+               String ddd = URLEncoder.encode("EventMeshTest/consumerGroup-_");
+               System.out.println(ddd);
+       }
+       
+       public interface StorageConnectorMetedataAndConnector extends 
StorageConnectorMetedata,StorageConnector,ReplyOperation{
+               
+       }
+}
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullServiceTest.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullServiceTest.java
new file mode 100644
index 000000000..02428dd1f
--- /dev/null
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullServiceTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eventmesh.api.connector.storage.pull;
+
+import org.apache.eventmesh.api.connector.storage.StorageConfig;
+import org.apache.eventmesh.api.connector.storage.StorageConnector;
+import org.apache.eventmesh.api.connector.storage.data.PullRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import io.cloudevents.CloudEvent;
+
+public class StoragePullServiceTest {
+
+       private StorageConfig storageConfig;
+
+       private Executor executor;
+
+       private ScheduledExecutorService scheduledExecutor;
+
+       private StoragePullService storagePullService = new 
StoragePullService();
+
+       private PullRequest pullRequest = new PullRequest();
+
+       @Before
+       public void init() throws Exception {
+               this.executor = new 
ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 10,
+                               Runtime.getRuntime().availableProcessors() * 
300, 1000 * 60 * 60, TimeUnit.SECONDS,
+                               new LinkedBlockingQueue<>(), new 
ThreadFactory() {
+                                       AtomicInteger index = new 
AtomicInteger();
+
+                                       @Override
+                                       public Thread newThread(Runnable r) {
+                                               return new Thread(r, 
"storage-connent-" + index.getAndIncrement());
+                                       }
+                               });
+               this.scheduledExecutor = 
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 
10,
+                               new ThreadFactory() {
+                                       AtomicInteger index = new 
AtomicInteger();
+
+                                       @Override
+                                       public Thread newThread(Runnable r) {
+                                               return new Thread(r, 
"storage-connent-shceduled-" + index.getAndIncrement());
+                                       }
+                               });
+               storageConfig = new StorageConfig();
+               storagePullService.setStorageConfig(storageConfig);
+               storagePullService.setExecutor(executor);
+               storagePullService.setScheduledExecutor(scheduledExecutor);
+
+               StorageConnector storageConnector = 
Mockito.mock(StorageConnector.class);
+               List<CloudEvent> cloudEventList = new ArrayList<>();
+               cloudEventList.add(Mockito.mock(CloudEvent.class));
+               
Mockito.when(storageConnector.pull(Mockito.any())).thenReturn(null).thenReturn(new
 ArrayList<>())
+                               .thenReturn(cloudEventList);
+               pullRequest.setStorageConnector(storageConnector);
+               pullRequest.setPullCallback(Mockito.mock(PullCallback.class));
+
+       }
+
+       @Test
+       public void test_run_thread() {
+               this.executor.execute(storagePullService);
+               
this.storagePullService.executePullRequestImmediately(pullRequest);
+               System.out.println(1);
+       }
+}
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationServiceTest.java
 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationServiceTest.java
new file mode 100644
index 000000000..87c0645b2
--- /dev/null
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationServiceTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eventmesh.api.connector.storage.reply;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.eventmesh.api.RequestReplyCallback;
+import org.apache.eventmesh.api.connector.storage.CloudEventUtils;
+import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+public class ReplyOperationServiceTest {
+
+       private ReplyOperationService replyOperationService = new 
ReplyOperationService();
+
+       private List<ReplyOperation> replyOperationList = new ArrayList<>();
+
+       private List<String> topicList = new ArrayList<>();
+
+       private ReplyOperation replyOperation = 
Mockito.mock(ReplyOperation.class);
+
+       private ReplyOperation twoReplyOperation = 
Mockito.mock(ReplyOperation.class);
+
+       private RequestReplyCallback requestReplyCallback = 
Mockito.mock(RequestReplyCallback.class);
+
+       private String topic1 = "topic1";
+
+       private String topic2 = "topic2";
+
+       private String topic3 = "topic3";
+
+       private Map<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> 
replyOperationMap;
+
+       @SuppressWarnings("unchecked")
+       @Before
+       public void init() throws IllegalArgumentException, 
IllegalAccessException {
+               replyOperationList.add(replyOperation);
+               replyOperationList.add(twoReplyOperation);
+               topicList.add(topic1);
+               topicList.add(topic2);
+               topicList.add(topic3);
+               Field field = FieldUtils.getField(ReplyOperationService.class, 
"replyOperationMap", true);
+               field.setAccessible(true);
+               replyOperationMap = (Map<ReplyOperation, Map<String, Map<Long, 
RequestReplyInfo>>>) field
+                               .get(replyOperationService);
+       }
+
+       @Test
+       public void setRequestReplyInfo_test() {
+               for (ReplyOperation replyOperation : replyOperationList) {
+                       for (String topic : topicList) {
+                               for (long i = 0; i < 20; i++) {
+                                       
replyOperationService.setRequestReplyInfo(replyOperation, topic, i, new 
RequestReplyInfo());
+                               }
+                       }
+               }
+               for (ReplyOperation replyOperation : replyOperationList) {
+                       Map<String, Map<Long, RequestReplyInfo>> 
requestReplyInfoMap = replyOperationMap.get(replyOperation);
+                       Assert.assertEquals(topicList.size(), 
requestReplyInfoMap.size());
+                       for (String topic : topicList) {
+                               Map<Long, RequestReplyInfo> map = 
requestReplyInfoMap.get(topic);
+                               Assert.assertEquals(20, map.size());
+                       }
+               }
+       }
+
+       private Map<String, Map<Long, RequestReplyInfo>> createReplyMap(boolean 
timeout) {
+               long time = System.currentTimeMillis();
+               Map<String, Map<Long, RequestReplyInfo>> replyMap = new 
ConcurrentHashMap<>();
+               for (String topic : topicList) {
+                       Map<Long, RequestReplyInfo> requestReplyInfoMap = new 
ConcurrentHashMap<>();
+                       replyMap.put(topic, requestReplyInfoMap);
+                       for (long i = 0; i < 10; i++) {
+                               RequestReplyInfo requestReplyInfo = new 
RequestReplyInfo();
+                               if (timeout) {
+                                       requestReplyInfo.setTimeOut(time - 
1000);
+                               } else {
+                                       requestReplyInfo.setTimeOut(time + 
4000);
+                               }
+                               
requestReplyInfo.setRequestReplyCallback(requestReplyCallback);
+                               requestReplyInfoMap.put(i, requestReplyInfo);
+                       }
+               }
+               return replyMap;
+       }
+
+       @Test
+       public void checkReply_timeout_result_null() {
+               List<ReplyRequest> replyRequests = 
replyOperationService.checkReply(this.createReplyMap(true));
+               Assert.assertTrue(replyRequests.isEmpty());
+               Mockito.verify(requestReplyCallback, 
Mockito.times(30)).onException(Mockito.any());
+       }
+
+       @Test
+       public void checkReply_timeout_result_ok() {
+               List<ReplyRequest> replyRequests = 
replyOperationService.checkReply(this.createReplyMap(false));
+               for (ReplyRequest replyRequest : replyRequests) {
+                       Assert.assertEquals(replyRequest.getIdList().size(), 
10);
+               }
+       }
+
+       @Test
+       public void callback_onSuccess() throws URISyntaxException {
+               CloudEvent cloudEvent = 
CloudEventBuilder.v03().withExtension("cloudeventid", 
"1").withId("1").withType("1231")
+                               .withSource(new 
URI("")).withSubject("1").withExtension("id", 
"1").withData("1".getBytes()).build();
+               String cloudEventData = new 
String(CloudEventUtils.eventFormat.serialize(cloudEvent), 
Charset.forName("UTF-8"));
+               List<CloudEventInfo> cloudEventList = new ArrayList<>();
+               for (String topic : topicList) {
+                       for (long i = 0; i < 10; i++) {
+                               CloudEventInfo cloudEventInfo = new 
CloudEventInfo();
+                               cloudEventInfo.setCloudEventTopic(topic);
+                               cloudEventInfo.setCloudEventInfoId(i);
+                               
cloudEventInfo.setCloudEventReplyData(cloudEventData);
+                               cloudEventList.add(cloudEventInfo);
+                       }
+               }
+               Map<String, Map<Long, RequestReplyInfo>> replyMap = 
this.createReplyMap(false);
+
+               replyOperationService.callback(cloudEventList, replyMap);
+       }
+}
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java
index 4e9194de2..2f658181e 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.connector.storage.jdbc;
 
+import org.apache.eventmesh.api.connector.storage.Constant;
 import org.apache.eventmesh.connector.storage.jdbc.SQL.BaseSQLOperation;
 import org.apache.eventmesh.connector.storage.jdbc.SQL.CloudEventSQLOperation;
 import 
org.apache.eventmesh.connector.storage.jdbc.SQL.ConsumerGroupSQLOperation;
@@ -38,105 +39,103 @@ import com.alibaba.druid.pool.DruidPooledConnection;
 
 public abstract class AbstractJDBCStorageConnector {
 
-    protected static final Logger messageLogger = 
LoggerFactory.getLogger("message");
-
-    protected DruidDataSource druidDataSource;
-
-    protected CloudEventSQLOperation cloudEventSQLOperation;
-
-    protected BaseSQLOperation baseSQLOperation;
-
-    protected ConsumerGroupSQLOperation consumerGroupSQLOperation;
-
-
-    public void init(Properties properties) throws Exception {
-        StorageSQLService storageSQLService = new StorageSQLService("");
-        this.cloudEventSQLOperation = storageSQLService.getObject();
-        this.baseSQLOperation = storageSQLService.getObject();
-        this.consumerGroupSQLOperation = storageSQLService.getObject();
-        this.initdatabases(properties);
-        this.createDataSource(properties);
-    }
-
-    protected void createDataSource(Properties properties) throws Exception {
-
-        druidDataSource = new DruidDataSource();
-        druidDataSource.setUrl(properties.getProperty("url"));
-        druidDataSource.setUsername(properties.getProperty("username"));
-        druidDataSource.setPassword(properties.getProperty("password"));
-        druidDataSource.setValidationQuery("select 1");
-        
druidDataSource.setMaxActive(Integer.parseInt(properties.getProperty("maxActive")));
-        
druidDataSource.setMaxWait(Integer.parseInt(properties.getProperty("maxWait")));
-        druidDataSource.init();
-    }
-
-    protected void initdatabases(Properties properties) throws SQLException {
-        //TODO
-        druidDataSource = new DruidDataSource();
-        druidDataSource.setUrl(properties.getProperty("url"));
-        druidDataSource.setUsername(properties.getProperty("username"));
-        druidDataSource.setPassword(properties.getProperty("password"));
-        druidDataSource.setValidationQuery("select 1");
-        
druidDataSource.setMaxActive(Integer.parseInt(properties.getProperty("maxActive")));
-        
druidDataSource.setMaxWait(Integer.parseInt(properties.getProperty("maxWait")));
-        druidDataSource.init();
-
-        List<String> tableName = 
this.query(this.baseSQLOperation.queryConsumerGroupTableSQL(), 
ResultSetTransformUtils::transformTableName);
-        if (Objects.isNull(tableName) || tableName.isEmpty()) {
-            // create databases
-            this.execute(this.baseSQLOperation.createDatabases(), null);
-            // create tables;
-            
this.execute(this.consumerGroupSQLOperation.createConsumerGroupSQL(), null);
-        }
-    }
-
-    protected long execute(String sql, List<Object> parameter) throws 
SQLException {
-        return this.execute(sql, parameter, false);
-    }
-
-    protected long execute(String sql, List<Object> parameter, boolean 
generatedKeys) throws SQLException {
-        try (DruidPooledConnection pooledConnection = 
druidDataSource.getConnection();
-             PreparedStatement preparedStatement = 
pooledConnection.prepareStatement(sql)) {
-            this.setObject(preparedStatement, parameter);
-            long value = preparedStatement.executeUpdate();
-            if (generatedKeys) {
-                try (ResultSet resulSet = 
preparedStatement.getGeneratedKeys()) {
-                    resulSet.next();
-                    value = resulSet.getLong(1);
-                }
-            }
-            return value;
-        }
-    }
-
-    protected <T> List<T> query(String sql, ResultSetTransform<T> 
resultSetTransform) throws SQLException {
-        return this.query(sql, null, resultSetTransform);
-    }
-
-    @SuppressWarnings("unchecked")
-    protected <T> List<T> query(String sql, List<?> parameter, 
ResultSetTransform<T> resultSetTransform)
-        throws SQLException {
-        try (DruidPooledConnection pooledConnection = 
druidDataSource.getConnection();
-             PreparedStatement preparedStatement = 
pooledConnection.prepareStatement(sql)) {
-            this.setObject(preparedStatement, parameter);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                List<Object> resultList = new ArrayList<>();
-                while (resultSet.next()) {
-                    Object object = resultSetTransform.transform(resultSet);
-                    resultList.add(object);
-                }
-                return (List<T>) resultList;
-            }
-        }
-    }
-
-    protected void setObject(PreparedStatement preparedStatement, List<?> 
parameter) throws SQLException {
-        if (Objects.isNull(parameter) || parameter.isEmpty()) {
-            return;
-        }
-        int index = 1;
-        for (Object object : parameter) {
-            preparedStatement.setObject(index++, object);
-        }
-    }
+       protected static final Logger messageLogger = 
LoggerFactory.getLogger("message");
+
+       protected DruidDataSource druidDataSource;
+
+       protected CloudEventSQLOperation cloudEventSQLOperation;
+
+       protected BaseSQLOperation baseSQLOperation;
+
+       protected ConsumerGroupSQLOperation consumerGroupSQLOperation;
+
+       public void init(Properties properties) throws Exception {
+               StorageSQLService storageSQLService = new 
StorageSQLService(properties.getProperty(Constant.STORAGE_CONFIG_JDBC_TYPE));
+               this.cloudEventSQLOperation = storageSQLService.getObject();
+               this.baseSQLOperation = storageSQLService.getObject();
+               this.consumerGroupSQLOperation = storageSQLService.getObject();
+               this.initdatabases(properties, "information_schema");
+               this.createDataSource(properties, "event_mesh_storage");
+       }
+
+       protected void createDataSource(Properties properties, String 
databases) throws Exception {
+               druidDataSource = new DruidDataSource();
+               druidDataSource.setUrl(this.createUrl(properties, databases));
+               
druidDataSource.setUsername(properties.getProperty(Constant.STORAGE_CONFIG_USER_NAME));
+               
druidDataSource.setPassword(properties.getProperty(Constant.STORAGE_CONFIG_PASSWORD));
+               druidDataSource.setValidationQuery("select 1");
+               
druidDataSource.setMaxActive(Integer.parseInt(properties.getProperty(Constant.STORAGE_CONFIG_JDBC_MAXACTIVE)));
+               
druidDataSource.setMaxWait(Integer.parseInt(properties.getProperty(Constant.STORAGE_CONFIG_JDBC_MAXWAIT)));
+               druidDataSource.init();
+       }
+
+       private String createUrl(Properties properties, String databases) {
+               StringBuffer stringBuffer = new StringBuffer("jdbc:");
+               
stringBuffer.append(properties.getProperty(Constant.STORAGE_CONFIG_JDBC_TYPE)).append("://")
+                               
.append(properties.get(Constant.STORAGE_NODE_ADDRESS)).append("/").append(databases).append("?")
+                               
.append(properties.get(Constant.STORAGE_CONFIG_JDBC_PARAMETER));
+               return stringBuffer.toString();
+       }
+
+       protected void initdatabases(Properties properties, String databases) 
throws Exception {
+               this.createDataSource(properties, databases);
+               List<String> tableName = 
this.query(this.baseSQLOperation.queryDataBases(),
+                               ResultSetTransformUtils::transformTableName);
+               if (Objects.isNull(tableName) || tableName.isEmpty()) {
+                       this.execute(this.baseSQLOperation.createDatabases(), 
null);
+               }
+               // create tables;
+               // 
this.execute(this.consumerGroupSQLOperation.createConsumerGroupSQL(), null);
+       }
+
+       protected long execute(String sql, List<Object> parameter) throws 
SQLException {
+               return this.execute(sql, parameter, false);
+       }
+
+       protected long execute(String sql, List<Object> parameter, boolean 
generatedKeys) throws SQLException {
+               try (DruidPooledConnection pooledConnection = 
druidDataSource.getConnection();
+                               PreparedStatement preparedStatement = 
pooledConnection.prepareStatement(sql,
+                                               
PreparedStatement.RETURN_GENERATED_KEYS)) {
+                       this.setObject(preparedStatement, parameter);
+                       long value = preparedStatement.executeUpdate();
+                       if (generatedKeys) {
+                               try (ResultSet resulSet = 
preparedStatement.getGeneratedKeys()) {
+                                       resulSet.next();
+                                       value = resulSet.getLong(1);
+                               }
+                       }
+                       return value;
+               }
+       }
+
+       protected <T> List<T> query(String sql, ResultSetTransform<T> 
resultSetTransform) throws SQLException {
+               return this.query(sql, null, resultSetTransform);
+       }
+
+       @SuppressWarnings("unchecked")
+       protected <T> List<T> query(String sql, List<?> parameter, 
ResultSetTransform<T> resultSetTransform)
+                       throws SQLException {
+               try (DruidPooledConnection pooledConnection = 
druidDataSource.getConnection();
+                               PreparedStatement preparedStatement = 
pooledConnection.prepareStatement(sql)) {
+                       this.setObject(preparedStatement, parameter);
+                       try (ResultSet resultSet = 
preparedStatement.executeQuery()) {
+                               List<Object> resultList = new ArrayList<>();
+                               while (resultSet.next()) {
+                                       Object object = 
resultSetTransform.transform(resultSet);
+                                       resultList.add(object);
+                               }
+                               return (List<T>) resultList;
+                       }
+               }
+       }
+
+       protected void setObject(PreparedStatement preparedStatement, List<?> 
parameter) throws SQLException {
+               if (Objects.isNull(parameter) || parameter.isEmpty()) {
+                       return;
+               }
+               int index = 1;
+               for (Object object : parameter) {
+                       preparedStatement.setObject(index++, object);
+               }
+       }
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnectorMetadata.java
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnectorMetadata.java
index 14e3f3153..1af0d98fa 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnectorMetadata.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnectorMetadata.java
@@ -17,12 +17,12 @@
 
 package org.apache.eventmesh.connector.storage.jdbc;
 
+import org.apache.eventmesh.api.connector.storage.CloudEventUtils;
 import org.apache.eventmesh.api.connector.storage.StorageConnectorMetedata;
 import org.apache.eventmesh.api.connector.storage.data.ConsumerGroupInfo;
 import org.apache.eventmesh.api.connector.storage.data.PullRequest;
 import org.apache.eventmesh.api.connector.storage.data.TopicInfo;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -45,17 +45,32 @@ public class AbstractJDBCStorageConnectorMetadata extends 
AbstractJDBCStorageCon
     @Override
     public List<TopicInfo> geTopicInfos(List<PullRequest> pullRequests) throws 
Exception {
         StringBuffer sqlsb = new StringBuffer();
-        int index = 0;
-        List<String> tableNames = new ArrayList<>();
+        int index = 1;
         for (PullRequest pullRequest : pullRequests) {
-            String sql = 
this.cloudEventSQLOperation.selectLastMessageSQL(pullRequest.getTopicName());
+            String sql = 
this.cloudEventSQLOperation.selectNoConsumptionMessageSQL(pullRequest.getTopicName(),pullRequest.getConsumerGroupName());
             sqlsb.append(sql);
-            if (index < pullRequests.size()) {
+            if (index++ < pullRequests.size()) {
                 sqlsb.append(" union all ");
             }
-            tableNames.add(pullRequest.getTopicName());
         }
-        return this.query(sqlsb.toString(), tableNames, 
ResultSetTransformUtils::transformTopicInfo);
+        return this.query(sqlsb.toString(), null, 
ResultSetTransformUtils::transformTopicInfo);
+    }
+    
+    public List<TopicInfo> geTopicInfos(Set<String> topics,String key) throws 
Exception {
+       key = CloudEventUtils.checkConsumerGroupName(key);
+        StringBuffer sqlsb = new StringBuffer();
+        int index = 1;
+        for (String topic : topics) {
+               if(topic.startsWith("cloud_event_")) {
+                       topic = topic.substring(12);
+               }
+            String sql = 
this.cloudEventSQLOperation.selectNoConsumptionMessageSQL(topic,key);
+            sqlsb.append(sql);
+            if (index++ < topics.size()) {
+                sqlsb.append(" union all ");
+            }
+        }
+        return this.query(sqlsb.toString(), null, 
ResultSetTransformUtils::transformTopicInfo);
     }
 
     @Override
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java
index 911a26300..18bae1c94 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java
@@ -21,114 +21,118 @@ import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.api.RequestReplyCallback;
 import org.apache.eventmesh.api.SendCallback;
 import org.apache.eventmesh.api.connector.storage.CloudEventUtils;
+import org.apache.eventmesh.api.connector.storage.Constant;
 import org.apache.eventmesh.api.connector.storage.StorageConnector;
 import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo;
 import org.apache.eventmesh.api.connector.storage.data.PullRequest;
 import org.apache.eventmesh.api.connector.storage.reply.ReplyOperation;
 import org.apache.eventmesh.api.connector.storage.reply.ReplyRequest;
 
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import io.cloudevents.CloudEvent;
 
-public class JDBCStorageConnector extends AbstractJDBCStorageConnectorMetadata 
implements StorageConnector, ReplyOperation {
-
-    private String getTableName(CloudEvent cloudEvent) {
-        return cloudEvent.getType();
-    }
-
-    @Override
-    public void publish(CloudEvent cloudEvent, SendCallback sendCallback) 
throws Exception {
-        String topic = this.getTableName(cloudEvent);
-        String sql = this.cloudEventSQLOperation.insertCloudEventSQL(topic);
-        List<Object> parameterList = new ArrayList<>();
-        this.execute(sql, parameterList);
-    }
-
-    @Override
-    public void request(CloudEvent cloudEvent, RequestReplyCallback 
rrCallback, long timeout) throws Exception {
-        String topic = this.getTableName(cloudEvent);
-        String sql = this.cloudEventSQLOperation.insertCloudEventSQL(topic);
-        List<Object> parameterList = new ArrayList<>();
-        this.execute(sql, parameterList);
-    }
-
-    @Override
-    public List<CloudEvent> pull(PullRequest pullRequest) throws Exception {
-        String locationEventSQL = 
this.cloudEventSQLOperation.locationEventSQL(pullRequest.getTopicName());
-        //TODO 1. consumerGroup  2.  example_id 3. id 4.consumerGroup
-        List<Object> parameter = new ArrayList<>();
-
-        parameter.add(pullRequest.getConsumerGroupName());
-        parameter.add(pullRequest.getProcessSign());
-        parameter.add(pullRequest.getNextId());
-        parameter.add(pullRequest.getConsumerGroupName());
-        parameter.clear();
-        parameter.add(pullRequest.getNextId());
-        parameter.add(pullRequest.getProcessSign());
-
-        long num = this.execute(locationEventSQL, parameter);
-        if (num == 0) {
-            return null;
-        }
-        String queryLocationEventSQL = 
this.cloudEventSQLOperation.queryLocationEventSQL(pullRequest.getTopicName());
-
-        this.query(queryLocationEventSQL, parameter, 
ResultSetTransformUtils::transformCloudEvent);
-        return null;
-    }
-
-    @Override
-    public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext 
context) {
-        List<Object> parameterList = new ArrayList<>(cloudEvents.size());
-        for (CloudEvent cloudEvent : cloudEvents) {
-            try {
-                String topic = this.getTableName(cloudEvent);
-                String sql = 
this.cloudEventSQLOperation.updateCloudEventOffsetSQL(topic);
-                parameterList.add(cloudEvent.getExtension("cloudEventInfoId"));
-                long i = this.execute(sql, parameterList);
-                if (i != cloudEvents.size()) {
-                    messageLogger.warn("");
-                }
-                parameterList.clear();
-            } catch (Exception e) {
-                messageLogger.error(e.getMessage(), e);
-            }
-        }
-    }
-
-
-    @Override
-    public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) 
throws Exception {
-        String sql = 
this.cloudEventSQLOperation.updateCloudEventReplySQL(CloudEventUtils.getTopic(cloudEvent));
-        List<Object> parameterList = new ArrayList<>();
-        parameterList.add(CloudEventUtils.serializeReplyData(cloudEvent));
-        parameterList.add(CloudEventUtils.getId(cloudEvent));
-        return this.execute(sql, parameterList) == 1;
-    }
-
-
-    @Override
-    public void start() {
-
-    }
-
-    @Override
-    public void shutdown() {
-        druidDataSource.close();
-    }
-
-    @Override
-    public List<CloudEventInfo> queryReplyCloudEvent(ReplyRequest 
replyRequest) throws Exception {
-        List<Object> parameter = new ArrayList<>();
-        StringBuffer stringBuffer = new StringBuffer();
-        for (int i = 1; i < replyRequest.getIdList().size(); i++) {
-            stringBuffer.append('?');
-            if (i++ != replyRequest.getIdList().size()) {
-                stringBuffer.append(',');
-            }
-        }
-        String sql = 
this.cloudEventSQLOperation.selectCloudEventByReplySQL(replyRequest.getTopic(), 
stringBuffer.toString());
-        return this.query(sql, parameter, 
ResultSetTransformUtils::transformCloudEvent);
-    }
+public class JDBCStorageConnector extends AbstractJDBCStorageConnectorMetadata
+               implements StorageConnector, ReplyOperation {
+
+       public void init(Properties properties) throws Exception {
+               super.init(properties);
+       }
+
+       @Override
+       public void publish(CloudEvent cloudEvent, SendCallback sendCallback) 
throws Exception {
+               String topic = CloudEventUtils.getTableName(cloudEvent);
+               String sql = 
this.cloudEventSQLOperation.insertCloudEventSQL(topic);
+               Long id = this.execute(sql, 
CloudEventUtils.getParameterToCloudEvent(cloudEvent), true);
+               CloudEventUtils.setValue(cloudEvent, Constant.STORAGE_ID, 
id.toString());
+       }
+
+       @Override
+       public void request(CloudEvent cloudEvent, RequestReplyCallback 
rrCallback, long timeout) throws Exception {
+               this.publish(cloudEvent, null);
+       }
+
+       @Override
+       public List<CloudEvent> pull(PullRequest pullRequest) throws Exception {
+               String locationEventSQL = 
this.cloudEventSQLOperation.locationEventSQL(pullRequest.getTopicName(),
+                               
CloudEventUtils.checkConsumerGroupName(pullRequest.getConsumerGroupName()), 
pullRequest.getProcessSign(), 
CloudEventUtils.checkConsumerGroupName(pullRequest.getConsumerGroupName()));
+               List<Object> parameter = new ArrayList<>();
+               parameter.add(pullRequest.getNextId());
+               long num = this.execute(locationEventSQL, parameter);
+               if (num == 0) {
+                       return null;
+               }
+               String queryLocationEventSQL = 
this.cloudEventSQLOperation.queryLocationEventSQL(pullRequest.getTopicName(),
+                               
CloudEventUtils.checkConsumerGroupName(pullRequest.getConsumerGroupName()), 
pullRequest.getProcessSign());
+               List<CloudEventInfo> cloudEventInfoList = 
this.query(queryLocationEventSQL, parameter,
+                               ResultSetTransformUtils::transformCloudEvent);
+
+               List<CloudEvent> cloudEventList = new ArrayList<>();
+               if (cloudEventInfoList.size() != 0) {
+                       for (CloudEventInfo cloudEventInfo : 
cloudEventInfoList) {
+                               CloudEvent cloudEvent = 
CloudEventUtils.eventFormat
+                                               
.deserialize(cloudEventInfo.getCloudEventData().getBytes(Charset.forName("UTF-8")));
+                               cloudEventList.add(cloudEvent);
+                       }
+               }
+               
pullRequest.setNextId(cloudEventInfoList.get(cloudEventInfoList.size() - 
1).getCloudEventInfoId().toString());
+               return cloudEventList;
+       }
+
+       @Override
+       public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext 
context) {
+               List<Object> parameterList = new 
ArrayList<>(cloudEvents.size());
+               for (CloudEvent cloudEvent : cloudEvents) {
+                       try {
+                               String topic = 
CloudEventUtils.getTableName(cloudEvent);
+                               String sql = 
this.cloudEventSQLOperation.updateCloudEventOffsetSQL(topic);
+                               
parameterList.add(cloudEvent.getExtension("cloudEventInfoId"));
+                               long i = this.execute(sql, parameterList);
+                               if (i != cloudEvents.size()) {
+                                       messageLogger.warn("");
+                               }
+                               parameterList.clear();
+                       } catch (Exception e) {
+                               messageLogger.error(e.getMessage(), e);
+                       }
+               }
+       }
+
+       @Override
+       public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) 
throws Exception {
+               String sql = 
this.cloudEventSQLOperation.updateCloudEventReplySQL(CloudEventUtils.getTopic(cloudEvent));
+               List<Object> parameterList = new ArrayList<>();
+               
parameterList.add(CloudEventUtils.serializeReplyData(cloudEvent));
+               parameterList.add(CloudEventUtils.getId(cloudEvent));
+               this.execute(sql, parameterList, true);
+               return true;
+       }
+
+       @Override
+       public void start() {
+
+       }
+
+       @Override
+       public void shutdown() {
+               druidDataSource.close();
+       }
+
+       @Override
+       public List<CloudEventInfo> queryReplyCloudEvent(ReplyRequest 
replyRequest) throws Exception {
+               List<Object> parameter = new ArrayList<>();
+               StringBuffer stringBuffer = new StringBuffer();
+               for (int i = 1; i < replyRequest.getIdList().size(); i++) {
+                       stringBuffer.append('?');
+                       if (i++ != replyRequest.getIdList().size()) {
+                               stringBuffer.append(',');
+                       }
+               }
+               String sql = 
this.cloudEventSQLOperation.selectCloudEventByReplySQL(replyRequest.getTopic(),
+                               stringBuffer.toString());
+               return this.query(sql, parameter, 
ResultSetTransformUtils::transformCloudEvent);
+       }
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/ResultSetTransformUtils.java
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/ResultSetTransformUtils.java
index ffceb76e2..7c4b769e6 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/ResultSetTransformUtils.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/ResultSetTransformUtils.java
@@ -27,19 +27,36 @@ import java.sql.SQLException;
 public class ResultSetTransformUtils {
 
     public static String transformTableName(ResultSet resultSet) throws 
SQLException {
-        return resultSet.getString(1);
+       String topic = resultSet.getString(1);
+        return topic.substring(12);
     }
 
     public static ConsumerGroupInfo transformConsumerGroup(ResultSet 
resultSet) {
         return new ConsumerGroupInfo();
     }
 
-    public static CloudEventInfo transformCloudEvent(ResultSet resultSet) {
-        return null;
+    public static CloudEventInfo transformCloudEvent(ResultSet resultSet) 
throws SQLException {
+       CloudEventInfo cloudEventInfo = new CloudEventInfo();
+       
cloudEventInfo.setCloudEventInfoId(resultSet.getLong("cloud_event_info_id"));
+       
cloudEventInfo.setCloudEventTopic(resultSet.getString("cloud_event_topic"));
+        //cloudEventInfo.setCloudEventId(resultSet.getLong("cloud_event_id"));
+       
cloudEventInfo.setCloudEventStorageNodeAdress(resultSet.getString("cloud_event_storage_node_adress"));
+       
cloudEventInfo.setCloudEventType(resultSet.getString("cloud_event_type"));
+       
cloudEventInfo.setCloudEventProducerGroupName(resultSet.getString("cloud_event_producer_group_name"));
+       
cloudEventInfo.setCloudEventSource(resultSet.getString("cloud_event_source"));
+       
cloudEventInfo.setCloudEventContentType(resultSet.getString("cloud_event_content_type"));
+       
cloudEventInfo.setCloudEventData(resultSet.getString("cloud_event_data"));
+       
cloudEventInfo.setCloudEventReplyData(resultSet.getString("cloud_event_reply_data"));
+       return cloudEventInfo;
     }
 
 
-    public static TopicInfo transformTopicInfo(ResultSet resultSet) {
+    public static TopicInfo transformTopicInfo(ResultSet resultSet) throws 
SQLException {
+       TopicInfo topicInfo = new TopicInfo();
+       topicInfo.setCurrentId(resultSet.getLong("cloud_event_info_id"));
+       topicInfo.setTopicName(resultSet.getString("cloud_event_topic"));
+       topicInfo.setTopicName(topicInfo.getTopicName().substring(12));
+       topicInfo.setDbTablesName(resultSet.getString("cloud_event_topic"));
         return null;
     }
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQL.java
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQL.java
index 2f95b0d2a..e4d66fb91 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQL.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQL.java
@@ -23,6 +23,8 @@ import lombok.Data;
 public class BaseSQL {
 
     private String createDatabases;
+    
+    private String queryDataBases;
 
     private String queryConsumerGroupTableSQL;
 
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQLOperation.java
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQLOperation.java
index 9eeb48043..e4c2ab138 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQLOperation.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQLOperation.java
@@ -20,6 +20,8 @@ package org.apache.eventmesh.connector.storage.jdbc.SQL;
 public interface BaseSQLOperation {
 
     public String createDatabases();
+    
+    public String queryDataBases();
 
     public String queryConsumerGroupTableSQL();
 
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/CloudEventSQLOperation.java
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/CloudEventSQLOperation.java
index fac24ff5b..e20ea765f 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/CloudEventSQLOperation.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/CloudEventSQLOperation.java
@@ -29,15 +29,15 @@ public interface CloudEventSQLOperation {
 
     public String selectCloudEventByReplySQL(String table, String idNum);
 
-    public String locationEventSQL(String table);
+    public String locationEventSQL(String table,String 
consumerGroupName,String processSign,String consumerGroup);
 
-    public String queryLocationEventSQL(String table);
+    public String queryLocationEventSQL(String table,String 
consumerGroupName,String processSign);
 
     public String selectFastMessageSQL(String table);
 
     public String selectLastMessageSQL(String table);
 
-    public String selectNoConsumptionMessageSQL(String table);
+    public String selectNoConsumptionMessageSQL(String table,String key);
 
     public String selectAppointTimeMessageSQL(String table);
 }
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/SQLServiceInvocationHandler.java
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/SQLServiceInvocationHandler.java
index 680eaf63c..5f0076610 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/SQLServiceInvocationHandler.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/SQLServiceInvocationHandler.java
@@ -48,10 +48,9 @@ public class SQLServiceInvocationHandler implements 
InvocationHandler {
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
         String argsKey;
-        if (args.length == 0) {
+        if (Objects.isNull(args)||args.length == 0) {
             argsKey = "";
-        }
-        if (args.length == 1) {
+        }else if (args.length == 1) {
             argsKey = (String) args[0];
         } else {
             StringBuffer stringBuffer = new StringBuffer();
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
index 04d1a953b..5ce70c639 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-jdbc=JDBCStorageConnector
\ No newline at end of file
+jdbc=org.apache.eventmesh.connector.storage.jdbc.JDBCStorageConnector
\ No newline at end of file
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml
index af591573f..2efb9b342 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml
@@ -1,3 +1,4 @@
+queryDataBases: "select SCHEMA_NAME from information_schema.SCHEMATA where 
SCHEMA_NAME = 'event_mesh_storage'"
 createDatabases: "create database if not exists event_mesh_storage"
-queryConsumerGroupTableSQL: "select table_name from information_schema.tables 
where TABLE_SCHEMA = ? and table_name = ?"
-queryCloudEventTablesSQL: "select table_name from information_schema.tables 
where TABLE_SCHEMA = ? and table_name like 'cloud_event_%'"
\ No newline at end of file
+queryConsumerGroupTableSQL: ""
+queryCloudEventTablesSQL: "select table_name from information_schema.tables 
where TABLE_SCHEMA = 'event_mesh_storage' and table_name like 'cloud_event_%'"
\ No newline at end of file
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml
index 19d7a9e6d..282569c2e 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml
@@ -10,7 +10,7 @@ createCloudEventSQL: "create table `cloud_event_{table}`(
                                                                                
                        `cloud_event_tag` json not null default (json_array()) 
comment '事件标签',
                                                                                
                        `cloud_event_extensions` text not null  comment 
'存储扩展信息',
                                                                                
                        `cloud_event_data` longtext not null comment '事件数据',
-                                                                               
                        `cloud_event_reply_data`  not null  comment '事件reply数据',
+                                                                               
                        `cloud_event_reply_data` longtext  not null   comment 
'事件reply数据',
                                                                                
                        `cloud_event_consume_location` json not null default 
(json_object()) comment '扩展信息',
                                                                                
                        `cloud_event_state` varchar(31) not null default '' 
comment '存储节点地址',
                                                                                
                        `cloud_event_reply_state` varchar(31) not null default 
'NOTHING' comment '存储节点地址',
@@ -20,14 +20,14 @@ createCloudEventSQL: "create table `cloud_event_{table}`(
                                                                                
                        PRIMARY KEY (`cloud_event_info_id`),
                                                                                
                        key (`cloud_event_create_time`)
                                                                                
        )"
-insertCloudEventSQL: "insert into 
cloud_event_{table}(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,
-cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data)
 values(?,?,?,?,?,?,?,?,?,?)"
-updateCloudEventOffsetSQL: "update cloud_event_{table} set cloud_event_state = 
'SUCCESS'  where cloud_event_info_id = ?"
-updateCloudEventReplySQL: "update cloud_event_{table}  set  
cloud_event_reply_data = ? , cloud_event_reply_state = 'NOTHING' where 
cloud_event_info_id = ?"
+insertCloudEventSQL: "insert into 
`cloud_event_{table}`(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name,
+cloud_event_source,cloud_event_content_type,cloud_event_extensions,cloud_event_data,cloud_event_reply_data)
 values(?,?,?,?,?,?,?,?,?,?)"
+updateCloudEventOffsetSQL: "update `cloud_event_{table}` set cloud_event_state 
= 'SUCCESS'  where cloud_event_info_id = ?"
+updateCloudEventReplySQL: "update `cloud_event_{table}`  set  
cloud_event_reply_data = ? , cloud_event_reply_state = 'RESULT' where 
cloud_event_info_id = ?"
 selectCloudEventByReplySQL: "select * from cloud_event_{table} where 
cloud_event_info_id in({id}) and cloud_event_reply_data is not null"
-locationEventSQL: "update cloud_event_{table} set json_set( 
cloud_event_consume_location , ? ,? ) where cloud_event_info_id > ? and 
json_extract(cloud_event_consume_location, ?) is null limit 200"
-queryLocationEventSQL: "select * from cloud_event_{table} where 
cloud_event_info_id > ? and JSON_CONTAINS_PATH(cloud_event_consume_location, 
'one', ?)"
-selectFastMessageSQL: "select 'cloud_event_test' as tableName , 
cloud_event_info_id from cloud_event_{table}  order by  cloud_event_info_id 
limit 1"
-selectLastMessageSQL: "select 'cloud_event_test' as tableName , 
cloud_event_info_id from cloud_event_{table}  order by  cloud_event_info_id 
desc  limit 1"
-selectNoConsumptionMessageSQL: "select 'cloud_event_test' as tableName , 
cloud_event_info_id from cloud_event_{table} where  
json_extract(cloud_event_consume_location, ?) is not null order by  
cloud_event_info_id desc  limit 1"
+locationEventSQL: "update `cloud_event_{table}` set 
cloud_event_consume_location = json_set( cloud_event_consume_location , 
'$.{value}' ,'{value }' ) where cloud_event_info_id > ? and 
json_extract(cloud_event_consume_location, '$.{value}') is null limit 200"
+queryLocationEventSQL: "select * from `cloud_event_{table}` where 
cloud_event_info_id >= ? and JSON_EXTRACT(cloud_event_consume_location, 
'$.{value}') = '{value}'"
+selectFastMessageSQL: "select * from `cloud_event_{table}`  order by  
cloud_event_info_id limit 1"
+selectLastMessageSQL: "select * from `cloud_event_{table}`  order by  
cloud_event_info_id desc  limit 1"
+selectNoConsumptionMessageSQL: "( select * from `cloud_event_{table}` where  
json_extract(cloud_event_consume_location, '$.{value}') is not null order by  
cloud_event_info_id desc  limit 1 )"
 selectAppointTimeMessageSQL: ""
\ No newline at end of file
diff --git 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/test/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnectorTest.java
 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/test/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnectorTest.java
index b6e035799..6e07873c3 100644
--- 
a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/test/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnectorTest.java
+++ 
b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/test/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnectorTest.java
@@ -17,22 +17,159 @@
 
 package org.apache.eventmesh.connector.storage.jdbc;
 
+import org.apache.eventmesh.api.SendCallback;
+import org.apache.eventmesh.api.SendResult;
+import org.apache.eventmesh.api.connector.storage.data.PullRequest;
+import org.apache.eventmesh.api.connector.storage.data.TopicInfo;
+import org.apache.eventmesh.api.exception.OnExceptionContext;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-public class JDBCStorageConnectorTest  {
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+public class JDBCStorageConnectorTest {
+
+       private String topicOne = "test_1";
+
+       private String topicTwo = "test_2";
+
+       private String consumerGroupA = "consumerGroupA";
+
+       private String consumerGroupB = "consumerGroupB";
+
+       private PullRequest oneConsumerGroupA;
+
+       private PullRequest oneConsumerGroupB;
+
+       private PullRequest twoConsumerGroupA;
+
+       private PullRequest twoConsumerGroupB;
+
+       private Set<String> topicSet;
 
        JDBCStorageConnector connector = new JDBCStorageConnector();
-       
+
+       @Before
+       public void init() throws Exception {
+               
+               Properties properties = new Properties();
+               properties.put("dbType", "mysql");
+               properties.put("address", "127.0.0.1:3306");
+               properties.put("parameter",
+                               
"useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull");
+               properties.put("username", "root");
+               properties.put("password", "Ab123123@");
+               properties.put("maxActive", "20");
+               properties.put("maxWait", "10000");
+               connector.init(properties);
+
+               oneConsumerGroupA = new PullRequest();
+               oneConsumerGroupA.setConsumerGroupName(consumerGroupA);
+               oneConsumerGroupA.setProcessSign("A1");
+               oneConsumerGroupA.setTopicName(topicOne);
+
+               oneConsumerGroupB = new PullRequest();
+               oneConsumerGroupB.setConsumerGroupName(consumerGroupA);
+               oneConsumerGroupB.setProcessSign("A2");
+               oneConsumerGroupB.setTopicName(topicOne);
+
+               twoConsumerGroupA = new PullRequest();
+               twoConsumerGroupA.setConsumerGroupName(consumerGroupB);
+               twoConsumerGroupA.setProcessSign("B1");
+               twoConsumerGroupA.setTopicName(topicOne);
+
+               twoConsumerGroupB = new PullRequest();
+               twoConsumerGroupB.setConsumerGroupName(consumerGroupB);
+               twoConsumerGroupB.setProcessSign("B2");
+               twoConsumerGroupB.setTopicName(topicOne);
+
+               topicSet = connector.getTopic();
+               this.createTopic(topicOne);
+               this.createTopic(topicTwo);
+               List<PullRequest> pullRequestList = new ArrayList<>();
+               pullRequestList.add(oneConsumerGroupA);
+               pullRequestList.add(oneConsumerGroupB);
+               pullRequestList.add(twoConsumerGroupA);
+               pullRequestList.add(twoConsumerGroupB);
+               List<TopicInfo> topicInfoList = 
connector.geTopicInfos(pullRequestList);
+               for (TopicInfo topicInfo : topicInfoList) {
+                       Assert.assertEquals(Long.valueOf(0), 
topicInfo.getCurrentId());
+               }
+
+       }
+
+       private void createTopic(String topic) throws Exception {
+               if (!topicSet.contains("cloud_event_" + topic)) {
+                       TopicInfo topicInfo = new TopicInfo();
+                       topicInfo.setTopicName(topic);
+                       connector.createTopic(topicInfo);
+               }
+       }
+
        @Test
        public void testInit() throws Exception {
                Properties properties = new Properties();
-               properties.put("url", 
"jdbc:mysql://127.0.0.1:3306/electron?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull");
+               properties.put("url",
+                               
"jdbc:mysql://127.0.0.1:3306/electron?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull");
                properties.put("username", "root");
                properties.put("password", "Ab123123@");
                properties.put("maxActive", "20");
                properties.put("maxWait", "10000");
                connector.init(properties);
        }
+
+       @Test
+       public void test_publish_and_pull() throws Exception {
+               this.publish(topicOne, "0");
+               List<CloudEvent> cloudEventList = 
this.connector.pull(oneConsumerGroupA);
+               //Assert.assertNull(cloudEventList);
+               cloudEventList = this.connector.pull(oneConsumerGroupA);
+               Assert.assertNull(cloudEventList);
+               cloudEventList = this.connector.pull(oneConsumerGroupB);
+               Assert.assertNull(cloudEventList);
+               cloudEventList = this.connector.pull(twoConsumerGroupA);
+
+               this.publish_bach(topicOne);
+               this.publish_bach(topicTwo);
+
+               connector.pull(oneConsumerGroupA);
+       }
+
+       public void publish_bach(String topic) throws Exception {
+               for (int i = 1; i <= 900; i++) {
+                       this.publish(topic, i + "");
+               }
+       }
+
+       public void publish(String topic, String id) throws Exception {
+
+               CloudEvent cloudEvent = 
CloudEventBuilder.v03().withExtension("cloudeventid", 
id).withId(id).withType("1231").withSource(new 
URI("")).withSubject(topic).withExtension("id", id)
+                               .withData(id.getBytes()).build();
+
+               connector.publish(cloudEvent, new SendCallback() {
+
+                       @Override
+                       public void onSuccess(SendResult sendResult) {
+                               System.out.println("111");
+                       }
+
+                       @Override
+                       public void onException(OnExceptionContext context) {
+                               System.out.println("111");
+                       }
+               });
+       }
+
+       public void test_reply() throws Exception {
+               this.publish_bach(topicOne);
+       }
 }
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 9685f1ca5..fad9acd5a 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -76,6 +76,7 @@ dependencies {
     implementation project(":eventmesh-webhook:eventmesh-webhook-admin")
     implementation project(":eventmesh-webhook:eventmesh-webhook-api")
     implementation project(":eventmesh-webhook:eventmesh-webhook-receive")
+    implementation 
project(":eventmesh-connector-plugin:eventmesh-connector-storage-jdbc")
 
     testImplementation "org.mockito:mockito-core"
     testImplementation "org.mockito:mockito-inline"
diff --git a/eventmesh-runtime/conf/eventmesh.properties 
b/eventmesh-runtime/conf/eventmesh.properties
index 17bfab067..a2cbc7612 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -72,7 +72,16 @@ 
eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255
 eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8
 
 #connector plugin
-eventMesh.connector.plugin.type=standalone
+eventMesh.connector.plugin.type=storage
+eventMesh.connector.plugin.storage.nodeaddress=127.0.0.1:3306
+eventMesh.connector.plugin.storage.username=root
+eventMesh.connector.plugin.storage.password=Ab123123@
+eventMesh.connector.plugin.storage.type=jdbc
+eventMesh.connector.plugin.storage.jdbc.dbType=mysql
+eventMesh.connector.plugin.storage.jdbc.parameter=useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull
+eventMesh.connector.plugin.storage.jdbc.maxActive=200
+eventMesh.connector.plugin.storage.jdbc.maxWait=20000
+
 
 #security plugin
 eventMesh.server.security.enabled=false


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to