hhuang1231 commented on code in PR #4599:
URL: https://github.com/apache/eventmesh/pull/4599#discussion_r1413516292


##########
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.lark.sink.connector;
+
+import static org.apache.eventmesh.connector.lark.sink.ImServiceHandler.create;
+
+import org.apache.eventmesh.connector.lark.sink.ImServiceHandler;
+import org.apache.eventmesh.connector.lark.sink.config.LarkSinkConfig;
+import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.github.rholder.retry.RetryException;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.lark.oapi.Client;
+import com.lark.oapi.core.enums.AppType;
+import com.lark.oapi.core.request.SelfBuiltTenantAccessTokenReq;
+import com.lark.oapi.core.response.TenantAccessTokenResp;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class LarkSinkConnector implements Sink {
+
+    public static final String TENANT_ACCESS_TOKEN = "tenant_access_token";
+
+    /**
+     * replace lark build-in tokenCache
+     */
+    public static final Cache<String, String> AUTH_CACHE = 
CacheBuilder.newBuilder()
+            .initialCapacity(12)
+            .maximumSize(10)
+            .concurrencyLevel(5)
+            .expireAfterWrite(30, TimeUnit.MINUTES)
+            .build();
+
+    private LarkSinkConfig sinkConfig;
+
+    private ImServiceHandler imServiceHandler;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return LarkSinkConfig.class;
+    }
+
+    @Override
+    public void init(Config config) {
+        // Deprecated
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) {
+        // init config for lark sink connector
+        SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) 
connectorContext;
+        this.sinkConfig = (LarkSinkConfig) 
sinkConnectorContext.getSinkConfig();
+
+        SinkConnectorConfig sinkConnectorConfig = 
sinkConfig.getSinkConnectorConfig();
+        sinkConnectorConfig.validateSinkConfiguration();
+
+        imServiceHandler = create(sinkConnectorConfig);
+    }
+
+    @Override
+    public void start() {
+        if (!started.compareAndSet(false, true)) {
+            log.info("LarkSinkConnector has been started.");
+        }
+    }
+
+    @Override
+    public void commit(ConnectRecord record) {
+        // Sink does not need to implement
+    }
+
+    @Override
+    public String name() {
+        return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+    }
+
+    @Override
+    public void stop() {
+        if (!started.compareAndSet(true, false)) {
+            log.info("LarkSinkConnector has not started yet.");
+        }
+    }
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) {
+        for (ConnectRecord connectRecord : sinkRecords) {
+            try {
+                imServiceHandler.sink(connectRecord);
+            } catch (ExecutionException | RetryException e) {
+                log.error("Failed to sink event to lark", e);
+            }
+        }
+    }
+
+    @SneakyThrows
+    public static String getTenantAccessToken(String appId, String appSecret) {

Review Comment:
   Lark's access credentials have an expiration mechanism, but the expiration 
time can be refreshed by re-obtaining new credentials within half an hour 
before expiration. The documentation provided by Lark is as follows:
   > The maximum validity period of tenant_access_token is 2 hours. If this 
interface is called when the validity period is less than 30 minutes, a new 
tenant_access_token will be returned, and there will be two valid 
tenant_access_tokens at the same time.
   
   _I think expiration is not the reason for using Cache. If you only need to 
manage one kind of credential, then there is no need to use a collection to 
maintain the validity of the credential._
   
   ---
   
   飞书的访问凭证存在过期机制,但是在过期前半个小时内重新获取新的凭证即可刷新过期时间。飞书提供的文档说明如下:
   >  tenant_access_token 的最大有效期是 2 小时。如果在有效期小于 30 分钟的情况下,调用本接口,会返回一个新的 
tenant_access_token,这会同时存在两个有效的 tenant_access_token。
   
   _我认为过期并不是使用Cache的原因,如果仅需要管理一种凭证,那么并不需要使用一个集合来维护凭证的有效性。_



##########
eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.lark.sink.connector;
+
+import static org.apache.eventmesh.connector.lark.sink.ImServiceHandler.create;
+
+import org.apache.eventmesh.connector.lark.sink.ImServiceHandler;
+import org.apache.eventmesh.connector.lark.sink.config.LarkSinkConfig;
+import org.apache.eventmesh.connector.lark.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.github.rholder.retry.RetryException;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.lark.oapi.Client;
+import com.lark.oapi.core.enums.AppType;
+import com.lark.oapi.core.request.SelfBuiltTenantAccessTokenReq;
+import com.lark.oapi.core.response.TenantAccessTokenResp;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class LarkSinkConnector implements Sink {
+
+    public static final String TENANT_ACCESS_TOKEN = "tenant_access_token";
+
+    /**
+     * replace lark build-in tokenCache
+     */
+    public static final Cache<String, String> AUTH_CACHE = 
CacheBuilder.newBuilder()
+            .initialCapacity(12)
+            .maximumSize(10)
+            .concurrencyLevel(5)
+            .expireAfterWrite(30, TimeUnit.MINUTES)
+            .build();
+
+    private LarkSinkConfig sinkConfig;
+
+    private ImServiceHandler imServiceHandler;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return LarkSinkConfig.class;
+    }
+
+    @Override
+    public void init(Config config) {
+        // Deprecated
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) {
+        // init config for lark sink connector
+        SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) 
connectorContext;
+        this.sinkConfig = (LarkSinkConfig) 
sinkConnectorContext.getSinkConfig();
+
+        SinkConnectorConfig sinkConnectorConfig = 
sinkConfig.getSinkConnectorConfig();
+        sinkConnectorConfig.validateSinkConfiguration();
+
+        imServiceHandler = create(sinkConnectorConfig);
+    }
+
+    @Override
+    public void start() {
+        if (!started.compareAndSet(false, true)) {
+            log.info("LarkSinkConnector has been started.");
+        }
+    }
+
+    @Override
+    public void commit(ConnectRecord record) {
+        // Sink does not need to implement
+    }
+
+    @Override
+    public String name() {
+        return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+    }
+
+    @Override
+    public void stop() {
+        if (!started.compareAndSet(true, false)) {
+            log.info("LarkSinkConnector has not started yet.");
+        }
+    }
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) {
+        for (ConnectRecord connectRecord : sinkRecords) {
+            try {
+                imServiceHandler.sink(connectRecord);
+            } catch (ExecutionException | RetryException e) {
+                log.error("Failed to sink event to lark", e);
+            }
+        }
+    }
+
+    @SneakyThrows
+    public static String getTenantAccessToken(String appId, String appSecret) {

Review Comment:
   Lark's access credentials have an expiration mechanism, but the expiration 
time can be refreshed by re-obtaining new credentials within half an hour 
before expiration. The documentation provided by Lark is as follows:
   > The maximum validity period of tenant_access_token is 2 hours. If this 
interface is called when the validity period is less than 30 minutes, a new 
tenant_access_token will be returned, and there will be two valid 
tenant_access_tokens at the same time.
   
   _I think expiration is not the reason for using Cache. If you only need to 
manage one kind of credential, then there is no need to use a collection to 
maintain the validity of the credential._
   
   ---
   
   飞书的访问凭证存在过期机制,但是在过期前半个小时内重新获取新的凭证即可刷新过期时间。飞书提供的文档说明如下:
   >  tenant_access_token 的最大有效期是 2 小时。如果在有效期小于 30 分钟的情况下,调用本接口,会返回一个新的 
tenant_access_token,这会同时存在两个有效的 tenant_access_token。
   
   _我认为过期并不是使用Cache的原因,如果仅需要管理一种凭证,那么并不需要使用一个集合来维护凭证的有效性。_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to