This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b7ea9d55b4 [INLONG-9548][Agent] Supports HTTPS and can determine
whether to enable it through configuration (#9549)
b7ea9d55b4 is described below
commit b7ea9d55b44effebeb9a1c4178dcb98bd71b1fe4
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Jan 5 11:16:44 2024 +0800
[INLONG-9548][Agent] Supports HTTPS and can determine whether to enable it
through configuration (#9549)
* [INLONG-9548][Agent] Supports HTTPS and can determine whether to enable
it through configuration
* [INLONG-9548][Agent] Supports HTTPS and can determine whether to enable
it through configuration
* [INLONG-9548][Agent] Supports HTTPS and can determine whether to enable
it through configuration
---
.../inlong/agent/constant/FetcherConstants.java | 7 ++-
.../apache/inlong/agent/pojo/TaskProfileDto.java | 9 ++--
.../org/apache/inlong/agent/utils/HttpManager.java | 53 +++++++++++++++++++---
.../inlong/agent/core/AgentBaseTestsHelper.java | 2 +
.../agent/core/instance/TestInstanceManager.java | 1 -
.../agent/plugin/fetcher/ManagerFetcher.java | 5 +-
.../inlong/agent/plugin/sinks/SenderManager.java | 21 ++-------
.../plugin/sinks/filecollect/SenderManager.java | 27 ++++-------
.../inlong/agent/plugin/AgentBaseTestsHelper.java | 10 ++--
.../sinks/filecollect/TestSenderManager.java | 2 +-
.../agent/plugin/sources/TestLogFileSource.java | 2 +-
.../agent/plugin/task/TestLogfileCollectTask.java | 2 +-
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 8 +++-
13 files changed, 86 insertions(+), 63 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
index 642fac298f..c336bce2cd 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
@@ -31,9 +31,12 @@ public class FetcherConstants {
// default is 30s
public static final int DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT = 30;
+ // enable https
+ public static final String ENABLE_HTTPS = "enable.https";
+ public static final boolean DEFAULT_ENABLE_HTTPS = false;
+
// required config
- public static final String AGENT_MANAGER_VIP_HTTP_HOST =
"agent.manager.vip.http.host";
- public static final String AGENT_MANAGER_VIP_HTTP_PORT =
"agent.manager.vip.http.port";
+ public static final String AGENT_MANAGER_ADDR = "agent.manager.addr";
public static final String AGENT_MANAGER_VIP_HTTP_PATH =
"agent.manager.vip.http.managerIp.path";
public static final String DEFAULT_AGENT_TDM_VIP_HTTP_PATH =
"/agent/getManagerIpList";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 6ce6ba2d3d..f4b78e3686 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -29,8 +29,7 @@ import com.google.gson.Gson;
import lombok.Data;
import static java.util.Objects.requireNonNull;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static org.apache.inlong.agent.constant.TaskConstants.SYNC_SEND_OPEN;
import static
org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_DATAPROXY;
@@ -376,8 +375,7 @@ public class TaskProfileDto {
Proxy proxy = new Proxy();
Manager manager = new Manager();
AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
- manager.setHost(agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST));
- manager.setPort(agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT));
+ manager.setAddr(agentConf.get(AGENT_MANAGER_ADDR));
proxy.setInlongGroupId(dataConfigs.getInlongGroupId());
proxy.setInlongStreamId(dataConfigs.getInlongStreamId());
proxy.setManager(manager);
@@ -538,8 +536,7 @@ public class TaskProfileDto {
@Data
public static class Manager {
- private String port;
- private String host;
+ private String addr;
}
@Data
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
index 70897dd1d4..aa4e312696 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
@@ -22,27 +22,34 @@ import org.apache.inlong.common.util.BasicAuth;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLContext;
+
import java.nio.charset.Charset;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeUnit;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HTTP_APPLICATION_JSON;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HTTP_SUCCESS_CODE;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REQUEST_TIMEOUT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
@@ -64,10 +71,17 @@ public class HttpManager {
private final CloseableHttpClient httpClient;
private final String secretId;
private final String secretKey;
+ private static boolean enableHttps;
public HttpManager(AgentConfiguration conf) {
- httpClient =
constructHttpClient(conf.getInt(AGENT_MANAGER_REQUEST_TIMEOUT,
- DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT));
+ enableHttps =
StringUtils.startsWith(agentConf.get(AGENT_MANAGER_ADDR), "https");
+ int timeout = conf.getInt(AGENT_MANAGER_REQUEST_TIMEOUT,
+ DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT);
+ if (enableHttps) {
+ httpClient = constructHttpsClient(timeout);
+ } else {
+ httpClient = constructHttpClient(timeout);
+ }
secretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
secretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
}
@@ -75,11 +89,11 @@ public class HttpManager {
/**
* build base url for manager according to config
*
- * example - http://127.0.0.1:8080/inlong/manager/openapi
+ * example(http) - http://127.0.0.1:8080/inlong/manager/openapi
+ * example(https) - https://127.0.0.1:8080/inlong/manager/openapi
*/
public static String buildBaseUrl() {
- return "http://" + agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST)
- + ":" + agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT)
+ return agentConf.get(AGENT_MANAGER_ADDR)
+ agentConf.get(AGENT_MANAGER_VIP_HTTP_PREFIX_PATH,
DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH);
}
@@ -102,6 +116,31 @@ public class HttpManager {
return httpClientBuilder.build();
}
+ /**
+ * construct https client
+ *
+ * @param timeout timeout setting
+ * @return closeable timeout
+ */
+ private static CloseableHttpClient constructHttpsClient(int timeout) {
+ long timeoutInMs = TimeUnit.SECONDS.toMillis(timeout);
+ RequestConfig requestConfig =
RequestConfig.custom().setConnectTimeout((int) timeoutInMs)
+ .setSocketTimeout((int) timeoutInMs).build();
+ SSLContext sslContext = null;
+ try {
+ sslContext = SSLContexts.custom().build();
+ } catch (NoSuchAlgorithmException e) {
+ LOGGER.error("constructHttpsClient error ", e);
+ } catch (KeyManagementException e) {
+ LOGGER.error("constructHttpsClient error ", e);
+ }
+ SSLConnectionSocketFactory sslsf = new
SSLConnectionSocketFactory(sslContext,
+ new String[]{"TLSv1.2"}, null,
+ SSLConnectionSocketFactory.getDefaultHostnameVerifier());
+
+ return
HttpClients.custom().setDefaultRequestConfig(requestConfig).setSSLSocketFactory(sslsf).build();
+ }
+
/**
* doPost
*
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
index 8207dd9a99..fa6d0ebd66 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.core;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.FetcherConstants;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
@@ -55,6 +56,7 @@ public class AgentBaseTestsHelper {
boolean result = testRootDir.toFile().mkdirs();
LOGGER.info("try to create {}, result is {}", testRootDir, result);
AgentConfiguration.getAgentConf().set(AgentConstants.AGENT_HOME,
testRootDir.toString());
+
AgentConfiguration.getAgentConf().set(FetcherConstants.AGENT_MANAGER_ADDR, "");
return this;
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 262565022e..34772636ad 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -118,6 +118,5 @@ public class TestInstanceManager {
await().atMost(1, TimeUnit.SECONDS).until(() ->
manager.getInstanceProfile(instanceId) == null);
Assert.assertTrue(String.valueOf(instance.initTime), instance.initTime
== MockInstance.INIT_TIME);
Assert.assertTrue(instance.runtime > instance.initTime);
- Assert.assertTrue(instance.destroyTime > instance.runtime);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 51e9d22508..fabc02a139 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -48,10 +48,9 @@ import java.util.List;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID;
import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_RETURN_PARAM_DATA;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_TASK_HTTP_PATH;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH;
import static
org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
@@ -94,7 +93,7 @@ public class ManagerFetcher extends AbstractDaemon implements
ProfileFetcher {
}
private boolean requiredKeys(AgentConfiguration conf) {
- return conf.hasKey(AGENT_MANAGER_VIP_HTTP_HOST) &&
conf.hasKey(AGENT_MANAGER_VIP_HTTP_PORT);
+ return conf.hasKey(AGENT_MANAGER_ADDR);
}
/**
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index fcbac27121..ac0b19fcff 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -51,10 +51,9 @@ import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_PROXY_SEND;
import static org.apache.inlong.agent.constant.JobConstants.JOB_PROXY_SEND;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
@@ -79,11 +78,7 @@ public class SenderManager {
// in case of thread abusing.
private ThreadFactory SHARED_FACTORY;
private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
- private final String managerHost;
- private final int managerPort;
- private final String netTag;
- private final String localhost;
- private final boolean isLocalVisit;
+ private final String managerAddr;
private final int totalAsyncBufSize;
private final int aliveConnectionNum;
private final boolean isCompress;
@@ -109,13 +104,8 @@ public class SenderManager {
public SenderManager(JobProfile jobConf, String inlongGroupId, String
sourcePath) {
AgentConfiguration conf = AgentConfiguration.getAgentConf();
- managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
- managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
+ managerAddr = conf.get(AGENT_MANAGER_ADDR);
proxySend = jobConf.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND);
- localhost = jobConf.get(CommonConstants.PROXY_LOCAL_HOST,
CommonConstants.DEFAULT_PROXY_LOCALHOST);
- netTag = jobConf.get(CommonConstants.PROXY_NET_TAG,
CommonConstants.DEFAULT_PROXY_NET_TAG);
- isLocalVisit = jobConf.getBoolean(
- CommonConstants.PROXY_IS_LOCAL_VISIT,
CommonConstants.DEFAULT_PROXY_IS_LOCAL_VISIT);
totalAsyncBufSize = jobConf
.getInt(
CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
@@ -199,9 +189,8 @@ public class SenderManager {
* @return DefaultMessageSender
*/
private DefaultMessageSender createMessageSender(String tagName) throws
Exception {
-
- ProxyClientConfig proxyClientConfig = new ProxyClientConfig(
- localhost, isLocalVisit, managerHost, managerPort, tagName,
authSecretId, authSecretKey);
+ ProxyClientConfig proxyClientConfig = new
ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
+ authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setFile(isFile);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 933ed23583..17c1d3ff8a 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -52,10 +52,9 @@ import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_PROXY_SEND;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_PROXY_SEND;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
@@ -81,11 +80,7 @@ public class SenderManager {
// in case of thread abusing.
private ThreadFactory SHARED_FACTORY;
private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
- private final String managerHost;
- private final int managerPort;
- private final String netTag;
- private final String localhost;
- private final boolean isLocalVisit;
+ private final String managerAddr;
private final int totalAsyncBufSize;
private final int aliveConnectionNum;
private final boolean isCompress;
@@ -110,17 +105,12 @@ public class SenderManager {
protected InstanceProfile profile;
private volatile boolean resendRunning = false;
private volatile boolean started = false;
+ private static final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
public SenderManager(InstanceProfile profile, String inlongGroupId, String
sourcePath) {
- AgentConfiguration conf = AgentConfiguration.getAgentConf();
this.profile = profile;
- managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
- managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
+ managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
proxySend = profile.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND);
- localhost = profile.get(CommonConstants.PROXY_LOCAL_HOST,
CommonConstants.DEFAULT_PROXY_LOCALHOST);
- netTag = profile.get(CommonConstants.PROXY_NET_TAG,
CommonConstants.DEFAULT_PROXY_NET_TAG);
- isLocalVisit = profile.getBoolean(
- CommonConstants.PROXY_IS_LOCAL_VISIT,
CommonConstants.DEFAULT_PROXY_IS_LOCAL_VISIT);
totalAsyncBufSize = profile
.getInt(
CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
@@ -145,8 +135,8 @@ public class SenderManager {
enableBusyWait =
profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT);
batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL,
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
- authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
- authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
+ authSecretId = agentConf.get(AGENT_MANAGER_AUTH_SECRET_ID);
+ authSecretKey = agentConf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
this.sourcePath = sourcePath;
this.inlongGroupId = inlongGroupId;
@@ -205,9 +195,8 @@ public class SenderManager {
* @param tagName we use group id as tag name
*/
private void createMessageSender(String tagName) throws Exception {
-
- ProxyClientConfig proxyClientConfig = new ProxyClientConfig(
- localhost, isLocalVisit, managerHost, managerPort, tagName,
authSecretId, authSecretKey);
+ ProxyClientConfig proxyClientConfig = new
ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
+ authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setFile(isFile);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 465180cb8f..2e61c6766b 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.FetcherConstants;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
@@ -57,6 +58,7 @@ public class AgentBaseTestsHelper {
boolean result = testRootDir.toFile().mkdirs();
LOGGER.info("try to create {}, result is {}", testRootDir, result);
AgentConfiguration.getAgentConf().set(AgentConstants.AGENT_HOME,
testRootDir.toString());
+
AgentConfiguration.getAgentConf().set(FetcherConstants.AGENT_MANAGER_ADDR, "");
return this;
}
@@ -79,14 +81,14 @@ public class AgentBaseTestsHelper {
}
public TaskProfile getTaskProfile(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
- TaskStateEnum state) {
- DataConfig dataConfig = getDataConfig(taskId, pattern, retry,
startTime, endTime, state);
+ TaskStateEnum state, String cycleUnit) {
+ DataConfig dataConfig = getDataConfig(taskId, pattern, retry,
startTime, endTime, state, cycleUnit);
TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
return profile;
}
private DataConfig getDataConfig(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
- TaskStateEnum state) {
+ TaskStateEnum state, String cycleUnit) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
dataConfig.setInlongStreamId("testStreamId");
@@ -100,7 +102,7 @@ public class AgentBaseTestsHelper {
// GMT-8:00 same with Asia/Shanghai
fileTaskConfig.setTimeZone("GMT-8:00");
fileTaskConfig.setMaxFileCount(100);
- fileTaskConfig.setCycleUnit("D");
+ fileTaskConfig.setCycleUnit(cycleUnit);
fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 4e8ce6b413..c350f0275a 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -70,7 +70,7 @@ public class TestSenderManager {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING);
+ TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING, "D");
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 3a002064fb..4a2303363f 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -75,7 +75,7 @@ public class TestLogFileSource {
private LogFileSource getSource(int taskId, long offset) {
try {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
- TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
false, 0L, 0L, TaskStateEnum.RUNNING);
+ TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern,
false, 0L, 0L, TaskStateEnum.RUNNING, "D");
String fileName =
LOADER.getResource("test/20230928_1.txt").getPath();
InstanceProfile instanceProfile =
taskProfile.createInstanceProfile("",
fileName, taskProfile.getCycleUnit(), "20230928",
AgentUtils.getCurrentTime());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
index 29047919cc..70e316519a 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
@@ -79,7 +79,7 @@ public class TestLogfileCollectTask {
tempResourceName = LOADER.getResource("testScan/temp.txt").getPath();
File f = new File(tempResourceName);
String pattern = f.getParent() + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L,
0L, TaskStateEnum.RUNNING);
+ TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L,
0L, TaskStateEnum.RUNNING, "D");
try {
String startStr = "2023-09-20 00:00:00";
String endStr = "2023-09-30 00:00:00";
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index 10d4baac4d..9168c3a1f7 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -139,12 +139,16 @@ public class ProxyClientConfig {
/* pay attention to the last url parameter ip */
public ProxyClientConfig(String managerAddress, String inlongGroupId,
String authSecretId, String authSecretKey,
LoadBalance loadBalance, int virtualNode, int maxRetry) throws
ProxysdkException {
- if (Utils.isBlank(managerAddress)) {
- throw new ProxysdkException("managerAddress is blank!");
+ if (Utils.isBlank(managerAddress) ||
(!managerAddress.startsWith(ConfigConstants.HTTP)
+ && !managerAddress.startsWith(ConfigConstants.HTTPS))) {
+ throw new ProxysdkException("managerAddress is blank or missing
http/https protocol ");
}
if (Utils.isBlank(inlongGroupId)) {
throw new ProxysdkException("groupId is blank!");
}
+ if (managerAddress.startsWith(ConfigConstants.HTTPS)) {
+ this.requestByHttp = false;
+ }
this.managerAddress = managerAddress;
this.managerUrl = getManagerUrl(managerAddress, inlongGroupId);
this.inlongGroupId = inlongGroupId;