pandaapo commented on code in PR #4599:
URL: https://github.com/apache/eventmesh/pull/4599#discussion_r1415975391


##########
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java:
##########
@@ -93,76 +102,154 @@ public static ImServiceHandler create(SinkConnectorConfig 
sinkConnectorConfig) {
                 .build()
                 .im();
 
-        Map<String, List<String>> headers = new HashMap<>();
-        headers.put("Content-Type", Lists.newArrayList("application/json; 
charset=utf-8"));
-
-        imServiceHandler.requestOptions = RequestOptions.newBuilder()
-                
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), 
sinkConnectorConfig.getAppSecret()))
-                .headers(headers)
-                .build();
-
         long fixedWait = 
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
         int maxRetryTimes = 
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
-        imServiceHandler.retryer = RetryerBuilder.<ConnectRecord>newBuilder()
-                .retryIfException()
-                .retryIfResult(Objects::nonNull)
-                .withWaitStrategy(WaitStrategies.fixedWait(fixedWait, 
TimeUnit.MILLISECONDS))
-                
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
-                .withRetryListener(new RetryListener() {
-                    @SneakyThrows
-                    @Override
-                    public <V> void onRetry(Attempt<V> attempt) {
-                        long times = attempt.getAttemptNumber();
-                        if (times > 1) {
-                            log.warn("Retry sink event to lark | times=[{}]", 
attempt.getAttemptNumber() - 1);
-                        }
-                        if (times == maxRetryTimes) {
-                            UN_ACK_REQ.remove(attempt.get());
+        if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+            int availableProcessors = 
Runtime.getRuntime().availableProcessors();
+            imServiceHandler.sinkAsyncWorker = 
Executors.newFixedThreadPool(availableProcessors, r -> {
+                Thread thread = new Thread(r);
+                thread.setDaemon(true);
+                thread.setName("eventmesh-connector-lark-sinkAsyncWorker");
+                return thread;
+            });
+
+            imServiceHandler.cleanerWorker = 
Executors.newFixedThreadPool(availableProcessors, r -> {
+                Thread thread = new Thread(r);
+                thread.setDaemon(true);
+                thread.setName("eventmesh-connector-lark-cleanerWorker");
+                return thread;
+            });
+
+            imServiceHandler.retryWorker = 
Executors.newScheduledThreadPool(availableProcessors, r -> {
+                Thread thread = new Thread(r);
+                thread.setDaemon(true);
+                thread.setName("eventmesh-connector-lark-retryWorker");
+                return thread;
+            });
+        } else {
+            imServiceHandler.retryer = 
RetryerBuilder.<ConnectRecord>newBuilder()
+                    .retryIfException()
+                    .retryIfResult(Objects::nonNull)
+                    .withWaitStrategy(WaitStrategies.fixedWait(fixedWait, 
TimeUnit.MILLISECONDS))
+                    
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
+                    .withRetryListener(new RetryListener() {
+                        @SneakyThrows
+                        @Override
+                        public <V> void onRetry(Attempt<V> attempt) {
+
+                            long times = attempt.getAttemptNumber();
+                            if (times > 1) {
+                                redoSinkNum.increment();
+                                log.info("Total redo sink task num : [{}]", 
redoSinkNum.sum());
+                                log.warn("Retry sink event to lark | 
times=[{}]", attempt.getAttemptNumber() - 1);
+                            }
                         }
-                    }
-                })
-                .build();
+                    })
+                    .build();
+        }
+
         return imServiceHandler;
     }
 
     public void sink(ConnectRecord connectRecord) throws ExecutionException, 
RetryException {
-        retryer.call(() -> {
-            CreateMessageReq createMessageReq = 
UN_ACK_REQ.computeIfAbsent(connectRecord, (k) -> {
-                CreateMessageReqBody.Builder bodyBuilder = 
CreateMessageReqBody.newBuilder()
-                        .receiveId(sinkConnectorConfig.getReceiveId())
-                        .uuid(UUID.randomUUID().toString());
-
-                String templateTypeKey = 
connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK);
-                if (null == templateTypeKey || "null".equals(templateTypeKey)) 
{
-                    templateTypeKey = 
LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey();
-                }
-                LarkMessageTemplateType templateType = 
LarkMessageTemplateType.of(templateTypeKey);
-                if (LarkMessageTemplateType.PLAIN_TEXT == templateType) {
-                    bodyBuilder.content(createTextContent(connectRecord))
-                            .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
-                } else if (LarkMessageTemplateType.MARKDOWN == templateType) {
-                    String title = 
Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK))
-                            .orElse("EventMesh-Message");
-                    
bodyBuilder.content(createInteractiveContent(connectRecord, title))
-                            
.msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());
-                }
+        Map<String, List<String>> headers = new HashMap<>();
+        headers.put("Content-Type", Lists.newArrayList("application/json; 
charset=utf-8"));
 
-                return CreateMessageReq.newBuilder()
-                        .receiveIdType(sinkConnectorConfig.getReceiveIdType())
-                        .createMessageReqBody(bodyBuilder.build())
-                        .build();
-            });
+        RequestOptions requestOptions = RequestOptions.newBuilder()
+                
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), 
sinkConnectorConfig.getAppSecret()))
+                .headers(headers)
+                .build();
+
+        retryer.call(() -> {
+            CreateMessageReq createMessageReq = 
convertCreateMessageReq(connectRecord);
             CreateMessageResp resp = 
imService.message().create(createMessageReq, requestOptions);
             if (resp.getCode() != 0) {
                 log.warn("Sinking event to lark failure | code:[{}] | msg:[{}] 
| err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
                 return connectRecord;
             }
-
-            UN_ACK_REQ.remove(connectRecord);
             return null;
         });
     }
 
+    public void sinkAsync(ConnectRecord connectRecord) {

Review Comment:
   When I realized that this was sending messages to Lark, I thought that in 
practical applications, most scenarios may be used as event notifications, so 
there may not be the transmission of a large number of messages between message 
middleware. When considering this, my suggestion (to submit a task to the 
thread pool and process messages in a loop within the task) may not be more 
efficient than your current solution (to submit a task for each message to the 
thread pool). I hope you don't mind my repeated actions.
   
   
当我意识到这是给飞书发消息时,我想实际应用中,可能大多数场景是用来作为一种事件通知,所以也许并没有比如消息中间件之间那种大量消息的传输。这样考虑时,我的建议中(给线程池提交一个任务,在任务中循环处理消息)的效率也许并不优于没有您现在的方案(每个消息给线程池提交一个任务)。希望您不要介意我的反反复复。



##########
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java:
##########
@@ -93,76 +102,154 @@ public static ImServiceHandler create(SinkConnectorConfig 
sinkConnectorConfig) {
                 .build()
                 .im();
 
-        Map<String, List<String>> headers = new HashMap<>();
-        headers.put("Content-Type", Lists.newArrayList("application/json; 
charset=utf-8"));
-
-        imServiceHandler.requestOptions = RequestOptions.newBuilder()
-                
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), 
sinkConnectorConfig.getAppSecret()))
-                .headers(headers)
-                .build();
-
         long fixedWait = 
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
         int maxRetryTimes = 
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
-        imServiceHandler.retryer = RetryerBuilder.<ConnectRecord>newBuilder()
-                .retryIfException()
-                .retryIfResult(Objects::nonNull)
-                .withWaitStrategy(WaitStrategies.fixedWait(fixedWait, 
TimeUnit.MILLISECONDS))
-                
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
-                .withRetryListener(new RetryListener() {
-                    @SneakyThrows
-                    @Override
-                    public <V> void onRetry(Attempt<V> attempt) {
-                        long times = attempt.getAttemptNumber();
-                        if (times > 1) {
-                            log.warn("Retry sink event to lark | times=[{}]", 
attempt.getAttemptNumber() - 1);
-                        }
-                        if (times == maxRetryTimes) {
-                            UN_ACK_REQ.remove(attempt.get());
+        if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+            int availableProcessors = 
Runtime.getRuntime().availableProcessors();
+            imServiceHandler.sinkAsyncWorker = 
Executors.newFixedThreadPool(availableProcessors, r -> {
+                Thread thread = new Thread(r);
+                thread.setDaemon(true);
+                thread.setName("eventmesh-connector-lark-sinkAsyncWorker");
+                return thread;
+            });
+
+            imServiceHandler.cleanerWorker = 
Executors.newFixedThreadPool(availableProcessors, r -> {
+                Thread thread = new Thread(r);
+                thread.setDaemon(true);
+                thread.setName("eventmesh-connector-lark-cleanerWorker");
+                return thread;
+            });
+
+            imServiceHandler.retryWorker = 
Executors.newScheduledThreadPool(availableProcessors, r -> {
+                Thread thread = new Thread(r);
+                thread.setDaemon(true);
+                thread.setName("eventmesh-connector-lark-retryWorker");
+                return thread;
+            });
+        } else {
+            imServiceHandler.retryer = 
RetryerBuilder.<ConnectRecord>newBuilder()
+                    .retryIfException()
+                    .retryIfResult(Objects::nonNull)
+                    .withWaitStrategy(WaitStrategies.fixedWait(fixedWait, 
TimeUnit.MILLISECONDS))
+                    
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
+                    .withRetryListener(new RetryListener() {
+                        @SneakyThrows
+                        @Override
+                        public <V> void onRetry(Attempt<V> attempt) {
+
+                            long times = attempt.getAttemptNumber();
+                            if (times > 1) {
+                                redoSinkNum.increment();
+                                log.info("Total redo sink task num : [{}]", 
redoSinkNum.sum());
+                                log.warn("Retry sink event to lark | 
times=[{}]", attempt.getAttemptNumber() - 1);
+                            }
                         }
-                    }
-                })
-                .build();
+                    })
+                    .build();
+        }
+
         return imServiceHandler;
     }
 
     public void sink(ConnectRecord connectRecord) throws ExecutionException, 
RetryException {
-        retryer.call(() -> {
-            CreateMessageReq createMessageReq = 
UN_ACK_REQ.computeIfAbsent(connectRecord, (k) -> {
-                CreateMessageReqBody.Builder bodyBuilder = 
CreateMessageReqBody.newBuilder()
-                        .receiveId(sinkConnectorConfig.getReceiveId())
-                        .uuid(UUID.randomUUID().toString());
-
-                String templateTypeKey = 
connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK);
-                if (null == templateTypeKey || "null".equals(templateTypeKey)) 
{
-                    templateTypeKey = 
LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey();
-                }
-                LarkMessageTemplateType templateType = 
LarkMessageTemplateType.of(templateTypeKey);
-                if (LarkMessageTemplateType.PLAIN_TEXT == templateType) {
-                    bodyBuilder.content(createTextContent(connectRecord))
-                            .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
-                } else if (LarkMessageTemplateType.MARKDOWN == templateType) {
-                    String title = 
Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK))
-                            .orElse("EventMesh-Message");
-                    
bodyBuilder.content(createInteractiveContent(connectRecord, title))
-                            
.msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());
-                }
+        Map<String, List<String>> headers = new HashMap<>();
+        headers.put("Content-Type", Lists.newArrayList("application/json; 
charset=utf-8"));
 
-                return CreateMessageReq.newBuilder()
-                        .receiveIdType(sinkConnectorConfig.getReceiveIdType())
-                        .createMessageReqBody(bodyBuilder.build())
-                        .build();
-            });
+        RequestOptions requestOptions = RequestOptions.newBuilder()
+                
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), 
sinkConnectorConfig.getAppSecret()))
+                .headers(headers)
+                .build();
+
+        retryer.call(() -> {
+            CreateMessageReq createMessageReq = 
convertCreateMessageReq(connectRecord);
             CreateMessageResp resp = 
imService.message().create(createMessageReq, requestOptions);
             if (resp.getCode() != 0) {
                 log.warn("Sinking event to lark failure | code:[{}] | msg:[{}] 
| err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
                 return connectRecord;
             }
-
-            UN_ACK_REQ.remove(connectRecord);
             return null;
         });
     }
 
+    public void sinkAsync(ConnectRecord connectRecord) {

Review Comment:
   When I realized that this was sending messages to Lark, I thought that in 
practical applications, most scenarios may be used as event notifications, so 
there may not be the transmission of a large number of messages between message 
middleware. When considering this, my suggestion (to submit a task to the 
thread pool and process messages in a loop within the task) may not be more 
efficient than your current solution (to submit a task for each message to the 
thread pool). I hope you don't mind my repeated actions.
   
   
当我意识到这是给飞书发消息时,我想实际应用中,可能大多数场景是用来作为一种事件通知,所以也许并没有比如消息中间件之间那种大量消息的传输。这样考虑时,我的建议中(给线程池提交一个任务,在任务中循环处理消息)的效率也许并不优于没有您现在的方案(每个消息给线程池提交一个任务)。希望您不要介意我的反反复复。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to