This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 75c7648bc [INLONG-6586][Sort] Fix the wrong node duration time of 
Kafka sink (#6587)
75c7648bc is described below

commit 75c7648bc9bffda95807affb29f2d48f29b4edb4
Author: vernedeng <[email protected]>
AuthorDate: Thu Nov 24 10:49:15 2022 +0800

    [INLONG-6586][Sort] Fix the wrong node duration time of Kafka sink (#6587)
---
 .../sort/standalone/sink/kafka/KafkaFederationSinkContext.java      | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index dc6f57001..5d6cfadb3 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.sort.standalone.sink.kafka;
 
 import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
@@ -29,7 +28,6 @@ import 
org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
-import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.slf4j.Logger;
 
@@ -204,8 +202,8 @@ public class KafkaFederationSinkContext extends SinkContext 
{
             if (sendTime > 0) {
                 long currentTime = System.currentTimeMillis();
                 long sinkDuration = currentTime - sendTime;
-                long nodeDuration = currentTime - 
NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
-                long wholeDuration = currentTime - msgTime;
+                long nodeDuration = currentTime - currentRecord.getFetchTime();
+                long wholeDuration = currentTime - 
currentRecord.getRawLogTime();
                 metricItem.sinkDuration.addAndGet(sinkDuration * count);
                 metricItem.nodeDuration.addAndGet(nodeDuration * count);
                 metricItem.wholeDuration.addAndGet(wholeDuration * count);

Reply via email to