This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 75c7648bc [INLONG-6586][Sort] Fix the wrong node duration time of
Kafka sink (#6587)
75c7648bc is described below
commit 75c7648bc9bffda95807affb29f2d48f29b4edb4
Author: vernedeng <[email protected]>
AuthorDate: Thu Nov 24 10:49:15 2022 +0800
[INLONG-6586][Sort] Fix the wrong node duration time of Kafka sink (#6587)
---
.../sort/standalone/sink/kafka/KafkaFederationSinkContext.java | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index dc6f57001..5d6cfadb3 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -18,7 +18,6 @@
package org.apache.inlong.sort.standalone.sink.kafka;
import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
@@ -29,7 +28,6 @@ import
org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
-import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
@@ -204,8 +202,8 @@ public class KafkaFederationSinkContext extends SinkContext
{
if (sendTime > 0) {
long currentTime = System.currentTimeMillis();
long sinkDuration = currentTime - sendTime;
- long nodeDuration = currentTime -
NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
- long wholeDuration = currentTime - msgTime;
+ long nodeDuration = currentTime - currentRecord.getFetchTime();
+ long wholeDuration = currentTime -
currentRecord.getRawLogTime();
metricItem.sinkDuration.addAndGet(sinkDuration * count);
metricItem.nodeDuration.addAndGet(nodeDuration * count);
metricItem.wholeDuration.addAndGet(wholeDuration * count);