This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 6508df826 [ISSUE#4580] Connector extension supports spring environment
variables. (#4582)
6508df826 is described below
commit 6508df8262c53fe6e85929d031a3fe2f28f4fe03
Author: yanrongzhen <[email protected]>
AuthorDate: Tue Nov 28 14:28:17 2023 +0800
[ISSUE#4580] Connector extension supports spring environment variables.
(#4582)
* Connector extension supports spring environment variables.
* fix: cr
---
.../constants/ConnectRecordExtensionKeys.java | 4 +-
.../sink/connector/DingDingSinkConnector.java | 6 +--
.../sink/connector/DingDingSinkConnectorTest.java | 2 +-
.../source/connector/SpringSourceConnector.java | 49 +++++++++++++++++++++-
.../connector/SpringSourceConnectorTest.java | 16 +++++--
.../src/main/resources/application.properties | 3 +-
6 files changed, 69 insertions(+), 11 deletions(-)
diff --git
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/common/constants/ConnectRecordExtensionKeys.java
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/common/constants/ConnectRecordExtensionKeys.java
index 16efd5219..733de2c43 100644
---
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/common/constants/ConnectRecordExtensionKeys.java
+++
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/common/constants/ConnectRecordExtensionKeys.java
@@ -22,7 +22,7 @@ package
org.apache.eventmesh.connector.dingtalk.common.constants;
*/
public interface ConnectRecordExtensionKeys {
- String DINGDING_TEMPLATE_TYPE_KEY = "dingDingTemplateTypeKey";
+ String DINGTALK_TEMPLATE_TYPE_KEY = "dingtalktemplatetype";
- String DINGDING_MARKDOWN_MESSAGE_TITLE = "dingDingMarkdownMessageTitle";
+ String DINGTALK_MARKDOWN_MESSAGE_TITLE = "dingtalkmarkdownmessagetitle";
}
diff --git
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
index 592d1ad1b..864ac3ae6 100644
---
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java
@@ -126,7 +126,7 @@ public class DingDingSinkConnector implements Sink {
new OrgGroupSendHeaders();
orgGroupSendHeaders.xAcsDingtalkAccessToken = accessToken;
- String templateTypeKey =
record.getExtension(ConnectRecordExtensionKeys.DINGDING_TEMPLATE_TYPE_KEY);
+ String templateTypeKey =
record.getExtension(ConnectRecordExtensionKeys.DINGTALK_TEMPLATE_TYPE_KEY);
if (null == templateTypeKey || "null".equals(templateTypeKey))
{
templateTypeKey =
DingDingMessageTemplateType.PLAIN_TEXT.getTemplateKey();
}
@@ -136,10 +136,10 @@ public class DingDingSinkConnector implements Sink {
if (DingDingMessageTemplateType.PLAIN_TEXT == templateType) {
contentMap.put("content", new String((byte[])
record.getData()));
} else if (DingDingMessageTemplateType.MARKDOWN ==
templateType) {
- String title =
Optional.ofNullable(record.getExtension(ConnectRecordExtensionKeys.DINGDING_MARKDOWN_MESSAGE_TITLE))
+ String title =
Optional.ofNullable(record.getExtension(ConnectRecordExtensionKeys.DINGTALK_MARKDOWN_MESSAGE_TITLE))
.orElse("EventMesh-Message");
contentMap.put("title", title);
- contentMap.put("text", String.valueOf(record.getData()));
+ contentMap.put("text", new String((byte[])
record.getData()));
}
OrgGroupSendRequest orgGroupSendRequest =
diff --git
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/test/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/test/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnectorTest.java
index 81e6fecf6..d24c51ffa 100644
---
a/eventmesh-connectors/eventmesh-connector-dingtalk/src/test/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnectorTest.java
+++
b/eventmesh-connectors/eventmesh-connector-dingtalk/src/test/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnectorTest.java
@@ -94,7 +94,7 @@ public class DingDingSinkConnectorTest {
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(), "Hello,
EventMesh!".getBytes(StandardCharsets.UTF_8));
-
connectRecord.addExtension(ConnectRecordExtensionKeys.DINGDING_TEMPLATE_TYPE_KEY,
+
connectRecord.addExtension(ConnectRecordExtensionKeys.DINGTALK_TEMPLATE_TYPE_KEY,
DingDingMessageTemplateType.PLAIN_TEXT.getTemplateKey());
records.add(connectRecord);
}
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
index e57935d4f..5f4d5c89b 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
@@ -35,13 +35,25 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.springframework.beans.BeansException;
+import org.springframework.boot.env.OriginTrackedMapPropertySource;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.core.env.MutablePropertySources;
+import org.springframework.core.env.PropertySource;
+
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class SpringSourceConnector implements Source, MessageSendingOperations
{
+public class SpringSourceConnector implements Source,
MessageSendingOperations, ApplicationContextAware {
+
+ private static final String CONNECTOR_PROPERTY_PREFIX =
"eventmesh.connector.";
private static final int DEFAULT_BATCH_SIZE = 10;
+ private ApplicationContext applicationContext;
+
private SpringSourceConfig sourceConfig;
private BlockingQueue<ConnectRecord> queue;
@@ -116,6 +128,7 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord record = new ConnectRecord(partition, offset,
System.currentTimeMillis(), message);
+ addSpringEnvironmentPropertyExtensions(record);
queue.offer(record);
}
@@ -131,6 +144,40 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations {
RecordOffset offset = new RecordOffset();
ConnectRecord record = new ConnectRecord(partition, offset,
System.currentTimeMillis(), message);
record.addExtension(SourceWorker.CALLBACK_EXTENSION, workerCallback);
+ addSpringEnvironmentPropertyExtensions(record);
queue.offer(record);
}
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ private void addSpringEnvironmentPropertyExtensions(ConnectRecord
connectRecord) {
+ ConfigurableApplicationContext context =
(ConfigurableApplicationContext) applicationContext;
+ MutablePropertySources propertySources =
context.getEnvironment().getPropertySources();
+ for (PropertySource<?> propertySource : propertySources) {
+ if (!(propertySource instanceof OriginTrackedMapPropertySource)) {
+ continue;
+ }
+ OriginTrackedMapPropertySource originTrackedMapPropertySource =
+ (OriginTrackedMapPropertySource) propertySource;
+ String[] keys = originTrackedMapPropertySource.getPropertyNames();
+ for (String key : keys) {
+ if (!key.startsWith(CONNECTOR_PROPERTY_PREFIX)) {
+ continue;
+ }
+ Object value = null;
+ try {
+ value = originTrackedMapPropertySource.getProperty(key);
+ if (value != null) {
+
connectRecord.addExtension(key.replaceAll(CONNECTOR_PROPERTY_PREFIX,
"").toLowerCase(),
+ String.valueOf(value));
+ }
+ } catch (Throwable e) {
+ log.error("Put spring environment property to extension
failed, key=[{}], value=[{}]", key, value);
+ }
+ }
+ }
+ }
}
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/test/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-spring/src/test/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnectorTest.java
index 35e4accbc..65c71aea0 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/test/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnectorTest.java
+++
b/eventmesh-connectors/eventmesh-connector-spring/src/test/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnectorTest.java
@@ -17,6 +17,8 @@
package org.apache.eventmesh.connector.spring.source.connector;
+import static org.mockito.Mockito.doReturn;
+
import org.apache.eventmesh.connector.spring.source.config.SpringSourceConfig;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
@@ -25,17 +27,25 @@ import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Spy;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.core.env.MutablePropertySources;
@ExtendWith(MockitoExtension.class)
public class SpringSourceConnectorTest {
- @Spy
private SpringSourceConnector connector;
@Test
public void testSpringSourceConnector() throws Exception {
+ ConfigurableApplicationContext context =
Mockito.mock(ConfigurableApplicationContext.class);
+ ConfigurableEnvironment environment =
Mockito.mock(ConfigurableEnvironment.class);
+ doReturn(new
MutablePropertySources()).when(environment).getPropertySources();
+ doReturn(environment).when(context).getEnvironment();
+ connector = new SpringSourceConnector();
+ connector.setApplicationContext(context);
SpringSourceConfig sourceConfig = new SpringSourceConfig();
connector.init(sourceConfig);
connector.start();
@@ -51,7 +61,7 @@ public class SpringSourceConnectorTest {
}
}
- private void writeMockedRecords(int count, String message) throws
Exception {
+ private void writeMockedRecords(int count, String message) {
for (int i = 0; i < count; i++) {
connector.send(message + i);
}
diff --git a/eventmesh-examples/src/main/resources/application.properties
b/eventmesh-examples/src/main/resources/application.properties
index 259e49e89..6c7b727d2 100644
--- a/eventmesh-examples/src/main/resources/application.properties
+++ b/eventmesh-examples/src/main/resources/application.properties
@@ -23,4 +23,5 @@ eventmesh.grpc.port=10205
eventmesh.selector.type=nacos
eventmesh.selector.nacos.address=127.0.0.1:8848
eventmesh.catalog.name=EVENTMESH-catalog
-eventmesh.workflow.name=EVENTMESH-workflow
\ No newline at end of file
+eventmesh.workflow.name=EVENTMESH-workflow
+eventmesh.connector.dingtalkTemplateType=sampleText
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]