This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 1e533d1df [ISSUE #4598] Improve eventmesh-connector-lark (#4599)
1e533d1df is described below
commit 1e533d1dfa36e6a567082a6f4e824072bab10651
Author: hhuang <[email protected]>
AuthorDate: Sun Dec 10 12:38:50 2023 +0800
[ISSUE #4598] Improve eventmesh-connector-lark (#4599)
* refactor : v1
* feat : v2
* feat : v3
* style : improve naming
* style : improve naming
* style :
* fix : javadoc
* docs : split into README.md and README_CN.md
* fix : token will expire
* feat : sync and async
* fix :
* fix :
* fix : add verify in regularSink method
---
.../org/apache/eventmesh/common/Constants.java | 11 -
eventmesh-connectors/README.md | 4 +-
.../feishu/sink/connector/FeishuSinkConnector.java | 149 ---------
.../s3/source/FeishuSinkConnectorTest.java | 110 -------
.../eventmesh-connector-lark/README.md | 35 +++
.../eventmesh-connector-lark/README_CN.md | 35 +++
.../build.gradle | 8 +-
.../gradle.properties | 2 +-
.../lark/ConnectRecordExtensionKeys.java} | 30 +-
.../lark/config/LarkConnectServerConfig.java} | 17 +-
.../lark/config/LarkMessageTemplateType.java} | 27 +-
.../connector/lark/server/LarkConnectServer.java} | 21 +-
.../connector/lark/sink/ImServiceHandler.java | 346 +++++++++++++++++++++
.../lark/sink/config/LarkSinkConfig.java} | 6 +-
.../lark/sink/config/SinkConnectorConfig.java | 81 +++++
.../lark/sink/connector/LarkSinkConnector.java | 151 +++++++++
.../src/main/resources/server-config.yml} | 4 +-
.../src/main/resources/sink-config.yml | 21 +-
.../connector/lark/sink/ImServiceHandlerTest.java | 163 ++++++++++
.../connector/lark/sink/LarkSinkConnectorTest.java | 105 +++++++
.../src/test/resources/server-config.yml} | 4 +-
.../src/test}/resources/sink-config.yml | 21 +-
.../src/main/resources/application.properties | 5 +-
.../openconnect/api/connector/Connector.java | 4 +
settings.gradle | 2 +-
25 files changed, 1023 insertions(+), 339 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
index f60ba0f26..867d50b43 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
@@ -205,15 +205,4 @@ public class Constants {
public static final String OS_WIN_PREFIX = "win";
public static final String DEFAULT = "default";
-
- public static final String FEISHU_SEND_MESSAGE_API =
"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=";
-
- public static final String FEISHU_RECEIVE_ID = "receive_id";
-
- public static final String FEISHU_MSG_TYPE = "msg_type";
-
- public static final String FEISHU_CONTENT = "content";
-
- public static final String FEISHU_UUID = "uuid";
-
}
diff --git a/eventmesh-connectors/README.md b/eventmesh-connectors/README.md
index 1fc836f3a..ccd1e7dd0 100644
--- a/eventmesh-connectors/README.md
+++ b/eventmesh-connectors/README.md
@@ -30,8 +30,8 @@ Add a new connector by implementing the source/sink interface
using :
| [Dingtalk](eventmesh-connector-dingtalk) | Sink | ✅ |
| Email | Source | ⬜ |
| Email | Sink | ⬜ |
-| FeiShu | Source | ⬜ |
-| [FeiShu](eventmesh-connector-feishu) | Sink | ✅ |
+| Lark | Source | ⬜ |
+| [Lark](eventmesh-connector-lark) | Sink | ✅ |
| [File](eventmesh-connector-file) | Source | ✅ |
| [File](eventmesh-connector-file) | Sink | ✅ |
| Github | Source | ⬜ |
diff --git
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/sink/connector/FeishuSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/sink/connector/FeishuSinkConnector.java
deleted file mode 100644
index ea4032ebc..000000000
---
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/sink/connector/FeishuSinkConnector.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.connector.feishu.sink.connector;
-
-import static org.apache.eventmesh.common.Constants.FEISHU_CONTENT;
-import static org.apache.eventmesh.common.Constants.FEISHU_MSG_TYPE;
-import static org.apache.eventmesh.common.Constants.FEISHU_RECEIVE_ID;
-import static org.apache.eventmesh.common.Constants.FEISHU_SEND_MESSAGE_API;
-import static org.apache.eventmesh.common.Constants.FEISHU_UUID;
-
-import org.apache.eventmesh.connector.feishu.sink.config.FeishuSinkConfig;
-import org.apache.eventmesh.connector.feishu.sink.config.SinkConnectorConfig;
-import org.apache.eventmesh.openconnect.api.config.Config;
-import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
-import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
-import org.apache.eventmesh.openconnect.api.sink.Sink;
-import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.github.rholder.retry.Attempt;
-import com.github.rholder.retry.RetryListener;
-import com.github.rholder.retry.Retryer;
-import com.github.rholder.retry.RetryerBuilder;
-import com.github.rholder.retry.StopStrategies;
-import com.github.rholder.retry.WaitStrategies;
-import com.lark.oapi.Client;
-import com.lark.oapi.core.response.RawResponse;
-import com.lark.oapi.core.token.AccessTokenType;
-import com.lark.oapi.service.im.v1.enums.MsgTypeEnum;
-import com.lark.oapi.service.im.v1.model.ext.MessageText;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class FeishuSinkConnector implements Sink {
-
- private FeishuSinkConfig sinkConfig;
-
- private Client feishuClient;
-
- private static final int MAX_RETRY_TIME = 3;
-
- private static final int FIXED_WAIT_SECOND = 1;
-
- private final Retryer<Boolean> retryer =
RetryerBuilder.<Boolean>newBuilder().retryIfException().retryIfResult(res ->
!res)
- .withWaitStrategy(WaitStrategies.fixedWait(FIXED_WAIT_SECOND,
TimeUnit.SECONDS))
-
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIME)).withRetryListener(new
RetryListener() {
-
- @Override
- public <V> void onRetry(Attempt<V> attempt) {
- long times = attempt.getAttemptNumber();
- log.warn("retry invoke http,times={}", times);
- }
- }).build();
-
- @Override
- public Class<? extends Config> configClass() {
- return FeishuSinkConfig.class;
- }
-
- @Override
- public void init(Config config) throws Exception {
- // init config for feishu sink connector
- this.sinkConfig = (FeishuSinkConfig) config;
- doInit();
- }
-
- @Override
- public void init(ConnectorContext connectorContext) throws Exception {
- // init config for feishu sink connector
- SinkConnectorContext sinkConnectorContext = (SinkConnectorContext)
connectorContext;
- this.sinkConfig = (FeishuSinkConfig)
sinkConnectorContext.getSinkConfig();
- doInit();
- }
-
- private void doInit() {
- this.feishuClient = Client.newBuilder(this.getConfig().getAppId(),
this.getConfig().getAppSecret()).requestTimeout(3, TimeUnit.SECONDS)
- .build();
- }
-
- @Override
- public void start() {
- }
-
- @Override
- public void commit(ConnectRecord record) {
-
- }
-
- @Override
- public String name() {
- return this.sinkConfig.getConnectorConfig().getConnectorName();
- }
-
- @Override
- public void stop() {
- }
-
- @Override
- public void put(List<ConnectRecord> sinkRecords) {
- SinkConnectorConfig connectorConfig = getConfig();
- try {
- for (ConnectRecord connectRecord : sinkRecords) {
- AtomicReference<RawResponse> response = new
AtomicReference<>();
- retryer.call(() -> {
- Map<String, Object> body = new HashMap<>();
- body.put(FEISHU_RECEIVE_ID,
connectorConfig.getReceiveId());
- body.put(FEISHU_CONTENT,
MessageText.newBuilder().text(connectRecord.getData().toString()).build());
- body.put(FEISHU_MSG_TYPE,
MsgTypeEnum.MSG_TYPE_TEXT.getValue());
- body.put(FEISHU_UUID, UUID.randomUUID().toString());
- response.set(feishuClient.post(FEISHU_SEND_MESSAGE_API +
connectorConfig.getReceiveIdType(), body, AccessTokenType.Tenant));
- if (response.get().getStatusCode() != 200) {
- log.error("request feishu open api err{}", new
String(response.get().getBody()));
- return false;
- }
- return true;
- });
- }
- } catch (Exception e) {
- log.error("failed to put message to feishu", e);
- }
- }
-
- public SinkConnectorConfig getConfig() {
- return this.sinkConfig.getConnectorConfig();
- }
-
-}
diff --git
a/eventmesh-connectors/eventmesh-connector-feishu/src/test/java/org/apache/eventmesh/connector/s3/source/FeishuSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-feishu/src/test/java/org/apache/eventmesh/connector/s3/source/FeishuSinkConnectorTest.java
deleted file mode 100644
index 40f6db500..000000000
---
a/eventmesh-connectors/eventmesh-connector-feishu/src/test/java/org/apache/eventmesh/connector/s3/source/FeishuSinkConnectorTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.connector.s3.source;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import org.apache.eventmesh.connector.feishu.sink.config.FeishuSinkConfig;
-import org.apache.eventmesh.connector.feishu.sink.config.SinkConnectorConfig;
-import
org.apache.eventmesh.connector.feishu.sink.connector.FeishuSinkConnector;
-import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.platform.commons.support.HierarchyTraversalMode;
-import org.junit.platform.commons.support.ReflectionSupport;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import com.lark.oapi.Client;
-import com.lark.oapi.core.response.RawResponse;
-
-@ExtendWith(MockitoExtension.class)
-public class FeishuSinkConnectorTest {
-
- private static final FeishuSinkConfig sinkConfig;
-
- private static final SinkConnectorConfig SINK_CONNECTOR_CONFIG;
-
- static {
- sinkConfig = new FeishuSinkConfig();
- SINK_CONNECTOR_CONFIG = new SinkConnectorConfig();
- SINK_CONNECTOR_CONFIG.setConnectorName("FeishuSinkConnector");
- SINK_CONNECTOR_CONFIG.setAppId("xxx");
- SINK_CONNECTOR_CONFIG.setAppSecret("xxx");
- SINK_CONNECTOR_CONFIG.setReceiveId("xxx");
- SINK_CONNECTOR_CONFIG.setReceiveIdType("open_id");
- sinkConfig.setConnectorConfig(SINK_CONNECTOR_CONFIG);
- }
-
- private FeishuSinkConnector feishuSinkConnector;
-
- @Mock
- private Client feishuClient;
-
- @BeforeEach
- public void setup() throws Exception {
- feishuSinkConnector = new FeishuSinkConnector();
- RawResponse response = new RawResponse();
- response.setStatusCode(200);
- Mockito.doReturn(response).when(feishuClient).post(Mockito.any(),
Mockito.any(), Mockito.any());
-
- Field feishuClientField =
ReflectionSupport.findFields(feishuSinkConnector.getClass(),
- (f) -> f.getName().equals("feishuClient"),
- HierarchyTraversalMode.BOTTOM_UP).get(0);
-
- feishuClientField.setAccessible(true);
- feishuClientField.set(feishuSinkConnector, feishuClient);
-
- Field sinkConfigField =
ReflectionSupport.findFields(feishuSinkConnector.getClass(),
- (f) -> f.getName().equals("sinkConfig"),
- HierarchyTraversalMode.BOTTOM_UP).get(0);
-
- sinkConfigField.setAccessible(true);
- sinkConfigField.set(feishuSinkConnector, sinkConfig);
- }
-
- @Test
- public void testFeishuSinkConnector() throws Exception {
- feishuSinkConnector.start();
- final int times = 3;
- List<ConnectRecord> connectRecords = new ArrayList<>();
- for (int i = 0; i < times; i++) {
- connectRecords.add(new ConnectRecord(null, null, 0L, "test"));
- }
- feishuSinkConnector.put(connectRecords);
-
- verify(feishuClient, times(times)).post(any(), any(), any());
-
- }
-
- @AfterEach
- public void tearDown() {
- feishuSinkConnector.stop();
- }
-
-}
diff --git a/eventmesh-connectors/eventmesh-connector-lark/README.md
b/eventmesh-connectors/eventmesh-connector-lark/README.md
new file mode 100644
index 000000000..39c76ae11
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-lark/README.md
@@ -0,0 +1,35 @@
+# eventmesh-connector-lark
+
+## Lark Sink Server Config And Start
+
+Before using eventmesh-connector-lark to sink events, you need to configure
the server.
+- Please customize `sinkEnable``=`true`/`false` in
`/resource/server-config.yml` to turn on/off the sink function.
+- Regarding `/resource/sink-config.yml`, only the configuration under
`sinkConnectorConfig` is explained here:
+ - `connectorName`, specify the connector name
+ - (required) `appId`, the appId obtained from lark
+ - (required) `appSecret`, the appSecret obtained from lark
+ - `receiveIdType`, the type of receiving Id, the default and recommended
use is `open_id`. Optional open_id/user_id/union_id/email/chat_id.
+ - (Required) `receiveId`, receive Id, needs to correspond to
`receiveIdType`.
+ - `sinkAsync`, whether to asynchronously sink events
+ - `maxRetryTimes`, the maximum number of retransmissions when the sink
event fails. The default is 3 times.
+ - `retryDelayInMills`, when the sink event fails, the time interval for
retransmitting the event. Default is 1s, unit is milliseconds.
+
+
+## Sink CloudEvent To Lark
+
+When using the eventmesh-connector-lark sinking event, you need to add the
corresponding extension filed in CloudEvent:
+- When key=`templatetype4lark`, value=`text`/`markdown`, indicating the text
type of the event
+- When the text type is markdown, you can add extension:
key=`markdownmessagetitle4lark`, value indicates the title of the event.
+- When key=`atusers4lark`, value=`id-0,name-0;id-1,name-1`, indicating that
the event requires `@`certain users
+ - It is recommended to use **open_id** for id.
+ - When the text is of text type, the id can be
**open_id/union_id/user_id**; when the text is of markdown type, the id can be
**open_id/user_id**. In particular, when the application type is [custom
robot](https://open.larksuite.com/document/ukTMukTMukTM/ucTM5YjL3ETO24yNxkjN)
and the text is of markdown type, only the use of **open_id** to `@` the user
is supported.
+ - When the text is of text type and the id is invalid, name will be used
instead for display; when the text is of markdown type and the id is invalid,
an exception will be thrown directly (you should try to ensure the correctness
of the id, and name can be considered omitted).
+- When key=`atall4lark`, value=`true`/`false`, indicating that the event
requires `@` everyone.
+
+
+## Lark Open Platform API
+
+For the Lark open platform API involved in this module, please click the
following link:
+- **Send Message**, please [view
here](https://open.larksuite.com/document/server-docs/im-v1/message/create?appId=cli_a5e1bc31507ed00c)
+- **text**, please [view
here](https://open.larksuite.com/document/server-docs/im-v1/message-content-description/create_json#c9e08671)
+- **markdown**, please [view
here](https://open.larksuite.com/document/common-capabilities/message-card/message-cards-content/using-markdown-tags)
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-lark/README_CN.md
b/eventmesh-connectors/eventmesh-connector-lark/README_CN.md
new file mode 100644
index 000000000..f20aa94d0
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-lark/README_CN.md
@@ -0,0 +1,35 @@
+# eventmesh-connector-lark
+
+## Lark Sink Server的配置与启动
+
+使用eventmesh-connector-lark下沉事件之前,需要进行server的配置。
+- 请在`/resource/server-config.yml`中自定义`sinkEnable``=`true`/`false`以开启/关闭sink功能。
+- 关于`/resource/sink-config.yml`,在此仅说明`sinkConnectorConfig`下的配置:
+ - `connectorName`, 指定connector名称
+ - (必需)`appId`, lark中获取的appId
+ - (必需)`appSecret`, lark中获取的appSecret
+ - `receiveIdType`,
接收Id的类型,默认且推荐使用`open_id`。可选open_id/user_id/union_id/email/chat_id。
+ - (必需)`receiveId`, 接收Id,需要和`receiveIdType`对应。
+ - `sinkAsync`, 是否异步下沉事件
+ - `maxRetryTimes`, sink事件失败时,最大重传的次数。默认3次。
+ - `retryDelayInMills`, sink事件失败时,重传事件的时间间隔。默认1s,单位为毫秒。
+
+
+## 可下沉飞书的CLoudEvent
+
+使用eventmesh-connector-lark下沉事件时,需要在CloudEvent中添加对应的extension filed:
+- 当key=`templatetype4lark`时,value=`text`/`markdown`,表明该事件的文本类型
+- 当文本类型为markdown时,可以添加extension:key=`markdownmessagetitle4lark`,value表明该事件的标题。
+- 当key=`atusers4lark`时,value=`id-0,name-0;id-1,name-1`,表明该事件需要`@`某些用户
+ - id推荐使用**open_id**。
+ -
当文本属于text类型时,id可以是**open_id/union_id/user_id**;当文本属于markdown类型时,id可以是**open_id/user_id**。特别地,当应用类型为[自定义机器人](https://open.feishu.cn/document/ukTMukTMukTM/ucTM5YjL3ETO24yNxkjN)且文本属于markdown类型,则仅支持使用**open_id**来`@`用户。
+ -
当文本属于text类型且id无效时,将利用name代替展示;当文本属于markdown类型时且id无效时,直接抛出异常(您应该尽量保证id的正确性,而name则可以考虑省略)。
+- 当key=`atall4lark`时,value=`true`/`false`,表明该事件需要`@`所有人。
+
+
+## 飞书开放平台API
+
+有关该模块涉及到的飞书开放平台API,请点击以下链接:
+-
**发送消息**,请[查看这里](https://open.feishu.cn/document/server-docs/im-v1/message/create?appId=cli_a5e1bc31507ed00c)
+-
**text**,请[查看这里](https://open.feishu.cn/document/server-docs/im-v1/message-content-description/create_json#c9e08671)
+-
**markdown**,请[查看这里](https://open.feishu.cn/document/common-capabilities/message-card/message-cards-content/using-markdown-tags)
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-feishu/build.gradle
b/eventmesh-connectors/eventmesh-connector-lark/build.gradle
similarity index 89%
rename from eventmesh-connectors/eventmesh-connector-feishu/build.gradle
rename to eventmesh-connectors/eventmesh-connector-lark/build.gradle
index b5bd61258..7cdd35b05 100644
--- a/eventmesh-connectors/eventmesh-connector-feishu/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-lark/build.gradle
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-List feishu = [
- "com.larksuite.oapi:oapi-sdk:$feishu_version",
+List lark = [
+ "com.larksuite.oapi:oapi-sdk:$lark_version",
"com.github.rholder:guava-retrying:$guava_retrying_version",
"org.apache.httpcomponents:httpclient",
project(":eventmesh-common")
@@ -24,10 +24,10 @@ List feishu = [
dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
- implementation feishu
+ implementation lark
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
- testImplementation "org.mockito:mockito-core"
+ testImplementation "org.mockito:mockito-inline"
testImplementation "org.mockito:mockito-junit-jupiter"
}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-feishu/gradle.properties
b/eventmesh-connectors/eventmesh-connector-lark/gradle.properties
similarity index 94%
copy from eventmesh-connectors/eventmesh-connector-feishu/gradle.properties
copy to eventmesh-connectors/eventmesh-connector-lark/gradle.properties
index d2238bfc9..43dc7b297 100644
--- a/eventmesh-connectors/eventmesh-connector-feishu/gradle.properties
+++ b/eventmesh-connectors/eventmesh-connector-lark/gradle.properties
@@ -14,5 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-feishu_version=2.0.28
+lark_version=2.0.28
guava_retrying_version=2.0.0
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/server/FeishuConnectServer.java
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/ConnectRecordExtensionKeys.java
similarity index 53%
copy from
eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/server/FeishuConnectServer.java
copy to
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/ConnectRecordExtensionKeys.java
index 996ccdd31..42274199e 100644
---
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/server/FeishuConnectServer.java
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/ConnectRecordExtensionKeys.java
@@ -15,19 +15,29 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.feishu.server;
+package org.apache.eventmesh.connector.lark;
-import
org.apache.eventmesh.connector.feishu.sink.connector.FeishuSinkConnector;
-import org.apache.eventmesh.openconnect.Application;
+/**
+ * Constants of record extension key.
+ */
+public interface ConnectRecordExtensionKeys {
-import lombok.extern.slf4j.Slf4j;
+ /**
+ * {@code text} or {@code markdown}, otherwise use {@code text} to replace.
+ */
+ String TEMPLATE_TYPE_4_LARK = "templatetype4lark";
-@Slf4j
-public class FeishuConnectServer {
+ /**
+ * The value format is {@code id,name;id,name;}.Recommend to use {@code
open_id} as {@code id}.
+ * <p>
+ * To prevent bad situations, you should ensure that the {@code id} is
valid
+ */
+ String AT_USERS_4_LARK = "atusers4lark";
- public static void main(String[] args) throws Exception {
+ /**
+ * true or false
+ */
+ String AT_ALL_4_LARK = "atall4lark";
- Application feishuSinkApp = new Application();
- feishuSinkApp.run(FeishuSinkConnector.class);
- }
+ String MARKDOWN_MESSAGE_TITLE_4_LARK = "markdownmessagetitle4lark";
}
diff --git
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/sink/config/SinkConnectorConfig.java
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/config/LarkConnectServerConfig.java
similarity index 75%
rename from
eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/sink/config/SinkConnectorConfig.java
rename to
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/config/LarkConnectServerConfig.java
index 8bb76533a..6d0428056 100644
---
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/sink/config/SinkConnectorConfig.java
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/config/LarkConnectServerConfig.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.feishu.sink.config;
+package org.apache.eventmesh.connector.lark.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
import lombok.Data;
+import lombok.EqualsAndHashCode;
@Data
-public class SinkConnectorConfig {
-
- private String connectorName;
-
- private String receiveId;
-
- private String appId;
+@EqualsAndHashCode(callSuper = true)
+public class LarkConnectServerConfig extends Config {
- private String appSecret;
+ private boolean sinkEnable;
- private String receiveIdType;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/server/FeishuConnectServer.java
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/config/LarkMessageTemplateType.java
similarity index 56%
copy from
eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/server/FeishuConnectServer.java
copy to
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/config/LarkMessageTemplateType.java
index 996ccdd31..e99afcaa6 100644
---
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/server/FeishuConnectServer.java
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/config/LarkMessageTemplateType.java
@@ -15,19 +15,28 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.feishu.server;
+package org.apache.eventmesh.connector.lark.config;
-import
org.apache.eventmesh.connector.feishu.sink.connector.FeishuSinkConnector;
-import org.apache.eventmesh.openconnect.Application;
+import java.util.Arrays;
-import lombok.extern.slf4j.Slf4j;
+import lombok.Getter;
-@Slf4j
-public class FeishuConnectServer {
+@Getter
+public enum LarkMessageTemplateType {
- public static void main(String[] args) throws Exception {
+ PLAIN_TEXT("text"),
+ MARKDOWN("markdown");
- Application feishuSinkApp = new Application();
- feishuSinkApp.run(FeishuSinkConnector.class);
+ private final String templateKey;
+
+ LarkMessageTemplateType(String templateKey) {
+ this.templateKey = templateKey;
+ }
+
+ public static LarkMessageTemplateType of(String templateKey) {
+ return Arrays.stream(values())
+ .filter(v -> v.getTemplateKey().equals(templateKey))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("TemplateKey: " +
templateKey + " not found."));
}
}
diff --git
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/server/FeishuConnectServer.java
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/server/LarkConnectServer.java
similarity index 57%
rename from
eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/server/FeishuConnectServer.java
rename to
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/server/LarkConnectServer.java
index 996ccdd31..656beb8cc 100644
---
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/server/FeishuConnectServer.java
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/server/LarkConnectServer.java
@@ -15,19 +15,24 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.feishu.server;
+package org.apache.eventmesh.connector.lark.server;
-import
org.apache.eventmesh.connector.feishu.sink.connector.FeishuSinkConnector;
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.connector.lark.config.LarkConnectServerConfig;
+import org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector;
import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class FeishuConnectServer {
+public class LarkConnectServer {
public static void main(String[] args) throws Exception {
- Application feishuSinkApp = new Application();
- feishuSinkApp.run(FeishuSinkConnector.class);
+ LarkConnectServerConfig larkConnectServerConfig =
ConfigUtil.parse(LarkConnectServerConfig.class,
+ Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
+
+ if (larkConnectServerConfig.isSinkEnable()) {
+ Application application = new Application();
+ application.run(LarkSinkConnector.class);
+ }
}
}
diff --git
a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java
new file mode 100644
index 000000000..f97452e67
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.lark.sink;
+
+import static
org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector.getTenantAccessToken;
+
+import org.apache.eventmesh.connector.lark.ConnectRecordExtensionKeys;
+import org.apache.eventmesh.connector.lark.config.LarkMessageTemplateType;
+import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.commons.text.StringEscapeUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.lark.oapi.Client;
+import com.lark.oapi.card.enums.MessageCardHeaderTemplateEnum;
+import com.lark.oapi.card.model.MessageCard;
+import com.lark.oapi.card.model.MessageCardConfig;
+import com.lark.oapi.card.model.MessageCardElement;
+import com.lark.oapi.card.model.MessageCardHeader;
+import com.lark.oapi.card.model.MessageCardMarkdown;
+import com.lark.oapi.card.model.MessageCardPlainText;
+import com.lark.oapi.core.httpclient.OkHttpTransport;
+import com.lark.oapi.core.request.RequestOptions;
+import com.lark.oapi.core.utils.Lists;
+import com.lark.oapi.okhttp.OkHttpClient;
+import com.lark.oapi.service.im.v1.ImService;
+import com.lark.oapi.service.im.v1.enums.MsgTypeEnum;
+import com.lark.oapi.service.im.v1.model.CreateMessageReq;
+import com.lark.oapi.service.im.v1.model.CreateMessageReqBody;
+import com.lark.oapi.service.im.v1.model.CreateMessageResp;
+import com.lark.oapi.service.im.v1.model.ext.MessageText;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ImServiceHandler {
+
+ private SinkConnectorConfig sinkConnectorConfig;
+
+ private ImService imService;
+
+ private Retryer<ConnectRecord> retryer;
+
+ private ExecutorService sinkAsyncWorker;
+ private ExecutorService cleanerWorker;
+ private ScheduledExecutorService retryWorker;
+
+ private static final LongAdder redoSinkNum = new LongAdder();
+
+ public ImServiceHandler() {
+ }
+
+ public static ImServiceHandler create(SinkConnectorConfig
sinkConnectorConfig) {
+ ImServiceHandler imServiceHandler = new ImServiceHandler();
+ imServiceHandler.sinkConnectorConfig = sinkConnectorConfig;
+ imServiceHandler.imService =
Client.newBuilder(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret())
+ .httpTransport(new OkHttpTransport(new
OkHttpClient().newBuilder()
+ .callTimeout(3L, TimeUnit.SECONDS)
+ .build())
+ )
+ .disableTokenCache()
+ .requestTimeout(3, TimeUnit.SECONDS)
+ .build()
+ .im();
+
+ long fixedWait =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ int availableProcessors =
Runtime.getRuntime().availableProcessors();
+ imServiceHandler.sinkAsyncWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-sinkAsyncWorker");
+ return thread;
+ });
+
+ imServiceHandler.cleanerWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-cleanerWorker");
+ return thread;
+ });
+
+ imServiceHandler.retryWorker =
Executors.newScheduledThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-retryWorker");
+ return thread;
+ });
+ } else {
+ imServiceHandler.retryer =
RetryerBuilder.<ConnectRecord>newBuilder()
+ .retryIfException()
+ .retryIfResult(Objects::nonNull)
+ .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
+
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
+ .withRetryListener(new RetryListener() {
+ @SneakyThrows
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+
+ long times = attempt.getAttemptNumber();
+ if (times > 1) {
+ redoSinkNum.increment();
+ log.info("Total redo sink task num : [{}]",
redoSinkNum.sum());
+ log.warn("Retry sink event to lark |
times=[{}]", attempt.getAttemptNumber() - 1);
+ }
+ }
+ })
+ .build();
+ }
+
+ return imServiceHandler;
+ }
+
+ public void sink(ConnectRecord connectRecord) throws ExecutionException,
RetryException {
+ Map<String, List<String>> headers = new HashMap<>();
+ headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
+
+ RequestOptions requestOptions = RequestOptions.newBuilder()
+
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
+ .headers(headers)
+ .build();
+
+ retryer.call(() -> {
+ CreateMessageReq createMessageReq =
convertCreateMessageReq(connectRecord);
+ CreateMessageResp resp =
imService.message().create(createMessageReq, requestOptions);
+ if (resp.getCode() != 0) {
+ log.warn("Sinking event to lark failure | code:[{}] | msg:[{}]
| err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
+ return connectRecord;
+ }
+ return null;
+ });
+ }
+
+ public void sinkAsync(ConnectRecord connectRecord) {
+ Map<String, List<String>> headers = new HashMap<>();
+ headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
+
+ RequestOptions requestOptions = RequestOptions.newBuilder()
+
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
+ .headers(headers)
+ .build();
+
+ CreateMessageReq createMessageReq =
convertCreateMessageReq(connectRecord);
+
+ long fixedWait =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
+ LongAdder cnt = new LongAdder();
+ AtomicBoolean isAck = new AtomicBoolean(false);
+ Runnable task = () -> CompletableFuture
+ .supplyAsync(() -> {
+ try {
+ cnt.increment();
+ return imService.message().create(createMessageReq,
requestOptions);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, sinkAsyncWorker)
+ .whenCompleteAsync((resp, e) -> {
+ if (cnt.sum() > 1) {
+ redoSinkNum.increment();
+ log.info("Total redo sink task num : [{}]",
redoSinkNum.sum());
+ log.warn("Retry sink event to lark | times=[{}]",
cnt.sum() - 1);
+ }
+ if (Objects.nonNull(e)) {
+ log.error("eventmesh-connector-lark internal
exception.", e);
+ return;
+ }
+ if (resp.getCode() != 0) {
+ log.warn("Sinking event to lark failure | code:[{}] |
msg:[{}] | err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
+ return;
+ }
+ isAck.set(true);
+ });
+
+ ScheduledFuture<?> future = retryWorker.scheduleAtFixedRate(task, 0L,
fixedWait, TimeUnit.MILLISECONDS);
+ cleanerWorker.submit(() -> {
+ while (true) {
+ // complete task
+ if (isAck.get() || cnt.sum() >= maxRetryTimes) {
+ future.cancel(true);
+ return;
+ }
+ }
+ });
+ }
+
+ private CreateMessageReq convertCreateMessageReq(ConnectRecord
connectRecord) {
+ CreateMessageReqBody.Builder bodyBuilder =
CreateMessageReqBody.newBuilder()
+ .receiveId(sinkConnectorConfig.getReceiveId())
+ .uuid(UUID.randomUUID().toString());
+
+ String templateTypeKey =
connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK);
+ if (null == templateTypeKey || "null".equals(templateTypeKey)) {
+ templateTypeKey =
LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey();
+ }
+ LarkMessageTemplateType templateType =
LarkMessageTemplateType.of(templateTypeKey);
+ if (LarkMessageTemplateType.PLAIN_TEXT == templateType) {
+ bodyBuilder.content(createTextContent(connectRecord))
+ .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
+ } else if (LarkMessageTemplateType.MARKDOWN == templateType) {
+ String title =
Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK))
+ .orElse("EventMesh-Message");
+ bodyBuilder.content(createInteractiveContent(connectRecord, title))
+ .msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());
+ }
+
+ return CreateMessageReq.newBuilder()
+ .receiveIdType(sinkConnectorConfig.getReceiveIdType())
+ .createMessageReqBody(bodyBuilder.build())
+ .build();
+ }
+
+ private String createTextContent(ConnectRecord connectRecord) {
+ MessageText.Builder msgBuilder = MessageText.newBuilder();
+
+ if (needAtAll(connectRecord)) {
+ msgBuilder.atAll();
+ }
+ String atUsers = needAtUser(connectRecord);
+ if (!atUsers.isEmpty()) {
+ String[] users = atUsers.split(";");
+
+ for (String user : users) {
+ String[] kv = user.split(",");
+ msgBuilder.atUser(kv[0], kv[1]);
+ }
+ }
+
+ String escapedString = StringEscapeUtils.escapeJava(new
String((byte[]) connectRecord.getData()));
+ return msgBuilder.text(escapedString).build();
+ }
+
+ private String createInteractiveContent(ConnectRecord connectRecord,
String title) {
+ StringBuilder sb = new StringBuilder();
+ if (needAtAll(connectRecord)) {
+ atAll(sb);
+ }
+ String atUsers = needAtUser(connectRecord);
+ if (!atUsers.isEmpty()) {
+ String[] users = atUsers.split(";");
+
+ for (String user : users) {
+ String[] kv = user.split(",");
+ atUser(sb, kv[0]);
+ }
+ }
+ sb.append(new String((byte[]) connectRecord.getData()));
+
+ MessageCardConfig config = MessageCardConfig.newBuilder()
+ .enableForward(true)
+ .wideScreenMode(true)
+ .updateMulti(true)
+ .build();
+
+ // header
+ MessageCardHeader header = MessageCardHeader.newBuilder()
+ .template(MessageCardHeaderTemplateEnum.BLUE)
+ .title(MessageCardPlainText.newBuilder()
+ .content(title)
+ .build())
+ .build();
+
+ MessageCard content = MessageCard.newBuilder()
+ .config(config)
+ .header(header)
+ .elements(new MessageCardElement[]{
+
MessageCardMarkdown.newBuilder().content(sb.toString()).build()
+ })
+ .build();
+
+ return content.String();
+ }
+
+ private boolean needAtAll(ConnectRecord connectRecord) {
+ String atAll =
connectRecord.getExtension(ConnectRecordExtensionKeys.AT_ALL_4_LARK);
+ return null != atAll && !"null".equals(atAll) &&
Boolean.parseBoolean(atAll);
+ }
+
+ private String needAtUser(ConnectRecord connectRecord) {
+ String atUsers =
connectRecord.getExtension(ConnectRecordExtensionKeys.AT_USERS_4_LARK);
+ return null != atUsers && !"null".equals(atUsers) ? atUsers : "";
+ }
+
+ /**
+ * For markdown template type.
+ *
+ * @param sb StringBuilder
+ */
+ private void atAll(StringBuilder sb) {
+ sb.append("<at id=all>")
+ .append("</at>");
+ }
+
+ /**
+ * For markdown template type
+ *
+ * @param sb StringBuilder
+ * @param userId open_id/union_id/user_id, recommend to use open_id.
Custom robots can only be used open_id,
+ */
+ private void atUser(StringBuilder sb, String userId) {
+ sb.append("<at id=")
+ .append(userId)
+ .append(">")
+ .append("</at>");
+ }
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/sink/config/FeishuSinkConfig.java
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/LarkSinkConfig.java
similarity index 85%
rename from
eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/sink/config/FeishuSinkConfig.java
rename to
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/LarkSinkConfig.java
index 0fcc3d42c..a97ece91b 100644
---
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/java/org/apache/eventmesh/connector/feishu/sink/config/FeishuSinkConfig.java
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/LarkSinkConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.feishu.sink.config;
+package org.apache.eventmesh.connector.lark.sink.config;
import org.apache.eventmesh.openconnect.api.config.SinkConfig;
@@ -24,8 +24,8 @@ import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
-public class FeishuSinkConfig extends SinkConfig {
+public class LarkSinkConfig extends SinkConfig {
- public SinkConnectorConfig connectorConfig;
+ public SinkConnectorConfig sinkConnectorConfig;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/SinkConnectorConfig.java
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/SinkConnectorConfig.java
new file mode 100644
index 000000000..84d4cc64e
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/SinkConnectorConfig.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.lark.sink.config;
+
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.lark.oapi.service.im.v1.enums.ReceiveIdTypeEnum;
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+ private String connectorName = "larkSink";
+
+ /**
+ * Can not be blank
+ */
+ private String appId;
+
+ /**
+ * Can not be blank
+ */
+ private String appSecret;
+
+ /**
+ * The value is {@code open_id/user_id/union_id/email/chat_id}.
+ * Recommend to use open_id.
+ */
+ private String receiveIdType = "open_id";
+
+ /**
+ * Can not be blank.And it needs to correspond to {@code receiveIdType}
+ */
+ private String receiveId;
+
+ /**
+ * When sinking CouldEvent to lark, choose to call
+ * {@link
org.apache.eventmesh.connector.lark.sink.ImServiceHandler#sink(ConnectRecord)}
+ * or {@link
org.apache.eventmesh.connector.lark.sink.ImServiceHandler#sinkAsync(ConnectRecord)}
+ */
+ private String sinkAsync = "true";
+
+ private String maxRetryTimes = "3";
+
+ private String retryDelayInMills = "1000";
+
+ public void validateSinkConfiguration() {
+ // validate blank
+ if (StringUtils.isAnyBlank(appId, appSecret, receiveId)) {
+ throw new IllegalArgumentException("appId or appSecret or
receiveId is blank,please check it.");
+ }
+
+ // validate receiveIdType
+ if (!StringUtils.containsAny(receiveIdType,
ReceiveIdTypeEnum.CHAT_ID.getValue(),
+ ReceiveIdTypeEnum.EMAIL.getValue(),
+ ReceiveIdTypeEnum.OPEN_ID.getValue(),
+ ReceiveIdTypeEnum.USER_ID.getValue(),
+ ReceiveIdTypeEnum.UNION_ID.getValue())) {
+ throw new IllegalArgumentException(
+ String.format("sinkConnectorConfig.receiveIdType=[%s],
Invalid.", receiveIdType));
+ }
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
new file mode 100644
index 000000000..b5c134f66
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.lark.sink.connector;
+
+import static org.apache.eventmesh.connector.lark.sink.ImServiceHandler.create;
+
+import org.apache.eventmesh.connector.lark.sink.ImServiceHandler;
+import org.apache.eventmesh.connector.lark.sink.config.LarkSinkConfig;
+import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.github.rholder.retry.RetryException;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.lark.oapi.Client;
+import com.lark.oapi.core.enums.AppType;
+import com.lark.oapi.core.request.SelfBuiltTenantAccessTokenReq;
+import com.lark.oapi.core.response.TenantAccessTokenResp;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class LarkSinkConnector implements Sink {
+
+ public static final String TENANT_ACCESS_TOKEN = "tenant_access_token";
+
+ /**
+ * Global Access Credential Manager to replace lark build-in tokenCache.
+ * <p>
+ * If you plan to extend the method of obtaining other credentials,
+ * you can refer to the implementation of {@link
#getTenantAccessToken(String, String)}
+ * <p>
+ * If the expiration mechanism provided by lark conflicts with the
expiration time set in AUTH_CACHE,
+ * you can try to modify it.
+ */
+ public static final Cache<String, String> AUTH_CACHE =
CacheBuilder.newBuilder()
+ .initialCapacity(12)
+ .maximumSize(10)
+ .concurrencyLevel(5)
+ .expireAfterWrite(30, TimeUnit.MINUTES)
+ .build();
+
+ private LarkSinkConfig sinkConfig;
+
+ private ImServiceHandler imServiceHandler;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ @Override
+ public Class<? extends Config> configClass() {
+ return LarkSinkConfig.class;
+ }
+
+ @Override
+ public void init(Config config) {}
+
+ @Override
+ public void init(ConnectorContext connectorContext) {
+ // init config for lark sink connector
+ SinkConnectorContext sinkConnectorContext = (SinkConnectorContext)
connectorContext;
+ this.sinkConfig = (LarkSinkConfig)
sinkConnectorContext.getSinkConfig();
+
+ SinkConnectorConfig sinkConnectorConfig =
sinkConfig.getSinkConnectorConfig();
+ sinkConnectorConfig.validateSinkConfiguration();
+
+ imServiceHandler = create(sinkConnectorConfig);
+ }
+
+ @Override
+ public void start() {
+ if (!started.compareAndSet(false, true)) {
+ log.info("LarkSinkConnector has been started.");
+ }
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+ // Sink does not need to implement
+ }
+
+ @Override
+ public String name() {
+ return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+ }
+
+ @Override
+ public void stop() {
+ if (!started.compareAndSet(true, false)) {
+ log.info("LarkSinkConnector has not started yet.");
+ }
+ }
+
+ @Override
+ public void put(List<ConnectRecord> sinkRecords) {
+ for (ConnectRecord connectRecord : sinkRecords) {
+ try {
+ if
(Boolean.parseBoolean(sinkConfig.getSinkConnectorConfig().getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
+ } else {
+ imServiceHandler.sink(connectRecord);
+ }
+ } catch (ExecutionException | RetryException e) {
+ log.error("Failed to sink event to lark", e);
+ }
+ }
+ }
+
+ @SneakyThrows
+ public static String getTenantAccessToken(String appId, String appSecret) {
+ return AUTH_CACHE.get(TENANT_ACCESS_TOKEN, () -> {
+
+ Client client = Client.newBuilder(appId, appSecret)
+ .appType(AppType.SELF_BUILT)
+ .logReqAtDebug(true)
+ .build();
+
+ TenantAccessTokenResp resp =
client.ext().getTenantAccessTokenBySelfBuiltApp(
+ SelfBuiltTenantAccessTokenReq.newBuilder()
+ .appSecret(appSecret)
+ .appId(appId)
+ .build());
+ return resp.getTenantAccessToken();
+ });
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-feishu/gradle.properties
b/eventmesh-connectors/eventmesh-connector-lark/src/main/resources/server-config.yml
similarity index 94%
copy from eventmesh-connectors/eventmesh-connector-feishu/gradle.properties
copy to
eventmesh-connectors/eventmesh-connector-lark/src/main/resources/server-config.yml
index d2238bfc9..20d6e6d59 100644
--- a/eventmesh-connectors/eventmesh-connector-feishu/gradle.properties
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/resources/server-config.yml
@@ -14,5 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-feishu_version=2.0.28
-guava_retrying_version=2.0.0
\ No newline at end of file
+
+sinkEnable: true
diff --git
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-lark/src/main/resources/sink-config.yml
similarity index 74%
copy from
eventmesh-connectors/eventmesh-connector-feishu/src/main/resources/sink-config.yml
copy to
eventmesh-connectors/eventmesh-connector-lark/src/main/resources/sink-config.yml
index 01cb95335..b5962fd99 100644
---
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/resources/sink-config.yml
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/main/resources/sink-config.yml
@@ -17,14 +17,19 @@
pubSubConfig:
meshAddress: 127.0.0.1:10000
- subject: TopicTest
+ subject: TEST-TOPIC-LARK
idc: FT
env: PRD
- group: feishuSink
+ group: larkSink
appId: 5031
- userName: feishuSinkUser
- passWord: feishuPassWord
-connectorConfig:
- connectorName: feishuSink
- reciveId: reciveIdValue
- reciveType: open_id
+ userName: larkSinkUser
+ passWord: larkPassWord
+sinkConnectorConfig:
+ connectorName: larkSink
+ appId: appId
+ appSecret: appSecret
+ receiveIdType: open_id
+ receiveId: receiveId
+ sinkAsync: true
+ maxRetryTimes: 3
+ retryDelayInMills: 1000
diff --git
a/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java
b/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java
new file mode 100644
index 000000000..1b5fefa42
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.lark.sink;
+
+import static org.apache.eventmesh.connector.lark.sink.ImServiceHandler.create;
+import static
org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector.AUTH_CACHE;
+import static
org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector.TENANT_ACCESS_TOKEN;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.eventmesh.connector.lark.sink.config.LarkSinkConfig;
+import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import com.lark.oapi.service.im.v1.ImService;
+import com.lark.oapi.service.im.v1.model.CreateMessageResp;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class ImServiceHandlerTest {
+
+ private SinkConnectorConfig sinkConnectorConfig;
+
+ private ImServiceHandler imServiceHandler;
+
+ @Mock
+ private ImService imService;
+
+ @Mock
+ private ImService.Message message;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ sinkConnectorConfig = ((LarkSinkConfig)
ConfigUtil.parse(LarkSinkConfig.class)).getSinkConnectorConfig();
+ }
+
+ private void init() throws Exception {
+ // prevent rely on Lark's ExtService
+ AUTH_CACHE.put(TENANT_ACCESS_TOKEN, "test-TenantAccessToken");
+
+ imServiceHandler = create(sinkConnectorConfig);
+
+ // prevent rely on Lark's ImService
+ when(message.create(any(), any())).thenReturn(new CreateMessageResp());
+ when(imService.message()).thenReturn(message);
+ Field imServiceField =
ReflectionSupport.findFields(imServiceHandler.getClass(),
+ (f) -> f.getName().equals("imService"),
+ HierarchyTraversalMode.BOTTOM_UP).get(0);
+ imServiceField.setAccessible(true);
+ imServiceField.set(imServiceHandler, imService);
+ }
+
+ @Test
+ public void testRegularSink() throws Exception {
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ regularSink();
+ }
+
+ @Test
+ public void testRegularSinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ regularSink();
+ }
+
+ private void regularSink() throws Exception {
+ final int times = 3;
+ for (int i = 0; i < times; i++) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+ System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ long duration = retryDelayInMills * times;
+ Thread.sleep(duration);
+ } else {
+ imServiceHandler.sink(connectRecord);
+ }
+ }
+ verify(message, times(times)).create(any(), any());
+ }
+
+ @Test
+ public void testRetrySink() throws Exception {
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ retrySink();
+ }
+
+ @Test
+ public void testRetrySinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ retrySink();
+ }
+
+ private void retrySink() throws Exception {
+ CreateMessageResp resp = new CreateMessageResp();
+ resp.setCode(1);
+ doReturn(resp).when(message).create(any(), any());
+ final int times = 3;
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes());
+ // (maxRetryTimes + 1) event are actually sent
+ int sinkTimes = times * (maxRetryTimes + 1);
+ long duration = retryDelayInMills * sinkTimes;
+
+ for (int i = 0; i < times; i++) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+ System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
+
+ Thread.sleep(duration);
+ } else {
+ Assertions.assertThrows(Exception.class, () ->
imServiceHandler.sink(connectRecord));
+
+ }
+ }
+ verify(message, times(sinkTimes)).create(any(), any());
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/LarkSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/LarkSinkConnectorTest.java
new file mode 100644
index 000000000..566bc1f27
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/LarkSinkConnectorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.lark.sink;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.eventmesh.connector.lark.sink.config.LarkSinkConfig;
+import org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class LarkSinkConnectorTest {
+
+ private LarkSinkConnector larkSinkConnector;
+
+ private LarkSinkConfig sinkConfig;
+
+ /**
+ * more test see {@link ImServiceHandlerTest}
+ */
+ @Mock
+ private ImServiceHandler imServiceHandler;
+
+ private MockedStatic<ImServiceHandler> imServiceWrapperMockedStatic;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ imServiceWrapperMockedStatic = mockStatic(ImServiceHandler.class);
+ when(ImServiceHandler.create(any())).thenReturn(imServiceHandler);
+ doNothing().when(imServiceHandler).sink(any(ConnectRecord.class));
+ doNothing().when(imServiceHandler).sinkAsync(any(ConnectRecord.class));
+
+ larkSinkConnector = new LarkSinkConnector();
+ sinkConfig = (LarkSinkConfig)
ConfigUtil.parse(larkSinkConnector.configClass());
+ SinkConnectorContext sinkConnectorContext = new SinkConnectorContext();
+ sinkConnectorContext.setSinkConfig(sinkConfig);
+ larkSinkConnector.init(sinkConnectorContext);
+ larkSinkConnector.start();
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ final int times = 3;
+ List<ConnectRecord> connectRecords = new ArrayList<>();
+ for (int i = 0; i < times; i++) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+ System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
+ connectRecords.add(connectRecord);
+ }
+ larkSinkConnector.put(connectRecords);
+ if
(Boolean.parseBoolean(sinkConfig.getSinkConnectorConfig().getSinkAsync())) {
+ verify(imServiceHandler,
times(times)).sinkAsync(any(ConnectRecord.class));
+ } else {
+ verify(imServiceHandler,
times(times)).sink(any(ConnectRecord.class));
+ }
+ }
+
+ @AfterEach
+ public void tearDown() {
+
LarkSinkConnector.AUTH_CACHE.invalidate(LarkSinkConnector.TENANT_ACCESS_TOKEN);
+ larkSinkConnector.stop();
+ imServiceWrapperMockedStatic.close();
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-feishu/gradle.properties
b/eventmesh-connectors/eventmesh-connector-lark/src/test/resources/server-config.yml
similarity index 94%
rename from eventmesh-connectors/eventmesh-connector-feishu/gradle.properties
rename to
eventmesh-connectors/eventmesh-connector-lark/src/test/resources/server-config.yml
index d2238bfc9..20d6e6d59 100644
--- a/eventmesh-connectors/eventmesh-connector-feishu/gradle.properties
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/test/resources/server-config.yml
@@ -14,5 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-feishu_version=2.0.28
-guava_retrying_version=2.0.0
\ No newline at end of file
+
+sinkEnable: true
diff --git
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-lark/src/test/resources/sink-config.yml
similarity index 74%
rename from
eventmesh-connectors/eventmesh-connector-feishu/src/main/resources/sink-config.yml
rename to
eventmesh-connectors/eventmesh-connector-lark/src/test/resources/sink-config.yml
index 01cb95335..b5962fd99 100644
---
a/eventmesh-connectors/eventmesh-connector-feishu/src/main/resources/sink-config.yml
+++
b/eventmesh-connectors/eventmesh-connector-lark/src/test/resources/sink-config.yml
@@ -17,14 +17,19 @@
pubSubConfig:
meshAddress: 127.0.0.1:10000
- subject: TopicTest
+ subject: TEST-TOPIC-LARK
idc: FT
env: PRD
- group: feishuSink
+ group: larkSink
appId: 5031
- userName: feishuSinkUser
- passWord: feishuPassWord
-connectorConfig:
- connectorName: feishuSink
- reciveId: reciveIdValue
- reciveType: open_id
+ userName: larkSinkUser
+ passWord: larkPassWord
+sinkConnectorConfig:
+ connectorName: larkSink
+ appId: appId
+ appSecret: appSecret
+ receiveIdType: open_id
+ receiveId: receiveId
+ sinkAsync: true
+ maxRetryTimes: 3
+ retryDelayInMills: 1000
diff --git a/eventmesh-examples/src/main/resources/application.properties
b/eventmesh-examples/src/main/resources/application.properties
index 726d05d19..f53b1fb19 100644
--- a/eventmesh-examples/src/main/resources/application.properties
+++ b/eventmesh-examples/src/main/resources/application.properties
@@ -25,4 +25,7 @@ eventmesh.selector.nacos.address=127.0.0.1:8848
eventmesh.catalog.name=EVENTMESH-catalog
eventmesh.workflow.name=EVENTMESH-workflow
eventmesh.connector.dingtalkTemplateType=text
-eventmesh.connector.wecomTemplateType=text
\ No newline at end of file
+eventmesh.connector.wecomTemplateType=text
+eventmesh.connector.templateType4Lark=text
+eventmesh.connector.atUsers4lark=id,name;id,name;
+eventmesh.connector.atAll4Lark=false
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
index 7004a3943..82993b198 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
@@ -33,11 +33,15 @@ public interface Connector {
Class<? extends Config> configClass();
/**
+ * This init method is obsolete. For detailed discussion,
+ * please see <a
href="https://github.com/apache/eventmesh/issues/4565">here</a>
+ * <p>
* Initializes the Connector with the provided configuration.
*
* @param config Configuration object
* @throws Exception if initialization fails
*/
+ @Deprecated
void init(Config config) throws Exception;
/**
diff --git a/settings.gradle b/settings.gradle
index a60f8d862..ac5446664 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -45,7 +45,7 @@ include 'eventmesh-connectors:eventmesh-connector-file'
include 'eventmesh-connectors:eventmesh-connector-spring'
include 'eventmesh-connectors:eventmesh-connector-prometheus'
include 'eventmesh-connectors:eventmesh-connector-dingtalk'
-include 'eventmesh-connectors:eventmesh-connector-feishu'
+include 'eventmesh-connectors:eventmesh-connector-lark'
include 'eventmesh-connectors:eventmesh-connector-wecom'
include 'eventmesh-connectors:eventmesh-connector-slack'
include 'eventmesh-connectors:eventmesh-connector-wechat'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]