This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 28340ca90 [INLONG-6022][Agent] Fix lost read and send count (#6023)
28340ca90 is described below
commit 28340ca905107f91777a01e17f3285d325bc3cda
Author: Lucas <[email protected]>
AuthorDate: Tue Sep 27 20:08:53 2022 +0800
[INLONG-6022][Agent] Fix lost read and send count (#6023)
---
.../inlong/agent/metrics/AgentMetricItem.java | 7 ++-
.../inlong/agent/plugin/channel/MemoryChannel.java | 70 ++++++++++++++--------
.../inlong/agent/plugin/sinks/SenderManager.java | 6 ++
.../agent/plugin/sources/reader/BinlogReader.java | 3 +-
.../agent/plugin/sources/reader/KafkaReader.java | 1 +
.../agent/plugin/sources/reader/MongoDBReader.java | 4 +-
.../plugin/sources/reader/PostgreSQLReader.java | 3 +-
.../plugin/sources/reader/SQLServerReader.java | 2 +
.../agent/plugin/sources/reader/SqlReader.java | 2 +
.../sources/reader/file/FileReaderOperator.java | 1 +
.../agent/plugin/sources/TestSQLServerReader.java | 4 ++
11 files changed, 70 insertions(+), 33 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
index dbec354ff..96d35f97c 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.metrics;
import org.apache.inlong.common.metric.CountMetric;
import org.apache.inlong.common.metric.Dimension;
+import org.apache.inlong.common.metric.GaugeMetric;
import org.apache.inlong.common.metric.MetricDomain;
import org.apache.inlong.common.metric.MetricItem;
@@ -61,13 +62,13 @@ public class AgentMetricItem extends MetricItem {
@Dimension
public String inlongStreamId;
- @CountMetric
+ @GaugeMetric
public AtomicLong jobRunningCount = new AtomicLong(0);
@CountMetric
public AtomicLong jobFatalCount = new AtomicLong(0);
- @CountMetric
+ @GaugeMetric
public AtomicLong taskRunningCount = new AtomicLong(0);
- @CountMetric
+ @GaugeMetric
public AtomicLong taskRetryingCount = new AtomicLong(0);
@CountMetric
public AtomicLong taskFatalCount = new AtomicLong(0);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
index 9125b5852..086d45b8b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
@@ -19,7 +19,6 @@ package org.apache.inlong.agent.plugin.channel;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.plugin.Channel;
@@ -35,7 +34,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
/**
@@ -49,38 +52,31 @@ public class MemoryChannel implements Channel {
//metric
private AgentMetricItemSet metricItemSet;
private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+ private String inlongGroupId;
+ private String inlongStreamId;
public MemoryChannel() {
}
@Override
public void push(Message message) {
- String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
try {
if (message != null) {
- if (message instanceof ProxyMessage) {
- groupId = ((ProxyMessage) message).getInlongGroupId();
- }
- AgentMetricItem metricItem =
getMetricItem(KEY_INLONG_GROUP_ID, groupId);
+ AgentMetricItem metricItem = getMetricItem(new HashMap<String,
String>());
metricItem.pluginReadCount.incrementAndGet();
queue.put(message);
metricItem.pluginReadSuccessCount.incrementAndGet();
}
} catch (InterruptedException ex) {
- getMetricItem(KEY_INLONG_GROUP_ID,
groupId).pluginReadFailCount.incrementAndGet();
- Thread.currentThread().interrupt();
+ this.metricItemReadFailed();
}
}
@Override
public boolean push(Message message, long timeout, TimeUnit unit) {
- String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
try {
if (message != null) {
- if (message instanceof ProxyMessage) {
- groupId = ((ProxyMessage) message).getInlongGroupId();
- }
- AgentMetricItem metricItem =
getMetricItem(KEY_INLONG_GROUP_ID, groupId);
+ AgentMetricItem metricItem = getMetricItem(new HashMap<String,
String>());
metricItem.pluginReadCount.incrementAndGet();
boolean result = queue.offer(message, timeout, unit);
if (result) {
@@ -91,36 +87,31 @@ public class MemoryChannel implements Channel {
return result;
}
} catch (InterruptedException ex) {
- AgentMetricItem metricItem = getMetricItem(KEY_INLONG_GROUP_ID,
groupId);
- metricItem.pluginReadFailCount.incrementAndGet();
- Thread.currentThread().interrupt();
+ this.metricItemReadFailed();
}
return false;
}
@Override
public Message pull(long timeout, TimeUnit unit) {
- String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
try {
Message message = queue.poll(timeout, unit);
if (message != null) {
- if (message instanceof ProxyMessage) {
- groupId = ((ProxyMessage) message).getInlongGroupId();
- }
- AgentMetricItem metricItem =
getMetricItem(KEY_INLONG_GROUP_ID, groupId);
+ AgentMetricItem metricItem = getMetricItem(new HashMap<String,
String>());
metricItem.pluginSendSuccessCount.incrementAndGet();
+ metricItem.pluginSendCount.incrementAndGet();
}
return message;
} catch (InterruptedException ex) {
- AgentMetricItem metricItem = getMetricItem(KEY_INLONG_GROUP_ID,
groupId);
- metricItem.pluginSendFailCount.incrementAndGet();
- Thread.currentThread().interrupt();
+ this.metricItemSendFailed();
throw new IllegalStateException(ex);
}
}
@Override
public void init(JobProfile jobConf) {
+ inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID,
DEFAULT_PROXY_INLONG_GROUP_ID);
+ inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID,
DEFAULT_PROXY_INLONG_STREAM_ID);
queue = new LinkedBlockingQueue<>(
jobConf.getInt(AgentConstants.CHANNEL_MEMORY_CAPACITY,
AgentConstants.DEFAULT_CHANNEL_MEMORY_CAPACITY));
@@ -138,10 +129,37 @@ public class MemoryChannel implements Channel {
LOGGER.info("destroy channel, show memory channel metric:");
}
- private AgentMetricItem getMetricItem(String otherKey, String value) {
+ private AgentMetricItem getMetricItem(Map<String, String> dimens) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
- dimensions.put(otherKey, value);
+ dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+ dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+ dimens.forEach((key, value) -> {
+ dimensions.put(key, value);
+ });
return this.metricItemSet.findMetricItem(dimensions);
}
+
+ private void metricItemReadFailed() {
+ Map<String, String> dimensions = new HashMap<String, String>();
+ dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+ dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+ AgentMetricItem metricItem = getMetricItem(dimensions);
+ metricItem.pluginReadFailCount.incrementAndGet();
+ LOGGER.debug("plugin read failed:{}", dimensions.toString());
+ Thread.currentThread().interrupt();
+ return;
+ }
+
+ private void metricItemSendFailed() {
+ Map<String, String> dimensions = new HashMap<String, String>();
+ dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+ dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+ AgentMetricItem metricItem = getMetricItem(dimensions);
+ metricItem.pluginSendFailCount.incrementAndGet();
+ metricItem.pluginSendCount.incrementAndGet();
+ LOGGER.debug("plugin send failed:{}", dimensions.toString());
+ Thread.currentThread().interrupt();
+ return;
+ }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index dd164847d..9cc10d604 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -256,6 +256,7 @@ public class SenderManager {
AgentUtils.silenceSleepInMs(retrySleepTime);
}
Map<String, String> dims = new HashMap<>();
+ dims.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
dims.put(KEY_INLONG_GROUP_ID, groupId);
dims.put(KEY_INLONG_STREAM_ID, streamId);
try {
@@ -264,11 +265,13 @@ public class SenderManager {
if (result == SendResult.OK) {
semaphore.release(bodyList.size());
getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
+ getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size());
long totalSize = bodyList.stream().mapToLong(body ->
body.length).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId, dataTime, bodyList.size(),
totalSize);
} else {
getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size());
+ getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size());
LOGGER.warn("send data to dataproxy error {}",
result.toString());
sendBatchSync(groupId, streamId, bodyList, retry + 1,
dataTime, extraMap);
}
@@ -279,6 +282,7 @@ public class SenderManager {
try {
TimeUnit.SECONDS.sleep(1);
getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size());
+ getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size());
sendBatchSync(groupId, streamId, bodyList, retry + 1,
dataTime, extraMap);
} catch (Exception ignored) {
// ignore it.
@@ -319,12 +323,14 @@ public class SenderManager {
}
semaphore.release(bodyList.size());
Map<String, String> dims = new HashMap<>();
+ dims.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
dims.put(KEY_INLONG_GROUP_ID, groupId);
dims.put(KEY_INLONG_STREAM_ID, streamId);
long totalSize = bodyList.stream().mapToLong(body ->
body.length).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId,
streamId, dataTime, bodyList.size(),
totalSize);
getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
+ getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size());
if (sourcePath != null) {
taskPositionManager.updateSinkPosition(jobId, sourcePath,
bodyList.size());
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 3fab3899c..38877951b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -111,7 +111,6 @@ public class BinlogReader extends AbstractReader {
@Override
public Message read() {
if (!binlogMessagesQueue.isEmpty()) {
- readerMetric.pluginReadCount.incrementAndGet();
return getBinlogMessage();
} else {
return null;
@@ -170,9 +169,11 @@ public class BinlogReader extends AbstractReader {
long dataSize = records.stream().mapToLong(r ->
r.value().length()).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId,
System.currentTimeMillis(), records.size(),
dataSize);
+
readerMetric.pluginReadSuccessCount.addAndGet(records.size());
readerMetric.pluginReadCount.addAndGet(records.size());
} catch (Exception e) {
readerMetric.pluginReadFailCount.addAndGet(records.size());
+ readerMetric.pluginReadCount.addAndGet(records.size());
LOGGER.error("parse binlog message error", e);
}
})
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 79e5479ee..52e72ed10 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -121,6 +121,7 @@ public class KafkaReader<K, V> extends AbstractReader {
"partition:" + record.partition()
+ ", value:" + new String(recordValue) + ",
offset:" + record.offset());
// control speed
+ readerMetric.pluginReadSuccessCount.incrementAndGet();
readerMetric.pluginReadCount.incrementAndGet();
// commit succeed,then record current offset
snapshot = record.partition() +
JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
index c7f0341f5..3abe037bb 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
@@ -136,7 +136,6 @@ public class MongoDBReader extends AbstractReader {
@Override
public Message read() {
if (!bufferPool.isEmpty()) {
- super.readerMetric.pluginReadCount.incrementAndGet();
return this.pollMessage();
} else {
return null;
@@ -367,12 +366,13 @@ public class MongoDBReader extends AbstractReader {
long dataSize = records.stream().mapToLong(c ->
c.value().length()).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
super.inlongGroupId, super.inlongStreamId,
System.currentTimeMillis(), records.size(), dataSize);
+ readerMetric.pluginReadSuccessCount.addAndGet(records.size());
readerMetric.pluginReadCount.addAndGet(records.size());
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.error("parse mongo message error", e);
-
readerMetric.pluginReadFailCount.addAndGet(records.size());
+ readerMetric.pluginReadCount.addAndGet(records.size());
}
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
index dc7587bc7..23bfa1fbe 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
@@ -108,7 +108,6 @@ public class PostgreSQLReader extends AbstractReader {
@Override
public Message read() {
if (!postgreSQLMessageQueue.isEmpty()) {
- readerMetric.pluginReadCount.incrementAndGet();
return getPostgreSQLMessage();
} else {
return null;
@@ -165,9 +164,11 @@ public class PostgreSQLReader extends AbstractReader {
long dataSize = records.stream().mapToLong(c ->
c.value().length()).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId,
System.currentTimeMillis(), records.size(),
dataSize);
+
readerMetric.pluginReadSuccessCount.addAndGet(records.size());
readerMetric.pluginReadCount.addAndGet(records.size());
} catch (Exception e) {
readerMetric.pluginReadFailCount.addAndGet(records.size());
+ readerMetric.pluginReadCount.addAndGet(records.size());
LOGGER.error("parse binlog message error", e);
}
})
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
index 9edb14ba3..c4daa8560 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
@@ -112,11 +112,13 @@ public class SQLServerReader extends AbstractReader {
long dataSize = lineColumns.stream().mapToLong(column ->
column.length()).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId,
System.currentTimeMillis(), 1, dataSize);
+ readerMetric.pluginReadSuccessCount.incrementAndGet();
readerMetric.pluginReadCount.incrementAndGet();
return generateMessage(lineColumns);
} catch (Exception ex) {
LOGGER.error("error while reading data", ex);
readerMetric.pluginReadFailCount.incrementAndGet();
+ readerMetric.pluginReadCount.incrementAndGet();
throw new RuntimeException(ex);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index b1b0c2280..130326462 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -118,6 +118,7 @@ public class SqlReader extends AbstractReader {
long dataSize = lineColumns.stream().mapToLong(column ->
column.length()).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId,
System.currentTimeMillis(), 1, dataSize);
+ readerMetric.pluginReadSuccessCount.incrementAndGet();
readerMetric.pluginReadCount.incrementAndGet();
return generateMessage(lineColumns);
} else {
@@ -126,6 +127,7 @@ public class SqlReader extends AbstractReader {
} catch (Exception ex) {
LOGGER.error("error while reading data", ex);
readerMetric.pluginReadFailCount.incrementAndGet();
+ readerMetric.pluginReadCount.incrementAndGet();
throw new RuntimeException(ex);
}
return null;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 83d6e2a97..33e8816cc 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -93,6 +93,7 @@ public class FileReaderOperator extends AbstractReader {
if (validateMessage(message)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId,
System.currentTimeMillis(), 1, message.length());
+ readerMetric.pluginReadSuccessCount.incrementAndGet();
readerMetric.pluginReadCount.incrementAndGet();
String proxyPartitionKey =
jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
index 6b100fd84..7b07378b6 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
@@ -93,6 +93,8 @@ public class TestSQLServerReader {
private AtomicLong atomicLong;
+ private AtomicLong atomicCountLong;
+
private String sql;
@Before
@@ -107,6 +109,7 @@ public class TestSQLServerReader {
final String groupId = "group01";
final String streamId = "stream01";
atomicLong = new AtomicLong(0L);
+ atomicCountLong = new AtomicLong(0L);
sql = "select * from dbo.test01";
@@ -143,6 +146,7 @@ public class TestSQLServerReader {
whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem);
field(AgentMetricItem.class, "pluginReadCount").set(agentMetricItem,
atomicLong);
+ field(AgentMetricItem.class,
"pluginReadSuccessCount").set(agentMetricItem, atomicCountLong);
//init method
(reader = new SQLServerReader(sql)).init(jobProfile);