pandaapo commented on code in PR #4599:
URL: https://github.com/apache/eventmesh/pull/4599#discussion_r1415832718
##########
eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java:
##########
@@ -85,31 +88,75 @@ public void setup() throws Exception {
@Test
public void testRegularSink() throws Exception {
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ regularSink();
+ }
+
+ @Test
+ public void testRegularSinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ regularSink();
+ }
+
+ private void regularSink() throws Exception {
final int times = 3;
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- imServiceHandler.sink(connectRecord);
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ long duration = retryDelayInMills * times;
+ Thread.sleep(duration);
+ } else {
+ imServiceHandler.sink(connectRecord);
+ }
}
-
- verify(message, times(times)).create(any(), any());
}
@Test
public void testRetrySink() throws Exception {
- doThrow(new Exception()).when(message).create(any(), any());
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ retrySink();
+ }
+
+ @Test
+ public void testRetrySinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ retrySink();
+ }
+
+ private void retrySink() throws Exception {
+ CreateMessageResp resp = new CreateMessageResp();
+ resp.setCode(1);
+ doReturn(resp).when(message).create(any(), any());
final int times = 3;
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes());
+ // (maxRetryTimes + 1) event are actually sent
+ int sinkTimes = times * (maxRetryTimes + 1);
+ long duration = retryDelayInMills * sinkTimes;
+
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- Assertions.assertThrows(Exception.class, () ->
imServiceHandler.sink(connectRecord));
- }
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
- // (maxRetryTimes + 1) event are actually sent
- verify(message, times(times *
(Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1))).create(any(),
any());
+ Thread.sleep(duration);
Review Comment:
Oh! I missed the last line of code. retrySink() is already sufficient for
me. This review should be written under regularSink().
Oh! 我错过了最后一行代码,retrySink() 已经可以了。这段review应该写在regularSink()下。
##########
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java:
##########
@@ -93,76 +102,154 @@ public static ImServiceHandler create(SinkConnectorConfig
sinkConnectorConfig) {
.build()
.im();
- Map<String, List<String>> headers = new HashMap<>();
- headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
-
- imServiceHandler.requestOptions = RequestOptions.newBuilder()
-
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
- .headers(headers)
- .build();
-
long fixedWait =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
- imServiceHandler.retryer = RetryerBuilder.<ConnectRecord>newBuilder()
- .retryIfException()
- .retryIfResult(Objects::nonNull)
- .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
-
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
- .withRetryListener(new RetryListener() {
- @SneakyThrows
- @Override
- public <V> void onRetry(Attempt<V> attempt) {
- long times = attempt.getAttemptNumber();
- if (times > 1) {
- log.warn("Retry sink event to lark | times=[{}]",
attempt.getAttemptNumber() - 1);
- }
- if (times == maxRetryTimes) {
- UN_ACK_REQ.remove(attempt.get());
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ int availableProcessors =
Runtime.getRuntime().availableProcessors();
+ imServiceHandler.sinkAsyncWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-sinkAsyncWorker");
+ return thread;
+ });
+
+ imServiceHandler.cleanerWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-cleanerWorker");
+ return thread;
+ });
+
+ imServiceHandler.retryWorker =
Executors.newScheduledThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-retryWorker");
+ return thread;
+ });
+ } else {
+ imServiceHandler.retryer =
RetryerBuilder.<ConnectRecord>newBuilder()
+ .retryIfException()
+ .retryIfResult(Objects::nonNull)
+ .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
+
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
+ .withRetryListener(new RetryListener() {
+ @SneakyThrows
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+
+ long times = attempt.getAttemptNumber();
+ if (times > 1) {
+ redoSinkNum.increment();
+ log.info("Total redo sink task num : [{}]",
redoSinkNum.sum());
+ log.warn("Retry sink event to lark |
times=[{}]", attempt.getAttemptNumber() - 1);
+ }
}
- }
- })
- .build();
+ })
+ .build();
+ }
+
return imServiceHandler;
}
public void sink(ConnectRecord connectRecord) throws ExecutionException,
RetryException {
- retryer.call(() -> {
- CreateMessageReq createMessageReq =
UN_ACK_REQ.computeIfAbsent(connectRecord, (k) -> {
- CreateMessageReqBody.Builder bodyBuilder =
CreateMessageReqBody.newBuilder()
- .receiveId(sinkConnectorConfig.getReceiveId())
- .uuid(UUID.randomUUID().toString());
-
- String templateTypeKey =
connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK);
- if (null == templateTypeKey || "null".equals(templateTypeKey))
{
- templateTypeKey =
LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey();
- }
- LarkMessageTemplateType templateType =
LarkMessageTemplateType.of(templateTypeKey);
- if (LarkMessageTemplateType.PLAIN_TEXT == templateType) {
- bodyBuilder.content(createTextContent(connectRecord))
- .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
- } else if (LarkMessageTemplateType.MARKDOWN == templateType) {
- String title =
Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK))
- .orElse("EventMesh-Message");
-
bodyBuilder.content(createInteractiveContent(connectRecord, title))
-
.msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());
- }
+ Map<String, List<String>> headers = new HashMap<>();
+ headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
- return CreateMessageReq.newBuilder()
- .receiveIdType(sinkConnectorConfig.getReceiveIdType())
- .createMessageReqBody(bodyBuilder.build())
- .build();
- });
+ RequestOptions requestOptions = RequestOptions.newBuilder()
+
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
+ .headers(headers)
+ .build();
+
+ retryer.call(() -> {
+ CreateMessageReq createMessageReq =
convertCreateMessageReq(connectRecord);
CreateMessageResp resp =
imService.message().create(createMessageReq, requestOptions);
if (resp.getCode() != 0) {
log.warn("Sinking event to lark failure | code:[{}] | msg:[{}]
| err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
return connectRecord;
}
-
- UN_ACK_REQ.remove(connectRecord);
return null;
});
}
+ public void sinkAsync(ConnectRecord connectRecord) {
Review Comment:
There is no hidden dangers, just more efficiency. Now, for each data send,
create a Runnable task and submit it to the thread pool. If you directly put
the data or some failed data into the queue, you only need to create a Runnable
task to submit to the thread pool, and then loop through the task to retrieve
the data. If there are many tasks in the thread pool and processing time is
insufficient, there may be frequent switching of thread processing tasks after
a certain accumulation. This is my opinion, what do you think?
没有隐藏的不安全,只是更高效。现在每发一次数据,创建一个Runnable任务,然后提交到线程池。如果直接把数据或部分失败数据放到队列里,只需要创建一个Runnable任务提交给线程池,然后任务中循环取数据。如果线程池中任务较多且处理不及时时,有一定累积后,可能会频繁切换线程处理任务。这是我的观点,您怎么看?
##########
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java:
##########
@@ -93,76 +102,154 @@ public static ImServiceHandler create(SinkConnectorConfig
sinkConnectorConfig) {
.build()
.im();
- Map<String, List<String>> headers = new HashMap<>();
- headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
-
- imServiceHandler.requestOptions = RequestOptions.newBuilder()
-
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
- .headers(headers)
- .build();
-
long fixedWait =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
- imServiceHandler.retryer = RetryerBuilder.<ConnectRecord>newBuilder()
- .retryIfException()
- .retryIfResult(Objects::nonNull)
- .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
-
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
- .withRetryListener(new RetryListener() {
- @SneakyThrows
- @Override
- public <V> void onRetry(Attempt<V> attempt) {
- long times = attempt.getAttemptNumber();
- if (times > 1) {
- log.warn("Retry sink event to lark | times=[{}]",
attempt.getAttemptNumber() - 1);
- }
- if (times == maxRetryTimes) {
- UN_ACK_REQ.remove(attempt.get());
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ int availableProcessors =
Runtime.getRuntime().availableProcessors();
+ imServiceHandler.sinkAsyncWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-sinkAsyncWorker");
+ return thread;
+ });
+
+ imServiceHandler.cleanerWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-cleanerWorker");
+ return thread;
+ });
+
+ imServiceHandler.retryWorker =
Executors.newScheduledThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-retryWorker");
+ return thread;
+ });
+ } else {
+ imServiceHandler.retryer =
RetryerBuilder.<ConnectRecord>newBuilder()
+ .retryIfException()
+ .retryIfResult(Objects::nonNull)
+ .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
+
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
+ .withRetryListener(new RetryListener() {
+ @SneakyThrows
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+
+ long times = attempt.getAttemptNumber();
+ if (times > 1) {
+ redoSinkNum.increment();
+ log.info("Total redo sink task num : [{}]",
redoSinkNum.sum());
+ log.warn("Retry sink event to lark |
times=[{}]", attempt.getAttemptNumber() - 1);
+ }
}
- }
- })
- .build();
+ })
+ .build();
+ }
+
return imServiceHandler;
}
public void sink(ConnectRecord connectRecord) throws ExecutionException,
RetryException {
- retryer.call(() -> {
- CreateMessageReq createMessageReq =
UN_ACK_REQ.computeIfAbsent(connectRecord, (k) -> {
- CreateMessageReqBody.Builder bodyBuilder =
CreateMessageReqBody.newBuilder()
- .receiveId(sinkConnectorConfig.getReceiveId())
- .uuid(UUID.randomUUID().toString());
-
- String templateTypeKey =
connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK);
- if (null == templateTypeKey || "null".equals(templateTypeKey))
{
- templateTypeKey =
LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey();
- }
- LarkMessageTemplateType templateType =
LarkMessageTemplateType.of(templateTypeKey);
- if (LarkMessageTemplateType.PLAIN_TEXT == templateType) {
- bodyBuilder.content(createTextContent(connectRecord))
- .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
- } else if (LarkMessageTemplateType.MARKDOWN == templateType) {
- String title =
Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK))
- .orElse("EventMesh-Message");
-
bodyBuilder.content(createInteractiveContent(connectRecord, title))
-
.msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());
- }
+ Map<String, List<String>> headers = new HashMap<>();
+ headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
- return CreateMessageReq.newBuilder()
- .receiveIdType(sinkConnectorConfig.getReceiveIdType())
- .createMessageReqBody(bodyBuilder.build())
- .build();
- });
+ RequestOptions requestOptions = RequestOptions.newBuilder()
+
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
+ .headers(headers)
+ .build();
+
+ retryer.call(() -> {
+ CreateMessageReq createMessageReq =
convertCreateMessageReq(connectRecord);
CreateMessageResp resp =
imService.message().create(createMessageReq, requestOptions);
if (resp.getCode() != 0) {
log.warn("Sinking event to lark failure | code:[{}] | msg:[{}]
| err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
return connectRecord;
}
-
- UN_ACK_REQ.remove(connectRecord);
return null;
});
}
+ public void sinkAsync(ConnectRecord connectRecord) {
Review Comment:
There is no hidden dangers, just more efficiency. Now, for each data send,
create a Runnable task and submit it to the thread pool. If you directly put
the data or some failed data into the queue, you only need to create a Runnable
task to submit to the thread pool, and then loop through the task to retrieve
the data. If there are many tasks in the thread pool and processing time is
insufficient, there may be frequent switching of thread processing tasks after
a certain accumulation. This is my opinion, what do you think?
没有隐藏的不安全,只是更高效。现在每发一次数据,创建一个Runnable任务,然后提交到线程池。如果直接把数据或部分失败数据放到队列里,只需要创建一个Runnable任务提交给线程池,然后任务中循环取数据。如果线程池中任务较多且处理不及时时,有一定累积后,可能会频繁切换线程处理任务。这是我的观点,您怎么看?
##########
eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java:
##########
@@ -85,31 +88,75 @@ public void setup() throws Exception {
@Test
public void testRegularSink() throws Exception {
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ regularSink();
+ }
+
+ @Test
+ public void testRegularSinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ regularSink();
+ }
+
+ private void regularSink() throws Exception {
final int times = 3;
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- imServiceHandler.sink(connectRecord);
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ long duration = retryDelayInMills * times;
+ Thread.sleep(duration);
+ } else {
+ imServiceHandler.sink(connectRecord);
+ }
}
-
- verify(message, times(times)).create(any(), any());
}
@Test
public void testRetrySink() throws Exception {
- doThrow(new Exception()).when(message).create(any(), any());
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ retrySink();
+ }
+
+ @Test
+ public void testRetrySinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ retrySink();
+ }
+
+ private void retrySink() throws Exception {
+ CreateMessageResp resp = new CreateMessageResp();
+ resp.setCode(1);
+ doReturn(resp).when(message).create(any(), any());
final int times = 3;
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes());
+ // (maxRetryTimes + 1) event are actually sent
+ int sinkTimes = times * (maxRetryTimes + 1);
+ long duration = retryDelayInMills * sinkTimes;
+
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- Assertions.assertThrows(Exception.class, () ->
imServiceHandler.sink(connectRecord));
- }
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
- // (maxRetryTimes + 1) event are actually sent
- verify(message, times(times *
(Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1))).create(any(),
any());
+ Thread.sleep(duration);
Review Comment:
Oh! I missed the last line of code. retrySink() is already sufficient for
me. This review should be written under regularSink().
Oh! 我错过了最后一行代码,retrySink() 已经可以了。这段review应该写在regularSink()下。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]