This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bd9ce03a34 [INLONG-11815][Agent] Add a unified reporting point for
events (#11816)
bd9ce03a34 is described below
commit bd9ce03a34b60e4cf10b8698d6bfe770051cf0b1
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Apr 2 14:09:50 2025 +0800
[INLONG-11815][Agent] Add a unified reporting point for events (#11816)
* [INLONG-11815][Agent] Add a unified reporting point for events
* [INLONG-11815][Agent] Close all senders when creating sender error
---
.../inlong/agent/constant/CommonConstants.java | 2 +-
.../inlong/agent/metrics/AgentEventMetricItem.java | 72 ++++++++++++++
.../agent/metrics/AgentEventMetricItemSet.java | 39 ++++++++
.../inlong/agent/utils/EventReportUtils.java | 108 +++++++++++++++++++++
.../inlong/agent/core/AgentStatusManager.java | 15 ++-
.../inlong/agent/core/FileStaticManager.java | 15 ++-
.../apache/inlong/agent/core/HeartbeatManager.java | 7 ++
.../apache/inlong/agent/core/task/TaskManager.java | 13 +++
.../agent/plugin/fetcher/ManagerFetcher.java | 59 ++++++++---
.../agent/plugin/sinks/dataproxy/Sender.java | 15 ++-
10 files changed, 325 insertions(+), 20 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index db0e509718..a20f281c63 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -41,7 +41,7 @@ public class CommonConstants {
public static final boolean DEFAULT_PROXY_IS_COMPRESS = true;
public static final String PROXY_MAX_SENDER_PER_GROUP =
"proxy.max.sender.per.group";
- public static final int DEFAULT_PROXY_MAX_SENDER_PER_GROUP = 10;
+ public static final int DEFAULT_PROXY_MAX_SENDER_PER_GROUP = 3;
// max size of message list
public static final String PROXY_PACKAGE_MAX_SIZE =
"proxy.package.maxSize";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItem.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItem.java
new file mode 100644
index 0000000000..275326b611
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItem.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import org.apache.inlong.common.metric.CountMetric;
+import org.apache.inlong.common.metric.Dimension;
+import org.apache.inlong.common.metric.MetricDomain;
+import org.apache.inlong.common.metric.MetricItem;
+
+import java.text.SimpleDateFormat;
+import java.util.concurrent.atomic.AtomicLong;
+
+@MetricDomain(name = "AgentEvent")
+public class AgentEventMetricItem extends MetricItem {
+
+ public final static SimpleDateFormat FORMAT = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ public static final String KEY_INLONG_EVENT_TIME = "eventTime";
+ public static final String KEY_INLONG_GROUP_ID = "groupId";
+ public static final String KEY_INLONG_STREAM_ID = "streamId";
+ public static final String KEY_INLONG_COMPONENT_TYPE = "componentType";
+ public static final String KEY_INLONG_COMPONENT_NAME = "componentName";
+ public static final String KEY_INLONG_AGENT_IP = "agentIp";
+ public static final String KEY_INLONG_COMPONENT_VERSION =
"componentVersion";
+ public static final String KEY_INLONG_EVENT_TYPE = "eventType";
+ public static final String KEY_INLONG_EVENT_LEVEL = "eventLevel";
+ public static final String KEY_INLONG_EVENT_CODE = "eventCode";
+ public static final String KEY_INLONG_EXT = "ext";
+ public static final String KEY_INLONG_EVENT_DESC = "eventDesc";
+
+ @Dimension
+ public String eventTime;
+ @Dimension
+ public String groupId;
+ @Dimension
+ public String streamId;
+ @Dimension
+ public String componentType;
+ @Dimension
+ public String componentName;
+ @Dimension
+ public String agentIp;
+ @Dimension
+ public String componentVersion;
+ @Dimension
+ public String eventType;
+ @Dimension
+ public String eventLevel;
+ @Dimension
+ public String eventCode;
+ @Dimension
+ public String ext;
+ @Dimension
+ public String eventDesc;
+
+ @CountMetric
+ public AtomicLong count = new AtomicLong(0);
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItemSet.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItemSet.java
new file mode 100644
index 0000000000..48f4f25e42
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItemSet.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import org.apache.inlong.common.metric.MetricDomain;
+import org.apache.inlong.common.metric.MetricItemSet;
+
+@MetricDomain(name = "AgentEvent")
+public class AgentEventMetricItemSet extends
MetricItemSet<AgentEventMetricItem> {
+
+ /**
+ * Constructor
+ *
+ * @param name
+ */
+ public AgentEventMetricItemSet(String name) {
+ super(name);
+ }
+
+ @Override
+ protected AgentEventMetricItem createItem() {
+ return new AgentEventMetricItem();
+ }
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/EventReportUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/EventReportUtils.java
new file mode 100644
index 0000000000..3c3863fc1f
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/EventReportUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.utils;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.metrics.AgentEventMetricItem;
+import org.apache.inlong.agent.metrics.AgentEventMetricItemSet;
+import org.apache.inlong.common.metric.MetricRegister;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_IP;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_AGENT_IP;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_NAME;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_TYPE;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_VERSION;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_CODE;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_DESC;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_LEVEL;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_TIME;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_TYPE;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EXT;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_STREAM_ID;
+
+/**
+ * DiagUtils
+ */
+public class EventReportUtils {
+
+ public enum EvenCodeEnum {
+
+ CONFIG_UPDATE_SUC(0, "config update suc"),
+ CONFIG_NO_UPDATE(1, "config no update"),
+ CONFIG_UPDATE_VERSION_NO_CHANGE(2, "config update version no change"),
+ CONFIG_INVALID_RET_CODE(3, "config invalid ret code"),
+ CONFIG_INVALID_RESULT(4, "config invalid result maybe visit manager
failed"),
+ TASK_ADD(5, "task add"),
+ TASK_DELETE(6, "task delete");
+
+ private final int code;
+ private final String message;
+
+ EvenCodeEnum(int code, String message) {
+ this.code = code;
+ this.message = message;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+ }
+
+ private final static String COMPONENT_TYPE_AGENT = "AGENT";
+ private final static String COMPONENT_NAME_AGENT = "AGENT";
+ public static final String EVENT_TYPE_CONFIG_UPDATE = "CONFIG_UPDATE";
+ public static final String EVENT_LEVEL_INFO = "INFO";
+ public static final String EVENT_LEVEL_WARN = "WARN";
+ public static final String EVENT_LEVEL_ERROR = "ERROR";
+ private static AgentEventMetricItemSet metricItemSet;
+
+ private EventReportUtils() {
+ }
+
+ public static void init() {
+ metricItemSet = new AgentEventMetricItemSet(COMPONENT_NAME_AGENT);
+ MetricRegister.register(metricItemSet);
+ }
+
+ public static void report(String groupId, String streamId, long eventTime,
String eventType,
+ String eventLevel, EvenCodeEnum evenCode, String ext, String desc)
{
+ Map<String, String> dims = new HashMap<>();
+ dims.put(KEY_INLONG_GROUP_ID, groupId);
+ dims.put(KEY_INLONG_STREAM_ID, streamId);
+ dims.put(KEY_INLONG_COMPONENT_TYPE, COMPONENT_TYPE_AGENT);
+ dims.put(KEY_INLONG_COMPONENT_NAME, COMPONENT_NAME_AGENT);
+ dims.put(KEY_INLONG_AGENT_IP,
AgentConfiguration.getAgentConf().get(AGENT_LOCAL_IP));
+ dims.put(KEY_INLONG_COMPONENT_VERSION,
EventReportUtils.class.getPackage().getImplementationVersion());
+ dims.put(KEY_INLONG_EVENT_TIME, AgentEventMetricItem.FORMAT.format(new
Date(eventTime)));
+ dims.put(KEY_INLONG_EVENT_TYPE, eventType);
+ dims.put(KEY_INLONG_EVENT_LEVEL, eventLevel);
+ dims.put(KEY_INLONG_EVENT_CODE, String.valueOf(evenCode.getCode()));
+ dims.put(KEY_INLONG_EXT, ext.replaceAll("\\|", "-"));
+ dims.put(KEY_INLONG_EVENT_DESC, desc);
+ metricItemSet.findMetricItem(dims).count.addAndGet(1);
+ }
+}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
index 50085fa1bd..314fe5e669 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.core.task.MemoryManager;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ExcuteLinux;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
@@ -167,10 +168,20 @@ public class AgentStatusManager {
}
try {
ProcessResult procResult = new ProcessResult();
+ long dataTime = AgentUtils.getCurrentTime();
+ byte[] body =
data.getFieldsString().getBytes(StandardCharsets.UTF_8);
if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM,
- INLONG_AGENT_STATUS, AgentUtils.getCurrentTime(), null,
- data.getFieldsString().getBytes(StandardCharsets.UTF_8)),
procResult)) {
+ INLONG_AGENT_STATUS, dataTime, null, body), procResult)) {
LOGGER.error("send status failed: ret = {}", procResult);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED,
INLONG_AGENT_SYSTEM, INLONG_AGENT_STATUS,
+ dataTime, 1, body.length);
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME,
INLONG_AGENT_SYSTEM,
+ INLONG_AGENT_STATUS, dataTime, 1, body.length);
+ } else {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
INLONG_AGENT_SYSTEM, INLONG_AGENT_STATUS,
+ dataTime, 1, body.length);
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME,
INLONG_AGENT_SYSTEM,
+ INLONG_AGENT_STATUS, dataTime, 1, body.length);
}
} catch (Throwable ex) {
LOGGER.error("send status throw exception", ex);
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
index b3a47f2c6a..e598fa0092 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.core;
import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
@@ -134,10 +135,20 @@ public class FileStaticManager {
}
try {
ProcessResult procResult = new ProcessResult();
+ long dataTime = AgentUtils.getCurrentTime();
+ byte[] body =
data.getFieldsString().getBytes(StandardCharsets.UTF_8);
if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM,
- INLONG_FILE_STATIC, AgentUtils.getCurrentTime(), null,
-
data.getFieldsString().getBytes(StandardCharsets.UTF_8)), procResult)) {
+ INLONG_FILE_STATIC, dataTime, null, body),
procResult)) {
LOGGER.error("send static failed: ret = {}", procResult);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED,
INLONG_AGENT_SYSTEM, INLONG_FILE_STATIC,
+ dataTime, 1, body.length);
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME,
INLONG_AGENT_SYSTEM,
+ INLONG_FILE_STATIC, dataTime, 1, body.length);
+ } else {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
INLONG_AGENT_SYSTEM, INLONG_FILE_STATIC,
+ dataTime, 1, body.length);
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME,
INLONG_AGENT_SYSTEM,
+ INLONG_FILE_STATIC, dataTime, 1, body.length);
}
} catch (Throwable ex) {
LOGGER.error("send static throw exception", ex);
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index d1d759cd0a..7693eebed5 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -77,6 +77,9 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
httpManager = new HttpManager(conf);
baseManagerUrl = httpManager.getBaseUrl();
reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
+ createMessageSender();
+ AgentStatusManager.init(agentManager);
+ FileStaticManager.init();
}
public static HeartbeatManager getInstance(AgentManager agentManager) {
@@ -123,6 +126,9 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(" {} report heartbeat to manager",
heartbeatMsg);
}
+ if (sender == null) {
+ createMessageSender();
+ }
AgentStatusManager.sendStatusMsg(sender);
FileStaticManager.sendStaticMsg(sender);
} catch (Throwable e) {
@@ -212,6 +218,7 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
// start sender object
ProcessResult procResult = new ProcessResult();
if (!sender.start(procResult)) {
+ sender.close();
throw new ProxySdkException("Sender start failure, " +
procResult);
}
} catch (Throwable ex) {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 2da20cde9b..e34e1ae2b3 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -27,6 +27,8 @@ import org.apache.inlong.agent.plugin.file.Task;
import org.apache.inlong.agent.store.Store;
import org.apache.inlong.agent.store.TaskStore;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.EventReportUtils;
+import org.apache.inlong.agent.utils.EventReportUtils.EvenCodeEnum;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
@@ -144,6 +146,7 @@ public class TaskManager extends AbstractDaemon {
pendingTasks = new LinkedBlockingQueue<>(taskMaxLimit);
configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY);
actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
+ EventReportUtils.init();
}
public static TaskStore getTaskStore() {
@@ -299,6 +302,11 @@ public class TaskManager extends AbstractDaemon {
profileFromManager.getTaskId(),
profileFromManager.isRetry(),
profileFromManager.getState());
addTask(profileFromManager);
+ EventReportUtils.report(profileFromManager.getInlongGroupId(),
+ profileFromManager.getInlongStreamId(),
AgentUtils.getCurrentTime(),
+ EventReportUtils.EVENT_TYPE_CONFIG_UPDATE,
EventReportUtils.EVENT_LEVEL_INFO,
+ EvenCodeEnum.TASK_ADD, profileFromManager.toJsonStr(),
+ EvenCodeEnum.TASK_ADD.getMessage());
} else {
TaskStateEnum managerState = profileFromManager.getState();
TaskStateEnum storeState = taskFromStore.getState();
@@ -331,6 +339,11 @@ public class TaskManager extends AbstractDaemon {
taskStore.getTasks().forEach((profileFromStore) -> {
if (!tasksFromManager.containsKey(profileFromStore.getTaskId())) {
LOGGER.info("traverseStoreTasksToManager try to delete task
{}", profileFromStore.getTaskId());
+ EventReportUtils.report(profileFromStore.getInlongGroupId(),
+ profileFromStore.getInlongStreamId(),
AgentUtils.getCurrentTime(),
+ EventReportUtils.EVENT_TYPE_CONFIG_UPDATE,
EventReportUtils.EVENT_LEVEL_INFO,
+ EvenCodeEnum.TASK_DELETE, profileFromStore.toJsonStr(),
+ EvenCodeEnum.TASK_DELETE.getMessage());
deleteTask(profileFromStore);
}
});
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index dc07605e3e..da2315e480 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -25,6 +25,8 @@ import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.EventReportUtils;
+import org.apache.inlong.agent.utils.EventReportUtils.EvenCodeEnum;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.PullJobTypeEnum;
@@ -132,10 +134,10 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA);
LOGGER.info("Get static config end");
if (element != null) {
- LOGGER.info("Get static config not null {}", resultData);
+ LOGGER.info("Get static config not null {}", resultData);
return GSON.fromJson(element.getAsJsonObject(), TaskResult.class);
} else {
- LOGGER.info("Get static config nothing to do");
+ LOGGER.info("Get static config nothing to do");
return null;
}
}
@@ -192,16 +194,45 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
while (isRunnable()) {
try {
TaskResult taskResult = getStaticConfig();
- if (taskResult != null &&
taskResult.getCode().equals(AgentResponseCode.SUCCESS)
- &&
agentManager.getTaskManager().getTaskResultVersion() < taskResult.getVersion())
{
- List<TaskProfile> taskProfiles = new ArrayList<>();
- taskResult.getDataConfigs().forEach((config) -> {
- TaskProfile profile =
TaskProfile.convertToTaskProfile(config);
- taskProfiles.add(profile);
- });
-
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
-
agentManager.getTaskManager().setTaskResultMd5(taskResult.getMd5());
-
agentManager.getTaskManager().setTaskResultVersion(taskResult.getVersion());
+ if (taskResult != null) {
+ if
(taskResult.getCode().equals(AgentResponseCode.SUCCESS)) {
+ if
(agentManager.getTaskManager().getTaskResultVersion() <
taskResult.getVersion()) {
+ EventReportUtils.report("", "",
AgentUtils.getCurrentTime(),
+
EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_INFO,
+ EvenCodeEnum.CONFIG_UPDATE_SUC,
taskResult.toString(),
+
EvenCodeEnum.CONFIG_UPDATE_SUC.getMessage());
+ List<TaskProfile> taskProfiles = new
ArrayList<>();
+ taskResult.getDataConfigs().forEach((config)
-> {
+ TaskProfile profile =
TaskProfile.convertToTaskProfile(config);
+ taskProfiles.add(profile);
+ });
+
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
+
agentManager.getTaskManager().setTaskResultMd5(taskResult.getMd5());
+
agentManager.getTaskManager().setTaskResultVersion(taskResult.getVersion());
+ } else {
+ EventReportUtils.report("", "",
AgentUtils.getCurrentTime(),
+
EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_WARN,
+
EvenCodeEnum.CONFIG_UPDATE_VERSION_NO_CHANGE, taskResult.toString(),
+
EvenCodeEnum.CONFIG_UPDATE_VERSION_NO_CHANGE.getMessage());
+ LOGGER.warn("%s: %s",
EvenCodeEnum.CONFIG_UPDATE_VERSION_NO_CHANGE.getMessage(),
+ taskResult);
+ }
+ } else if
(taskResult.getCode().equals(AgentResponseCode.NO_UPDATE)) {
+ EventReportUtils.report("", "",
AgentUtils.getCurrentTime(),
+ EventReportUtils.EVENT_TYPE_CONFIG_UPDATE,
EventReportUtils.EVENT_LEVEL_INFO,
+ EvenCodeEnum.CONFIG_NO_UPDATE,
taskResult.toString(),
+
EvenCodeEnum.CONFIG_NO_UPDATE.getMessage());
+ } else {
+ EventReportUtils.report("", "",
AgentUtils.getCurrentTime(),
+ EventReportUtils.EVENT_TYPE_CONFIG_UPDATE,
EventReportUtils.EVENT_LEVEL_ERROR,
+ EvenCodeEnum.CONFIG_INVALID_RET_CODE,
taskResult.toString(),
+
EvenCodeEnum.CONFIG_INVALID_RET_CODE.getMessage());
+ }
+ } else {
+ EventReportUtils.report("", "",
AgentUtils.getCurrentTime(),
+ EventReportUtils.EVENT_TYPE_CONFIG_UPDATE,
EventReportUtils.EVENT_LEVEL_ERROR,
+ EvenCodeEnum.CONFIG_INVALID_RESULT,
taskResult.toString(),
+
EvenCodeEnum.CONFIG_INVALID_RESULT.getMessage());
}
AgentConfigInfo config = getAgentConfigInfo();
if (config != null &&
config.getCode().equals(AgentResponseCode.SUCCESS)) {
@@ -211,6 +242,10 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
}
}
} catch (Throwable ex) {
+ EventReportUtils.report("", "",
AgentUtils.getCurrentTime(),
+ EventReportUtils.EVENT_TYPE_CONFIG_UPDATE,
EventReportUtils.EVENT_LEVEL_ERROR,
+ EvenCodeEnum.CONFIG_INVALID_RESULT,
ex.getMessage(),
+ EvenCodeEnum.CONFIG_INVALID_RESULT.getMessage());
LOGGER.warn("exception caught", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
} finally {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
index 7385cd3729..00a69f5a1b 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
@@ -214,14 +214,23 @@ public class Sender {
proxyClientConfig.setEnableDataCompress(isCompress);
SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" +
sourcePath,
Thread.currentThread().isDaemon());
+ boolean hasError = false;
+ ProcessResult procResult = null;
for (int i = 0; i < maxSenderPerGroup; i++) {
InLongTcpMsgSender sender = new
InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY);
- ProcessResult procResult = new ProcessResult();
+ procResult = new ProcessResult();
if (!sender.start(procResult)) {
- throw new ProxySdkException("Start sender failure, " +
procResult);
+ hasError = true;
+ break;
}
senders.add(sender);
}
+ if (hasError) {
+ senders.forEach(sender -> {
+ sender.close();
+ });
+ throw new ProxySdkException("Start sender failure, " + procResult);
+ }
}
public void sendBatch(SenderMessage message) {
@@ -376,7 +385,7 @@ public class Sender {
AgentStatusManager.sendPackageCount.addAndGet(message.getMsgCnt());
AgentStatusManager.sendDataLen.addAndGet(message.getTotalSize());
} else {
- LOGGER.warn("send groupId {}, streamId {}, taskId {},
instanceId {}, dataTime {} fail with times {}, "
+ LOGGER.error("send groupId {}, streamId {}, taskId {},
instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId,
dataTime, retry, result);
getMetricItem(groupId,
streamId).pluginSendFailCount.addAndGet(msgCnt);
putInResendQueue(new AgentSenderCallback(message, retry));