pandaapo commented on code in PR #4599:
URL: https://github.com/apache/eventmesh/pull/4599#discussion_r1415674144
##########
eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/LarkSinkConnectorTest.java:
##########
@@ -43,12 +43,17 @@
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
Review Comment:
It seems unnecessary to add this annotation because Strictness.LENIENT is
the default execution mode of Mockito.
似乎没有必要增加该注解,因为Strictness.LENIENT是Mockito默认的执行模式。
##########
eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java:
##########
@@ -85,31 +88,75 @@ public void setup() throws Exception {
@Test
public void testRegularSink() throws Exception {
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ regularSink();
+ }
+
+ @Test
+ public void testRegularSinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ regularSink();
+ }
+
+ private void regularSink() throws Exception {
final int times = 3;
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- imServiceHandler.sink(connectRecord);
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ long duration = retryDelayInMills * times;
+ Thread.sleep(duration);
+ } else {
+ imServiceHandler.sink(connectRecord);
+ }
}
-
- verify(message, times(times)).create(any(), any());
}
@Test
public void testRetrySink() throws Exception {
- doThrow(new Exception()).when(message).create(any(), any());
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ retrySink();
+ }
+
+ @Test
+ public void testRetrySinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ retrySink();
+ }
+
+ private void retrySink() throws Exception {
+ CreateMessageResp resp = new CreateMessageResp();
+ resp.setCode(1);
+ doReturn(resp).when(message).create(any(), any());
final int times = 3;
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes());
+ // (maxRetryTimes + 1) event are actually sent
+ int sinkTimes = times * (maxRetryTimes + 1);
+ long duration = retryDelayInMills * sinkTimes;
+
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- Assertions.assertThrows(Exception.class, () ->
imServiceHandler.sink(connectRecord));
- }
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
- // (maxRetryTimes + 1) event are actually sent
- verify(message, times(times *
(Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1))).create(any(),
any());
+ Thread.sleep(duration);
Review Comment:
In this unit test class, the effectiveness (I don't know if the term is
appropriate, meaning there is no checksum assertion) is relatively weak for
both situations where there is no retry and asynchronous retry. It would be
best if optimization could continue in the future.
该单元测试类中,对于不会重试的情况和异步重试的情况的测试,有效性(我不知道这个词是否恰当,意思是没有校验和断言)都比较弱。如果后续能继续优化那最好不过。
##########
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java:
##########
@@ -93,76 +102,154 @@ public static ImServiceHandler create(SinkConnectorConfig
sinkConnectorConfig) {
.build()
.im();
- Map<String, List<String>> headers = new HashMap<>();
- headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
-
- imServiceHandler.requestOptions = RequestOptions.newBuilder()
-
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
- .headers(headers)
- .build();
-
long fixedWait =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
- imServiceHandler.retryer = RetryerBuilder.<ConnectRecord>newBuilder()
- .retryIfException()
- .retryIfResult(Objects::nonNull)
- .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
-
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
- .withRetryListener(new RetryListener() {
- @SneakyThrows
- @Override
- public <V> void onRetry(Attempt<V> attempt) {
- long times = attempt.getAttemptNumber();
- if (times > 1) {
- log.warn("Retry sink event to lark | times=[{}]",
attempt.getAttemptNumber() - 1);
- }
- if (times == maxRetryTimes) {
- UN_ACK_REQ.remove(attempt.get());
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ int availableProcessors =
Runtime.getRuntime().availableProcessors();
+ imServiceHandler.sinkAsyncWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-sinkAsyncWorker");
+ return thread;
+ });
+
+ imServiceHandler.cleanerWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-cleanerWorker");
+ return thread;
+ });
+
+ imServiceHandler.retryWorker =
Executors.newScheduledThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-retryWorker");
+ return thread;
+ });
+ } else {
+ imServiceHandler.retryer =
RetryerBuilder.<ConnectRecord>newBuilder()
+ .retryIfException()
+ .retryIfResult(Objects::nonNull)
+ .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
+
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
+ .withRetryListener(new RetryListener() {
+ @SneakyThrows
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+
+ long times = attempt.getAttemptNumber();
+ if (times > 1) {
+ redoSinkNum.increment();
+ log.info("Total redo sink task num : [{}]",
redoSinkNum.sum());
+ log.warn("Retry sink event to lark |
times=[{}]", attempt.getAttemptNumber() - 1);
+ }
}
- }
- })
- .build();
+ })
+ .build();
+ }
+
return imServiceHandler;
}
public void sink(ConnectRecord connectRecord) throws ExecutionException,
RetryException {
- retryer.call(() -> {
- CreateMessageReq createMessageReq =
UN_ACK_REQ.computeIfAbsent(connectRecord, (k) -> {
- CreateMessageReqBody.Builder bodyBuilder =
CreateMessageReqBody.newBuilder()
- .receiveId(sinkConnectorConfig.getReceiveId())
- .uuid(UUID.randomUUID().toString());
-
- String templateTypeKey =
connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK);
- if (null == templateTypeKey || "null".equals(templateTypeKey))
{
- templateTypeKey =
LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey();
- }
- LarkMessageTemplateType templateType =
LarkMessageTemplateType.of(templateTypeKey);
- if (LarkMessageTemplateType.PLAIN_TEXT == templateType) {
- bodyBuilder.content(createTextContent(connectRecord))
- .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
- } else if (LarkMessageTemplateType.MARKDOWN == templateType) {
- String title =
Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK))
- .orElse("EventMesh-Message");
-
bodyBuilder.content(createInteractiveContent(connectRecord, title))
-
.msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());
- }
+ Map<String, List<String>> headers = new HashMap<>();
+ headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
- return CreateMessageReq.newBuilder()
- .receiveIdType(sinkConnectorConfig.getReceiveIdType())
- .createMessageReqBody(bodyBuilder.build())
- .build();
- });
+ RequestOptions requestOptions = RequestOptions.newBuilder()
+
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
+ .headers(headers)
+ .build();
+
+ retryer.call(() -> {
+ CreateMessageReq createMessageReq =
convertCreateMessageReq(connectRecord);
CreateMessageResp resp =
imService.message().create(createMessageReq, requestOptions);
if (resp.getCode() != 0) {
log.warn("Sinking event to lark failure | code:[{}] | msg:[{}]
| err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
return connectRecord;
}
-
- UN_ACK_REQ.remove(connectRecord);
return null;
});
}
+ public void sinkAsync(ConnectRecord connectRecord) {
Review Comment:
Can the asynchronous mechanism in this method be taken outside of the
sinkAsync() method? For example, storing failed messages or each message in a
queue, and then asynchronously retry or process the messages in the queue. This
way, there is no need to submit a task to the thread pool every time a message
is sent. If you agree but want to continue optimizing in the new PR, you can
also do so.
能否将该方法中的异步机制拿到sinkAsync()方法外面?比如将发送失败的消息或者每个消息先存到一个队列中,然后对队列中的消息进行异步重试或者异步处理。这样就不用每发一个消息都向线程池提交一次任务。如果您赞同但想在新的PR中继续优化,也可以。
##########
eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java:
##########
@@ -85,31 +88,75 @@ public void setup() throws Exception {
@Test
public void testRegularSink() throws Exception {
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ regularSink();
+ }
+
+ @Test
+ public void testRegularSinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ regularSink();
+ }
+
+ private void regularSink() throws Exception {
final int times = 3;
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- imServiceHandler.sink(connectRecord);
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ long duration = retryDelayInMills * times;
+ Thread.sleep(duration);
+ } else {
+ imServiceHandler.sink(connectRecord);
+ }
}
-
- verify(message, times(times)).create(any(), any());
}
@Test
public void testRetrySink() throws Exception {
- doThrow(new Exception()).when(message).create(any(), any());
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ retrySink();
+ }
+
+ @Test
+ public void testRetrySinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ retrySink();
+ }
+
+ private void retrySink() throws Exception {
+ CreateMessageResp resp = new CreateMessageResp();
+ resp.setCode(1);
+ doReturn(resp).when(message).create(any(), any());
final int times = 3;
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes());
+ // (maxRetryTimes + 1) event are actually sent
+ int sinkTimes = times * (maxRetryTimes + 1);
+ long duration = retryDelayInMills * sinkTimes;
+
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- Assertions.assertThrows(Exception.class, () ->
imServiceHandler.sink(connectRecord));
- }
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
- // (maxRetryTimes + 1) event are actually sent
- verify(message, times(times *
(Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1))).create(any(),
any());
+ Thread.sleep(duration);
+ } else {
+ Assertions.assertThrows(Exception.class, () ->
imServiceHandler.sink(connectRecord));
Review Comment:
This is my question: Is this exception thrown internally by
om.github.rholder.retry.Retryer?
这是我的疑问:该异常是com.github.rholder.retry.Retryer内部抛出来的?
##########
eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/LarkSinkConnectorTest.java:
##########
@@ -43,12 +43,17 @@
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
Review Comment:
It seems unnecessary to add this annotation because Strictness.LENIENT is
the default execution mode of Mockito.
似乎没有必要增加该注解,因为Strictness.LENIENT是Mockito默认的执行模式。
##########
eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java:
##########
@@ -85,31 +88,75 @@ public void setup() throws Exception {
@Test
public void testRegularSink() throws Exception {
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ regularSink();
+ }
+
+ @Test
+ public void testRegularSinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ regularSink();
+ }
+
+ private void regularSink() throws Exception {
final int times = 3;
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- imServiceHandler.sink(connectRecord);
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ long duration = retryDelayInMills * times;
+ Thread.sleep(duration);
+ } else {
+ imServiceHandler.sink(connectRecord);
+ }
}
-
- verify(message, times(times)).create(any(), any());
}
@Test
public void testRetrySink() throws Exception {
- doThrow(new Exception()).when(message).create(any(), any());
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ retrySink();
+ }
+
+ @Test
+ public void testRetrySinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ retrySink();
+ }
+
+ private void retrySink() throws Exception {
+ CreateMessageResp resp = new CreateMessageResp();
+ resp.setCode(1);
+ doReturn(resp).when(message).create(any(), any());
final int times = 3;
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes());
+ // (maxRetryTimes + 1) event are actually sent
+ int sinkTimes = times * (maxRetryTimes + 1);
+ long duration = retryDelayInMills * sinkTimes;
+
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- Assertions.assertThrows(Exception.class, () ->
imServiceHandler.sink(connectRecord));
- }
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
- // (maxRetryTimes + 1) event are actually sent
- verify(message, times(times *
(Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1))).create(any(),
any());
+ Thread.sleep(duration);
Review Comment:
In this unit test class, the effectiveness (I don't know if the term is
appropriate, meaning there is no checksum assertion) is relatively weak for
both situations where there is no retry and asynchronous retry. It would be
best if optimization could continue in the future.
该单元测试类中,对于不会重试的情况和异步重试的情况的测试,有效性(我不知道这个词是否恰当,意思是没有校验和断言)都比较弱。如果后续能继续优化那最好不过。
##########
eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java:
##########
@@ -85,31 +88,75 @@ public void setup() throws Exception {
@Test
public void testRegularSink() throws Exception {
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ regularSink();
+ }
+
+ @Test
+ public void testRegularSinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ regularSink();
+ }
+
+ private void regularSink() throws Exception {
final int times = 3;
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- imServiceHandler.sink(connectRecord);
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ long duration = retryDelayInMills * times;
+ Thread.sleep(duration);
+ } else {
+ imServiceHandler.sink(connectRecord);
+ }
}
-
- verify(message, times(times)).create(any(), any());
}
@Test
public void testRetrySink() throws Exception {
- doThrow(new Exception()).when(message).create(any(), any());
+ sinkConnectorConfig.setSinkAsync("false");
+ init();
+ retrySink();
+ }
+
+ @Test
+ public void testRetrySinkAsync() throws Exception {
+ sinkConnectorConfig.setSinkAsync("true");
+ init();
+ retrySink();
+ }
+
+ private void retrySink() throws Exception {
+ CreateMessageResp resp = new CreateMessageResp();
+ resp.setCode(1);
+ doReturn(resp).when(message).create(any(), any());
final int times = 3;
+ long retryDelayInMills =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
+ int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes());
+ // (maxRetryTimes + 1) event are actually sent
+ int sinkTimes = times * (maxRetryTimes + 1);
+ long duration = retryDelayInMills * sinkTimes;
+
for (int i = 0; i < times; i++) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(),
"test-lark".getBytes(StandardCharsets.UTF_8));
- Assertions.assertThrows(Exception.class, () ->
imServiceHandler.sink(connectRecord));
- }
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ imServiceHandler.sinkAsync(connectRecord);
- // (maxRetryTimes + 1) event are actually sent
- verify(message, times(times *
(Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1))).create(any(),
any());
+ Thread.sleep(duration);
+ } else {
+ Assertions.assertThrows(Exception.class, () ->
imServiceHandler.sink(connectRecord));
Review Comment:
This is my question: Is this exception thrown internally by
om.github.rholder.retry.Retryer?
这是我的疑问:该异常是com.github.rholder.retry.Retryer内部抛出来的?
##########
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java:
##########
@@ -93,76 +102,154 @@ public static ImServiceHandler create(SinkConnectorConfig
sinkConnectorConfig) {
.build()
.im();
- Map<String, List<String>> headers = new HashMap<>();
- headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
-
- imServiceHandler.requestOptions = RequestOptions.newBuilder()
-
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
- .headers(headers)
- .build();
-
long fixedWait =
Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
int maxRetryTimes =
Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
- imServiceHandler.retryer = RetryerBuilder.<ConnectRecord>newBuilder()
- .retryIfException()
- .retryIfResult(Objects::nonNull)
- .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
-
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
- .withRetryListener(new RetryListener() {
- @SneakyThrows
- @Override
- public <V> void onRetry(Attempt<V> attempt) {
- long times = attempt.getAttemptNumber();
- if (times > 1) {
- log.warn("Retry sink event to lark | times=[{}]",
attempt.getAttemptNumber() - 1);
- }
- if (times == maxRetryTimes) {
- UN_ACK_REQ.remove(attempt.get());
+ if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
+ int availableProcessors =
Runtime.getRuntime().availableProcessors();
+ imServiceHandler.sinkAsyncWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-sinkAsyncWorker");
+ return thread;
+ });
+
+ imServiceHandler.cleanerWorker =
Executors.newFixedThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-cleanerWorker");
+ return thread;
+ });
+
+ imServiceHandler.retryWorker =
Executors.newScheduledThreadPool(availableProcessors, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("eventmesh-connector-lark-retryWorker");
+ return thread;
+ });
+ } else {
+ imServiceHandler.retryer =
RetryerBuilder.<ConnectRecord>newBuilder()
+ .retryIfException()
+ .retryIfResult(Objects::nonNull)
+ .withWaitStrategy(WaitStrategies.fixedWait(fixedWait,
TimeUnit.MILLISECONDS))
+
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
+ .withRetryListener(new RetryListener() {
+ @SneakyThrows
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+
+ long times = attempt.getAttemptNumber();
+ if (times > 1) {
+ redoSinkNum.increment();
+ log.info("Total redo sink task num : [{}]",
redoSinkNum.sum());
+ log.warn("Retry sink event to lark |
times=[{}]", attempt.getAttemptNumber() - 1);
+ }
}
- }
- })
- .build();
+ })
+ .build();
+ }
+
return imServiceHandler;
}
public void sink(ConnectRecord connectRecord) throws ExecutionException,
RetryException {
- retryer.call(() -> {
- CreateMessageReq createMessageReq =
UN_ACK_REQ.computeIfAbsent(connectRecord, (k) -> {
- CreateMessageReqBody.Builder bodyBuilder =
CreateMessageReqBody.newBuilder()
- .receiveId(sinkConnectorConfig.getReceiveId())
- .uuid(UUID.randomUUID().toString());
-
- String templateTypeKey =
connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK);
- if (null == templateTypeKey || "null".equals(templateTypeKey))
{
- templateTypeKey =
LarkMessageTemplateType.PLAIN_TEXT.getTemplateKey();
- }
- LarkMessageTemplateType templateType =
LarkMessageTemplateType.of(templateTypeKey);
- if (LarkMessageTemplateType.PLAIN_TEXT == templateType) {
- bodyBuilder.content(createTextContent(connectRecord))
- .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
- } else if (LarkMessageTemplateType.MARKDOWN == templateType) {
- String title =
Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK))
- .orElse("EventMesh-Message");
-
bodyBuilder.content(createInteractiveContent(connectRecord, title))
-
.msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());
- }
+ Map<String, List<String>> headers = new HashMap<>();
+ headers.put("Content-Type", Lists.newArrayList("application/json;
charset=utf-8"));
- return CreateMessageReq.newBuilder()
- .receiveIdType(sinkConnectorConfig.getReceiveIdType())
- .createMessageReqBody(bodyBuilder.build())
- .build();
- });
+ RequestOptions requestOptions = RequestOptions.newBuilder()
+
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(),
sinkConnectorConfig.getAppSecret()))
+ .headers(headers)
+ .build();
+
+ retryer.call(() -> {
+ CreateMessageReq createMessageReq =
convertCreateMessageReq(connectRecord);
CreateMessageResp resp =
imService.message().create(createMessageReq, requestOptions);
if (resp.getCode() != 0) {
log.warn("Sinking event to lark failure | code:[{}] | msg:[{}]
| err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
return connectRecord;
}
-
- UN_ACK_REQ.remove(connectRecord);
return null;
});
}
+ public void sinkAsync(ConnectRecord connectRecord) {
Review Comment:
Can the asynchronous mechanism in this method be taken outside of the
sinkAsync() method? For example, storing failed messages or each message in a
queue, and then asynchronously retry or process the messages in the queue. This
way, there is no need to submit a task to the thread pool every time a message
is sent. If you agree but want to continue optimizing in the new PR, you can
also do so.
能否将该方法中的异步机制拿到sinkAsync()方法外面?比如将发送失败的消息或者每个消息先存到一个队列中,然后对队列中的消息进行异步重试或者异步处理。这样就不用每发一个消息都向线程池提交一次任务。如果您赞同但想在新的PR中继续优化,也可以。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]