2011shenlin commented on a change in pull request #5:
URL: https://github.com/apache/rocketmq-connect/pull/5#discussion_r821289924



##########
File path: 
connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkConnector.java
##########
@@ -0,0 +1,82 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FcSinkConnector extends SinkConnector {
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> keyValueList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(FcConstant.REGION_CONSTANT, region);
+        keyValue.put(FcConstant.ACCESS_KEY_CONSTANT, accessKey);
+        keyValue.put(FcConstant.ACCESS_SECRET_KEY_CONSTANT, accessSecretKey);
+        keyValue.put(FcConstant.ACCOUNT_ID_CONSTANT, accountId);
+        keyValue.put(FcConstant.SERVICE_NAME_CONSTANT, serviceName);
+        keyValue.put(FcConstant.FUNCTION_NAME_CONSTANT, functionName);
+        keyValue.put(FcConstant.INVOCATION_TYPE_CONSTANT, invocationType);
+        keyValue.put(FcConstant.QUALIFIER_CONSTANT, qualifier);
+        keyValueList.add(keyValue);
+        return keyValueList;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return FcSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        region = config.getString(FcConstant.REGION_CONSTANT);
+        accessKey = config.getString(FcConstant.ACCESS_KEY_CONSTANT);
+        accessSecretKey = 
config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT);
+        accountId = config.getString(FcConstant.ACCOUNT_ID_CONSTANT);
+        serviceName = config.getString(FcConstant.SERVICE_NAME_CONSTANT);
+        functionName = config.getString(FcConstant.FUNCTION_NAME_CONSTANT);
+        invocationType = config.getString(FcConstant.INVOCATION_TYPE_CONSTANT, 
null);

Review comment:
       If the configuration items of SinkConnector and SinkTask are the same, 
could return the SinkConnector config directly,Avoid duplicate code between 
SinkConnector and SinkTask about configuration item extraction

##########
File path: 
connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkTask.java
##########
@@ -0,0 +1,131 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import com.aliyuncs.fc.client.FunctionComputeClient;
+import com.aliyuncs.fc.constants.Const;
+import com.aliyuncs.fc.exceptions.ClientException;
+import com.aliyuncs.fc.request.GetFunctionRequest;
+import com.aliyuncs.fc.request.GetServiceRequest;
+import com.aliyuncs.fc.request.InvokeFunctionRequest;
+import com.aliyuncs.fc.response.InvokeFunctionResponse;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class FcSinkTask extends SinkTask {
+
+    private static final Logger log = 
LoggerFactory.getLogger(FcSinkTask.class);
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    private FunctionComputeClient functionComputeClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(connectRecord -> {
+                InvokeFunctionRequest invokeFunctionRequest = new 
InvokeFunctionRequest(serviceName, functionName);
+                
invokeFunctionRequest.setPayload(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+                if (!StringUtils.isBlank(invocationType)) {
+                    
invokeFunctionRequest.setInvocationType(Const.INVOCATION_TYPE_ASYNC);
+                }
+                invokeFunctionRequest.setQualifier(qualifier);
+                InvokeFunctionResponse invokeFunctionResponse = 
functionComputeClient.invokeFunction(invokeFunctionRequest);
+                if (Const.INVOCATION_TYPE_ASYNC.equals(invocationType)) {
+                    if (HttpURLConnection.HTTP_ACCEPTED == 
invokeFunctionResponse.getStatus()) {
+                        log.info("Async invocation has been queued for 
execution, request ID: {}", invokeFunctionResponse.getRequestId());
+                    }else {
+                        log.info("Async invocation was not accepted");
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.error("FcSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(FcConstant.REGION_CONSTANT))
+            || 
StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_CONSTANT))
+            || 
StringUtils.isBlank(config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT))
+            || 
StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT))
+            || 
StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT))
+            || 
StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) {
+            throw new RuntimeException("fc required parameter is null !");
+        }
+        try {

Review comment:
       should check the access key and the secret key efficient.

##########
File path: 
connectors/aliyun/rocketmq-connect-fc/src/main/java/com/aliyun/rocketmq/connect/fc/sink/FcSinkTask.java
##########
@@ -0,0 +1,131 @@
+package com.aliyun.rocketmq.connect.fc.sink;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.rocketmq.connect.fc.sink.constant.FcConstant;
+import com.aliyuncs.fc.client.FunctionComputeClient;
+import com.aliyuncs.fc.constants.Const;
+import com.aliyuncs.fc.exceptions.ClientException;
+import com.aliyuncs.fc.request.GetFunctionRequest;
+import com.aliyuncs.fc.request.GetServiceRequest;
+import com.aliyuncs.fc.request.InvokeFunctionRequest;
+import com.aliyuncs.fc.response.InvokeFunctionResponse;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class FcSinkTask extends SinkTask {
+
+    private static final Logger log = 
LoggerFactory.getLogger(FcSinkTask.class);
+
+    private String region;
+
+    private String accessKey;
+
+    private String accessSecretKey;
+
+    private String accountId;
+
+    private String serviceName;
+
+    private String functionName;
+
+    private String invocationType;
+
+    private String qualifier;
+
+    private FunctionComputeClient functionComputeClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        try {
+            sinkRecords.forEach(connectRecord -> {
+                InvokeFunctionRequest invokeFunctionRequest = new 
InvokeFunctionRequest(serviceName, functionName);
+                
invokeFunctionRequest.setPayload(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+                if (!StringUtils.isBlank(invocationType)) {
+                    
invokeFunctionRequest.setInvocationType(Const.INVOCATION_TYPE_ASYNC);
+                }
+                invokeFunctionRequest.setQualifier(qualifier);
+                InvokeFunctionResponse invokeFunctionResponse = 
functionComputeClient.invokeFunction(invokeFunctionRequest);
+                if (Const.INVOCATION_TYPE_ASYNC.equals(invocationType)) {
+                    if (HttpURLConnection.HTTP_ACCEPTED == 
invokeFunctionResponse.getStatus()) {
+                        log.info("Async invocation has been queued for 
execution, request ID: {}", invokeFunctionResponse.getRequestId());
+                    }else {
+                        log.info("Async invocation was not accepted");
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.error("FcSinkTask | put | error => ", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(FcConstant.REGION_CONSTANT))
+            || 
StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_CONSTANT))
+            || 
StringUtils.isBlank(config.getString(FcConstant.ACCESS_SECRET_KEY_CONSTANT))
+            || 
StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT))
+            || 
StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT))
+            || 
StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) {
+            throw new RuntimeException("fc required parameter is null !");
+        }
+        try {
+            GetServiceRequest getServiceRequest = new 
GetServiceRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT));
+            
getServiceRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT));
+            functionComputeClient.getService(getServiceRequest);
+            GetFunctionRequest getFunctionRequest = new 
GetFunctionRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT), 
config.getString(FcConstant.FUNCTION_NAME_CONSTANT));
+            
getFunctionRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT));
+            functionComputeClient.getFunction(getFunctionRequest);
+        } catch (ClientException e) {
+            log.error("FcSinkTask | validate | error => ", e);

Review comment:
       should check the serviceName exist, function name exist.

##########
File path: connectors/aliyun/rocketmq-connect-fc/README.md
##########
@@ -0,0 +1,48 @@
+# rocketmq-connect-fc
+* **rocketmq-connect-fc** 说明
+```
+Be responsible for consuming messages from producer and writing data to 
function calculation FC.
+```
+
+## rocketmq-connect-fc 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-fc 启动
+
+* **fc-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-sink-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"com.aliyun.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"${region}",accessKey”:"${accessKey}",accessSecretKey”:"${accessSecretKey}",accountId”:"${accountId}","serviceName":"${serviceName}","functionName":"${functionName}","invocationType":"${invocationType}",
 "qualifier":"${qualifier}"}
+```
+
+例子 
+```
+http://localhost:8081/connectors/fcConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster";,
+"connector-class":"com.aliyun.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"cn-hangzhou",accessKey”:"xxxx",accessSecretKey”:"xxxx",accountId”:"xxxx","serviceName":"xxxx","functionName":"xxxx","invocationType":"",
 "qualifier":"LATEST"}
+```
+
+>**注:** `rocketmq-fc-connect` 
的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-fc 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-connector-name}/stop
+```
+
+## rocketmq-connect-fc 参数说明
+* **fc-sink-connector 参数说明**
+
+|         KEY            |  TYPE   | Must be filled | Description              
        | Example
+|------------------------|---------|----------------|----------------------------------|--|
+|region                  | String  | YES            | 地域                       
        | cn-hangzhou|
+|accessKey               | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取  
                  | xxxx |
+|accessSecretKey         | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取  
                   | xxx |
+|accountId               | String  | YES            | 阿里云yourAccountId         
             | xxxx |
+|serviceName             | String  | YES            | 服务名称 | xxxx |
+|functionName            | String  | YES            | 函数名称 | xxxx |
+|invocationType    | String | NO             | 同步或者异步                          
 | null |
+|qualifier        | String | NO             | 服务版本和别名                          
| LATEST |
+

Review comment:
       FC's Payload needs to support trasform configuration.

##########
File path: connectors/aliyun/rocketmq-connect-fc/src/main/resources/logback.xml
##########
@@ -0,0 +1,90 @@
+<configuration>

Review comment:
       why need logback config?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to