This is an automated email from the ASF dual-hosted git repository. mikexue pushed a commit to branch storage-api in repository https://gitbox.apache.org/repos/asf/eventmesh.git
commit c0e182a0998f6d1a3f1a7f7240b2d7ade5417d69 Author: githublaohu <[email protected]> AuthorDate: Mon Apr 24 16:57:13 2023 +0800 Resolve conflicts with storage-api branch --- .../eventmesh-connector-api/build.gradle | 6 + .../eventmesh-connector-api/gradle.properties | 4 +- .../api/connector/storage/CloudEventUtils.java | 45 +++- ...va => ConnectorResourceServiceStorageImpl.java} | 19 +- .../eventmesh/api/connector/storage/Constant.java | 21 +- .../api/connector/storage/StorageConfig.java | 4 +- .../api/connector/storage/StorageConnector.java | 4 + .../storage/StorageConnectorMetedata.java | 2 + .../connector/storage/StorageConnectorProxy.java | 21 +- .../connector/storage/StorageConnectorService.java | 253 +++++++++++---------- .../api/connector/storage/StorageConsumer.java | 102 ++++++--- .../api/connector/storage/StorageProducer.java | 9 +- .../api/connector/storage/data/PullRequest.java | 61 ++--- .../api/connector/storage/data/TopicInfo.java | 2 + .../connector/storage/metadata/RouteHandler.java | 23 +- .../storage/metadata/StorageMetaServcie.java | 74 ++++-- .../connector/storage/pull/PullCallbackImpl.java | 6 +- .../connector/storage/pull/StoragePullService.java | 11 +- .../storage/reply/ReplyOperationService.java | 206 +++++++++-------- ...entmesh.api.connector.ConnectorResourceService} | 2 +- .../org.apache.eventmesh.api.consumer.Consumer} | 2 +- .../org.apache.eventmesh.api.producer.Producer} | 2 +- .../storage/StorageConnectorProxyTest.java | 132 +++++++++++ .../storage/pull/StoragePullServiceTest.java | 94 ++++++++ .../storage/reply/ReplyOperationServiceTest.java | 151 ++++++++++++ .../storage/jdbc/AbstractJDBCStorageConnector.java | 201 ++++++++-------- .../jdbc/AbstractJDBCStorageConnectorMetadata.java | 29 ++- .../storage/jdbc/JDBCStorageConnector.java | 202 ++++++++-------- .../storage/jdbc/ResultSetTransformUtils.java | 25 +- .../connector/storage/jdbc/SQL/BaseSQL.java | 2 + .../storage/jdbc/SQL/BaseSQLOperation.java | 2 + .../storage/jdbc/SQL/CloudEventSQLOperation.java | 6 +- .../jdbc/SQL/SQLServiceInvocationHandler.java | 5 +- ...ventmesh.api.connector.storage.StorageConnector | 2 +- .../src/main/resource/mysql-base.yaml | 5 +- .../src/main/resource/mysql-cloudevent.yaml | 20 +- .../storage/jdbc/JDBCStorageConnectorTest.java | 143 +++++++++++- eventmesh-runtime/build.gradle | 1 + eventmesh-runtime/conf/eventmesh.properties | 11 +- 39 files changed, 1345 insertions(+), 565 deletions(-) diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle index 25c68d12e..be494c74c 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle +++ b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle @@ -23,10 +23,16 @@ dependencies { api "io.dropwizard.metrics:metrics-healthchecks" api "io.dropwizard.metrics:metrics-annotation" api "io.dropwizard.metrics:metrics-json" + + implementation 'io.cloudevents:cloudevents-json-jackson:2.4.0' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' + implementation "org.mockito:mockito-core" + implementation "org.powermock:powermock-module-junit4" + implementation "org.powermock:powermock-api-mockito2" + testCompileOnly 'org.projectlombok:lombok' testAnnotationProcessor 'org.projectlombok:lombok' diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties b/eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties index a9fd83fea..41f4a5718 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties +++ b/eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties @@ -13,4 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# \ No newline at end of file +# +pluginType=connector +pluginName=storage \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/CloudEventUtils.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/CloudEventUtils.java index d935c5ba9..ecf289fc9 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/CloudEventUtils.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/CloudEventUtils.java @@ -20,24 +20,31 @@ package org.apache.eventmesh.api.connector.storage; import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo; import java.lang.reflect.Field; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Objects; import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.impl.BaseCloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.core.v03.CloudEventV03; import io.cloudevents.core.v1.CloudEventV1; public class CloudEventUtils { + + public static EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat("application/cloudevents+json"); private static Field CLOUD_EVENT_EXTENSIONS_FIELD; static { try { - CLOUD_EVENT_EXTENSIONS_FIELD = BaseCloudEvent.class.getField("extensions"); + CLOUD_EVENT_EXTENSIONS_FIELD = BaseCloudEvent.class.getDeclaredField("extensions"); CLOUD_EVENT_EXTENSIONS_FIELD.setAccessible(true); } catch (NoSuchFieldException | SecurityException e) { - e.printStackTrace(); + throw new RuntimeException(e.getMessage() , e ); } } @@ -50,22 +57,48 @@ public class CloudEventUtils { extensions.put(key, value); return cloudEvent; } catch (Exception e) { - e.printStackTrace(); + throw new RuntimeException(e); } } return null; } + + public static String getTableName(CloudEvent cloudEvent) { + return cloudEvent.getSubject(); + } + + public static String checkConsumerGroupName(String topic) { + return topic.replace("-", "_"); + } + + public static List<Object> getParameterToCloudEvent(CloudEvent cloudEvent) { + List<Object> parameterList = new ArrayList<>(); + String id = (String) cloudEvent.getExtension("cloudeventid"); + parameterList.add(Objects.isNull(id)?"1":id);// id + parameterList.add(getTableName(cloudEvent));// topic + parameterList.add("");// cloud_event_storage_node_adress + parameterList.add("");// cloud_event_type + parameterList.add("");// cloud_event_producer_group_name + parameterList.add("");// cloud_event_source + parameterList.add("application/cloudevents+json");// cloud_event_content_type + // parameterList.add("");//cloud_event_tag + parameterList.add("");// cloud_event_extensions + String data = new String(CloudEventUtils.eventFormat.serialize(cloudEvent), Charset.forName("UTF-8")); + parameterList.add(data);// cloud_event_data + parameterList.add("{}"); + return parameterList; + } public static String getNodeAdress(CloudEvent cloudEvent) { - return (String) cloudEvent.getExtension(Constant.NODE_ADDRESS); + return (String) cloudEvent.getExtension(Constant.STORAGE_CONFIG_ADDRESS); } public static String getTopic(CloudEvent cloudEvent) { - return null; + return cloudEvent.getSubject(); } public static String getId(CloudEvent cloudEvent) { - return null; + return ""; } public static CloudEvent createCloudEvent(CloudEventInfo cloudEventInfo) { diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorInfo.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/ConnectorResourceServiceStorageImpl.java similarity index 71% rename from eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorInfo.java rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/ConnectorResourceServiceStorageImpl.java index 65a3fecd4..274cd0ad9 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorInfo.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/ConnectorResourceServiceStorageImpl.java @@ -14,13 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.eventmesh.api.connector.storage; -import lombok.Data; +import org.apache.eventmesh.api.connector.ConnectorResourceService; + +public class ConnectorResourceServiceStorageImpl implements ConnectorResourceService{ + + @Override + public void init() throws Exception { + // TODO Auto-generated method stub + + } -@Data -public class StorageConnectorInfo { + @Override + public void release() throws Exception { + // TODO Auto-generated method stub + + } - private boolean distinguishTopic; } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/Constant.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/Constant.java index f08d62cf0..ceef0e5e9 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/Constant.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/Constant.java @@ -19,5 +19,24 @@ package org.apache.eventmesh.api.connector.storage; public class Constant { - public static final String NODE_ADDRESS = "nodeAddress"; + public static final String STORAGE_CONFIG_ADDRESS = "eventMesh.connector.plugin.storage.nodeaddress"; + + public static final String STORAGE_CONFIG_TYPE = "eventMesh.connector.plugin.storage.type"; + + public static final String STORAGE_CONFIG_JDBC_PARAMETER = "eventMesh.connector.plugin.storage.jdbc.parameter"; + + public static final String STORAGE_CONFIG_USER_NAME = "eventMesh.connector.plugin.storage.username"; + + public static final String STORAGE_CONFIG_PASSWORD = "eventMesh.connector.plugin.storage.password"; + + public static final String STORAGE_CONFIG_JDBC_TYPE = "eventMesh.connector.plugin.storage.jdbc.dbType"; + + public static final String STORAGE_CONFIG_JDBC_MAXACTIVE = "eventMesh.connector.plugin.storage.jdbc.maxActive"; + + public static final String STORAGE_CONFIG_JDBC_MAXWAIT = "eventMesh.connector.plugin.storage.jdbc.maxWait"; + + public static final String STORAGE_ID = "storageid"; + + public static final String STORAGE_NODE_ADDRESS = "address"; + } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConfig.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConfig.java index 67eedb144..64c6fb88f 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConfig.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConfig.java @@ -22,7 +22,7 @@ import lombok.Data; @Data public class StorageConfig { - private long pullInterval; + private long pullInterval = 200; - private long pullThresholdForQueue; + private long pullThresholdForQueue = 5; } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java index 6e0493ca4..65756630a 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java @@ -61,6 +61,10 @@ public interface StorageConnector extends LifeCycle { public List<CloudEvent> pull(PullRequest pullRequest) throws Exception; void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context); + + public default String getTopic(String topic) { + return topic; + } public default int deleteCloudEvent(CloudEvent cloudEvent) { return 0; diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorMetedata.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorMetedata.java index 4843c25fb..84bb4edcc 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorMetedata.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorMetedata.java @@ -35,5 +35,7 @@ public interface StorageConnectorMetedata { public int createTopic(TopicInfo topicInfo) throws Exception; public int createConsumerGroupInfo(ConsumerGroupInfo consumerGroupInfo) throws Exception; + + public List<TopicInfo> geTopicInfos(Set<String> topics,String key) throws Exception; } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxy.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxy.java index a0339c65e..211594faf 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxy.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxy.java @@ -24,11 +24,13 @@ import org.apache.eventmesh.api.connector.storage.data.PullRequest; import org.apache.eventmesh.api.connector.storage.data.TopicInfo; import org.apache.eventmesh.api.connector.storage.metadata.RouteHandler; import org.apache.eventmesh.api.connector.storage.metadata.StorageMetaServcie; +import org.apache.eventmesh.api.connector.storage.reply.ReplyOperation; import org.apache.eventmesh.api.connector.storage.reply.ReplyOperationService; import org.apache.eventmesh.api.connector.storage.reply.RequestReplyInfo; import org.apache.eventmesh.api.exception.ConnectorRuntimeException; import org.apache.eventmesh.api.exception.OnExceptionContext; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Properties; @@ -36,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import io.cloudevents.CloudEvent; +import lombok.Setter; public class StorageConnectorProxy implements StorageConnector { @@ -45,10 +48,13 @@ public class StorageConnectorProxy implements StorageConnector { private RouteHandler routeHandler = new RouteHandler(); + @Setter private ReplyOperationService replyService; + @Setter private StorageMetaServcie storageMetaServcie; + @Setter private Executor executor; @Override @@ -63,7 +69,12 @@ public class StorageConnectorProxy implements StorageConnector { public void init(Properties properties) throws Exception { } + public Collection<StorageConnector> getStorageConnectorList(){ + return this.storageConnectorByKeyMap.values(); + } + public void setConnector(StorageConnector storageConnector, String key) { + routeHandler.addStorageConnector(storageConnector); storageConnectorMap.put(storageConnector, key); storageConnectorByKeyMap.put(key, storageConnector); } @@ -84,6 +95,7 @@ public class StorageConnectorProxy implements StorageConnector { if (storageConnector instanceof StorageConnectorMetedata && !storageMetaServcie.isTopic(storageConnector, CloudEventUtils.getTopic(cloudEvent))) { TopicInfo topicInfo = new TopicInfo(); + topicInfo.setTopicName(CloudEventUtils.getTopic(cloudEvent)); StorageConnectorMetedata storageConnectorMetedata = (StorageConnectorMetedata) storageConnector; storageConnectorMetedata.createTopic(topicInfo); } @@ -114,15 +126,18 @@ public class StorageConnectorProxy implements StorageConnector { public void doRequest(CloudEvent cloudEvent, RequestReplyCallback requestReplyCallback, long timeout) { try { StorageConnector storageConnector = routeHandler.select(); + if(!(storageConnector instanceof ReplyOperation)) { + return; + } String key = storageConnectorMap.get(storageConnector); - CloudEventUtils.setValue(cloudEvent, "nodeAddress", key); + CloudEventUtils.setValue(cloudEvent, Constant.STORAGE_CONFIG_ADDRESS, key); storageConnector.request(cloudEvent, requestReplyCallback, timeout); - Long storageId = (Long) cloudEvent.getExtension("storageId"); + Long storageId = Long.valueOf( cloudEvent.getExtension(Constant.STORAGE_ID).toString()); RequestReplyInfo requestReplyInfo = new RequestReplyInfo(); requestReplyInfo.setStorageId(storageId); requestReplyInfo.setTimeOut(System.currentTimeMillis() + timeout); requestReplyInfo.setRequestReplyCallback(requestReplyCallback); - replyService.setRequestReplyInfo(null, cloudEvent.getType(), storageId, requestReplyInfo); + replyService.setRequestReplyInfo((ReplyOperation)storageConnector, cloudEvent.getType(), storageId, requestReplyInfo); } catch (Exception e) { requestReplyCallback.onException(e); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorService.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorService.java index 287c2b915..e4e51a215 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorService.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorService.java @@ -22,9 +22,9 @@ import org.apache.eventmesh.api.connector.storage.data.PullRequest; import org.apache.eventmesh.api.connector.storage.metadata.StorageMetaServcie; import org.apache.eventmesh.api.connector.storage.pull.StoragePullService; import org.apache.eventmesh.api.connector.storage.reply.ReplyOperationService; +import org.apache.eventmesh.common.config.ConfigurationWrapper; import org.apache.eventmesh.spi.EventMeshExtensionFactory; -import java.net.URL; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,122 +42,137 @@ import lombok.Getter; public class StorageConnectorService implements LifeCycle { - private static final StorageConnectorService instance = new StorageConnectorService(); - - private StoragePullService pullService = new StoragePullService(); - - private StorageMetaServcie storageMetaServcie = new StorageMetaServcie(); - - private ReplyOperationService replyService = new ReplyOperationService(); - - private Map<String, StorageConnector> storageConnectorMap = new HashMap<>(); - - private Executor executor; - - private ScheduledExecutorService scheduledExecutor; - - @Getter - private StorageConnector storageConnector = new StorageConnectorProxy(); - - public static StorageConnectorService getInstance() { - return instance; - } - - private StorageConnectorService() { - this.executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 10, - Runtime.getRuntime().availableProcessors() * 100, 1000 * 60 * 60, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), new ThreadFactory() { - AtomicInteger index = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "storage-connent-" + index.getAndIncrement()); - } - }); - this.scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 10, - new ThreadFactory() { - AtomicInteger index = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "storage-connent-shceduled-" + index.getAndIncrement()); - } - }); - this.storageMetaServcie = new StorageMetaServcie(); - this.storageMetaServcie.setScheduledExecutor(scheduledExecutor); - this.storageMetaServcie.setStoragePullService(pullService); - this.replyService.setExecutor(executor); - this.pullService.setExecutor(executor); - this.executor.execute(pullService); - this.scheduled(); - - } - - public void scheduled() { - scheduledExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - storageMetaServcie.pullMeteData(); - } - }, 5, 1000, TimeUnit.MILLISECONDS); - scheduledExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - replyService.execute(); - } - }, 5, 5, TimeUnit.MILLISECONDS); - } - - public StorageConnector createConsumerByStorageConnector(Properties properties) { - StorageConnector storageConnector = this.createConsumerByStorageConnector(properties); - if (storageConnector instanceof StorageConnectorMetedata) { - this.storageMetaServcie.registerStorageConnector((StorageConnectorMetedata) storageConnector); - } - - return storageConnector; - } - - public StorageConnector createProducerByStorageConnector(Properties properties, List<PullRequest> pullRequests) { - StorageConnector storageConnector = this.createConsumerByStorageConnector(properties); - this.storageMetaServcie.registerPullRequest(pullRequests, storageConnector); - return storageConnector; - } - - public StorageConnector createStorageConnector(Properties properties) throws Exception { - URL url = new URL(properties.getProperty("")); - String host = url.getHost() + ":" + url.getPort(); - String[] hosts = host.split(","); - StorageConnectorProxy connectorProxy = new StorageConnectorProxy(); - for (String address : hosts) { - StorageConnector storageConnector = EventMeshExtensionFactory.getExtension(StorageConnector.class, - url.getProtocol()); - properties.setProperty("nodeAddress", address); - properties.setProperty("protocol", url.getProtocol()); - storageConnector.init(properties); - String key = url.getProtocol() + "://" + address; - connectorProxy.setConnector(storageConnector, key); - storageConnectorMap.put(key, storageConnector); - } - - return connectorProxy; - } - - @Override - public boolean isStarted() { - return true; - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void start() { - } - - @Override - public void shutdown() { - storageConnectorMap.values().forEach(value -> value.shutdown()); - } + private static final StorageConnectorService instance = new StorageConnectorService(); + + private StoragePullService pullService = new StoragePullService(); + + private StorageMetaServcie storageMetaServcie = new StorageMetaServcie(); + + private ReplyOperationService replyService = new ReplyOperationService(); + + private Map<String, StorageConnector> storageConnectorMap = new HashMap<>(); + + @Getter + private Executor executor; + + private ScheduledExecutorService scheduledExecutor; + + @Getter + private StorageConnector storageConnector = new StorageConnectorProxy(); + + public static StorageConnectorService getInstance() { + return instance; + } + + private StorageConnectorService() { + this.executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 10, + Runtime.getRuntime().availableProcessors() * 300, 1000 * 60 * 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), new ThreadFactory() { + AtomicInteger index = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "storage-connent-" + index.getAndIncrement()); + } + }); + this.scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 10, + new ThreadFactory() { + AtomicInteger index = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "storage-connent-shceduled-" + index.getAndIncrement()); + } + }); + this.storageMetaServcie = new StorageMetaServcie(); + this.storageMetaServcie.setScheduledExecutor(scheduledExecutor); + this.storageMetaServcie.setStoragePullService(pullService); + this.storageMetaServcie.setExecutor(executor); + this.replyService.setExecutor(executor); + this.pullService.setExecutor(executor); + this.pullService.setScheduledExecutor(scheduledExecutor); + StorageConfig storageConfig = new StorageConfig(); + this.pullService.setStorageConfig(storageConfig); + this.executor.execute(pullService); + this.scheduled(); + + } + + public void scheduled() { + scheduledExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + storageMetaServcie.pullMeteData(); + } + }, 5, 1000, TimeUnit.MILLISECONDS); + scheduledExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + replyService.execute(); + } + }, 5, 5, TimeUnit.MILLISECONDS); + } + + public StorageConnector createProducerByStorageConnector(Properties properties) { + StorageConnector storageConnector = this.createStorageConnector(properties); + this.storageMetaServcie.registerStorageConnector(storageConnector); + return storageConnector; + } + + public StorageConnector createConsumerByStorageConnector(Properties properties, List<PullRequest> pullRequests) { + try { + StorageConnector storageConnector = this.createProducerByStorageConnector(properties); + this.storageMetaServcie.registerPullRequest(pullRequests, storageConnector); + return storageConnector; + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public StorageConnector createStorageConnector(Properties properties) { + try { + ConfigurationWrapper configurationWrapper = new ConfigurationWrapper( + System.getProperty("confPath", System.getenv("confPath")), "eventmesh.properties", false); + configurationWrapper.getProperties().putAll(properties); + properties = configurationWrapper.getProperties(); + String storageType = properties.getProperty(Constant.STORAGE_CONFIG_TYPE); + String[] hosts = properties.getProperty(Constant.STORAGE_CONFIG_ADDRESS).split(";"); + StorageConnectorProxy connectorProxy = new StorageConnectorProxy(); + connectorProxy.setExecutor(executor); + connectorProxy.setReplyService(replyService); + connectorProxy.setStorageMetaServcie(storageMetaServcie); + for (String address : hosts) { + StorageConnector storageConnector = EventMeshExtensionFactory.getExtension(StorageConnector.class, + storageType); + properties.setProperty(Constant.STORAGE_NODE_ADDRESS, address); + storageConnector.init(properties); + storageConnector.start(); + String key = storageType + "://" + address; + connectorProxy.setConnector(storageConnector, key); + storageConnectorMap.put(key, storageConnector); + } + return connectorProxy; + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public boolean isStarted() { + return true; + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void start() { + } + + @Override + public void shutdown() { + storageConnectorMap.values().forEach(value -> value.shutdown()); + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConsumer.java index 49c7fcde1..616a84ca1 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConsumer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConsumer.java @@ -18,60 +18,94 @@ package org.apache.eventmesh.api.connector.storage; import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.EventListener; +import org.apache.eventmesh.api.connector.storage.data.PullRequest; +import org.apache.eventmesh.api.connector.storage.pull.PullCallbackImpl; import org.apache.eventmesh.api.consumer.Consumer; +import org.apache.eventmesh.common.Constants; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Properties; +import java.util.Set; import io.cloudevents.CloudEvent; public class StorageConsumer implements Consumer { - private StorageConnector storageOperation; + private StorageConnector storageOperation; - @Override - public boolean isStarted() { - return storageOperation.isStarted(); - } + private EventListener listener; - @Override - public boolean isClosed() { - return storageOperation.isClosed(); - } + private Properties keyValue; - @Override - public void start() { - storageOperation.start(); - } + private Set<String> topicSet = new HashSet<>(); - @Override - public void shutdown() { - storageOperation.shutdown(); - } + @Override + public boolean isStarted() { + return storageOperation.isStarted(); + } - @Override - public void init(Properties keyValue) throws Exception { - storageOperation.init(keyValue); - } + @Override + public boolean isClosed() { + return storageOperation.isClosed(); + } - @Override - public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) { - storageOperation.updateOffset(cloudEvents, context); - } + @Override + public void start() { + if (Objects.isNull(listener)) { - @Override - public void subscribe(String topic) throws Exception { + } - } + if (topicSet.isEmpty()) { - @Override - public void unsubscribe(String topic) { + } - } + StorageConnectorService storageConnectorService = StorageConnectorService.getInstance(); + List<PullRequest> pullRequestList = new ArrayList<PullRequest>(topicSet.size()); + for (String topic : topicSet) { + PullRequest pullRequest = new PullRequest(); + pullRequest.setTopicName(topic); + pullRequest.setConsumerGroupName(keyValue.getProperty(Constants.CONSUMER_GROUP)); + PullCallbackImpl pullCallback = new PullCallbackImpl(); + pullCallback.setEventListener(listener); + pullCallback.setExecutor(storageConnectorService.getExecutor()); + pullRequest.setPullCallback(pullCallback); + pullRequestList.add(pullRequest); + } + storageOperation = storageConnectorService.createConsumerByStorageConnector(this.keyValue, pullRequestList); - @Override - public void registerEventListener(EventListener listener) { + } - } + @Override + public void shutdown() { + storageOperation.shutdown(); + } + + @Override + public void init(Properties keyValue) throws Exception { + this.keyValue = keyValue; + } + + @Override + public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) { + storageOperation.updateOffset(cloudEvents, context); + } + + @Override + public void subscribe(String topic) throws Exception { + topicSet.add(topic); + } + + @Override + public void unsubscribe(String topic) { + topicSet.remove(topic); + } + + @Override + public void registerEventListener(EventListener listener) { + this.listener = listener; + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageProducer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageProducer.java index e43be785f..12ab9ea15 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageProducer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageProducer.java @@ -27,6 +27,8 @@ import io.cloudevents.CloudEvent; public class StorageProducer implements Producer { private StorageConnector storageOperation; + + private Properties keyValue; @Override public boolean isStarted() { @@ -40,7 +42,8 @@ public class StorageProducer implements Producer { @Override public void start() { - storageOperation.start(); + StorageConnectorService storageConnectorService = StorageConnectorService.getInstance(); + storageOperation = storageConnectorService.createProducerByStorageConnector(this.keyValue); } @Override @@ -50,7 +53,7 @@ public class StorageProducer implements Producer { @Override public void init(Properties keyValue) throws Exception { - storageOperation.init(keyValue); + this.keyValue = keyValue; } @@ -76,7 +79,7 @@ public class StorageProducer implements Producer { @Override public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception { - return false; + return storageOperation.reply(cloudEvent, sendCallback); } @Override diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java index 48e712e6a..9e10147c9 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/PullRequest.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.api.connector.storage.data; +import org.apache.eventmesh.api.EventListener; import org.apache.eventmesh.api.connector.storage.StorageConnector; import org.apache.eventmesh.api.connector.storage.pull.PullCallback; @@ -33,48 +34,48 @@ import lombok.Data; @Data public class PullRequest { - private static final AtomicLong INCREASING_ID = new AtomicLong(); + private static final AtomicLong INCREASING_ID = new AtomicLong(); - private long id = INCREASING_ID.incrementAndGet(); + private long id = INCREASING_ID.incrementAndGet(); - private String topicName; + private String topicName; - private String consumerGroupName; + private String consumerGroupName; - private String nextId; + private String nextId = "0"; - private String processSign; + private String processSign; - private StorageConnector storageConnector; + private StorageConnector storageConnector; - private AtomicBoolean isEliminate = new AtomicBoolean(true); + private AtomicBoolean isEliminate = new AtomicBoolean(true); - private AtomicInteger stock = new AtomicInteger(); + private AtomicInteger stock = new AtomicInteger(); - private PullCallback pullCallback; + private PullCallback pullCallback; - private List<PullRequest> pullRequests; + private List<PullRequest> pullRequests; - private Map<String, PullRequest> topicAndPullRequests; + private Map<String, PullRequest> topicAndPullRequests; - public synchronized void setPullRequests(List<PullRequest> pullRequests) { - this.pullRequests = pullRequests; - this.topicAndPullRequests = null; - } + public synchronized void setPullRequests(List<PullRequest> pullRequests) { + this.pullRequests = pullRequests; + this.topicAndPullRequests = null; + } - public List<PullRequest> getPullRequests() { - List<PullRequest> pullRequests = this.pullRequests; - return pullRequests; - } + public List<PullRequest> getPullRequests() { + List<PullRequest> pullRequests = this.pullRequests; + return pullRequests; + } - public Map<String, PullRequest> getTopicAndPullRequests() { - if (Objects.isNull(this.topicAndPullRequests)) { - Map<String, PullRequest> map = new HashMap<>(); - for (PullRequest pullRequest : pullRequests) { - map.put(pullRequest.getTopicName(), pullRequest); - } - this.topicAndPullRequests = map; - } - return this.topicAndPullRequests; - } + public Map<String, PullRequest> getTopicAndPullRequests() { + if (Objects.isNull(this.topicAndPullRequests)) { + Map<String, PullRequest> map = new HashMap<>(); + for (PullRequest pullRequest : pullRequests) { + map.put(pullRequest.getTopicName(), pullRequest); + } + this.topicAndPullRequests = map; + } + return this.topicAndPullRequests; + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/TopicInfo.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/TopicInfo.java index 310af9f98..098c43118 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/TopicInfo.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/data/TopicInfo.java @@ -28,6 +28,8 @@ import lombok.Data; public class TopicInfo { private String topicName; + + private String dbTablesName; private int writeQueueNums; diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java index c11c346ff..601eba94a 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/RouteHandler.java @@ -19,20 +19,37 @@ package org.apache.eventmesh.api.connector.storage.metadata; import org.apache.eventmesh.api.connector.storage.StorageConnector; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import lombok.Setter; public class RouteHandler { @Setter - List<StorageConnector> storageConnector; + List<StorageConnector> storageConnector = new ArrayList<>(); + private RouteSelect souteSelect = new PollRouteSelect(); - private RouteSelect souteSelect; - + public void addStorageConnector(StorageConnector storageConnector) { + this.storageConnector.add(storageConnector); + } public StorageConnector select() { return souteSelect.select(storageConnector); } + + + public class PollRouteSelect implements RouteSelect{ + + private AtomicLong index = new AtomicLong(); + + @Override + public StorageConnector select(List<StorageConnector> storageConnector) { + int value = (int)(index.getAndIncrement()%storageConnector.size()); + return storageConnector.get(value); + } + + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java index faaa2f648..2d6d26a01 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/metadata/StorageMetaServcie.java @@ -19,6 +19,7 @@ package org.apache.eventmesh.api.connector.storage.metadata; import org.apache.eventmesh.api.connector.storage.StorageConnector; import org.apache.eventmesh.api.connector.storage.StorageConnectorMetedata; +import org.apache.eventmesh.api.connector.storage.StorageConnectorProxy; import org.apache.eventmesh.api.connector.storage.data.ConsumerGroupInfo; import org.apache.eventmesh.api.connector.storage.data.Metadata; import org.apache.eventmesh.api.connector.storage.data.PullRequest; @@ -31,10 +32,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +46,7 @@ public class StorageMetaServcie { protected static final Logger messageLogger = LoggerFactory.getLogger("message"); - private static final String PROCESS_SIGN = Long.toString(System.currentTimeMillis()); + private static final String PROCESS_SIGN = UUID.randomUUID().toString(); @Setter private ScheduledExecutorService scheduledExecutor; @@ -59,23 +60,42 @@ public class StorageMetaServcie { private Map<StorageConnectorMetedata, Metadata> metaDataMap = new ConcurrentHashMap<>(); public void init() { - scheduledExecutor.scheduleWithFixedDelay(new Runnable() { + /*scheduledExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { StorageMetaServcie.this.pullMeteData(); } - }, 5, 1000, TimeUnit.MILLISECONDS); + }, 5, 1000, TimeUnit.MILLISECONDS);*/ } - public void registerStorageConnector(StorageConnectorMetedata storageConnector) { - metaDataMap.put(storageConnector, new Metadata()); + public void registerStorageConnector(Object storageConnector) { + if(storageConnector instanceof StorageConnectorProxy) { + StorageConnectorProxy storageConnectorProxy = (StorageConnectorProxy)storageConnector; + for(StorageConnector connector : storageConnectorProxy.getStorageConnectorList()) { + if(connector instanceof StorageConnectorMetedata) { + metaDataMap.put((StorageConnectorMetedata)connector, new Metadata()); + } + } + }else { + if(storageConnector instanceof StorageConnectorMetedata) { + metaDataMap.put((StorageConnectorMetedata)storageConnector, new Metadata()); + } + } + } public void registerPullRequest(List<PullRequest> pullRequests, StorageConnector storageConnector) { executor.execute(new Runnable() { @Override public void run() { - StorageMetaServcie.this.doRegisterPullRequest(pullRequests, storageConnector); + if(storageConnector instanceof StorageConnectorProxy) { + StorageConnectorProxy storageConnectorProxy = (StorageConnectorProxy)storageConnector; + for(StorageConnector connector : storageConnectorProxy.getStorageConnectorList()) { + StorageMetaServcie.this.doRegisterPullRequest(pullRequests, connector); + } + }else { + StorageMetaServcie.this.doRegisterPullRequest(pullRequests, storageConnector); + } } }); @@ -83,6 +103,11 @@ public class StorageMetaServcie { public void doRegisterPullRequest(List<PullRequest> pullRequests, StorageConnector storageConnector) { try { + if(Objects.isNull(pullRequests) || pullRequests.isEmpty()) { + //TODO + return; + } + StorageConnectorMetedata storageConnectorMetedata = null; if (storageConnector instanceof StorageConnectorMetedata) { storageConnectorMetedata = (StorageConnectorMetedata) storageConnector; @@ -92,29 +117,31 @@ public class StorageMetaServcie { Set<String> topicSet = new HashSet<>(); Map<String, TopicInfo> topicInfoMap = new HashMap<>(); if (Objects.nonNull(storageConnectorMetedata)) { - List<ConsumerGroupInfo> consumerGroupInfos = storageConnectorMetedata.getConsumerGroupInfo(); - consumerGroupInfos.forEach(value -> consumerGroupInfoMap.put(value.getConsumerGroupName(), value)); + //List<ConsumerGroupInfo> consumerGroupInfos = storageConnectorMetedata.getConsumerGroupInfo(); + //consumerGroupInfos.forEach(value -> consumerGroupInfoMap.put(value.getConsumerGroupName(), value)); topicSet = storageConnectorMetedata.getTopic(); - storageConnectorMetedata.geTopicInfos(pullRequests) + storageConnectorMetedata.geTopicInfos(topicSet,pullRequests.get(0).getConsumerGroupName()) .forEach(value -> topicInfoMap.put(value.getTopicName(), value)); } for (PullRequest pullRequest : pullRequests) { if (Objects.nonNull(storageConnectorMetedata) && !topicSet.contains(pullRequest.getTopicName())) { - try { - if (!topicSet.contains(pullRequest.getTopicName())) { + String topic = pullRequest.getTopicName(); + if (!topicSet.contains(topic)) { TopicInfo topicInfo = new TopicInfo(); + topicInfo.setTopicName(pullRequest.getTopicName()); storageConnectorMetedata.createTopic(topicInfo); + }else { + TopicInfo topicInfo = topicInfoMap.get(topic); + if(Objects.nonNull(topicInfo)) { + pullRequest.setNextId(Long.toString(topicInfo.getCurrentId())); + } } - if (!consumerGroupInfoMap.containsKey(pullRequest.getConsumerGroupName())) { + if (consumerGroupInfoMap.containsKey(pullRequest.getConsumerGroupName())) { ConsumerGroupInfo consumerGroupInfo = new ConsumerGroupInfo(); - storageConnectorMetedata.createConsumerGroupInfo(consumerGroupInfo); + consumerGroupInfo.setConsumerGroupName(pullRequest.getConsumerGroupName()); + //storageConnectorMetedata.createConsumerGroupInfo(consumerGroupInfo); } - TopicInfo topicInfo = topicInfoMap.get(pullRequest.getTopicName()); - pullRequest.setNextId(Long.toString(topicInfo.getCurrentId())); - } catch (Exception e) { - - } - + } pullRequest.setProcessSign(PROCESS_SIGN); pullRequest.setStorageConnector(storageConnector); @@ -125,6 +152,7 @@ public class StorageMetaServcie { } } + public void pullMeteData() { for (StorageConnectorMetedata storageConnectorMetedata : metaDataMap.keySet()) { executor.execute(new Runnable() { @@ -148,7 +176,11 @@ public class StorageMetaServcie { public boolean isTopic(StorageConnector storageConnector, String topic) { if (storageConnector instanceof StorageConnectorMetedata) { - return metaDataMap.get((StorageConnectorMetedata) storageConnector).getTopicSet().contains(topic); + Metadata metadata = metaDataMap.get((StorageConnectorMetedata) storageConnector); + if(Objects.isNull(metadata) || metadata.getTopicSet().isEmpty()) { + return false; + } + return metadata.getTopicSet().contains(storageConnector.getTopic(topic)); } return true; diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallbackImpl.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallbackImpl.java index 4d668c83d..ab0592284 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallbackImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/PullCallbackImpl.java @@ -28,7 +28,9 @@ import java.util.concurrent.Executor; import io.cloudevents.CloudEvent; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +@Slf4j(topic = "message") public class PullCallbackImpl implements PullCallback { @Setter @@ -55,7 +57,7 @@ public class PullCallbackImpl implements PullCallback { eventListener.consume(cloudEvent, context); pullRequest.getStock().decrementAndGet(); } catch (Exception e) { - e.printStackTrace(); + log.error(e.getMessage(),e); } } }); @@ -63,7 +65,7 @@ public class PullCallbackImpl implements PullCallback { } } catch (Exception e) { - e.printStackTrace(); + log.error(e.getMessage(),e); } } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullService.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullService.java index a3ada5f38..df906daee 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullService.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullService.java @@ -45,6 +45,7 @@ public class StoragePullService implements Runnable { private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>(); + @Setter private ScheduledExecutorService scheduledExecutor; @Setter @@ -54,7 +55,7 @@ public class StoragePullService implements Runnable { private Executor executor; private boolean isStopped() { - return true; + return false; } public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { @@ -86,7 +87,7 @@ public class StoragePullService implements Runnable { while (!this.isStopped()) { try { final PullRequest pullRequest = this.pullRequestQueue.take(); - if (pullRequest.getStock().get() < this.storageConfig.getPullThresholdForQueue()) { + if (pullRequest.getStock().get() > this.storageConfig.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, storageConfig.getPullInterval()); continue; } @@ -109,10 +110,10 @@ public class StoragePullService implements Runnable { try { List<CloudEvent> cloudEventList = pullRequest.getStorageConnector().pull(pullRequest); if (Objects.isNull(cloudEventList) || cloudEventList.isEmpty()) { - logger.info(""); + logger.info("pull resquest get data is null , consumerGroupName is {} , topicName is {}",pullRequest.getConsumerGroupName(),pullRequest.getTopicName()); return; } - if (Objects.nonNull(pullRequest.getPullRequests())) { + if (Objects.isNull(pullRequest.getPullRequests())) { this.setNextId(pullRequest, cloudEventList); pullRequest.getPullCallback().onSuccess(pullRequest, cloudEventList); @@ -135,7 +136,7 @@ public class StoragePullService implements Runnable { } } } catch (Exception e) { - logger.error("pull", e); + logger.error(e.getMessage(), e); } finally { this.executePullRequestLater(pullRequest, storageConfig.getPullInterval()); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java index 1d93ed4f0..8c37e0c18 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationService.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.api.connector.storage.reply; +import org.apache.eventmesh.api.connector.storage.CloudEventUtils; import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo; import java.util.ArrayList; @@ -30,101 +31,120 @@ import java.util.concurrent.Executor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.cloudevents.CloudEvent; import lombok.Setter; +/** + * + * @author laohu + * + */ public class ReplyOperationService { - protected static final Logger messageLogger = LoggerFactory.getLogger("message"); - - @Setter - private Executor executor; - - protected Map<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> replyOperationMap = new ConcurrentHashMap<>(); - - public void setRequestReplyInfo(ReplyOperation replyOperation, String topic, Long id, - RequestReplyInfo requestReplyInfo) { - Map<String, Map<Long, RequestReplyInfo>> replyMap = replyOperationMap.get(replyOperation); - if (Objects.isNull(replyMap)) { - replyMap = replyOperationMap.computeIfAbsent(replyOperation, k -> new ConcurrentHashMap<>()); - } - Map<Long, RequestReplyInfo> requestReplyInfoMap = replyMap.get(topic); - if (Objects.isNull(requestReplyInfoMap)) { - requestReplyInfoMap = replyMap.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()); - } - requestReplyInfoMap.put(id, requestReplyInfo); - } - - public void reply(ReplyOperation replyOperation, Map<String, Map<Long, RequestReplyInfo>> replyMap) { - if (replyMap.isEmpty()) { - return; - } - long time = System.currentTimeMillis(); - List<ReplyRequest> replyRequestList = new ArrayList<>(); - for (Entry<String, Map<Long, RequestReplyInfo>> entry : replyMap.entrySet()) { - if (entry.getValue().isEmpty()) { - continue; - } - ReplyRequest replyRequest = new ReplyRequest(); - List<Long> list = new ArrayList<>(); - for (Entry<Long, RequestReplyInfo> entry2 : entry.getValue().entrySet()) { - if (entry2.getValue().getTimeOut() > time) { - list.add(entry2.getKey()); - } else { - replyMap.remove(entry.getKey()); - messageLogger.warn(""); - RuntimeException runtimeException = new RuntimeException(); - entry2.getValue().getRequestReplyCallback().onException(runtimeException); - } - } - replyRequest.setTopic(entry.getKey()); - replyRequest.setIdList(list); - replyRequestList.add(replyRequest); - } - if (replyRequestList.isEmpty()) { - messageLogger.info(""); - return; - } - try { - List<CloudEventInfo> cloudEventList = replyOperation.queryReplyCloudEvent(replyRequestList); - if (cloudEventList.isEmpty()) { - messageLogger.warn(""); - } - for (CloudEventInfo cloudEventInfo : cloudEventList) { - RequestReplyInfo replyInfo = null; - try { - cloudEventInfo.getCloudEventInfoId(); - - replyInfo = replyMap.get(cloudEventInfo.getCloudEventTopic()).remove(Long.valueOf(cloudEventInfo.getCloudEventInfoId())); - if (Objects.isNull(replyInfo)) { - continue; - } - cloudEventInfo.getCloudEventReplyData(); - replyInfo.getRequestReplyCallback().onSuccess(null); - } catch (Exception e) { - if (Objects.nonNull(replyInfo)) { - replyInfo.getRequestReplyCallback().onException(e); - } - messageLogger.error(e.getMessage(), e); - } - } - } catch (Exception e) { - messageLogger.error(e.getMessage(), e); - } - } - - public void execute() { - if (replyOperationMap.isEmpty()) { - return; - } - for (Entry<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> entry : replyOperationMap.entrySet()) { - executor.execute(new Runnable() { - @Override - public void run() { - reply(entry.getKey(), entry.getValue()); - } - }); - - } - - } + protected static final Logger messageLogger = LoggerFactory.getLogger("message"); + + @Setter + private Executor executor; + + protected Map<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> replyOperationMap = new ConcurrentHashMap<>(); + + public void setRequestReplyInfo(ReplyOperation replyOperation, String topic, Long id, + RequestReplyInfo requestReplyInfo) { + Map<String, Map<Long, RequestReplyInfo>> replyMap = replyOperationMap.get(replyOperation); + if (Objects.isNull(replyMap)) { + replyMap = replyOperationMap.computeIfAbsent(replyOperation, k -> new ConcurrentHashMap<>()); + } + Map<Long, RequestReplyInfo> requestReplyInfoMap = replyMap.get(topic); + if (Objects.isNull(requestReplyInfoMap)) { + requestReplyInfoMap = replyMap.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()); + } + requestReplyInfoMap.put(id, requestReplyInfo); + } + + public List<ReplyRequest> checkReply(Map<String, Map<Long, RequestReplyInfo>> replyMap) { + long time = System.currentTimeMillis(); + List<ReplyRequest> replyRequestList = new ArrayList<>(); + for (Entry<String, Map<Long, RequestReplyInfo>> entry : replyMap.entrySet()) { + if (entry.getValue().isEmpty()) { + continue; + } + ReplyRequest replyRequest = new ReplyRequest(); + List<Long> list = new ArrayList<>(); + for (Entry<Long, RequestReplyInfo> entry2 : entry.getValue().entrySet()) { + if (entry2.getValue().getTimeOut() > time) { + list.add(entry2.getKey()); + } else { + entry.getValue().remove(entry2.getKey()); + messageLogger.warn(""); + RuntimeException runtimeException = new RuntimeException(); + entry2.getValue().getRequestReplyCallback().onException(runtimeException); + } + } + if (!list.isEmpty()) { + replyRequest.setTopic(entry.getKey()); + replyRequest.setIdList(list); + replyRequestList.add(replyRequest); + } + } + return replyRequestList; + } + + public void callback(List<CloudEventInfo> cloudEventList, Map<String, Map<Long, RequestReplyInfo>> replyMap) { + for (CloudEventInfo cloudEventInfo : cloudEventList) { + RequestReplyInfo replyInfo = null; + try { + replyInfo = replyMap.get(cloudEventInfo.getCloudEventTopic()) + .remove(Long.valueOf(cloudEventInfo.getCloudEventInfoId())); + if (Objects.isNull(replyInfo)) { + continue; + } + CloudEvent cloudEvent = CloudEventUtils.eventFormat + .deserialize(cloudEventInfo.getCloudEventReplyData().getBytes("UTF-8")); + replyInfo.getRequestReplyCallback().onSuccess(cloudEvent); + } catch (Exception e) { + if (Objects.nonNull(replyInfo)) { + replyInfo.getRequestReplyCallback().onException(e); + } + messageLogger.error(e.getMessage(), e); + } + } + } + + public void reply(ReplyOperation replyOperation, Map<String, Map<Long, RequestReplyInfo>> replyMap) { + + List<ReplyRequest> replyRequestList = this.checkReply(replyMap); + if (replyRequestList.isEmpty()) { + messageLogger.info(""); + return; + } + try { + List<CloudEventInfo> cloudEventList = replyOperation.queryReplyCloudEvent(replyRequestList); + if (cloudEventList.isEmpty()) { + messageLogger.warn(""); + return; + } + this.callback(cloudEventList, replyMap); + } catch (Exception e) { + messageLogger.error(e.getMessage(), e); + } + } + + public void execute() { + if (replyOperationMap.isEmpty()) { + return; + } + for (Entry<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> entry : replyOperationMap.entrySet()) { + if (entry.getValue().isEmpty()) { + continue; + } + executor.execute(new Runnable() { + @Override + public void run() { + reply(entry.getKey(), entry.getValue()); + } + }); + + } + + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.connector.ConnectorResourceService similarity index 90% copy from eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector copy to eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.connector.ConnectorResourceService index 04d1a953b..ad95b7c4d 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.connector.ConnectorResourceService @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -jdbc=JDBCStorageConnector \ No newline at end of file +storage=org.apache.eventmesh.api.connector.storage.ConnectorResourceServiceStorageImpl \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer similarity index 92% copy from eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector copy to eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer index 04d1a953b..a699a16a8 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.Consumer @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -jdbc=JDBCStorageConnector \ No newline at end of file +storage=org.apache.eventmesh.api.connector.storage.StorageConsumer \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.Producer similarity index 92% copy from eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector copy to eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.Producer index 04d1a953b..ab8d6dc66 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.Producer @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -jdbc=JDBCStorageConnector \ No newline at end of file +storage=org.apache.eventmesh.api.connector.storage.StorageProducer \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxyTest.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxyTest.java new file mode 100644 index 000000000..c235d95d1 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxyTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api.connector.storage; + +import org.apache.eventmesh.api.RequestReplyCallback; +import org.apache.eventmesh.api.SendCallback; +import org.apache.eventmesh.api.connector.storage.metadata.RouteHandler; +import org.apache.eventmesh.api.connector.storage.metadata.StorageMetaServcie; +import org.apache.eventmesh.api.connector.storage.reply.ReplyOperation; +import org.apache.eventmesh.api.connector.storage.reply.ReplyOperationService; + +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; + +public class StorageConnectorProxyTest { + + private StorageConnectorProxy storageConnectorProxy = new StorageConnectorProxy(); + + private RouteHandler routeHandler = new RouteHandler(); + + private StorageConnector one = Mockito.mock(StorageConnectorMetedataAndConnector.class); + + private StorageConnector two = Mockito.mock(StorageConnectorMetedataAndConnector.class); + + private String key = "127.0.0.1"; + + private Executor executor; + + private StorageMetaServcie storageMetaServcie = Mockito.mock(StorageMetaServcie.class); + + private ReplyOperationService replyOperationService = Mockito.mock(ReplyOperationService.class); + + CloudEvent cloudEvent; + + @Before + public void init() throws URISyntaxException { + this.executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 10, + Runtime.getRuntime().availableProcessors() * 300, 1000 * 60 * 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), new ThreadFactory() { + AtomicInteger index = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "storage-connent-" + index.getAndIncrement()); + } + }); + storageConnectorProxy.setExecutor(executor); + storageConnectorProxy.setStorageMetaServcie(storageMetaServcie); + storageConnectorProxy.setReplyService(replyOperationService); + cloudEvent = CloudEventBuilder.v03().withExtension("cloudeventid", "1").withId("1").withType("1231") + .withSource(new URI("")).withSubject("1").withExtension(Constant.STORAGE_ID, "1").withData("1".getBytes()).build(); + } + + @Test + public void test_routehandler() { + routeHandler.addStorageConnector(one); + routeHandler.addStorageConnector(two); + for (int i = 0; i < 100; i++) { + StorageConnector select = routeHandler.select(); + StorageConnector check = i % 2 == 0 ? one : two; + Assert.assertEquals(select, check); + } + } + + @Test + public void tst_setConnector() { + storageConnectorProxy.setConnector(one, key); + } + + @Test + public void test_publish() throws Exception { + storageConnectorProxy.setConnector(one, key); + + Mockito.when(storageMetaServcie.isTopic(Mockito.any(), Mockito.anyString())).thenReturn(false).thenReturn(true).thenThrow(RuntimeException.class); + SendCallback sendCallback = Mockito.mock(SendCallback.class); + storageConnectorProxy.publish(cloudEvent, sendCallback); + storageConnectorProxy.publish(cloudEvent, sendCallback); + storageConnectorProxy.publish(cloudEvent, sendCallback); + Thread.sleep(10); + Mockito.verify(storageMetaServcie, Mockito.times(3)).isTopic(Mockito.any(), Mockito.any()); + Mockito.verify(sendCallback, Mockito.atLeastOnce()).onException(Mockito.any()); + } + + @Test + public void test_request() throws Exception { + storageConnectorProxy.setConnector(one, key); + RequestReplyCallback requestReplyCallback = Mockito.mock(RequestReplyCallback.class); + storageConnectorProxy.request(cloudEvent, requestReplyCallback, 1000); + Thread.sleep(10); + } + + @Test + public void test() throws UnsupportedEncodingException { + String ddd = URLEncoder.encode("EventMeshTest/consumerGroup-_"); + System.out.println(ddd); + } + + public interface StorageConnectorMetedataAndConnector extends StorageConnectorMetedata,StorageConnector,ReplyOperation{ + + } +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullServiceTest.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullServiceTest.java new file mode 100644 index 000000000..02428dd1f --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/pull/StoragePullServiceTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api.connector.storage.pull; + +import org.apache.eventmesh.api.connector.storage.StorageConfig; +import org.apache.eventmesh.api.connector.storage.StorageConnector; +import org.apache.eventmesh.api.connector.storage.data.PullRequest; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import io.cloudevents.CloudEvent; + +public class StoragePullServiceTest { + + private StorageConfig storageConfig; + + private Executor executor; + + private ScheduledExecutorService scheduledExecutor; + + private StoragePullService storagePullService = new StoragePullService(); + + private PullRequest pullRequest = new PullRequest(); + + @Before + public void init() throws Exception { + this.executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 10, + Runtime.getRuntime().availableProcessors() * 300, 1000 * 60 * 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), new ThreadFactory() { + AtomicInteger index = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "storage-connent-" + index.getAndIncrement()); + } + }); + this.scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 10, + new ThreadFactory() { + AtomicInteger index = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "storage-connent-shceduled-" + index.getAndIncrement()); + } + }); + storageConfig = new StorageConfig(); + storagePullService.setStorageConfig(storageConfig); + storagePullService.setExecutor(executor); + storagePullService.setScheduledExecutor(scheduledExecutor); + + StorageConnector storageConnector = Mockito.mock(StorageConnector.class); + List<CloudEvent> cloudEventList = new ArrayList<>(); + cloudEventList.add(Mockito.mock(CloudEvent.class)); + Mockito.when(storageConnector.pull(Mockito.any())).thenReturn(null).thenReturn(new ArrayList<>()) + .thenReturn(cloudEventList); + pullRequest.setStorageConnector(storageConnector); + pullRequest.setPullCallback(Mockito.mock(PullCallback.class)); + + } + + @Test + public void test_run_thread() { + this.executor.execute(storagePullService); + this.storagePullService.executePullRequestImmediately(pullRequest); + System.out.println(1); + } +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationServiceTest.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationServiceTest.java new file mode 100644 index 000000000..87c0645b2 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/test/java/org/apache/eventmesh/api/connector/storage/reply/ReplyOperationServiceTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api.connector.storage.reply; + +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.eventmesh.api.RequestReplyCallback; +import org.apache.eventmesh.api.connector.storage.CloudEventUtils; +import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo; + +import java.lang.reflect.Field; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; + +public class ReplyOperationServiceTest { + + private ReplyOperationService replyOperationService = new ReplyOperationService(); + + private List<ReplyOperation> replyOperationList = new ArrayList<>(); + + private List<String> topicList = new ArrayList<>(); + + private ReplyOperation replyOperation = Mockito.mock(ReplyOperation.class); + + private ReplyOperation twoReplyOperation = Mockito.mock(ReplyOperation.class); + + private RequestReplyCallback requestReplyCallback = Mockito.mock(RequestReplyCallback.class); + + private String topic1 = "topic1"; + + private String topic2 = "topic2"; + + private String topic3 = "topic3"; + + private Map<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>> replyOperationMap; + + @SuppressWarnings("unchecked") + @Before + public void init() throws IllegalArgumentException, IllegalAccessException { + replyOperationList.add(replyOperation); + replyOperationList.add(twoReplyOperation); + topicList.add(topic1); + topicList.add(topic2); + topicList.add(topic3); + Field field = FieldUtils.getField(ReplyOperationService.class, "replyOperationMap", true); + field.setAccessible(true); + replyOperationMap = (Map<ReplyOperation, Map<String, Map<Long, RequestReplyInfo>>>) field + .get(replyOperationService); + } + + @Test + public void setRequestReplyInfo_test() { + for (ReplyOperation replyOperation : replyOperationList) { + for (String topic : topicList) { + for (long i = 0; i < 20; i++) { + replyOperationService.setRequestReplyInfo(replyOperation, topic, i, new RequestReplyInfo()); + } + } + } + for (ReplyOperation replyOperation : replyOperationList) { + Map<String, Map<Long, RequestReplyInfo>> requestReplyInfoMap = replyOperationMap.get(replyOperation); + Assert.assertEquals(topicList.size(), requestReplyInfoMap.size()); + for (String topic : topicList) { + Map<Long, RequestReplyInfo> map = requestReplyInfoMap.get(topic); + Assert.assertEquals(20, map.size()); + } + } + } + + private Map<String, Map<Long, RequestReplyInfo>> createReplyMap(boolean timeout) { + long time = System.currentTimeMillis(); + Map<String, Map<Long, RequestReplyInfo>> replyMap = new ConcurrentHashMap<>(); + for (String topic : topicList) { + Map<Long, RequestReplyInfo> requestReplyInfoMap = new ConcurrentHashMap<>(); + replyMap.put(topic, requestReplyInfoMap); + for (long i = 0; i < 10; i++) { + RequestReplyInfo requestReplyInfo = new RequestReplyInfo(); + if (timeout) { + requestReplyInfo.setTimeOut(time - 1000); + } else { + requestReplyInfo.setTimeOut(time + 4000); + } + requestReplyInfo.setRequestReplyCallback(requestReplyCallback); + requestReplyInfoMap.put(i, requestReplyInfo); + } + } + return replyMap; + } + + @Test + public void checkReply_timeout_result_null() { + List<ReplyRequest> replyRequests = replyOperationService.checkReply(this.createReplyMap(true)); + Assert.assertTrue(replyRequests.isEmpty()); + Mockito.verify(requestReplyCallback, Mockito.times(30)).onException(Mockito.any()); + } + + @Test + public void checkReply_timeout_result_ok() { + List<ReplyRequest> replyRequests = replyOperationService.checkReply(this.createReplyMap(false)); + for (ReplyRequest replyRequest : replyRequests) { + Assert.assertEquals(replyRequest.getIdList().size(), 10); + } + } + + @Test + public void callback_onSuccess() throws URISyntaxException { + CloudEvent cloudEvent = CloudEventBuilder.v03().withExtension("cloudeventid", "1").withId("1").withType("1231") + .withSource(new URI("")).withSubject("1").withExtension("id", "1").withData("1".getBytes()).build(); + String cloudEventData = new String(CloudEventUtils.eventFormat.serialize(cloudEvent), Charset.forName("UTF-8")); + List<CloudEventInfo> cloudEventList = new ArrayList<>(); + for (String topic : topicList) { + for (long i = 0; i < 10; i++) { + CloudEventInfo cloudEventInfo = new CloudEventInfo(); + cloudEventInfo.setCloudEventTopic(topic); + cloudEventInfo.setCloudEventInfoId(i); + cloudEventInfo.setCloudEventReplyData(cloudEventData); + cloudEventList.add(cloudEventInfo); + } + } + Map<String, Map<Long, RequestReplyInfo>> replyMap = this.createReplyMap(false); + + replyOperationService.callback(cloudEventList, replyMap); + } +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java index 4e9194de2..2f658181e 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnector.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.connector.storage.jdbc; +import org.apache.eventmesh.api.connector.storage.Constant; import org.apache.eventmesh.connector.storage.jdbc.SQL.BaseSQLOperation; import org.apache.eventmesh.connector.storage.jdbc.SQL.CloudEventSQLOperation; import org.apache.eventmesh.connector.storage.jdbc.SQL.ConsumerGroupSQLOperation; @@ -38,105 +39,103 @@ import com.alibaba.druid.pool.DruidPooledConnection; public abstract class AbstractJDBCStorageConnector { - protected static final Logger messageLogger = LoggerFactory.getLogger("message"); - - protected DruidDataSource druidDataSource; - - protected CloudEventSQLOperation cloudEventSQLOperation; - - protected BaseSQLOperation baseSQLOperation; - - protected ConsumerGroupSQLOperation consumerGroupSQLOperation; - - - public void init(Properties properties) throws Exception { - StorageSQLService storageSQLService = new StorageSQLService(""); - this.cloudEventSQLOperation = storageSQLService.getObject(); - this.baseSQLOperation = storageSQLService.getObject(); - this.consumerGroupSQLOperation = storageSQLService.getObject(); - this.initdatabases(properties); - this.createDataSource(properties); - } - - protected void createDataSource(Properties properties) throws Exception { - - druidDataSource = new DruidDataSource(); - druidDataSource.setUrl(properties.getProperty("url")); - druidDataSource.setUsername(properties.getProperty("username")); - druidDataSource.setPassword(properties.getProperty("password")); - druidDataSource.setValidationQuery("select 1"); - druidDataSource.setMaxActive(Integer.parseInt(properties.getProperty("maxActive"))); - druidDataSource.setMaxWait(Integer.parseInt(properties.getProperty("maxWait"))); - druidDataSource.init(); - } - - protected void initdatabases(Properties properties) throws SQLException { - //TODO - druidDataSource = new DruidDataSource(); - druidDataSource.setUrl(properties.getProperty("url")); - druidDataSource.setUsername(properties.getProperty("username")); - druidDataSource.setPassword(properties.getProperty("password")); - druidDataSource.setValidationQuery("select 1"); - druidDataSource.setMaxActive(Integer.parseInt(properties.getProperty("maxActive"))); - druidDataSource.setMaxWait(Integer.parseInt(properties.getProperty("maxWait"))); - druidDataSource.init(); - - List<String> tableName = this.query(this.baseSQLOperation.queryConsumerGroupTableSQL(), ResultSetTransformUtils::transformTableName); - if (Objects.isNull(tableName) || tableName.isEmpty()) { - // create databases - this.execute(this.baseSQLOperation.createDatabases(), null); - // create tables; - this.execute(this.consumerGroupSQLOperation.createConsumerGroupSQL(), null); - } - } - - protected long execute(String sql, List<Object> parameter) throws SQLException { - return this.execute(sql, parameter, false); - } - - protected long execute(String sql, List<Object> parameter, boolean generatedKeys) throws SQLException { - try (DruidPooledConnection pooledConnection = druidDataSource.getConnection(); - PreparedStatement preparedStatement = pooledConnection.prepareStatement(sql)) { - this.setObject(preparedStatement, parameter); - long value = preparedStatement.executeUpdate(); - if (generatedKeys) { - try (ResultSet resulSet = preparedStatement.getGeneratedKeys()) { - resulSet.next(); - value = resulSet.getLong(1); - } - } - return value; - } - } - - protected <T> List<T> query(String sql, ResultSetTransform<T> resultSetTransform) throws SQLException { - return this.query(sql, null, resultSetTransform); - } - - @SuppressWarnings("unchecked") - protected <T> List<T> query(String sql, List<?> parameter, ResultSetTransform<T> resultSetTransform) - throws SQLException { - try (DruidPooledConnection pooledConnection = druidDataSource.getConnection(); - PreparedStatement preparedStatement = pooledConnection.prepareStatement(sql)) { - this.setObject(preparedStatement, parameter); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - List<Object> resultList = new ArrayList<>(); - while (resultSet.next()) { - Object object = resultSetTransform.transform(resultSet); - resultList.add(object); - } - return (List<T>) resultList; - } - } - } - - protected void setObject(PreparedStatement preparedStatement, List<?> parameter) throws SQLException { - if (Objects.isNull(parameter) || parameter.isEmpty()) { - return; - } - int index = 1; - for (Object object : parameter) { - preparedStatement.setObject(index++, object); - } - } + protected static final Logger messageLogger = LoggerFactory.getLogger("message"); + + protected DruidDataSource druidDataSource; + + protected CloudEventSQLOperation cloudEventSQLOperation; + + protected BaseSQLOperation baseSQLOperation; + + protected ConsumerGroupSQLOperation consumerGroupSQLOperation; + + public void init(Properties properties) throws Exception { + StorageSQLService storageSQLService = new StorageSQLService(properties.getProperty(Constant.STORAGE_CONFIG_JDBC_TYPE)); + this.cloudEventSQLOperation = storageSQLService.getObject(); + this.baseSQLOperation = storageSQLService.getObject(); + this.consumerGroupSQLOperation = storageSQLService.getObject(); + this.initdatabases(properties, "information_schema"); + this.createDataSource(properties, "event_mesh_storage"); + } + + protected void createDataSource(Properties properties, String databases) throws Exception { + druidDataSource = new DruidDataSource(); + druidDataSource.setUrl(this.createUrl(properties, databases)); + druidDataSource.setUsername(properties.getProperty(Constant.STORAGE_CONFIG_USER_NAME)); + druidDataSource.setPassword(properties.getProperty(Constant.STORAGE_CONFIG_PASSWORD)); + druidDataSource.setValidationQuery("select 1"); + druidDataSource.setMaxActive(Integer.parseInt(properties.getProperty(Constant.STORAGE_CONFIG_JDBC_MAXACTIVE))); + druidDataSource.setMaxWait(Integer.parseInt(properties.getProperty(Constant.STORAGE_CONFIG_JDBC_MAXWAIT))); + druidDataSource.init(); + } + + private String createUrl(Properties properties, String databases) { + StringBuffer stringBuffer = new StringBuffer("jdbc:"); + stringBuffer.append(properties.getProperty(Constant.STORAGE_CONFIG_JDBC_TYPE)).append("://") + .append(properties.get(Constant.STORAGE_NODE_ADDRESS)).append("/").append(databases).append("?") + .append(properties.get(Constant.STORAGE_CONFIG_JDBC_PARAMETER)); + return stringBuffer.toString(); + } + + protected void initdatabases(Properties properties, String databases) throws Exception { + this.createDataSource(properties, databases); + List<String> tableName = this.query(this.baseSQLOperation.queryDataBases(), + ResultSetTransformUtils::transformTableName); + if (Objects.isNull(tableName) || tableName.isEmpty()) { + this.execute(this.baseSQLOperation.createDatabases(), null); + } + // create tables; + // this.execute(this.consumerGroupSQLOperation.createConsumerGroupSQL(), null); + } + + protected long execute(String sql, List<Object> parameter) throws SQLException { + return this.execute(sql, parameter, false); + } + + protected long execute(String sql, List<Object> parameter, boolean generatedKeys) throws SQLException { + try (DruidPooledConnection pooledConnection = druidDataSource.getConnection(); + PreparedStatement preparedStatement = pooledConnection.prepareStatement(sql, + PreparedStatement.RETURN_GENERATED_KEYS)) { + this.setObject(preparedStatement, parameter); + long value = preparedStatement.executeUpdate(); + if (generatedKeys) { + try (ResultSet resulSet = preparedStatement.getGeneratedKeys()) { + resulSet.next(); + value = resulSet.getLong(1); + } + } + return value; + } + } + + protected <T> List<T> query(String sql, ResultSetTransform<T> resultSetTransform) throws SQLException { + return this.query(sql, null, resultSetTransform); + } + + @SuppressWarnings("unchecked") + protected <T> List<T> query(String sql, List<?> parameter, ResultSetTransform<T> resultSetTransform) + throws SQLException { + try (DruidPooledConnection pooledConnection = druidDataSource.getConnection(); + PreparedStatement preparedStatement = pooledConnection.prepareStatement(sql)) { + this.setObject(preparedStatement, parameter); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + List<Object> resultList = new ArrayList<>(); + while (resultSet.next()) { + Object object = resultSetTransform.transform(resultSet); + resultList.add(object); + } + return (List<T>) resultList; + } + } + } + + protected void setObject(PreparedStatement preparedStatement, List<?> parameter) throws SQLException { + if (Objects.isNull(parameter) || parameter.isEmpty()) { + return; + } + int index = 1; + for (Object object : parameter) { + preparedStatement.setObject(index++, object); + } + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnectorMetadata.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnectorMetadata.java index 14e3f3153..1af0d98fa 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnectorMetadata.java +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/AbstractJDBCStorageConnectorMetadata.java @@ -17,12 +17,12 @@ package org.apache.eventmesh.connector.storage.jdbc; +import org.apache.eventmesh.api.connector.storage.CloudEventUtils; import org.apache.eventmesh.api.connector.storage.StorageConnectorMetedata; import org.apache.eventmesh.api.connector.storage.data.ConsumerGroupInfo; import org.apache.eventmesh.api.connector.storage.data.PullRequest; import org.apache.eventmesh.api.connector.storage.data.TopicInfo; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -45,17 +45,32 @@ public class AbstractJDBCStorageConnectorMetadata extends AbstractJDBCStorageCon @Override public List<TopicInfo> geTopicInfos(List<PullRequest> pullRequests) throws Exception { StringBuffer sqlsb = new StringBuffer(); - int index = 0; - List<String> tableNames = new ArrayList<>(); + int index = 1; for (PullRequest pullRequest : pullRequests) { - String sql = this.cloudEventSQLOperation.selectLastMessageSQL(pullRequest.getTopicName()); + String sql = this.cloudEventSQLOperation.selectNoConsumptionMessageSQL(pullRequest.getTopicName(),pullRequest.getConsumerGroupName()); sqlsb.append(sql); - if (index < pullRequests.size()) { + if (index++ < pullRequests.size()) { sqlsb.append(" union all "); } - tableNames.add(pullRequest.getTopicName()); } - return this.query(sqlsb.toString(), tableNames, ResultSetTransformUtils::transformTopicInfo); + return this.query(sqlsb.toString(), null, ResultSetTransformUtils::transformTopicInfo); + } + + public List<TopicInfo> geTopicInfos(Set<String> topics,String key) throws Exception { + key = CloudEventUtils.checkConsumerGroupName(key); + StringBuffer sqlsb = new StringBuffer(); + int index = 1; + for (String topic : topics) { + if(topic.startsWith("cloud_event_")) { + topic = topic.substring(12); + } + String sql = this.cloudEventSQLOperation.selectNoConsumptionMessageSQL(topic,key); + sqlsb.append(sql); + if (index++ < topics.size()) { + sqlsb.append(" union all "); + } + } + return this.query(sqlsb.toString(), null, ResultSetTransformUtils::transformTopicInfo); } @Override diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java index 911a26300..18bae1c94 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnector.java @@ -21,114 +21,118 @@ import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.RequestReplyCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.connector.storage.CloudEventUtils; +import org.apache.eventmesh.api.connector.storage.Constant; import org.apache.eventmesh.api.connector.storage.StorageConnector; import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo; import org.apache.eventmesh.api.connector.storage.data.PullRequest; import org.apache.eventmesh.api.connector.storage.reply.ReplyOperation; import org.apache.eventmesh.api.connector.storage.reply.ReplyRequest; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import io.cloudevents.CloudEvent; -public class JDBCStorageConnector extends AbstractJDBCStorageConnectorMetadata implements StorageConnector, ReplyOperation { - - private String getTableName(CloudEvent cloudEvent) { - return cloudEvent.getType(); - } - - @Override - public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception { - String topic = this.getTableName(cloudEvent); - String sql = this.cloudEventSQLOperation.insertCloudEventSQL(topic); - List<Object> parameterList = new ArrayList<>(); - this.execute(sql, parameterList); - } - - @Override - public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { - String topic = this.getTableName(cloudEvent); - String sql = this.cloudEventSQLOperation.insertCloudEventSQL(topic); - List<Object> parameterList = new ArrayList<>(); - this.execute(sql, parameterList); - } - - @Override - public List<CloudEvent> pull(PullRequest pullRequest) throws Exception { - String locationEventSQL = this.cloudEventSQLOperation.locationEventSQL(pullRequest.getTopicName()); - //TODO 1. consumerGroup 2. example_id 3. id 4.consumerGroup - List<Object> parameter = new ArrayList<>(); - - parameter.add(pullRequest.getConsumerGroupName()); - parameter.add(pullRequest.getProcessSign()); - parameter.add(pullRequest.getNextId()); - parameter.add(pullRequest.getConsumerGroupName()); - parameter.clear(); - parameter.add(pullRequest.getNextId()); - parameter.add(pullRequest.getProcessSign()); - - long num = this.execute(locationEventSQL, parameter); - if (num == 0) { - return null; - } - String queryLocationEventSQL = this.cloudEventSQLOperation.queryLocationEventSQL(pullRequest.getTopicName()); - - this.query(queryLocationEventSQL, parameter, ResultSetTransformUtils::transformCloudEvent); - return null; - } - - @Override - public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) { - List<Object> parameterList = new ArrayList<>(cloudEvents.size()); - for (CloudEvent cloudEvent : cloudEvents) { - try { - String topic = this.getTableName(cloudEvent); - String sql = this.cloudEventSQLOperation.updateCloudEventOffsetSQL(topic); - parameterList.add(cloudEvent.getExtension("cloudEventInfoId")); - long i = this.execute(sql, parameterList); - if (i != cloudEvents.size()) { - messageLogger.warn(""); - } - parameterList.clear(); - } catch (Exception e) { - messageLogger.error(e.getMessage(), e); - } - } - } - - - @Override - public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception { - String sql = this.cloudEventSQLOperation.updateCloudEventReplySQL(CloudEventUtils.getTopic(cloudEvent)); - List<Object> parameterList = new ArrayList<>(); - parameterList.add(CloudEventUtils.serializeReplyData(cloudEvent)); - parameterList.add(CloudEventUtils.getId(cloudEvent)); - return this.execute(sql, parameterList) == 1; - } - - - @Override - public void start() { - - } - - @Override - public void shutdown() { - druidDataSource.close(); - } - - @Override - public List<CloudEventInfo> queryReplyCloudEvent(ReplyRequest replyRequest) throws Exception { - List<Object> parameter = new ArrayList<>(); - StringBuffer stringBuffer = new StringBuffer(); - for (int i = 1; i < replyRequest.getIdList().size(); i++) { - stringBuffer.append('?'); - if (i++ != replyRequest.getIdList().size()) { - stringBuffer.append(','); - } - } - String sql = this.cloudEventSQLOperation.selectCloudEventByReplySQL(replyRequest.getTopic(), stringBuffer.toString()); - return this.query(sql, parameter, ResultSetTransformUtils::transformCloudEvent); - } +public class JDBCStorageConnector extends AbstractJDBCStorageConnectorMetadata + implements StorageConnector, ReplyOperation { + + public void init(Properties properties) throws Exception { + super.init(properties); + } + + @Override + public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception { + String topic = CloudEventUtils.getTableName(cloudEvent); + String sql = this.cloudEventSQLOperation.insertCloudEventSQL(topic); + Long id = this.execute(sql, CloudEventUtils.getParameterToCloudEvent(cloudEvent), true); + CloudEventUtils.setValue(cloudEvent, Constant.STORAGE_ID, id.toString()); + } + + @Override + public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception { + this.publish(cloudEvent, null); + } + + @Override + public List<CloudEvent> pull(PullRequest pullRequest) throws Exception { + String locationEventSQL = this.cloudEventSQLOperation.locationEventSQL(pullRequest.getTopicName(), + CloudEventUtils.checkConsumerGroupName(pullRequest.getConsumerGroupName()), pullRequest.getProcessSign(), CloudEventUtils.checkConsumerGroupName(pullRequest.getConsumerGroupName())); + List<Object> parameter = new ArrayList<>(); + parameter.add(pullRequest.getNextId()); + long num = this.execute(locationEventSQL, parameter); + if (num == 0) { + return null; + } + String queryLocationEventSQL = this.cloudEventSQLOperation.queryLocationEventSQL(pullRequest.getTopicName(), + CloudEventUtils.checkConsumerGroupName(pullRequest.getConsumerGroupName()), pullRequest.getProcessSign()); + List<CloudEventInfo> cloudEventInfoList = this.query(queryLocationEventSQL, parameter, + ResultSetTransformUtils::transformCloudEvent); + + List<CloudEvent> cloudEventList = new ArrayList<>(); + if (cloudEventInfoList.size() != 0) { + for (CloudEventInfo cloudEventInfo : cloudEventInfoList) { + CloudEvent cloudEvent = CloudEventUtils.eventFormat + .deserialize(cloudEventInfo.getCloudEventData().getBytes(Charset.forName("UTF-8"))); + cloudEventList.add(cloudEvent); + } + } + pullRequest.setNextId(cloudEventInfoList.get(cloudEventInfoList.size() - 1).getCloudEventInfoId().toString()); + return cloudEventList; + } + + @Override + public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) { + List<Object> parameterList = new ArrayList<>(cloudEvents.size()); + for (CloudEvent cloudEvent : cloudEvents) { + try { + String topic = CloudEventUtils.getTableName(cloudEvent); + String sql = this.cloudEventSQLOperation.updateCloudEventOffsetSQL(topic); + parameterList.add(cloudEvent.getExtension("cloudEventInfoId")); + long i = this.execute(sql, parameterList); + if (i != cloudEvents.size()) { + messageLogger.warn(""); + } + parameterList.clear(); + } catch (Exception e) { + messageLogger.error(e.getMessage(), e); + } + } + } + + @Override + public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception { + String sql = this.cloudEventSQLOperation.updateCloudEventReplySQL(CloudEventUtils.getTopic(cloudEvent)); + List<Object> parameterList = new ArrayList<>(); + parameterList.add(CloudEventUtils.serializeReplyData(cloudEvent)); + parameterList.add(CloudEventUtils.getId(cloudEvent)); + this.execute(sql, parameterList, true); + return true; + } + + @Override + public void start() { + + } + + @Override + public void shutdown() { + druidDataSource.close(); + } + + @Override + public List<CloudEventInfo> queryReplyCloudEvent(ReplyRequest replyRequest) throws Exception { + List<Object> parameter = new ArrayList<>(); + StringBuffer stringBuffer = new StringBuffer(); + for (int i = 1; i < replyRequest.getIdList().size(); i++) { + stringBuffer.append('?'); + if (i++ != replyRequest.getIdList().size()) { + stringBuffer.append(','); + } + } + String sql = this.cloudEventSQLOperation.selectCloudEventByReplySQL(replyRequest.getTopic(), + stringBuffer.toString()); + return this.query(sql, parameter, ResultSetTransformUtils::transformCloudEvent); + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/ResultSetTransformUtils.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/ResultSetTransformUtils.java index ffceb76e2..7c4b769e6 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/ResultSetTransformUtils.java +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/ResultSetTransformUtils.java @@ -27,19 +27,36 @@ import java.sql.SQLException; public class ResultSetTransformUtils { public static String transformTableName(ResultSet resultSet) throws SQLException { - return resultSet.getString(1); + String topic = resultSet.getString(1); + return topic.substring(12); } public static ConsumerGroupInfo transformConsumerGroup(ResultSet resultSet) { return new ConsumerGroupInfo(); } - public static CloudEventInfo transformCloudEvent(ResultSet resultSet) { - return null; + public static CloudEventInfo transformCloudEvent(ResultSet resultSet) throws SQLException { + CloudEventInfo cloudEventInfo = new CloudEventInfo(); + cloudEventInfo.setCloudEventInfoId(resultSet.getLong("cloud_event_info_id")); + cloudEventInfo.setCloudEventTopic(resultSet.getString("cloud_event_topic")); + //cloudEventInfo.setCloudEventId(resultSet.getLong("cloud_event_id")); + cloudEventInfo.setCloudEventStorageNodeAdress(resultSet.getString("cloud_event_storage_node_adress")); + cloudEventInfo.setCloudEventType(resultSet.getString("cloud_event_type")); + cloudEventInfo.setCloudEventProducerGroupName(resultSet.getString("cloud_event_producer_group_name")); + cloudEventInfo.setCloudEventSource(resultSet.getString("cloud_event_source")); + cloudEventInfo.setCloudEventContentType(resultSet.getString("cloud_event_content_type")); + cloudEventInfo.setCloudEventData(resultSet.getString("cloud_event_data")); + cloudEventInfo.setCloudEventReplyData(resultSet.getString("cloud_event_reply_data")); + return cloudEventInfo; } - public static TopicInfo transformTopicInfo(ResultSet resultSet) { + public static TopicInfo transformTopicInfo(ResultSet resultSet) throws SQLException { + TopicInfo topicInfo = new TopicInfo(); + topicInfo.setCurrentId(resultSet.getLong("cloud_event_info_id")); + topicInfo.setTopicName(resultSet.getString("cloud_event_topic")); + topicInfo.setTopicName(topicInfo.getTopicName().substring(12)); + topicInfo.setDbTablesName(resultSet.getString("cloud_event_topic")); return null; } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQL.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQL.java index 2f95b0d2a..e4d66fb91 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQL.java +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQL.java @@ -23,6 +23,8 @@ import lombok.Data; public class BaseSQL { private String createDatabases; + + private String queryDataBases; private String queryConsumerGroupTableSQL; diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQLOperation.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQLOperation.java index 9eeb48043..e4c2ab138 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQLOperation.java +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/BaseSQLOperation.java @@ -20,6 +20,8 @@ package org.apache.eventmesh.connector.storage.jdbc.SQL; public interface BaseSQLOperation { public String createDatabases(); + + public String queryDataBases(); public String queryConsumerGroupTableSQL(); diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/CloudEventSQLOperation.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/CloudEventSQLOperation.java index fac24ff5b..e20ea765f 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/CloudEventSQLOperation.java +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/CloudEventSQLOperation.java @@ -29,15 +29,15 @@ public interface CloudEventSQLOperation { public String selectCloudEventByReplySQL(String table, String idNum); - public String locationEventSQL(String table); + public String locationEventSQL(String table,String consumerGroupName,String processSign,String consumerGroup); - public String queryLocationEventSQL(String table); + public String queryLocationEventSQL(String table,String consumerGroupName,String processSign); public String selectFastMessageSQL(String table); public String selectLastMessageSQL(String table); - public String selectNoConsumptionMessageSQL(String table); + public String selectNoConsumptionMessageSQL(String table,String key); public String selectAppointTimeMessageSQL(String table); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/SQLServiceInvocationHandler.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/SQLServiceInvocationHandler.java index 680eaf63c..5f0076610 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/SQLServiceInvocationHandler.java +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/java/org/apache/eventmesh/connector/storage/jdbc/SQL/SQLServiceInvocationHandler.java @@ -48,10 +48,9 @@ public class SQLServiceInvocationHandler implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String argsKey; - if (args.length == 0) { + if (Objects.isNull(args)||args.length == 0) { argsKey = ""; - } - if (args.length == 1) { + }else if (args.length == 1) { argsKey = (String) args[0]; } else { StringBuffer stringBuffer = new StringBuffer(); diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector index 04d1a953b..5ce70c639 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/META-INF/eventmesh/org.apache.eventmesh.api.connector.storage.StorageConnector @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -jdbc=JDBCStorageConnector \ No newline at end of file +jdbc=org.apache.eventmesh.connector.storage.jdbc.JDBCStorageConnector \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml index af591573f..2efb9b342 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-base.yaml @@ -1,3 +1,4 @@ +queryDataBases: "select SCHEMA_NAME from information_schema.SCHEMATA where SCHEMA_NAME = 'event_mesh_storage'" createDatabases: "create database if not exists event_mesh_storage" -queryConsumerGroupTableSQL: "select table_name from information_schema.tables where TABLE_SCHEMA = ? and table_name = ?" -queryCloudEventTablesSQL: "select table_name from information_schema.tables where TABLE_SCHEMA = ? and table_name like 'cloud_event_%'" \ No newline at end of file +queryConsumerGroupTableSQL: "" +queryCloudEventTablesSQL: "select table_name from information_schema.tables where TABLE_SCHEMA = 'event_mesh_storage' and table_name like 'cloud_event_%'" \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml index 19d7a9e6d..282569c2e 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/main/resource/mysql-cloudevent.yaml @@ -10,7 +10,7 @@ createCloudEventSQL: "create table `cloud_event_{table}`( `cloud_event_tag` json not null default (json_array()) comment '事件标签', `cloud_event_extensions` text not null comment '存储扩展信息', `cloud_event_data` longtext not null comment '事件数据', - `cloud_event_reply_data` not null comment '事件reply数据', + `cloud_event_reply_data` longtext not null comment '事件reply数据', `cloud_event_consume_location` json not null default (json_object()) comment '扩展信息', `cloud_event_state` varchar(31) not null default '' comment '存储节点地址', `cloud_event_reply_state` varchar(31) not null default 'NOTHING' comment '存储节点地址', @@ -20,14 +20,14 @@ createCloudEventSQL: "create table `cloud_event_{table}`( PRIMARY KEY (`cloud_event_info_id`), key (`cloud_event_create_time`) )" -insertCloudEventSQL: "insert into cloud_event_{table}(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name, -cloud_event_source,cloud_event_content_type,cloud_event_tag,cloud_event_extensions,cloud_event_data) values(?,?,?,?,?,?,?,?,?,?)" -updateCloudEventOffsetSQL: "update cloud_event_{table} set cloud_event_state = 'SUCCESS' where cloud_event_info_id = ?" -updateCloudEventReplySQL: "update cloud_event_{table} set cloud_event_reply_data = ? , cloud_event_reply_state = 'NOTHING' where cloud_event_info_id = ?" +insertCloudEventSQL: "insert into `cloud_event_{table}`(cloud_event_id,cloud_event_topic,cloud_event_storage_node_adress,cloud_event_type,cloud_event_producer_group_name, +cloud_event_source,cloud_event_content_type,cloud_event_extensions,cloud_event_data,cloud_event_reply_data) values(?,?,?,?,?,?,?,?,?,?)" +updateCloudEventOffsetSQL: "update `cloud_event_{table}` set cloud_event_state = 'SUCCESS' where cloud_event_info_id = ?" +updateCloudEventReplySQL: "update `cloud_event_{table}` set cloud_event_reply_data = ? , cloud_event_reply_state = 'RESULT' where cloud_event_info_id = ?" selectCloudEventByReplySQL: "select * from cloud_event_{table} where cloud_event_info_id in({id}) and cloud_event_reply_data is not null" -locationEventSQL: "update cloud_event_{table} set json_set( cloud_event_consume_location , ? ,? ) where cloud_event_info_id > ? and json_extract(cloud_event_consume_location, ?) is null limit 200" -queryLocationEventSQL: "select * from cloud_event_{table} where cloud_event_info_id > ? and JSON_CONTAINS_PATH(cloud_event_consume_location, 'one', ?)" -selectFastMessageSQL: "select 'cloud_event_test' as tableName , cloud_event_info_id from cloud_event_{table} order by cloud_event_info_id limit 1" -selectLastMessageSQL: "select 'cloud_event_test' as tableName , cloud_event_info_id from cloud_event_{table} order by cloud_event_info_id desc limit 1" -selectNoConsumptionMessageSQL: "select 'cloud_event_test' as tableName , cloud_event_info_id from cloud_event_{table} where json_extract(cloud_event_consume_location, ?) is not null order by cloud_event_info_id desc limit 1" +locationEventSQL: "update `cloud_event_{table}` set cloud_event_consume_location = json_set( cloud_event_consume_location , '$.{value}' ,'{value }' ) where cloud_event_info_id > ? and json_extract(cloud_event_consume_location, '$.{value}') is null limit 200" +queryLocationEventSQL: "select * from `cloud_event_{table}` where cloud_event_info_id >= ? and JSON_EXTRACT(cloud_event_consume_location, '$.{value}') = '{value}'" +selectFastMessageSQL: "select * from `cloud_event_{table}` order by cloud_event_info_id limit 1" +selectLastMessageSQL: "select * from `cloud_event_{table}` order by cloud_event_info_id desc limit 1" +selectNoConsumptionMessageSQL: "( select * from `cloud_event_{table}` where json_extract(cloud_event_consume_location, '$.{value}') is not null order by cloud_event_info_id desc limit 1 )" selectAppointTimeMessageSQL: "" \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/test/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnectorTest.java b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/test/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnectorTest.java index b6e035799..6e07873c3 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/test/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnectorTest.java +++ b/eventmesh-connector-plugin/eventmesh-connector-storage-jdbc/src/test/java/org/apache/eventmesh/connector/storage/jdbc/JDBCStorageConnectorTest.java @@ -17,22 +17,159 @@ package org.apache.eventmesh.connector.storage.jdbc; +import org.apache.eventmesh.api.SendCallback; +import org.apache.eventmesh.api.SendResult; +import org.apache.eventmesh.api.connector.storage.data.PullRequest; +import org.apache.eventmesh.api.connector.storage.data.TopicInfo; +import org.apache.eventmesh.api.exception.OnExceptionContext; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; +import java.util.Set; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -public class JDBCStorageConnectorTest { +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; + +public class JDBCStorageConnectorTest { + + private String topicOne = "test_1"; + + private String topicTwo = "test_2"; + + private String consumerGroupA = "consumerGroupA"; + + private String consumerGroupB = "consumerGroupB"; + + private PullRequest oneConsumerGroupA; + + private PullRequest oneConsumerGroupB; + + private PullRequest twoConsumerGroupA; + + private PullRequest twoConsumerGroupB; + + private Set<String> topicSet; JDBCStorageConnector connector = new JDBCStorageConnector(); - + + @Before + public void init() throws Exception { + + Properties properties = new Properties(); + properties.put("dbType", "mysql"); + properties.put("address", "127.0.0.1:3306"); + properties.put("parameter", + "useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull"); + properties.put("username", "root"); + properties.put("password", "Ab123123@"); + properties.put("maxActive", "20"); + properties.put("maxWait", "10000"); + connector.init(properties); + + oneConsumerGroupA = new PullRequest(); + oneConsumerGroupA.setConsumerGroupName(consumerGroupA); + oneConsumerGroupA.setProcessSign("A1"); + oneConsumerGroupA.setTopicName(topicOne); + + oneConsumerGroupB = new PullRequest(); + oneConsumerGroupB.setConsumerGroupName(consumerGroupA); + oneConsumerGroupB.setProcessSign("A2"); + oneConsumerGroupB.setTopicName(topicOne); + + twoConsumerGroupA = new PullRequest(); + twoConsumerGroupA.setConsumerGroupName(consumerGroupB); + twoConsumerGroupA.setProcessSign("B1"); + twoConsumerGroupA.setTopicName(topicOne); + + twoConsumerGroupB = new PullRequest(); + twoConsumerGroupB.setConsumerGroupName(consumerGroupB); + twoConsumerGroupB.setProcessSign("B2"); + twoConsumerGroupB.setTopicName(topicOne); + + topicSet = connector.getTopic(); + this.createTopic(topicOne); + this.createTopic(topicTwo); + List<PullRequest> pullRequestList = new ArrayList<>(); + pullRequestList.add(oneConsumerGroupA); + pullRequestList.add(oneConsumerGroupB); + pullRequestList.add(twoConsumerGroupA); + pullRequestList.add(twoConsumerGroupB); + List<TopicInfo> topicInfoList = connector.geTopicInfos(pullRequestList); + for (TopicInfo topicInfo : topicInfoList) { + Assert.assertEquals(Long.valueOf(0), topicInfo.getCurrentId()); + } + + } + + private void createTopic(String topic) throws Exception { + if (!topicSet.contains("cloud_event_" + topic)) { + TopicInfo topicInfo = new TopicInfo(); + topicInfo.setTopicName(topic); + connector.createTopic(topicInfo); + } + } + @Test public void testInit() throws Exception { Properties properties = new Properties(); - properties.put("url", "jdbc:mysql://127.0.0.1:3306/electron?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull"); + properties.put("url", + "jdbc:mysql://127.0.0.1:3306/electron?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull"); properties.put("username", "root"); properties.put("password", "Ab123123@"); properties.put("maxActive", "20"); properties.put("maxWait", "10000"); connector.init(properties); } + + @Test + public void test_publish_and_pull() throws Exception { + this.publish(topicOne, "0"); + List<CloudEvent> cloudEventList = this.connector.pull(oneConsumerGroupA); + //Assert.assertNull(cloudEventList); + cloudEventList = this.connector.pull(oneConsumerGroupA); + Assert.assertNull(cloudEventList); + cloudEventList = this.connector.pull(oneConsumerGroupB); + Assert.assertNull(cloudEventList); + cloudEventList = this.connector.pull(twoConsumerGroupA); + + this.publish_bach(topicOne); + this.publish_bach(topicTwo); + + connector.pull(oneConsumerGroupA); + } + + public void publish_bach(String topic) throws Exception { + for (int i = 1; i <= 900; i++) { + this.publish(topic, i + ""); + } + } + + public void publish(String topic, String id) throws Exception { + + CloudEvent cloudEvent = CloudEventBuilder.v03().withExtension("cloudeventid", id).withId(id).withType("1231").withSource(new URI("")).withSubject(topic).withExtension("id", id) + .withData(id.getBytes()).build(); + + connector.publish(cloudEvent, new SendCallback() { + + @Override + public void onSuccess(SendResult sendResult) { + System.out.println("111"); + } + + @Override + public void onException(OnExceptionContext context) { + System.out.println("111"); + } + }); + } + + public void test_reply() throws Exception { + this.publish_bach(topicOne); + } } diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index 9685f1ca5..fad9acd5a 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -76,6 +76,7 @@ dependencies { implementation project(":eventmesh-webhook:eventmesh-webhook-admin") implementation project(":eventmesh-webhook:eventmesh-webhook-api") implementation project(":eventmesh-webhook:eventmesh-webhook-receive") + implementation project(":eventmesh-connector-plugin:eventmesh-connector-storage-jdbc") testImplementation "org.mockito:mockito-core" testImplementation "org.mockito:mockito-inline" diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 17bfab067..a2cbc7612 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -72,7 +72,16 @@ eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255 eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8 #connector plugin -eventMesh.connector.plugin.type=standalone +eventMesh.connector.plugin.type=storage +eventMesh.connector.plugin.storage.nodeaddress=127.0.0.1:3306 +eventMesh.connector.plugin.storage.username=root +eventMesh.connector.plugin.storage.password=Ab123123@ +eventMesh.connector.plugin.storage.type=jdbc +eventMesh.connector.plugin.storage.jdbc.dbType=mysql +eventMesh.connector.plugin.storage.jdbc.parameter=useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull +eventMesh.connector.plugin.storage.jdbc.maxActive=200 +eventMesh.connector.plugin.storage.jdbc.maxWait=20000 + #security plugin eventMesh.server.security.enabled=false --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
