This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 6508df826 [ISSUE#4580] Connector extension supports spring environment 
variables. (#4582)
6508df826 is described below

commit 6508df8262c53fe6e85929d031a3fe2f28f4fe03
Author: yanrongzhen <[email protected]>
AuthorDate: Tue Nov 28 14:28:17 2023 +0800

    [ISSUE#4580] Connector extension supports spring environment variables. 
(#4582)
    
    * Connector extension supports spring environment variables.
    
    * fix: cr
---
 .../constants/ConnectRecordExtensionKeys.java      |  4 +-
 .../sink/connector/DingDingSinkConnector.java      |  6 +--
 .../sink/connector/DingDingSinkConnectorTest.java  |  2 +-
 .../source/connector/SpringSourceConnector.java    | 49 +++++++++++++++++++++-
 .../connector/SpringSourceConnectorTest.java       | 16 +++++--
 .../src/main/resources/application.properties      |  3 +-
 6 files changed, 69 insertions(+), 11 deletions(-)

diff --git 
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/common/constants/ConnectRecordExtensionKeys.java
 
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/common/constants/ConnectRecordExtensionKeys.java
index 16efd5219..733de2c43 100644
--- 
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/common/constants/ConnectRecordExtensionKeys.java
+++ 
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/common/constants/ConnectRecordExtensionKeys.java
@@ -22,7 +22,7 @@ package 
org.apache.eventmesh.connector.dingtalk.common.constants;
  */
 public interface ConnectRecordExtensionKeys {
 
-    String DINGDING_TEMPLATE_TYPE_KEY = "dingDingTemplateTypeKey";
+    String DINGTALK_TEMPLATE_TYPE_KEY = "dingtalktemplatetype";
 
-    String DINGDING_MARKDOWN_MESSAGE_TITLE = "dingDingMarkdownMessageTitle";
+    String DINGTALK_MARKDOWN_MESSAGE_TITLE = "dingtalkmarkdownmessagetitle";
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
index 592d1ad1b..864ac3ae6 100644
--- 
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
@@ -126,7 +126,7 @@ public class DingDingSinkConnector implements Sink {
                     new OrgGroupSendHeaders();
                 orgGroupSendHeaders.xAcsDingtalkAccessToken = accessToken;
 
-                String templateTypeKey = 
record.getExtension(ConnectRecordExtensionKeys.DINGDING_TEMPLATE_TYPE_KEY);
+                String templateTypeKey = 
record.getExtension(ConnectRecordExtensionKeys.DINGTALK_TEMPLATE_TYPE_KEY);
                 if (null == templateTypeKey || "null".equals(templateTypeKey)) 
{
                     templateTypeKey = 
DingDingMessageTemplateType.PLAIN_TEXT.getTemplateKey();
                 }
@@ -136,10 +136,10 @@ public class DingDingSinkConnector implements Sink {
                 if (DingDingMessageTemplateType.PLAIN_TEXT == templateType) {
                     contentMap.put("content", new String((byte[]) 
record.getData()));
                 } else if (DingDingMessageTemplateType.MARKDOWN == 
templateType) {
-                    String title = 
Optional.ofNullable(record.getExtension(ConnectRecordExtensionKeys.DINGDING_MARKDOWN_MESSAGE_TITLE))
+                    String title = 
Optional.ofNullable(record.getExtension(ConnectRecordExtensionKeys.DINGTALK_MARKDOWN_MESSAGE_TITLE))
                         .orElse("EventMesh-Message");
                     contentMap.put("title", title);
-                    contentMap.put("text", String.valueOf(record.getData()));
+                    contentMap.put("text", new String((byte[]) 
record.getData()));
                 }
 
                 OrgGroupSendRequest orgGroupSendRequest =
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/test/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/test/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnectorTest.java
index 81e6fecf6..d24c51ffa 100644
--- 
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/test/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnectorTest.java
+++ 
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/test/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnectorTest.java
@@ -94,7 +94,7 @@ public class DingDingSinkConnectorTest {
             RecordOffset offset = new RecordOffset();
             ConnectRecord connectRecord = new ConnectRecord(partition, offset,
                 System.currentTimeMillis(), "Hello, 
EventMesh!".getBytes(StandardCharsets.UTF_8));
-            
connectRecord.addExtension(ConnectRecordExtensionKeys.DINGDING_TEMPLATE_TYPE_KEY,
+            
connectRecord.addExtension(ConnectRecordExtensionKeys.DINGTALK_TEMPLATE_TYPE_KEY,
                 DingDingMessageTemplateType.PLAIN_TEXT.getTemplateKey());
             records.add(connectRecord);
         }
diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
index e57935d4f..5f4d5c89b 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
@@ -35,13 +35,25 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.springframework.beans.BeansException;
+import org.springframework.boot.env.OriginTrackedMapPropertySource;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.core.env.MutablePropertySources;
+import org.springframework.core.env.PropertySource;
+
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class SpringSourceConnector implements Source, MessageSendingOperations 
{
+public class SpringSourceConnector implements Source, 
MessageSendingOperations, ApplicationContextAware {
+
+    private static final String CONNECTOR_PROPERTY_PREFIX = 
"eventmesh.connector.";
 
     private static final int DEFAULT_BATCH_SIZE = 10;
 
+    private ApplicationContext applicationContext;
+
     private SpringSourceConfig sourceConfig;
 
     private BlockingQueue<ConnectRecord> queue;
@@ -116,6 +128,7 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations {
         RecordPartition partition = new RecordPartition();
         RecordOffset offset = new RecordOffset();
         ConnectRecord record = new ConnectRecord(partition, offset, 
System.currentTimeMillis(), message);
+        addSpringEnvironmentPropertyExtensions(record);
         queue.offer(record);
     }
 
@@ -131,6 +144,40 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations {
         RecordOffset offset = new RecordOffset();
         ConnectRecord record = new ConnectRecord(partition, offset, 
System.currentTimeMillis(), message);
         record.addExtension(SourceWorker.CALLBACK_EXTENSION, workerCallback);
+        addSpringEnvironmentPropertyExtensions(record);
         queue.offer(record);
     }
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) 
throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+
+    private void addSpringEnvironmentPropertyExtensions(ConnectRecord 
connectRecord) {
+        ConfigurableApplicationContext context = 
(ConfigurableApplicationContext) applicationContext;
+        MutablePropertySources propertySources = 
context.getEnvironment().getPropertySources();
+        for (PropertySource<?> propertySource : propertySources) {
+            if (!(propertySource instanceof OriginTrackedMapPropertySource)) {
+                continue;
+            }
+            OriginTrackedMapPropertySource originTrackedMapPropertySource =
+                (OriginTrackedMapPropertySource) propertySource;
+            String[] keys = originTrackedMapPropertySource.getPropertyNames();
+            for (String key : keys) {
+                if (!key.startsWith(CONNECTOR_PROPERTY_PREFIX)) {
+                    continue;
+                }
+                Object value = null;
+                try {
+                    value = originTrackedMapPropertySource.getProperty(key);
+                    if (value != null) {
+                        
connectRecord.addExtension(key.replaceAll(CONNECTOR_PROPERTY_PREFIX, 
"").toLowerCase(),
+                            String.valueOf(value));
+                    }
+                } catch (Throwable e) {
+                    log.error("Put spring environment property to extension 
failed, key=[{}], value=[{}]", key, value);
+                }
+            }
+        }
+    }
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/test/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-spring/src/test/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnectorTest.java
index 35e4accbc..65c71aea0 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/test/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnectorTest.java
+++ 
b/eventmesh-connectors/eventmesh-connector-spring/src/test/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnectorTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.eventmesh.connector.spring.source.connector;
 
+import static org.mockito.Mockito.doReturn;
+
 import org.apache.eventmesh.connector.spring.source.config.SpringSourceConfig;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
@@ -25,17 +27,25 @@ import java.util.List;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Spy;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.core.env.MutablePropertySources;
 
 @ExtendWith(MockitoExtension.class)
 public class SpringSourceConnectorTest {
 
-    @Spy
     private SpringSourceConnector connector;
 
     @Test
     public void testSpringSourceConnector() throws Exception {
+        ConfigurableApplicationContext context = 
Mockito.mock(ConfigurableApplicationContext.class);
+        ConfigurableEnvironment environment = 
Mockito.mock(ConfigurableEnvironment.class);
+        doReturn(new 
MutablePropertySources()).when(environment).getPropertySources();
+        doReturn(environment).when(context).getEnvironment();
+        connector = new SpringSourceConnector();
+        connector.setApplicationContext(context);
         SpringSourceConfig sourceConfig = new SpringSourceConfig();
         connector.init(sourceConfig);
         connector.start();
@@ -51,7 +61,7 @@ public class SpringSourceConnectorTest {
         }
     }
 
-    private void writeMockedRecords(int count, String message) throws 
Exception {
+    private void writeMockedRecords(int count, String message) {
         for (int i = 0; i < count; i++) {
             connector.send(message + i);
         }
diff --git a/eventmesh-examples/src/main/resources/application.properties 
b/eventmesh-examples/src/main/resources/application.properties
index 259e49e89..6c7b727d2 100644
--- a/eventmesh-examples/src/main/resources/application.properties
+++ b/eventmesh-examples/src/main/resources/application.properties
@@ -23,4 +23,5 @@ eventmesh.grpc.port=10205
 eventmesh.selector.type=nacos
 eventmesh.selector.nacos.address=127.0.0.1:8848
 eventmesh.catalog.name=EVENTMESH-catalog
-eventmesh.workflow.name=EVENTMESH-workflow
\ No newline at end of file
+eventmesh.workflow.name=EVENTMESH-workflow
+eventmesh.connector.dingtalkTemplateType=sampleText
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to