hhuang1231 commented on code in PR #4599: URL: https://github.com/apache/eventmesh/pull/4599#discussion_r1413500689
########## eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.lark.sink; + +import static org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector.getTenantAccessToken; + +import org.apache.eventmesh.connector.lark.ConnectRecordExtensionKeys; +import org.apache.eventmesh.connector.lark.config.LarkMessageTemplateType; +import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import org.apache.commons.text.StringEscapeUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.lark.oapi.Client; +import com.lark.oapi.card.enums.MessageCardHeaderTemplateEnum; +import com.lark.oapi.card.model.MessageCard; +import com.lark.oapi.card.model.MessageCardConfig; +import com.lark.oapi.card.model.MessageCardElement; +import com.lark.oapi.card.model.MessageCardHeader; +import com.lark.oapi.card.model.MessageCardMarkdown; +import com.lark.oapi.card.model.MessageCardPlainText; +import com.lark.oapi.core.httpclient.OkHttpTransport; +import com.lark.oapi.core.request.RequestOptions; +import com.lark.oapi.core.utils.Lists; +import com.lark.oapi.okhttp.OkHttpClient; +import com.lark.oapi.service.im.v1.ImService; +import com.lark.oapi.service.im.v1.enums.MsgTypeEnum; +import com.lark.oapi.service.im.v1.model.CreateMessageReq; +import com.lark.oapi.service.im.v1.model.CreateMessageReqBody; +import com.lark.oapi.service.im.v1.model.CreateMessageResp; +import com.lark.oapi.service.im.v1.model.ext.MessageText; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ImServiceHandler { + + private static final ConcurrentHashMap<ConnectRecord, CreateMessageReq> UN_ACK_REQ = new ConcurrentHashMap<>(); Review Comment: @pandaapo If it is an asynchronous request, Feishu may fail to respond in time and the connectorServer starts to retransmit. This may lead to repeated consumption on Feishu. But there is currently a problem with my implementation logic, because the method of using `com.github.rholder.retry.Retryer` is a synchronous request, and I mistakenly thought it was an asynchronous request. - For synchronous requests, this collection is not needed to store unresponsive requests. - Or think about it from another angle, where can dead letter events be stored? my thoughts: 1. If there are performance requirements, I recommend removing `retryer` and using `ScheduledExecutorService` to implement asynchronous requests. 2. Continue to use `retryer` and remove the `UN_ACK_REQ` collection 3. If there is no need to retransmit, you can directly remove this function. Do you have any better suggestions in this regard? --- 如果是异步请求就可能会出现飞书未能及时响应,而connectorServer开始重传的情况。这可能会导致飞书重复消费。 但是目前我的实现逻辑存在问题,因为使用`com.github.rholder.retry.Retryer`的方式属于同步请求,而我误以为是异步请求。 - 对于同步请求来说,并不需要这种集合来存储未响应的请求。 - 或者可以换另一个角度思考,这里可以存储死信事件? 我的想法: 1. 有性能需求的话,我建议移除`retryer`,自行使用`ScheduledExecutorService`实现异步请求。 2. 继续使用`retryer`,移除`UN_ACK_REQ`集合 3. 没必要重传的话,可以直接移除该功能。 请问您在这方面有什么更好的建议吗? ########## eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.lark.sink; + +import static org.apache.eventmesh.connector.lark.sink.connector.LarkSinkConnector.getTenantAccessToken; + +import org.apache.eventmesh.connector.lark.ConnectRecordExtensionKeys; +import org.apache.eventmesh.connector.lark.config.LarkMessageTemplateType; +import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import org.apache.commons.text.StringEscapeUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.lark.oapi.Client; +import com.lark.oapi.card.enums.MessageCardHeaderTemplateEnum; +import com.lark.oapi.card.model.MessageCard; +import com.lark.oapi.card.model.MessageCardConfig; +import com.lark.oapi.card.model.MessageCardElement; +import com.lark.oapi.card.model.MessageCardHeader; +import com.lark.oapi.card.model.MessageCardMarkdown; +import com.lark.oapi.card.model.MessageCardPlainText; +import com.lark.oapi.core.httpclient.OkHttpTransport; +import com.lark.oapi.core.request.RequestOptions; +import com.lark.oapi.core.utils.Lists; +import com.lark.oapi.okhttp.OkHttpClient; +import com.lark.oapi.service.im.v1.ImService; +import com.lark.oapi.service.im.v1.enums.MsgTypeEnum; +import com.lark.oapi.service.im.v1.model.CreateMessageReq; +import com.lark.oapi.service.im.v1.model.CreateMessageReqBody; +import com.lark.oapi.service.im.v1.model.CreateMessageResp; +import com.lark.oapi.service.im.v1.model.ext.MessageText; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ImServiceHandler { + + private static final ConcurrentHashMap<ConnectRecord, CreateMessageReq> UN_ACK_REQ = new ConcurrentHashMap<>(); Review Comment: @pandaapo If it is an asynchronous request, Feishu may fail to respond in time and the connectorServer starts to retransmit. This may lead to repeated consumption on Feishu. But there is currently a problem with my implementation logic, because the method of using `com.github.rholder.retry.Retryer` is a synchronous request, and I mistakenly thought it was an asynchronous request. - For synchronous requests, this collection is not needed to store unresponsive requests. - Or think about it from another angle, where can dead letter events be stored? my thoughts: 1. If there are performance requirements, I recommend removing `retryer` and using `ScheduledExecutorService` to implement asynchronous requests. 2. Continue to use `retryer` and remove the `UN_ACK_REQ` collection 3. If there is no need to retransmit, you can directly remove this function. Do you have any better suggestions in this regard? --- 如果是异步请求就可能会出现飞书未能及时响应,而connectorServer开始重传的情况。这可能会导致飞书重复消费。 但是目前我的实现逻辑存在问题,因为使用`com.github.rholder.retry.Retryer`的方式属于同步请求,而我误以为是异步请求。 - 对于同步请求来说,并不需要这种集合来存储未响应的请求。 - 或者可以换另一个角度思考,这里可以存储死信事件? 我的想法: 1. 有性能需求的话,我建议移除`retryer`,自行使用`ScheduledExecutorService`实现异步请求。 2. 继续使用`retryer`,移除`UN_ACK_REQ`集合 3. 没必要重传的话,可以直接移除该功能。 请问您在这方面有什么更好的建议吗? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
