This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/eventmesh.git

commit fd78f4828ffb861c54b80b41bec6b87fab62a510
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jul 9 14:15:59 2021 +0800

    [ISSUE #418]Refactor the plugin load code (#421)
    
    * [ISSUE #418]Refactor the plugin load code
    
    * fix ut
---
 .../eventmesh-runtime-quickstart.zh-CN.md          |  16 ++--
 .../instructions/eventmesh-runtime-quickstart.md   |  16 ++--
 docs/images/project-structure.png                  | Bin 63401 -> 54777 bytes
 .../common/config/CommonConfiguration.java         | 105 ++-------------------
 .../src/test/resources/configuration.properties    |   1 +
 eventmesh-connector-api/build.gradle               |   4 +-
 eventmesh-connector-api/gradle.properties          |   2 +-
 .../eventmesh/api/consumer/MeshMQPushConsumer.java |   2 +
 .../eventmesh/api/producer/MeshMQProducer.java     |   2 +
 ...pache.eventmesh.api.consumer.MeshMQPushConsumer |   2 +-
 ...rg.apache.eventmesh.api.producer.MeshMQProducer |   2 +-
 eventmesh-runtime/build.gradle                     |   4 +-
 eventmesh-runtime/conf/eventmesh.properties        |   5 +-
 .../runtime/core/plugin/MQConsumerWrapper.java     |  23 ++---
 .../runtime/core/plugin/MQProducerWrapper.java     |  22 ++---
 .../runtime/core/plugin/PluginFactory.java         |  39 ++++++++
 .../protocol/http/consumer/EventMeshConsumer.java  |   6 +-
 .../protocol/http/producer/EventMeshProducer.java  |   4 +-
 .../tcp/client/group/ClientGroupWrapper.java       |  12 ++-
 .../eventmesh/spi/EventMeshExtensionFactory.java   |   1 +
 .../eventmesh/spi/EventMeshExtensionLoader.java    |  10 +-
 .../eventmesh/http/demo/SyncRequestInstance.java   |  11 ++-
 22 files changed, 126 insertions(+), 163 deletions(-)

diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md 
b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
index adc970379..ecafc29f6 100644
--- a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
+++ b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
@@ -62,8 +62,9 @@ sh start.sh
 - eventmesh-runtime : eventmesh运行时模块
 - eventmesh-sdk-java : eventmesh java客户端sdk
 - eventmesh-starter : eventmesh本地启动运行项目入口
+- eventmesh-spi : eventmesh SPI加载模块
 
-> 注:插件模块遵循java spi机制,需要在对应模块中的/main/resources/META-INF/services 
下配置相关接口与实现类的映射文件
+> 注:插件模块遵循eventmesh定义的spi机制,需要在对应模块中的/main/resources/META-INF/eventmesh 
下配置相关接口与实现类的映射文件
 
 **2.3.2 配置VM启动参数**
 
@@ -75,18 +76,17 @@ sh start.sh
 ```
 > 注:如果操作系统为Windows, 可能需要将文件分隔符换成\
 
-**2.3.3 配置build.gradle文件**
+**2.3.3 配置插件**
 
-通过修改dependencies,compile project 项来指定项目启动后加载的插件
+在`eventMesh.properties`配置文件通过声明式的方式来指定项目启动后需要加载的插件
 
-修改`eventmesh-starter`模块下面的`build.gradle`文件
+修改`confPath`目录下面的`eventMesh.properties`文件
 
-加载**RocketMQ**插件配置:
+加载**RocketMQ Connector**插件配置:
 
 ```java
-dependencies {
-    compile project(":eventmesh-runtime"), 
project(":eventmesh-connector-rocketmq")
-}
+#connector plugin 
+eventMesh.connector.plugin.type=rocketmq
 ```
 
 **2.3.4 启动运行**
diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md 
b/docs/en/instructions/eventmesh-runtime-quickstart.md
index 0536e5b6e..dd89795f3 100644
--- a/docs/en/instructions/eventmesh-runtime-quickstart.md
+++ b/docs/en/instructions/eventmesh-runtime-quickstart.md
@@ -62,9 +62,10 @@ Same with 1.2
 - eventmesh-runtime : eventmesh runtime module
 - eventmesh-sdk-java : eventmesh java client sdk
 - eventmesh-starter : eventmesh project local start entry
+- eventmesh-spi : eventmesh SPI load module
 
-> ps: The loading of connector plugin follows the Java SPI mechanism, it's 
necessary to configure the mapping file of
-related interface and implementation class under 
/main/resources/meta-inf/services in the corresponding module
+> ps: The loading of connector plugin follows the eventmesh SPI mechanism, 
it's necessary to configure the mapping file of
+related interface and implementation class under 
/main/resources/meta-inf/eventmesh in the corresponding module
 
 **2.3.2 Configure VM Options**
 
@@ -76,18 +77,17 @@ related interface and implementation class under 
/main/resources/meta-inf/servic
 ```
 > ps: If you use Windows, you may need to replace the file separator to \
 
-**2.3.3 Configure build.gradle file**
+**2.3.3 Configure plugin**
 
-Specify the connector that will be loaded after the project start with 
updating compile project item in dependencies
+Specify the connector plugin that will be loaded after the project start by 
declaring in `eventMesh.properties`
 
-update `build.gradle` file under the `eventmesh-starter` module
+Modify the `eventMesh.properties` file in the `confPath` directory
 
 load **rocketmq connector** configuration:
 
 ```java
-dependencies {
-    compile project(":eventmesh-runtime"), 
project(":eventmesh-connector-rocketmq")
-}
+#connector plugin
+eventMesh.connector.plugin.type=rocketmq
 ```
 
 **2.3.4 Run**
diff --git a/docs/images/project-structure.png 
b/docs/images/project-structure.png
index 252a9536c..efc5249e5 100644
Binary files a/docs/images/project-structure.png and 
b/docs/images/project-structure.png differ
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 08a44cb8e..1ae6b2604 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -17,17 +17,10 @@
 
 package org.apache.eventmesh.common.config;
 
-import java.net.Inet6Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-
 import com.google.common.base.Preconditions;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.IPUtil;
 
 public class CommonConfiguration {
     public String eventMeshEnv = "P";
@@ -35,7 +28,7 @@ public class CommonConfiguration {
     public String eventMeshCluster = "LS";
     public String eventMeshName = "";
     public String sysID = "5477";
-
+    public String eventMeshConnectorPluginType = "rocketmq";
 
     public String namesrvAddr = "";
     public String clientUserName = "username";
@@ -84,8 +77,11 @@ public class CommonConfiguration {
 
             eventMeshServerIp = 
configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
             if (StringUtils.isBlank(eventMeshServerIp)) {
-                eventMeshServerIp = getLocalAddr();
+                eventMeshServerIp = IPUtil.getLocalAddress();
             }
+
+            eventMeshConnectorPluginType = 
configurationWraper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
+            
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), 
String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE));
         }
     }
 
@@ -105,94 +101,7 @@ public class CommonConfiguration {
         public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = 
"eventMesh.server.registry.registerIntervalInMills";
 
         public static String 
KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = 
"eventMesh.server.registry.fetchRegistryAddrIntervalInMills";
-    }
-
-    public static String getLocalAddr() {
-        //priority of networkInterface when generating client ip
-        String priority = System.getProperty("networkInterface.priority", 
"bond1<eth1<eth0");
-        ArrayList<String> preferList = new ArrayList<String>();
-        for (String eth : priority.split("<")) {
-            preferList.add(eth);
-        }
-        NetworkInterface preferNetworkInterface = null;
-
-        try {
-            Enumeration<NetworkInterface> enumeration1 = 
NetworkInterface.getNetworkInterfaces();
-            while (enumeration1.hasMoreElements()) {
-                final NetworkInterface networkInterface = 
enumeration1.nextElement();
-                if (!preferList.contains(networkInterface.getName())) {
-                    continue;
-                } else if (preferNetworkInterface == null) {
-                    preferNetworkInterface = networkInterface;
-                }
-                //get the networkInterface that has higher priority
-                else if (preferList.indexOf(networkInterface.getName())
-                        > 
preferList.indexOf(preferNetworkInterface.getName())) {
-                    preferNetworkInterface = networkInterface;
-                }
-            }
-
-            // Traversal Network interface to get the first non-loopback and 
non-private address
-            ArrayList<String> ipv4Result = new ArrayList<String>();
-            ArrayList<String> ipv6Result = new ArrayList<String>();
-
-            if (preferNetworkInterface != null) {
-                final Enumeration<InetAddress> en = 
preferNetworkInterface.getInetAddresses();
-                getIpResult(ipv4Result, ipv6Result, en);
-            } else {
-                Enumeration<NetworkInterface> enumeration = 
NetworkInterface.getNetworkInterfaces();
-                while (enumeration.hasMoreElements()) {
-                    final NetworkInterface networkInterface = 
enumeration.nextElement();
-                    final Enumeration<InetAddress> en = 
networkInterface.getInetAddresses();
-                    getIpResult(ipv4Result, ipv6Result, en);
-                }
-            }
-
-            // prefer ipv4
-            if (!ipv4Result.isEmpty()) {
-                for (String ip : ipv4Result) {
-                    if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
-                        continue;
-                    }
-
-                    return ip;
-                }
 
-                return ipv4Result.get(ipv4Result.size() - 1);
-            } else if (!ipv6Result.isEmpty()) {
-                return ipv6Result.get(0);
-            }
-            //If failed to find,fall back to localhost
-            final InetAddress localHost = InetAddress.getLocalHost();
-            return normalizeHostAddress(localHost);
-        } catch (SocketException e) {
-            e.printStackTrace();
-        } catch (UnknownHostException e) {
-            e.printStackTrace();
-        }
-
-        return null;
-    }
-
-    public static String normalizeHostAddress(final InetAddress localHost) {
-        if (localHost instanceof Inet6Address) {
-            return "[" + localHost.getHostAddress() + "]";
-        } else {
-            return localHost.getHostAddress();
-        }
-    }
-
-    private static void getIpResult(ArrayList<String> ipv4Result, 
ArrayList<String> ipv6Result,
-                                    Enumeration<InetAddress> en) {
-        while (en.hasMoreElements()) {
-            final InetAddress address = en.nextElement();
-            if (!address.isLoopbackAddress()) {
-                if (address instanceof Inet6Address) {
-                    ipv6Result.add(normalizeHostAddress(address));
-                } else {
-                    ipv4Result.add(normalizeHostAddress(address));
-                }
-            }
-        }
+        public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = 
"eventMesh.connector.plugin.type";
     }
 }
\ No newline at end of file
diff --git a/eventmesh-common/src/test/resources/configuration.properties 
b/eventmesh-common/src/test/resources/configuration.properties
index d7c50968b..76f29f277 100644
--- a/eventmesh-common/src/test/resources/configuration.properties
+++ b/eventmesh-common/src/test/resources/configuration.properties
@@ -21,3 +21,4 @@ eventMesh.sysid=3
 eventMesh.server.cluster=value4
 eventMesh.server.name=value5
 eventMesh.server.hostIp=value6
+eventMesh.connector.plugin.type=rocketmq
diff --git a/eventmesh-connector-api/build.gradle 
b/eventmesh-connector-api/build.gradle
index 2d1205df3..157048ecc 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-connector-api/build.gradle
@@ -20,6 +20,6 @@ List open_message = [
 ]
 
 dependencies {
-    implementation open_message,project(":eventmesh-common")
-    testImplementation open_message,project(":eventmesh-common")
+    implementation open_message,project(":eventmesh-common"), 
project(":eventmesh-spi")
+    testImplementation open_message,project(":eventmesh-common"), 
project(":eventmesh-spi")
 }
diff --git a/eventmesh-connector-api/gradle.properties 
b/eventmesh-connector-api/gradle.properties
index ae30087cf..9d1744e07 100644
--- a/eventmesh-connector-api/gradle.properties
+++ b/eventmesh-connector-api/gradle.properties
@@ -16,6 +16,6 @@
 #
 group=org.apache.eventmesh
 version=1.2.0-SNAPSHOT
-jdk=1.7
+jdk=1.8
 mavenUserName=
 mavenPassword=
\ No newline at end of file
diff --git 
a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
 
b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
index 5e60e0e0d..4ac1edbfc 100644
--- 
a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
+++ 
b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
@@ -25,7 +25,9 @@ import io.openmessaging.api.Consumer;
 import io.openmessaging.api.Message;
 
 import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.spi.EventMeshSPI;
 
+@EventMeshSPI
 public interface MeshMQPushConsumer extends Consumer {
 
     void init(Properties keyValue) throws Exception;
diff --git 
a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
 
b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
index 82ca583ce..c717385e0 100644
--- 
a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
+++ 
b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
@@ -24,7 +24,9 @@ import io.openmessaging.api.Producer;
 import io.openmessaging.api.SendCallback;
 
 import org.apache.eventmesh.api.RRCallback;
+import org.apache.eventmesh.spi.EventMeshSPI;
 
+@EventMeshSPI
 public interface MeshMQProducer extends Producer {
 
     void init(Properties properties) throws Exception;
diff --git 
a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
 
b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
similarity index 90%
rename from 
eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
rename to 
eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
index c98880a84..0df2e286d 100644
--- 
a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
+++ 
b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
\ No newline at end of file
+rocketmq=org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
\ No newline at end of file
diff --git 
a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
 
b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
similarity index 90%
rename from 
eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
rename to 
eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
index 28907ca17..ef4959d99 100644
--- 
a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
+++ 
b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
\ No newline at end of file
+rocketmq=org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 09dd79f9c..e5bb06565 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -31,6 +31,6 @@ List open_message = [
 
 
 dependencies {
-    implementation  metrics, 
open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
-    testImplementation 
metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api")
+    implementation  metrics, 
open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi")
+    testImplementation 
metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi")
 }
diff --git a/eventmesh-runtime/conf/eventmesh.properties 
b/eventmesh-runtime/conf/eventmesh.properties
index 035b950fc..45fc193b4 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -51,4 +51,7 @@ eventMesh.server.admin.http.port=10106
 eventMesh.server.registry.registerIntervalInMills=10000
 eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
 #auto-ack
-#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
\ No newline at end of file
+#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
+
+#connector plugin
+eventMesh.connector.plugin.type=rocketmq
\ No newline at end of file
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
index 080b7af40..b6298542b 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
@@ -19,7 +19,6 @@ package org.apache.eventmesh.runtime.core.plugin;
 
 import java.util.List;
 import java.util.Properties;
-import java.util.ServiceLoader;
 
 import io.openmessaging.api.AsyncMessageListener;
 import io.openmessaging.api.Message;
@@ -35,6 +34,14 @@ public class MQConsumerWrapper extends MQWrapper {
 
     protected MeshMQPushConsumer meshMQPushConsumer;
 
+    public MQConsumerWrapper(String connectorPluginType) {
+        this.meshMQPushConsumer = 
PluginFactory.getMeshMQPushConsumer(connectorPluginType);
+        if (meshMQPushConsumer == null) {
+            logger.error("can't load the meshMQPushConsumer plugin, please 
check.");
+            throw new RuntimeException("doesn't load the meshMQPushConsumer 
plugin, please check.");
+        }
+    }
+
     public void subscribe(String topic, AsyncMessageListener listener) throws 
Exception {
         meshMQPushConsumer.subscribe(topic, listener);
     }
@@ -44,24 +51,10 @@ public class MQConsumerWrapper extends MQWrapper {
     }
 
     public synchronized void init(Properties keyValue) throws Exception {
-        meshMQPushConsumer = getMeshMQPushConsumer();
-        if (meshMQPushConsumer == null) {
-            logger.error("can't load the meshMQPushConsumer plugin, please 
check.");
-            throw new RuntimeException("doesn't load the meshMQPushConsumer 
plugin, please check.");
-        }
-
         meshMQPushConsumer.init(keyValue);
         inited.compareAndSet(false, true);
     }
 
-    private MeshMQPushConsumer getMeshMQPushConsumer() {
-        ServiceLoader<MeshMQPushConsumer> meshMQPushConsumerServiceLoader = 
ServiceLoader.load(MeshMQPushConsumer.class);
-        if (meshMQPushConsumerServiceLoader.iterator().hasNext()) {
-            return meshMQPushConsumerServiceLoader.iterator().next();
-        }
-        return null;
-    }
-
     public synchronized void start() throws Exception {
         meshMQPushConsumer.start();
         started.compareAndSet(false, true);
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
index 082ab3bc0..60fe8421a 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
@@ -34,27 +34,21 @@ public class MQProducerWrapper extends MQWrapper {
 
     protected MeshMQProducer meshMQProducer;
 
-    public synchronized void init(Properties keyValue) throws Exception {
-        if (inited.get()) {
-            return;
-        }
-
-        meshMQProducer = getSpiMeshMQProducer();
+    public MQProducerWrapper(String connectorPluginType) {
+        this.meshMQProducer = 
PluginFactory.getMeshMQProducer(connectorPluginType);
         if (meshMQProducer == null) {
             logger.error("can't load the meshMQProducer plugin, please 
check.");
             throw new RuntimeException("doesn't load the meshMQProducer 
plugin, please check.");
         }
-        meshMQProducer.init(keyValue);
-
-        inited.compareAndSet(false, true);
     }
 
-    private MeshMQProducer getSpiMeshMQProducer() {
-        ServiceLoader<MeshMQProducer> meshMQProducerServiceLoader = 
ServiceLoader.load(MeshMQProducer.class);
-        if (meshMQProducerServiceLoader.iterator().hasNext()) {
-            return meshMQProducerServiceLoader.iterator().next();
+    public synchronized void init(Properties keyValue) throws Exception {
+        if (inited.get()) {
+            return;
         }
-        return null;
+        meshMQProducer.init(keyValue);
+
+        inited.compareAndSet(false, true);
     }
 
     public synchronized void start() throws Exception {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
new file mode 100644
index 000000000..b11495341
--- /dev/null
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.plugin;
+
+import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
+import org.apache.eventmesh.api.producer.MeshMQProducer;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
+
+public class PluginFactory {
+
+    public static MeshMQProducer getMeshMQProducer(String connectorPluginName) 
{
+        return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, 
connectorPluginName);
+    }
+
+    public static MeshMQPushConsumer getMeshMQPushConsumer(String 
connectorPluginName) {
+        return 
EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, 
connectorPluginName);
+    }
+
+    private static <T> T getPlugin(Class<T> pluginType, String pluginName) {
+        return EventMeshExtensionFactory.getExtension(pluginType, pluginName);
+    }
+}
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 8620e682a..ef051b03f 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -66,13 +66,15 @@ public class EventMeshConsumer {
 
     private ConsumerGroupConf consumerGroupConf;
 
-    private MQConsumerWrapper persistentMqConsumer = new MQConsumerWrapper();
+    private MQConsumerWrapper persistentMqConsumer;
 
-    private MQConsumerWrapper broadcastMqConsumer = new MQConsumerWrapper();
+    private MQConsumerWrapper broadcastMqConsumer;
 
     public EventMeshConsumer(EventMeshHTTPServer eventMeshHTTPServer, 
ConsumerGroupConf consumerGroupConf) {
         this.eventMeshHTTPServer = eventMeshHTTPServer;
         this.consumerGroupConf = consumerGroupConf;
+        this.persistentMqConsumer = new 
MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType);
+        this.broadcastMqConsumer = new 
MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType);
     }
 
     private MessageHandler httpMessageHandler;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
index cf41ca2b9..fe3218087 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
@@ -69,7 +69,7 @@ public class EventMeshProducer {
         return true;
     }
 
-    protected MQProducerWrapper mqProducerWrapper = new MQProducerWrapper();
+    protected MQProducerWrapper mqProducerWrapper;
 
     public MQProducerWrapper getMqProducerWrapper() {
         return mqProducerWrapper;
@@ -85,7 +85,7 @@ public class EventMeshProducer {
 
         //TODO for defibus
         keyValue.put("eventMeshIDC", eventMeshHttpConfiguration.eventMeshIDC);
-
+        mqProducerWrapper = new 
MQProducerWrapper(eventMeshHttpConfiguration.eventMeshConnectorPluginType);
         mqProducerWrapper.init(keyValue);
         inited.compareAndSet(false, true);
         logger.info("EventMeshProducer [{}] inited.............", 
producerGroupConfig.getGroupName());
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 5829e4941..310ea6dee 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -50,6 +50,7 @@ import 
org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
 import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
+import org.apache.eventmesh.runtime.core.plugin.PluginFactory;
 import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
 import 
org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
@@ -96,14 +97,16 @@ public class ClientGroupWrapper {
 
     public AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE);
 
-    private MQConsumerWrapper persistentMsgConsumer = new MQConsumerWrapper();
+    private MQConsumerWrapper persistentMsgConsumer;
 
-    private MQConsumerWrapper broadCastMsgConsumer = new MQConsumerWrapper();
+    private MQConsumerWrapper broadCastMsgConsumer;
 
     private ConcurrentHashMap<String, Set<Session>> 
topic2sessionInGroupMapping = new ConcurrentHashMap<String, Set<Session>>();
 
     public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
 
+    private MQProducerWrapper mqProducerWrapper;
+
     public ClientGroupWrapper(String sysId, String producerGroup, String 
consumerGroup,
                               EventMeshTCPServer eventMeshTCPServer,
                               DownstreamDispatchStrategy 
downstreamDispatchStrategy) {
@@ -115,6 +118,9 @@ public class ClientGroupWrapper {
         this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
         this.eventMeshTcpMonitor = eventMeshTCPServer.getEventMeshTcpMonitor();
         this.downstreamDispatchStrategy = downstreamDispatchStrategy;
+        this.persistentMsgConsumer = new 
MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+        this.broadCastMsgConsumer = new 
MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+        this.mqProducerWrapper = new 
MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
     }
 
     public ConcurrentHashMap<String, Set<Session>> 
getTopic2sessionInGroupMapping() {
@@ -163,8 +169,6 @@ public class ClientGroupWrapper {
         return true;
     }
 
-    private MQProducerWrapper mqProducerWrapper = new MQProducerWrapper();
-
     public MQProducerWrapper getMqProducerWrapper() {
         return mqProducerWrapper;
     }
diff --git 
a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
 
b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
index 96e054bd0..6aea9db11 100644
--- 
a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
+++ 
b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
@@ -18,6 +18,7 @@
 package org.apache.eventmesh.spi;
 
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
 
 public enum EventMeshExtensionFactory {
     ;
diff --git 
a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
 
b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
index 740ecb3f9..89696e04d 100644
--- 
a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
+++ 
b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
@@ -17,6 +17,9 @@
 
 package org.apache.eventmesh.spi;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
@@ -27,6 +30,8 @@ import java.util.concurrent.ConcurrentHashMap;
 public enum EventMeshExtensionLoader {
     ;
 
+    private static final Logger logger = 
LoggerFactory.getLogger(EventMeshExtensionLoader.class);
+
     private static final ConcurrentHashMap<Class<?>, ConcurrentHashMap<String, 
Class<?>>> EXTENSION_CLASS_LOAD_CACHE = new ConcurrentHashMap<>(16);
 
     private static final ConcurrentHashMap<String, Object> 
EXTENSION_INSTANCE_CACHE = new ConcurrentHashMap<>(16);
@@ -54,7 +59,9 @@ public enum EventMeshExtensionLoader {
         }
         Class<?> aClass = extensionClassMap.get(extensionName);
         try {
-            EXTENSION_INSTANCE_CACHE.put(extensionName, aClass.newInstance());
+            Object extensionObj = aClass.newInstance();
+            logger.info("initialize extension instance success, extensionType: 
{}, extensionName: {}", extensionType, extensionName);
+            EXTENSION_INSTANCE_CACHE.put(extensionName, extensionObj);
         } catch (InstantiationException | IllegalAccessException e) {
             throw new ExtensionException("Extension initialize error", e);
         }
@@ -87,6 +94,7 @@ public enum EventMeshExtensionLoader {
                 String extensionClassStr = (String) extensionClass;
                 try {
                     Class<?> targetClass = Class.forName(extensionClassStr);
+                    logger.info("load extension class success, extensionType: 
{}, extensionClass: {}", extensionType, targetClass);
                     if (!extensionType.isAssignableFrom(targetClass)) {
                         throw new ExtensionException(
                                 String.format("class: %s is not subClass of 
%s", targetClass, extensionType));
diff --git 
a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
 
b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
index 9d3af8ffe..329f2bc64 100644
--- 
a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
+++ 
b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
@@ -34,10 +34,15 @@ public class SyncRequestInstance {
     public static void main(String[] args) throws Exception {
 
         LiteProducer liteProducer = null;
+        String eventMeshIPPort = "127.0.0.1:10105";
+        String topic = "EventMesh.SyncRequestInstance";
         try {
-            String eventMeshIPPort = args[0];
-
-            final String topic = args[1];
+            if (args.length > 0 && StringUtils.isNotBlank(args[0])) {
+                eventMeshIPPort = args[0];
+            }
+            if (args.length > 1 && StringUtils.isNotBlank(args[1])) {
+                topic = args[1];
+            }
 
             if (StringUtils.isBlank(eventMeshIPPort)) {
                 // if has multi value, can config as: 
127.0.0.1:10105;127.0.0.2:10105


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to