zhaohai1299002788 commented on a change in pull request #2:
URL: https://github.com/apache/rocketmq-connect/pull/2#discussion_r834883646
##########
File path:
connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/common/OkHttpUtils.java
##########
@@ -0,0 +1,281 @@
+package org.apache.rocketmq.connect.dingtalk.sink.common;
+
+import com.alibaba.fastjson.JSON;
+import okhttp3.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class OkHttpUtils {
+ private static final Logger log =
LoggerFactory.getLogger(OkHttpUtils.class);
+
+ private static volatile OkHttpClient okHttpClient = null;
+ private static volatile Semaphore semaphore = null;
+ private Map<String, String> headerMap;
+ private Map<String, String> paramMap;
+ private String url;
+ private Request.Builder request;
+
+ private OkHttpUtils() {
+ if (okHttpClient == null) {
+ synchronized (OkHttpUtils.class) {
+ if (okHttpClient == null) {
+ TrustManager[] trustManagers = buildTrustManagers();
+ okHttpClient = new OkHttpClient.Builder()
+ .connectTimeout(15, TimeUnit.SECONDS)
+ .writeTimeout(20, TimeUnit.SECONDS)
+ .readTimeout(20, TimeUnit.SECONDS)
+
.sslSocketFactory(createSSLSocketFactory(trustManagers), (X509TrustManager)
trustManagers[0])
+ .hostnameVerifier((hostName, session) -> true)
+ .retryOnConnectionFailure(true)
+ .build();
+ addHeader("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac
OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132
Safari/537.36");
+ }
+ }
+ }
+ }
+
+ private static Semaphore getSemaphoreInstance() {
+ synchronized (OkHttpUtils.class) {
+ if (semaphore == null) {
+ semaphore = new Semaphore(0);
+ }
+ }
+ return semaphore;
+ }
+
+ public static OkHttpUtils builder() {
+ return new OkHttpUtils();
+ }
+
+ public OkHttpUtils url(String url) {
+ this.url = url;
+ return this;
+ }
+
+ /**
+ * 添加参数
+ *
+ * @param key 参数名
+ * @param value 参数值
+ * @return
+ */
+ public OkHttpUtils addParam(String key, String value) {
+ if (paramMap == null) {
+ paramMap = new LinkedHashMap<>(16);
+ }
+ paramMap.put(key, value);
+ return this;
+ }
+
+ /**
+ * 添加请求头
+ *
+ * @param key 参数名
+ * @param value 参数值
+ * @return
+ */
+ public OkHttpUtils addHeader(String key, String value) {
+ if (headerMap == null) {
+ headerMap = new LinkedHashMap<>(16);
+ }
+ headerMap.put(key, value);
+ return this;
+ }
+
+ public OkHttpUtils get() {
+ request = new Request.Builder().get();
+ StringBuilder urlBuilder = new StringBuilder(url);
+ if (paramMap != null) {
+ urlBuilder.append("?");
+ try {
+ for (Map.Entry<String, String> entry : paramMap.entrySet()) {
+ urlBuilder.append(URLEncoder.encode(entry.getKey(),
"utf-8")).
+ append("=").
+ append(URLEncoder.encode(entry.getValue(),
"utf-8")).
+ append("&");
+ }
+ } catch (Exception e) {
+ log.error("OkHttpUtils | get | error => ", e);
+ }
+ urlBuilder.deleteCharAt(urlBuilder.length() - 1);
+ }
+ request.url(urlBuilder.toString());
+ return this;
+ }
+
+ /**
+ * 初始化post方法
+ *
+ * @param isJsonPost true等于json的方式提交数据,类似postman里post方法的raw
+ * false等于普通的表单提交
+ * @return
+ */
+ public OkHttpUtils post(boolean isJsonPost) {
+ RequestBody requestBody;
+ if (isJsonPost) {
+ String json = "";
+ if (paramMap != null) {
+ json = JSON.toJSONString(paramMap);
+ }
+ requestBody =
RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json);
+ } else {
+ FormBody.Builder formBody = new FormBody.Builder();
+ if (paramMap != null) {
+ paramMap.forEach(formBody::add);
+ }
+ requestBody = formBody.build();
+ }
+ request = new Request.Builder().post(requestBody).url(url);
+ return this;
+ }
+
+ public OkHttpUtils postForStringBody(Object data) {
+ String json = JSON.toJSONString(data);
Review comment:
已修改
##########
File path:
connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkConnector.java
##########
@@ -0,0 +1,71 @@
+package org.apache.rocketmq.connect.dingtalk.sink;
+
+import org.apache.rocketmq.connect.dingtalk.sink.constant.DingTalkConstant;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DingTalkSinkConnector extends SinkConnector {
+
+ private String webHook;
+
+ private String secretKey;
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> taskConfigList = new ArrayList<>(11);
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(DingTalkConstant.WEB_HOOK, webHook);
+ keyValue.put(DingTalkConstant.SECRET_KEY, secretKey);
+ taskConfigList.add(keyValue);
+ return taskConfigList;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return DingTalkSinkTask.class;
+ }
+
+ @Override
+ public void validate(KeyValue config) {
Review comment:
已添加
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]