pandaapo commented on code in PR #4599: URL: https://github.com/apache/eventmesh/pull/4599#discussion_r1413329673
########## eventmesh-connectors/eventmesh-connector-lark/src/main/resources/sink-config.yml: ########## @@ -17,14 +17,18 @@ pubSubConfig: meshAddress: 127.0.0.1:10000 - subject: TopicTest + subject: TEST-TOPIC-LARK idc: FT env: PRD - group: feishuSink + group: larkSink appId: 5031 - userName: feishuSinkUser - passWord: feishuPassWord -connectorConfig: - connectorName: feishuSink - reciveId: reciveIdValue - reciveType: open_id + userName: larkSinkUser + passWord: larkPassWord +sinkConnectorConfig: + connectorName: larkSink + appId: appId + appSecret: appSecret + receiveIdType: open_id + receiveId: receiveId Review Comment: Have you noticed that Feishu Sink was able to function normally when there were no issues here before? 您是否注意到之前Feishu Sink当这里没有问题的时候能否正常工作? ########## eventmesh-connectors/eventmesh-connector-lark/src/main/resources/sink-config.yml: ########## @@ -17,14 +17,18 @@ pubSubConfig: meshAddress: 127.0.0.1:10000 - subject: TopicTest + subject: TEST-TOPIC-LARK idc: FT env: PRD - group: feishuSink + group: larkSink appId: 5031 - userName: feishuSinkUser - passWord: feishuPassWord -connectorConfig: - connectorName: feishuSink - reciveId: reciveIdValue - reciveType: open_id + userName: larkSinkUser + passWord: larkPassWord +sinkConnectorConfig: + connectorName: larkSink + appId: appId + appSecret: appSecret + receiveIdType: open_id + receiveId: receiveId Review Comment: Have you noticed that Feishu Sink was able to function normally when there were no issues here before? 您是否注意到之前Feishu Sink当这里没有问题的时候能否正常工作? ########## eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.lark.sink.connector; + +import static org.apache.eventmesh.connector.lark.sink.ImServiceHandler.create; + +import org.apache.eventmesh.connector.lark.sink.ImServiceHandler; +import org.apache.eventmesh.connector.lark.sink.config.LarkSinkConfig; +import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.openconnect.api.config.Config; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.github.rholder.retry.RetryException; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.lark.oapi.Client; +import com.lark.oapi.core.enums.AppType; +import com.lark.oapi.core.request.SelfBuiltTenantAccessTokenReq; +import com.lark.oapi.core.response.TenantAccessTokenResp; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class LarkSinkConnector implements Sink { + + public static final String TENANT_ACCESS_TOKEN = "tenant_access_token"; + + /** + * replace lark build-in tokenCache + */ + public static final Cache<String, String> AUTH_CACHE = CacheBuilder.newBuilder() + .initialCapacity(12) + .maximumSize(10) + .concurrencyLevel(5) + .expireAfterWrite(30, TimeUnit.MINUTES) + .build(); + + private LarkSinkConfig sinkConfig; + + private ImServiceHandler imServiceHandler; + + private final AtomicBoolean started = new AtomicBoolean(false); + + @Override + public Class<? extends Config> configClass() { + return LarkSinkConfig.class; + } + + @Override + public void init(Config config) { + // Deprecated Review Comment: You can directly annotate the `@Deprecated` annotation onto Connector#init() method. 可以直接将`@Deprecated`注解标注到Connector#init()方法上。 ########## eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.lark.sink.connector; + +import static org.apache.eventmesh.connector.lark.sink.ImServiceHandler.create; + +import org.apache.eventmesh.connector.lark.sink.ImServiceHandler; +import org.apache.eventmesh.connector.lark.sink.config.LarkSinkConfig; +import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.openconnect.api.config.Config; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.github.rholder.retry.RetryException; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.lark.oapi.Client; +import com.lark.oapi.core.enums.AppType; +import com.lark.oapi.core.request.SelfBuiltTenantAccessTokenReq; +import com.lark.oapi.core.response.TenantAccessTokenResp; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class LarkSinkConnector implements Sink { + + public static final String TENANT_ACCESS_TOKEN = "tenant_access_token"; + + /** + * replace lark build-in tokenCache + */ + public static final Cache<String, String> AUTH_CACHE = CacheBuilder.newBuilder() + .initialCapacity(12) + .maximumSize(10) + .concurrencyLevel(5) + .expireAfterWrite(30, TimeUnit.MINUTES) + .build(); + + private LarkSinkConfig sinkConfig; + + private ImServiceHandler imServiceHandler; + + private final AtomicBoolean started = new AtomicBoolean(false); + + @Override + public Class<? extends Config> configClass() { + return LarkSinkConfig.class; + } + + @Override + public void init(Config config) { + // Deprecated + } + + @Override + public void init(ConnectorContext connectorContext) { + // init config for lark sink connector + SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext; + this.sinkConfig = (LarkSinkConfig) sinkConnectorContext.getSinkConfig(); + + SinkConnectorConfig sinkConnectorConfig = sinkConfig.getSinkConnectorConfig(); + sinkConnectorConfig.validateSinkConfiguration(); + + imServiceHandler = create(sinkConnectorConfig); + } + + @Override + public void start() { + if (!started.compareAndSet(false, true)) { + log.info("LarkSinkConnector has been started."); + } + } + + @Override + public void commit(ConnectRecord record) { + // Sink does not need to implement + } + + @Override + public String name() { + return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); + } + + @Override + public void stop() { + if (!started.compareAndSet(true, false)) { + log.info("LarkSinkConnector has not started yet."); + } + } + + @Override + public void put(List<ConnectRecord> sinkRecords) { + for (ConnectRecord connectRecord : sinkRecords) { + try { + imServiceHandler.sink(connectRecord); + } catch (ExecutionException | RetryException e) { + log.error("Failed to sink event to lark", e); + } + } + } + + @SneakyThrows + public static String getTenantAccessToken(String appId, String appSecret) { Review Comment: Can this method be directly defined in ImServiceHandler? 能否直接将该方法定义在ImServiceHandler中? ########## eventmesh-connectors/eventmesh-connector-lark/build.gradle: ########## @@ -15,19 +15,20 @@ * limitations under the License. */ -List feishu = [ - "com.larksuite.oapi:oapi-sdk:$feishu_version", +List lark = [ + "com.larksuite.oapi:oapi-sdk:$lark_version", "com.github.rholder:guava-retrying:$guava_retrying_version", "org.apache.httpcomponents:httpclient", project(":eventmesh-common") ] dependencies { api project(":eventmesh-openconnect:eventmesh-openconnect-java") - implementation feishu + implementation lark compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' - testImplementation "org.mockito:mockito-core" + //testImplementation "org.mockito:mockito-core" Review Comment: Remove unnecessary dependencies directly 不需要的依赖就直接移除吧。 ########## eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.lark.sink; + +import static org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector.getTenantAccessToken; + +import org.apache.eventmesh.connector.lark.ConnectRecordExtensionKeys; +import org.apache.eventmesh.connector.lark.config.LarkMessageTemplateType; +import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import org.apache.commons.text.StringEscapeUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.lark.oapi.Client; +import com.lark.oapi.card.enums.MessageCardHeaderTemplateEnum; +import com.lark.oapi.card.model.MessageCard; +import com.lark.oapi.card.model.MessageCardConfig; +import com.lark.oapi.card.model.MessageCardElement; +import com.lark.oapi.card.model.MessageCardHeader; +import com.lark.oapi.card.model.MessageCardMarkdown; +import com.lark.oapi.card.model.MessageCardPlainText; +import com.lark.oapi.core.httpclient.OkHttpTransport; +import com.lark.oapi.core.request.RequestOptions; +import com.lark.oapi.core.utils.Lists; +import com.lark.oapi.okhttp.OkHttpClient; +import com.lark.oapi.service.im.v1.ImService; +import com.lark.oapi.service.im.v1.enums.MsgTypeEnum; +import com.lark.oapi.service.im.v1.model.CreateMessageReq; +import com.lark.oapi.service.im.v1.model.CreateMessageReqBody; +import com.lark.oapi.service.im.v1.model.CreateMessageResp; +import com.lark.oapi.service.im.v1.model.ext.MessageText; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ImServiceHandler { + + private static final ConcurrentHashMap<ConnectRecord, CreateMessageReq> UN_ACK_REQ = new ConcurrentHashMap<>(); Review Comment: I didn't understand the purpose of this Map, could you explain it? 没看明白这个集合的作用,能否解释一下? ########## eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.lark.sink; + +import static org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector.getTenantAccessToken; + +import org.apache.eventmesh.connector.lark.ConnectRecordExtensionKeys; +import org.apache.eventmesh.connector.lark.config.LarkMessageTemplateType; +import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import org.apache.commons.text.StringEscapeUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.lark.oapi.Client; +import com.lark.oapi.card.enums.MessageCardHeaderTemplateEnum; +import com.lark.oapi.card.model.MessageCard; +import com.lark.oapi.card.model.MessageCardConfig; +import com.lark.oapi.card.model.MessageCardElement; +import com.lark.oapi.card.model.MessageCardHeader; +import com.lark.oapi.card.model.MessageCardMarkdown; +import com.lark.oapi.card.model.MessageCardPlainText; +import com.lark.oapi.core.httpclient.OkHttpTransport; +import com.lark.oapi.core.request.RequestOptions; +import com.lark.oapi.core.utils.Lists; +import com.lark.oapi.okhttp.OkHttpClient; +import com.lark.oapi.service.im.v1.ImService; +import com.lark.oapi.service.im.v1.enums.MsgTypeEnum; +import com.lark.oapi.service.im.v1.model.CreateMessageReq; +import com.lark.oapi.service.im.v1.model.CreateMessageReqBody; +import com.lark.oapi.service.im.v1.model.CreateMessageResp; +import com.lark.oapi.service.im.v1.model.ext.MessageText; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ImServiceHandler { + + private static final ConcurrentHashMap<ConnectRecord, CreateMessageReq> UN_ACK_REQ = new ConcurrentHashMap<>(); + + private SinkConnectorConfig sinkConnectorConfig; + + private ImService imService; + + private RequestOptions requestOptions; + + private Retryer<ConnectRecord> retryer; + + public ImServiceHandler() {} + + public static ImServiceHandler create(SinkConnectorConfig sinkConnectorConfig) { + ImServiceHandler imServiceHandler = new ImServiceHandler(); + imServiceHandler.sinkConnectorConfig = sinkConnectorConfig; + imServiceHandler.imService = Client.newBuilder(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret()) + .httpTransport(new OkHttpTransport(new OkHttpClient().newBuilder() + .callTimeout(3L, TimeUnit.SECONDS) + .build()) + ) + .disableTokenCache() + .requestTimeout(3, TimeUnit.SECONDS) + .build() + .im(); + + Map<String, List<String>> headers = new HashMap<>(); + headers.put("Content-Type", Lists.newArrayList("application/json; charset=utf-8")); + + imServiceHandler.requestOptions = RequestOptions.newBuilder() + .tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret())) + .headers(headers) + .build(); + + long fixedWait = Long.parseLong(sinkConnectorConfig.getRetryDelayInMills()); + int maxRetryTimes = Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1; + imServiceHandler.retryer = RetryerBuilder.<ConnectRecord>newBuilder() + .retryIfException() + .retryIfResult(Objects::nonNull) + .withWaitStrategy(WaitStrategies.fixedWait(fixedWait, TimeUnit.MILLISECONDS)) + .withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes)) + .withRetryListener(new RetryListener() { + @SneakyThrows + @Override + public <V> void onRetry(Attempt<V> attempt) { + long times = attempt.getAttemptNumber(); + if (times > 1) { + log.warn("Retry sink event to lark | times=[{}]", attempt.getAttemptNumber() - 1); + } + if (times == maxRetryTimes) { + UN_ACK_REQ.remove(attempt.get()); + } + } + }) + .build(); + return imServiceHandler; + } + + public void sink(ConnectRecord connectRecord) throws ExecutionException, RetryException { + retryer.call(() -> { + CreateMessageReq createMessageReq = UN_ACK_REQ.computeIfAbsent(connectRecord, (k) -> { + CreateMessageReqBody.Builder bodyBuilder = CreateMessageReqBody.newBuilder() + .receiveId(sinkConnectorConfig.getReceiveId()) + .uuid(UUID.randomUUID().toString()); + + String templateTypeKey = connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK); + if (null == templateTypeKey || "null".equals(templateTypeKey)) { + templateTypeKey = LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey(); + } + LarkMessageTemplateType templateType = LarkMessageTemplateType.of(templateTypeKey); + if (LarkMessageTemplateType.PLAIN_TEXT == templateType) { + bodyBuilder.content(createTextContent(connectRecord)) + .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue()); + } else if (LarkMessageTemplateType.MARKDOWN == templateType) { + String title = Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK)) + .orElse("EventMesh-Message"); + bodyBuilder.content(createInteractiveContent(connectRecord, title)) + .msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue()); + } + + return CreateMessageReq.newBuilder() + .receiveIdType(sinkConnectorConfig.getReceiveIdType()) + .createMessageReqBody(bodyBuilder.build()) + .build(); + }); + CreateMessageResp resp = imService.message().create(createMessageReq, requestOptions); + if (resp.getCode() != 0) { + log.warn("Sinking event to lark failure | code:[{}] | msg:[{}] | err:[{}]", resp.getCode(), resp.getMsg(), resp.getError()); + return connectRecord; + } + + UN_ACK_REQ.remove(connectRecord); + return null; + }); + } + + private String createTextContent(ConnectRecord connectRecord) { + MessageText.Builder msgBuilder = MessageText.newBuilder(); + + if (needAtAll(connectRecord)) { + msgBuilder.atAll(); + } + String atUsers = needAtUser(connectRecord); + if (!atUsers.isEmpty()) { + String[] users = atUsers.split(";"); + + for (String user : users) { + String[] kv = user.split(","); + msgBuilder.atUser(kv[0], kv[1]); + } + } + + String escapedString = StringEscapeUtils.escapeJava(new String((byte[]) connectRecord.getData())); + return msgBuilder.text(escapedString).build(); + } + + private String createInteractiveContent(ConnectRecord connectRecord, String title) { + StringBuilder sb = new StringBuilder(); + if (needAtAll(connectRecord)) { + atAll(sb); + } + String atUsers = needAtUser(connectRecord); + if (!atUsers.isEmpty()) { + String[] users = atUsers.split(";"); + + for (String user : users) { + String[] kv = user.split(","); + atUser(sb, kv[0]); + } + } + sb.append(new String((byte[]) connectRecord.getData())); + + MessageCardConfig config = MessageCardConfig.newBuilder() + .enableForward(true) + .wideScreenMode(true) + .updateMulti(true) + .build(); + + // cardUrl, Only in need of PC, mobile side jump different links to use + /*MessageCardURL cardURL = MessageCardURL.newBuilder() + .pcUrl("http://www.baidu.com") + .iosUrl("http://www.google.com") + .url("http://open.feishu.com") + .androidUrl("http://www.jianshu.com") + .build();*/ Review Comment: Can this comment and the commented out code be removed? 这段注释和注释掉的代码能否移除掉? ########## eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.lark.sink.connector; + +import static org.apache.eventmesh.connector.lark.sink.ImServiceHandler.create; + +import org.apache.eventmesh.connector.lark.sink.ImServiceHandler; +import org.apache.eventmesh.connector.lark.sink.config.LarkSinkConfig; +import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.openconnect.api.config.Config; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.github.rholder.retry.RetryException; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.lark.oapi.Client; +import com.lark.oapi.core.enums.AppType; +import com.lark.oapi.core.request.SelfBuiltTenantAccessTokenReq; +import com.lark.oapi.core.response.TenantAccessTokenResp; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class LarkSinkConnector implements Sink { + + public static final String TENANT_ACCESS_TOKEN = "tenant_access_token"; + + /** + * replace lark build-in tokenCache + */ + public static final Cache<String, String> AUTH_CACHE = CacheBuilder.newBuilder() + .initialCapacity(12) + .maximumSize(10) + .concurrencyLevel(5) + .expireAfterWrite(30, TimeUnit.MINUTES) + .build(); + + private LarkSinkConfig sinkConfig; + + private ImServiceHandler imServiceHandler; + + private final AtomicBoolean started = new AtomicBoolean(false); + + @Override + public Class<? extends Config> configClass() { + return LarkSinkConfig.class; + } + + @Override + public void init(Config config) { + // Deprecated Review Comment: You can directly annotate the `@Deprecated` annotation onto Connector#init() method. 可以直接将`@Deprecated`注解标注到Connector#init()方法上。 ########## eventmesh-connectors/eventmesh-connector-lark/build.gradle: ########## @@ -15,19 +15,20 @@ * limitations under the License. */ -List feishu = [ - "com.larksuite.oapi:oapi-sdk:$feishu_version", +List lark = [ + "com.larksuite.oapi:oapi-sdk:$lark_version", "com.github.rholder:guava-retrying:$guava_retrying_version", "org.apache.httpcomponents:httpclient", project(":eventmesh-common") ] dependencies { api project(":eventmesh-openconnect:eventmesh-openconnect-java") - implementation feishu + implementation lark compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' - testImplementation "org.mockito:mockito-core" + //testImplementation "org.mockito:mockito-core" Review Comment: Remove unnecessary dependencies directly 不需要的依赖就直接移除吧。 ########## eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.lark.sink; + +import static org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector.getTenantAccessToken; + +import org.apache.eventmesh.connector.lark.ConnectRecordExtensionKeys; +import org.apache.eventmesh.connector.lark.config.LarkMessageTemplateType; +import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import org.apache.commons.text.StringEscapeUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.lark.oapi.Client; +import com.lark.oapi.card.enums.MessageCardHeaderTemplateEnum; +import com.lark.oapi.card.model.MessageCard; +import com.lark.oapi.card.model.MessageCardConfig; +import com.lark.oapi.card.model.MessageCardElement; +import com.lark.oapi.card.model.MessageCardHeader; +import com.lark.oapi.card.model.MessageCardMarkdown; +import com.lark.oapi.card.model.MessageCardPlainText; +import com.lark.oapi.core.httpclient.OkHttpTransport; +import com.lark.oapi.core.request.RequestOptions; +import com.lark.oapi.core.utils.Lists; +import com.lark.oapi.okhttp.OkHttpClient; +import com.lark.oapi.service.im.v1.ImService; +import com.lark.oapi.service.im.v1.enums.MsgTypeEnum; +import com.lark.oapi.service.im.v1.model.CreateMessageReq; +import com.lark.oapi.service.im.v1.model.CreateMessageReqBody; +import com.lark.oapi.service.im.v1.model.CreateMessageResp; +import com.lark.oapi.service.im.v1.model.ext.MessageText; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ImServiceHandler { + + private static final ConcurrentHashMap<ConnectRecord, CreateMessageReq> UN_ACK_REQ = new ConcurrentHashMap<>(); + + private SinkConnectorConfig sinkConnectorConfig; + + private ImService imService; + + private RequestOptions requestOptions; + + private Retryer<ConnectRecord> retryer; + + public ImServiceHandler() {} + + public static ImServiceHandler create(SinkConnectorConfig sinkConnectorConfig) { + ImServiceHandler imServiceHandler = new ImServiceHandler(); + imServiceHandler.sinkConnectorConfig = sinkConnectorConfig; + imServiceHandler.imService = Client.newBuilder(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret()) + .httpTransport(new OkHttpTransport(new OkHttpClient().newBuilder() + .callTimeout(3L, TimeUnit.SECONDS) + .build()) + ) + .disableTokenCache() + .requestTimeout(3, TimeUnit.SECONDS) + .build() + .im(); + + Map<String, List<String>> headers = new HashMap<>(); + headers.put("Content-Type", Lists.newArrayList("application/json; charset=utf-8")); + + imServiceHandler.requestOptions = RequestOptions.newBuilder() + .tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret())) + .headers(headers) + .build(); + + long fixedWait = Long.parseLong(sinkConnectorConfig.getRetryDelayInMills()); + int maxRetryTimes = Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1; + imServiceHandler.retryer = RetryerBuilder.<ConnectRecord>newBuilder() + .retryIfException() + .retryIfResult(Objects::nonNull) + .withWaitStrategy(WaitStrategies.fixedWait(fixedWait, TimeUnit.MILLISECONDS)) + .withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes)) + .withRetryListener(new RetryListener() { + @SneakyThrows + @Override + public <V> void onRetry(Attempt<V> attempt) { + long times = attempt.getAttemptNumber(); + if (times > 1) { + log.warn("Retry sink event to lark | times=[{}]", attempt.getAttemptNumber() - 1); + } + if (times == maxRetryTimes) { + UN_ACK_REQ.remove(attempt.get()); + } + } + }) + .build(); + return imServiceHandler; + } + + public void sink(ConnectRecord connectRecord) throws ExecutionException, RetryException { + retryer.call(() -> { + CreateMessageReq createMessageReq = UN_ACK_REQ.computeIfAbsent(connectRecord, (k) -> { + CreateMessageReqBody.Builder bodyBuilder = CreateMessageReqBody.newBuilder() + .receiveId(sinkConnectorConfig.getReceiveId()) + .uuid(UUID.randomUUID().toString()); + + String templateTypeKey = connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK); + if (null == templateTypeKey || "null".equals(templateTypeKey)) { + templateTypeKey = LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey(); + } + LarkMessageTemplateType templateType = LarkMessageTemplateType.of(templateTypeKey); + if (LarkMessageTemplateType.PLAIN_TEXT == templateType) { + bodyBuilder.content(createTextContent(connectRecord)) + .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue()); + } else if (LarkMessageTemplateType.MARKDOWN == templateType) { + String title = Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK)) + .orElse("EventMesh-Message"); + bodyBuilder.content(createInteractiveContent(connectRecord, title)) + .msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue()); + } + + return CreateMessageReq.newBuilder() + .receiveIdType(sinkConnectorConfig.getReceiveIdType()) + .createMessageReqBody(bodyBuilder.build()) + .build(); + }); + CreateMessageResp resp = imService.message().create(createMessageReq, requestOptions); + if (resp.getCode() != 0) { + log.warn("Sinking event to lark failure | code:[{}] | msg:[{}] | err:[{}]", resp.getCode(), resp.getMsg(), resp.getError()); + return connectRecord; + } + + UN_ACK_REQ.remove(connectRecord); + return null; + }); + } + + private String createTextContent(ConnectRecord connectRecord) { + MessageText.Builder msgBuilder = MessageText.newBuilder(); + + if (needAtAll(connectRecord)) { + msgBuilder.atAll(); + } + String atUsers = needAtUser(connectRecord); + if (!atUsers.isEmpty()) { + String[] users = atUsers.split(";"); + + for (String user : users) { + String[] kv = user.split(","); + msgBuilder.atUser(kv[0], kv[1]); + } + } + + String escapedString = StringEscapeUtils.escapeJava(new String((byte[]) connectRecord.getData())); + return msgBuilder.text(escapedString).build(); + } + + private String createInteractiveContent(ConnectRecord connectRecord, String title) { + StringBuilder sb = new StringBuilder(); + if (needAtAll(connectRecord)) { + atAll(sb); + } + String atUsers = needAtUser(connectRecord); + if (!atUsers.isEmpty()) { + String[] users = atUsers.split(";"); + + for (String user : users) { + String[] kv = user.split(","); + atUser(sb, kv[0]); + } + } + sb.append(new String((byte[]) connectRecord.getData())); + + MessageCardConfig config = MessageCardConfig.newBuilder() + .enableForward(true) + .wideScreenMode(true) + .updateMulti(true) + .build(); + + // cardUrl, Only in need of PC, mobile side jump different links to use + /*MessageCardURL cardURL = MessageCardURL.newBuilder() + .pcUrl("http://www.baidu.com") + .iosUrl("http://www.google.com") + .url("http://open.feishu.com") + .androidUrl("http://www.jianshu.com") + .build();*/ Review Comment: Can this comment and the commented out code be removed? 这段注释和注释掉的代码能否移除掉? ########## eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.lark.sink; + +import static org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector.getTenantAccessToken; + +import org.apache.eventmesh.connector.lark.ConnectRecordExtensionKeys; +import org.apache.eventmesh.connector.lark.config.LarkMessageTemplateType; +import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import org.apache.commons.text.StringEscapeUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.lark.oapi.Client; +import com.lark.oapi.card.enums.MessageCardHeaderTemplateEnum; +import com.lark.oapi.card.model.MessageCard; +import com.lark.oapi.card.model.MessageCardConfig; +import com.lark.oapi.card.model.MessageCardElement; +import com.lark.oapi.card.model.MessageCardHeader; +import com.lark.oapi.card.model.MessageCardMarkdown; +import com.lark.oapi.card.model.MessageCardPlainText; +import com.lark.oapi.core.httpclient.OkHttpTransport; +import com.lark.oapi.core.request.RequestOptions; +import com.lark.oapi.core.utils.Lists; +import com.lark.oapi.okhttp.OkHttpClient; +import com.lark.oapi.service.im.v1.ImService; +import com.lark.oapi.service.im.v1.enums.MsgTypeEnum; +import com.lark.oapi.service.im.v1.model.CreateMessageReq; +import com.lark.oapi.service.im.v1.model.CreateMessageReqBody; +import com.lark.oapi.service.im.v1.model.CreateMessageResp; +import com.lark.oapi.service.im.v1.model.ext.MessageText; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ImServiceHandler { + + private static final ConcurrentHashMap<ConnectRecord, CreateMessageReq> UN_ACK_REQ = new ConcurrentHashMap<>(); Review Comment: I didn't understand the purpose of this Map, could you explain it? 没看明白这个集合的作用,能否解释一下? ########## eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.lark.sink.connector; + +import static org.apache.eventmesh.connector.lark.sink.ImServiceHandler.create; + +import org.apache.eventmesh.connector.lark.sink.ImServiceHandler; +import org.apache.eventmesh.connector.lark.sink.config.LarkSinkConfig; +import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.openconnect.api.config.Config; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.github.rholder.retry.RetryException; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.lark.oapi.Client; +import com.lark.oapi.core.enums.AppType; +import com.lark.oapi.core.request.SelfBuiltTenantAccessTokenReq; +import com.lark.oapi.core.response.TenantAccessTokenResp; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class LarkSinkConnector implements Sink { + + public static final String TENANT_ACCESS_TOKEN = "tenant_access_token"; + + /** + * replace lark build-in tokenCache + */ + public static final Cache<String, String> AUTH_CACHE = CacheBuilder.newBuilder() + .initialCapacity(12) + .maximumSize(10) + .concurrencyLevel(5) + .expireAfterWrite(30, TimeUnit.MINUTES) + .build(); + + private LarkSinkConfig sinkConfig; + + private ImServiceHandler imServiceHandler; + + private final AtomicBoolean started = new AtomicBoolean(false); + + @Override + public Class<? extends Config> configClass() { + return LarkSinkConfig.class; + } + + @Override + public void init(Config config) { + // Deprecated + } + + @Override + public void init(ConnectorContext connectorContext) { + // init config for lark sink connector + SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext; + this.sinkConfig = (LarkSinkConfig) sinkConnectorContext.getSinkConfig(); + + SinkConnectorConfig sinkConnectorConfig = sinkConfig.getSinkConnectorConfig(); + sinkConnectorConfig.validateSinkConfiguration(); + + imServiceHandler = create(sinkConnectorConfig); + } + + @Override + public void start() { + if (!started.compareAndSet(false, true)) { + log.info("LarkSinkConnector has been started."); + } + } + + @Override + public void commit(ConnectRecord record) { + // Sink does not need to implement + } + + @Override + public String name() { + return this.sinkConfig.getSinkConnectorConfig().getConnectorName(); + } + + @Override + public void stop() { + if (!started.compareAndSet(true, false)) { + log.info("LarkSinkConnector has not started yet."); + } + } + + @Override + public void put(List<ConnectRecord> sinkRecords) { + for (ConnectRecord connectRecord : sinkRecords) { + try { + imServiceHandler.sink(connectRecord); + } catch (ExecutionException | RetryException e) { + log.error("Failed to sink event to lark", e); + } + } + } + + @SneakyThrows + public static String getTenantAccessToken(String appId, String appSecret) { Review Comment: Can this method be directly defined in ImServiceHandler? 能否直接将该方法定义在ImServiceHandler中? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
