ruanwenjun commented on code in PR #1798: URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865683157
########## seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientResult.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.flink.http.source.util; + +import java.io.Serializable; + +public class HttpClientResult implements Serializable { + + private static final long serialVersionUID = 2168152194164783950L; + + /** + * response status code + */ + private int code; + + /** + * response body + */ + private String content; + + public HttpClientResult() { + } + + public HttpClientResult(int code) { + this.code = code; + } + + public HttpClientResult(String content) { + this.content = content; + } Review Comment: This three constructor is unused. ########## seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.flink.http.source; + +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.flink.FlinkEnvironment; +import org.apache.seatunnel.flink.batch.FlinkBatchSource; +import org.apache.seatunnel.flink.http.source.constant.Settings; +import org.apache.seatunnel.flink.http.source.util.HttpClientResult; +import org.apache.seatunnel.flink.http.source.util.HttpClientUtils; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Http implements FlinkBatchSource { + + private static final String GET = "GET"; + private static final String POST = "POST"; + private static final int INITIAL_CAPACITY = 16; + + private static Logger LOG = LoggerFactory.getLogger(Http.class); + + private Config config; + + @Override + public void setConfig(Config config) { + this.config = config; + } + + @Override + public Config getConfig() { + return config; + } + + @Override + public CheckResult checkConfig() { + return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL); + } + + @Override + public DataSet<Row> getData(FlinkEnvironment env) { + String url = config.getString(Settings.SOURCE_HTTP_URL); + String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET); + String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, ""); + String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, ""); + String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, ""); + + Map requestMap = jsonToMap(requestParams); Review Comment: It's better to put these code related to config in `prepare` method, and throw the exception when parse requestParams error. ########## seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientResult.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.flink.http.source.util; + +import java.io.Serializable; + +public class HttpClientResult implements Serializable { + + private static final long serialVersionUID = 2168152194164783950L; + + /** + * response status code + */ + private int code; + + /** + * response body + */ + private String content; + + public HttpClientResult() { + } + + public HttpClientResult(int code) { + this.code = code; + } + + public HttpClientResult(String content) { + this.content = content; + } + + public HttpClientResult(int code, String content) { + this.code = code; + this.content = content; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public String getContent() { + return content; + } + + public void setContent(String content) { + this.content = content; + } Review Comment: Maybe we don't need to provide the set method. ########## seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.flink.http.source.util; + +import org.apache.http.HttpStatus; +import org.apache.http.NameValuePair; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +public class HttpClientUtils { + + private HttpClientUtils() { + throw new IllegalStateException("Utility class"); + } + + private static final String ENCODING = "UTF-8"; + private static final int CONNECT_TIMEOUT = 6000 * 2; + private static final int SOCKET_TIMEOUT = 6000 * 10; + private static final int INITIAL_CAPACITY = 16; + + /** + * Send a get request without request headers and request parameters + * + * @param url request address + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doGet(String url) throws Exception { + return doGet(url, null, null); + } + + /** + * Send a get request with request parameters + * + * @param url request address + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doGet(String url, Map<String, String> params) throws Exception { + return doGet(url, null, params); + } + + /** + * Send a get request with request headers and request parameters + * + * @param url request address + * @param headers request header map + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception { + // Create access address + URIBuilder uriBuilder = new URIBuilder(url); + if (params != null) { + Set<Entry<String, String>> entrySet = params.entrySet(); + for (Entry<String, String> entry : entrySet) { + uriBuilder.setParameter(entry.getKey(), entry.getValue()); + } + } + + /** + * setConnectTimeout:Set the connection timeout, in milliseconds. + * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds. + * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned. + */ + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build(); + HttpGet httpGet = new HttpGet(uriBuilder.build()); + httpGet.setConfig(requestConfig); + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + // set request header + packageHeader(headers, httpGet); + // Execute the request and get the response result + return getHttpClientResult(httpClient, httpGet); + } + } + + /** + * Send a post request without request headers and request parameters + * + * @param url request address + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doPost(String url) throws Exception { + return doPost(url, null, null); + } + + /** + * Send post request with request parameters + * + * @param url request address + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doPost(String url, Map<String, String> params) throws Exception { + return doPost(url, null, params); + } + + /** + * Send a post request with request headers and request parameters + * + * @param url request address + * @param headers request header map + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception { + HttpPost httpPost = new HttpPost(url); + /** + * setConnectTimeout:Set the connection timeout, in milliseconds. + * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds. + * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned. + */ + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build(); + httpPost.setConfig(requestConfig); + // set request header + packageHeader(headers, httpPost); + + // Encapsulate request parameters + packageParam(params, httpPost); + + try (CloseableHttpClient httpClient = HttpClients.createDefault();) { + // Execute the request and get the response result + return getHttpClientResult(httpClient, httpPost); + } + } + + /** + * Send a put request without request parameters + * + * @param url request address + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doPut(String url) throws Exception { + return doPut(url, null); + } + + /** + * Send a put request with request parameters + * + * @param url request address + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception { + + HttpPut httpPut = new HttpPut(url); + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build(); + httpPut.setConfig(requestConfig); + + packageParam(params, httpPut); + + try (CloseableHttpClient httpClient = HttpClients.createDefault();) { + return getHttpClientResult(httpClient, httpPut); + } + } + + /** + * Send delete request without request parameters + * + * @param url request address + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doDelete(String url) throws Exception { + + HttpDelete httpDelete = new HttpDelete(url); + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build(); + httpDelete.setConfig(requestConfig); + try (CloseableHttpClient httpClient = HttpClients.createDefault();) { + return getHttpClientResult(httpClient, httpDelete); + } + } + + /** + * Send delete request with request parameters + * + * @param url request address + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception { + if (params == null) { + params = new HashMap<>(INITIAL_CAPACITY); + } + + params.put("_method", "delete"); + return doPost(url, params); + } + + /** + * encapsulate request header + * + * @param params request header map + * @param httpMethod http request method + */ + public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) { + // encapsulate request header + if (params != null) { + Set<Entry<String, String>> entrySet = params.entrySet(); + for (Entry<String, String> entry : entrySet) { + // Set to the request header to the HttpRequestBase object + httpMethod.setHeader(entry.getKey(), entry.getValue()); + } + } + } + + /** + * Encapsulate request parameters + * + * @param params request parameter map + * @param httpMethod http request method + * @throws UnsupportedEncodingException exception information + */ + public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException { + // Encapsulate request parameters + if (params != null) { + List<NameValuePair> nvps = new ArrayList<>(); + Set<Entry<String, String>> entrySet = params.entrySet(); + for (Entry<String, String> entry : entrySet) { + nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue())); + } + + // Set to the request's http object + httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING)); + } + } + + /** + * get response result + * + * @param httpClient http client object + * @param httpMethod http method onject + * @return http response result + * @throws Exception information + */ + public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception { + // execute request + try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) { + // get return result + if (httpResponse != null && httpResponse.getStatusLine() != null) { + String content = ""; + if (httpResponse.getEntity() != null) { + content = EntityUtils.toString(httpResponse.getEntity(), ENCODING); + } + return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content); + } + } + return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR); + } + + /** + * release resources + * + * @param httpResponse http response object + * @param httpClient http client objet + * @throws IOException information + */ + public static void release(CloseableHttpResponse httpResponse, CloseableHttpClient httpClient) throws IOException { + // release resources + if (httpResponse != null) { + httpResponse.close(); + } + if (httpClient != null) { + httpClient.close(); + } + } Review Comment: Remove the unused method. ########## seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.flink.http.source; + +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.flink.FlinkEnvironment; +import org.apache.seatunnel.flink.batch.FlinkBatchSource; +import org.apache.seatunnel.flink.http.source.constant.Settings; +import org.apache.seatunnel.flink.http.source.util.HttpClientResult; +import org.apache.seatunnel.flink.http.source.util.HttpClientUtils; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Http implements FlinkBatchSource { + + private static final String GET = "GET"; + private static final String POST = "POST"; + private static final int INITIAL_CAPACITY = 16; + + private static Logger LOG = LoggerFactory.getLogger(Http.class); + + private Config config; + + @Override + public void setConfig(Config config) { + this.config = config; + } + + @Override + public Config getConfig() { + return config; + } + + @Override + public CheckResult checkConfig() { + return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL); + } + + @Override + public DataSet<Row> getData(FlinkEnvironment env) { + String url = config.getString(Settings.SOURCE_HTTP_URL); + String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET); + String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, ""); + String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, ""); + String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, ""); + + Map requestMap = jsonToMap(requestParams); + String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath); + LOG.info("sync values->{}", syncValues); Review Comment: This log is just used to debug? ########## seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.flink.http.source.util; + +import org.apache.http.HttpStatus; +import org.apache.http.NameValuePair; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +public class HttpClientUtils { + + private HttpClientUtils() { + throw new IllegalStateException("Utility class"); + } + + private static final String ENCODING = "UTF-8"; + private static final int CONNECT_TIMEOUT = 6000 * 2; + private static final int SOCKET_TIMEOUT = 6000 * 10; + private static final int INITIAL_CAPACITY = 16; + + /** + * Send a get request without request headers and request parameters + * + * @param url request address + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doGet(String url) throws Exception { + return doGet(url, null, null); + } + + /** + * Send a get request with request parameters + * + * @param url request address + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doGet(String url, Map<String, String> params) throws Exception { + return doGet(url, null, params); + } + + /** + * Send a get request with request headers and request parameters + * + * @param url request address + * @param headers request header map + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception { + // Create access address + URIBuilder uriBuilder = new URIBuilder(url); + if (params != null) { + Set<Entry<String, String>> entrySet = params.entrySet(); + for (Entry<String, String> entry : entrySet) { + uriBuilder.setParameter(entry.getKey(), entry.getValue()); + } + } + + /** + * setConnectTimeout:Set the connection timeout, in milliseconds. + * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds. + * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned. + */ + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build(); + HttpGet httpGet = new HttpGet(uriBuilder.build()); + httpGet.setConfig(requestConfig); + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + // set request header + packageHeader(headers, httpGet); + // Execute the request and get the response result + return getHttpClientResult(httpClient, httpGet); + } + } + + /** + * Send a post request without request headers and request parameters + * + * @param url request address + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doPost(String url) throws Exception { + return doPost(url, null, null); + } + + /** + * Send post request with request parameters + * + * @param url request address + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doPost(String url, Map<String, String> params) throws Exception { + return doPost(url, null, params); + } + + /** + * Send a post request with request headers and request parameters + * + * @param url request address + * @param headers request header map + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception { + HttpPost httpPost = new HttpPost(url); + /** + * setConnectTimeout:Set the connection timeout, in milliseconds. + * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds. + * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned. + */ + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build(); + httpPost.setConfig(requestConfig); + // set request header + packageHeader(headers, httpPost); + + // Encapsulate request parameters + packageParam(params, httpPost); + + try (CloseableHttpClient httpClient = HttpClients.createDefault();) { + // Execute the request and get the response result + return getHttpClientResult(httpClient, httpPost); + } + } + + /** + * Send a put request without request parameters + * + * @param url request address + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doPut(String url) throws Exception { + return doPut(url, null); + } + + /** + * Send a put request with request parameters + * + * @param url request address + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception { + + HttpPut httpPut = new HttpPut(url); + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build(); + httpPut.setConfig(requestConfig); + + packageParam(params, httpPut); + + try (CloseableHttpClient httpClient = HttpClients.createDefault();) { + return getHttpClientResult(httpClient, httpPut); + } + } + + /** + * Send delete request without request parameters + * + * @param url request address + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doDelete(String url) throws Exception { + + HttpDelete httpDelete = new HttpDelete(url); + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build(); + httpDelete.setConfig(requestConfig); + try (CloseableHttpClient httpClient = HttpClients.createDefault();) { + return getHttpClientResult(httpClient, httpDelete); + } + } + + /** + * Send delete request with request parameters + * + * @param url request address + * @param params request parameter map + * @return http response result + * @throws Exception information + */ + public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception { + if (params == null) { + params = new HashMap<>(INITIAL_CAPACITY); + } + + params.put("_method", "delete"); + return doPost(url, params); + } + + /** + * encapsulate request header + * + * @param params request header map + * @param httpMethod http request method + */ + public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) { + // encapsulate request header + if (params != null) { + Set<Entry<String, String>> entrySet = params.entrySet(); + for (Entry<String, String> entry : entrySet) { + // Set to the request header to the HttpRequestBase object + httpMethod.setHeader(entry.getKey(), entry.getValue()); + } + } + } + + /** + * Encapsulate request parameters + * + * @param params request parameter map + * @param httpMethod http request method + * @throws UnsupportedEncodingException exception information + */ + public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException { + // Encapsulate request parameters + if (params != null) { + List<NameValuePair> nvps = new ArrayList<>(); + Set<Entry<String, String>> entrySet = params.entrySet(); + for (Entry<String, String> entry : entrySet) { + nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue())); + } + + // Set to the request's http object + httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING)); + } + } + + /** + * get response result + * + * @param httpClient http client object + * @param httpMethod http method onject + * @return http response result + * @throws Exception information + */ + public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception { + // execute request + try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) { + // get return result + if (httpResponse != null && httpResponse.getStatusLine() != null) { + String content = ""; + if (httpResponse.getEntity() != null) { + content = EntityUtils.toString(httpResponse.getEntity(), ENCODING); + } + return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content); + } + } + return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR); Review Comment: This line is just used to make the compiler pass, so you can just add a comment here and return null, then you can remove this constructor. ########## seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.flink.http.source; + +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.flink.FlinkEnvironment; +import org.apache.seatunnel.flink.batch.FlinkBatchSource; +import org.apache.seatunnel.flink.http.source.constant.Settings; +import org.apache.seatunnel.flink.http.source.util.HttpClientResult; +import org.apache.seatunnel.flink.http.source.util.HttpClientUtils; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Http implements FlinkBatchSource { + + private static final String GET = "GET"; + private static final String POST = "POST"; + private static final int INITIAL_CAPACITY = 16; + + private static Logger LOG = LoggerFactory.getLogger(Http.class); + + private Config config; + + @Override + public void setConfig(Config config) { + this.config = config; + } + + @Override + public Config getConfig() { + return config; + } + + @Override + public CheckResult checkConfig() { + return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL); + } + + @Override + public DataSet<Row> getData(FlinkEnvironment env) { + String url = config.getString(Settings.SOURCE_HTTP_URL); + String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET); + String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, ""); + String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, ""); + String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, ""); + + Map requestMap = jsonToMap(requestParams); + String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath); + LOG.info("sync values->{}", syncValues); + Map syncMap = jsonToMap(syncValues); + if (!syncMap.isEmpty()) { + requestMap.putAll(syncMap); + } + + HttpClientResult response = new HttpClientResult(); + try { + Map headerMap = jsonToMap(header); + if (POST.equals(method)) { + response = HttpClientUtils.doPost(url, headerMap, requestMap); + } else { + response = HttpClientUtils.doGet(url, headerMap, requestMap); + } + } catch (Exception e) { + LOG.error("http call error!", e); + } + + LOG.info("http respond code->{}", response.getCode()); + LOG.info("http respond body->{}", response.getContent()); + + Row row = Row.of(response.getContent()); Review Comment: Do we need to use `getBatchTableEnvironment` table api to add data, if we use `env.getBatchEnvironment().fromElements(row);` here, it seems we cannot use sql transformer? ########## seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.flink.http.source; + +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.flink.FlinkEnvironment; +import org.apache.seatunnel.flink.batch.FlinkBatchSource; +import org.apache.seatunnel.flink.http.source.constant.Settings; +import org.apache.seatunnel.flink.http.source.util.HttpClientResult; +import org.apache.seatunnel.flink.http.source.util.HttpClientUtils; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Http implements FlinkBatchSource { + + private static final String GET = "GET"; + private static final String POST = "POST"; + private static final int INITIAL_CAPACITY = 16; + + private static Logger LOG = LoggerFactory.getLogger(Http.class); + + private Config config; + + @Override + public void setConfig(Config config) { + this.config = config; + } + + @Override + public Config getConfig() { + return config; + } + + @Override + public CheckResult checkConfig() { + return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL); + } + + @Override + public DataSet<Row> getData(FlinkEnvironment env) { + String url = config.getString(Settings.SOURCE_HTTP_URL); + String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET); + String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, ""); + String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, ""); + String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, ""); + + Map requestMap = jsonToMap(requestParams); + String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath); + LOG.info("sync values->{}", syncValues); + Map syncMap = jsonToMap(syncValues); + if (!syncMap.isEmpty()) { + requestMap.putAll(syncMap); + } + + HttpClientResult response = new HttpClientResult(); + try { + Map headerMap = jsonToMap(header); + if (POST.equals(method)) { + response = HttpClientUtils.doPost(url, headerMap, requestMap); + } else { + response = HttpClientUtils.doGet(url, headerMap, requestMap); + } + } catch (Exception e) { + LOG.error("http call error!", e); Review Comment: You need to throw this exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
