This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3fbef9072 [INLONG-5788][DataProxy] Fix the incorrect readFailSize
metric value (#5792)
3fbef9072 is described below
commit 3fbef9072b1383f48022d3cabb3484f315d8c3da
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Sep 6 15:10:40 2022 +0800
[INLONG-5788][DataProxy] Fix the incorrect readFailSize metric value (#5792)
---
.../src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java | 2 +-
.../java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java | 4 ++--
.../src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java | 2 +-
3 files changed, 4 insertions(+), 4 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 7ba7724e8..71aceb9c5 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -418,7 +418,7 @@ public class PulsarSink extends AbstractSink implements
Configurable, SendMessag
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
event.getHeaders().getOrDefault(TOPIC, ""));
DataProxyMetricItem metricItem =
this.metricItemSet.findMetricItem(dimensions);
metricItem.readSuccessCount.incrementAndGet();
- metricItem.readFailSize.addAndGet(event.getBody().length);
+
metricItem.readSuccessSize.addAndGet(event.getBody().length);
}
} else {
status = Status.BACKOFF;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
index def513ce5..a0e996029 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
@@ -597,8 +597,8 @@ public class SimpleMessageTubeSink extends AbstractSink
implements Configurable
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
"");
}
DataProxyMetricItem metricItem =
this.metricItemSet.findMetricItem(dimensions);
- metricItem.readFailCount.incrementAndGet();
- metricItem.readFailSize.addAndGet(event.getBody().length);
+ metricItem.readSuccessCount.incrementAndGet();
+
metricItem.readSuccessSize.addAndGet(event.getBody().length);
}
} else {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 74bdf719f..0f96aa0b9 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -287,7 +287,7 @@ public class TubeSink extends AbstractSink implements
Configurable {
cachedMsgCnt.incrementAndGet();
DataProxyMetricItem metricItem =
this.metricItemSet.findMetricItem(dimensions);
metricItem.readSuccessCount.incrementAndGet();
- metricItem.readFailSize.addAndGet(event.getBody().length);
+
metricItem.readSuccessSize.addAndGet(event.getBody().length);
} else {
tx.rollback();
//logger.info("[{}] Channel --> Queue(has no enough
space,current code point) "