hhuang1231 commented on code in PR #4599:
URL: https://github.com/apache/eventmesh/pull/4599#discussion_r1416459332
##########
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java:
##########
@@ -93,76 +102,154 @@ public static ImServiceHandler create(SinkConnectorConfig
sinkConnectorConfig) {
.build()
.im();
- Map<String, List<String>> headers = new HashMap<>();
- headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
-
- imServiceHandler.requestOptions = RequestOptions.newBuilder()
-
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
- .headers(headers)
- .build();
-
long fixedWait =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
- imServiceHandler.retryer = RetryerBuilder.<ConnectRecord>newBuilder()
- .retryIfException()
- .retryIfResult(Objects::nonNull)
- .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
-
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
- .withRetryListener(new RetryListener() {
- @SneakyThrows
- @Override
- public <V> void onRetry(Attempt<V> attempt) {
- long times = attempt.getAttemptNumber();
- if (times > 1) {
- log.warn("Retry sink event to lark | times=[{}]",
attempt.getAttemptNumber() - 1);
- }
- if (times == maxRetryTimes) {
- UN_ACK_REQ.remove(attempt.get());
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ int availableProcessors =
Runtime.getRuntime().availableProcessors();
+ imServiceHandler.sinkAsyncWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-sinkAsyncWorker");
+ return thread;
+ });
+
+ imServiceHandler.cleanerWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-cleanerWorker");
+ return thread;
+ });
+
+ imServiceHandler.retryWorker =
Executors.newScheduledThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-retryWorker");
+ return thread;
+ });
+ } else {
+ imServiceHandler.retryer =
RetryerBuilder.<ConnectRecord>newBuilder()
+ .retryIfException()
+ .retryIfResult(Objects::nonNull)
+ .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
+
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
+ .withRetryListener(new RetryListener() {
+ @SneakyThrows
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+
+ long times = attempt.getAttemptNumber();
+ if (times > 1) {
+ redoSinkNum.increment();
+ log.info("Total redo sink task num : [{}]",
redoSinkNum.sum());
+ log.warn("Retry sink event to lark |
times=[{}]", attempt.getAttemptNumber() - 1);
+ }
}
- }
- })
- .build();
+ })
+ .build();
+ }
+
return imServiceHandler;
}
public void sink(ConnectRecord connectRecord) throws ExecutionException,
RetryException {
- retryer.call(() -> {
- CreateMessageReq createMessageReq =
UN_ACK_REQ.computeIfAbsent(connectRecord, (k) -> {
- CreateMessageReqBody.Builder bodyBuilder =
CreateMessageReqBody.newBuilder()
- .receiveId(sinkConnectorConfig.getReceiveId())
- .uuid(UUID.randomUUID().toString());
-
- String templateTypeKey =
connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK);
- if (null == templateTypeKey || "null".equals(templateTypeKey))
{
- templateTypeKey =
LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey();
- }
- LarkMessageTemplateType templateType =
LarkMessageTemplateType.of(templateTypeKey);
- if (LarkMessageTemplateType.PLAIN_TEXT == templateType) {
- bodyBuilder.content(createTextContent(connectRecord))
- .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
- } else if (LarkMessageTemplateType.MARKDOWN == templateType) {
- String title =
Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK))
- .orElse("EventMesh-Message");
-
bodyBuilder.content(createInteractiveContent(connectRecord, title))
-
.msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());
- }
+ Map<String, List<String>> headers = new HashMap<>();
+ headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
- return CreateMessageReq.newBuilder()
- .receiveIdType(sinkConnectorConfig.getReceiveIdType())
- .createMessageReqBody(bodyBuilder.build())
- .build();
- });
+ RequestOptions requestOptions = RequestOptions.newBuilder()
+
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
+ .headers(headers)
+ .build();
+
+ retryer.call(() -> {
+ CreateMessageReq createMessageReq =
convertCreateMessageReq(connectRecord);
CreateMessageResp resp =
imService.message().create(createMessageReq, requestOptions);
if (resp.getCode() != 0) {
log.warn("Sinking event to lark failure | code:[{}] | msg:[{}]
| err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
return connectRecord;
}
-
- UN_ACK_REQ.remove(connectRecord);
return null;
});
}
+ public void sinkAsync(ConnectRecord connectRecord) {
Review Comment:
It's okay, I haven't started this work yet. Thank you for your attention to
this PR, I have learned a lot.
没关系,我还没有开始这项工作。感谢您对这个pr的关注,让我学到了很多东西。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]