This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3fe9c63 [INLONG-2056] The metric of DataProxy append a
dimension(minute level) of event time, supporting audit reconciliation of
minute level. (#2057)
3fe9c63 is described below
commit 3fe9c634dc0f872f5b4ee5c314228e37b28bf0cd
Author: 卢春亮 <[email protected]>
AuthorDate: Thu Dec 23 18:13:48 2021 +0800
[INLONG-2056] The metric of DataProxy append a dimension(minute level) of
event time, supporting audit reconciliation of minute level. (#2057)
---
.../dataproxy/config/holder/CommonPropertiesHolder.java | 15 +++++++++++++++
.../inlong/dataproxy/metrics/DataProxyMetricItem.java | 3 +++
.../sink/pulsar/federation/PulsarFederationWorker.java | 6 ++++++
.../sink/pulsar/federation/PulsarProducerCluster.java | 6 ++++--
.../inlong/dataproxy/source/ServerMessageHandler.java | 4 ++++
5 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index 6d9aa95..adbaba0 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.ClassUtils;
import org.apache.flume.Context;
import
org.apache.inlong.dataproxy.config.loader.ClassResourceCommonPropertiesLoader;
import org.apache.inlong.dataproxy.config.loader.CommonPropertiesLoader;
+import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,8 @@ public class CommonPropertiesHolder {
private static Map<String, String> props;
+ private static long auditFormatInterval = 60000L;
+
/**
* init
*/
@@ -55,6 +58,8 @@ public class CommonPropertiesHolder {
CommonPropertiesLoader loader =
(CommonPropertiesLoader) loaderObject;
props.putAll(loader.load());
LOG.info("loaderClass:{},properties:{}",
loaderClassName, props);
+ auditFormatInterval = NumberUtils
+
.toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
}
} catch (Throwable t) {
LOG.error("Fail to init
CommonPropertiesLoader,loaderClass:{},error:{}",
@@ -113,4 +118,14 @@ public class CommonPropertiesHolder {
value = (value != null) ? value : props.getOrDefault(key,
defaultValue);
return value;
}
+
+ /**
+ * getAuditFormatInterval
+ *
+ * @return
+ */
+ public static long getAuditFormatInterval() {
+ return auditFormatInterval;
+ }
+
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
index fd75537..e11cbd4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
@@ -41,6 +41,7 @@ public class DataProxyMetricItem extends MetricItem {
public static final String KEY_INLONG_STREAM_ID = "inlongStreamId";
public static final String KEY_SINK_ID = "sinkId";
public static final String KEY_SINK_DATA_ID = "sinkDataId";
+ public static final String KEY_MESSAGE_TIME = "msgTime";
//
public static final String M_READ_SUCCESS_COUNT = "readSuccessCount";
public static final String M_READ_SUCCESS_SIZE = "readSuccessSize";
@@ -71,6 +72,8 @@ public class DataProxyMetricItem extends MetricItem {
public String sinkId;
@Dimension
public String sinkDataId;
+ @Dimension
+ public String msgTime = String.valueOf(0);
@CountMetric
public AtomicLong readSuccessCount = new AtomicLong(0);
@CountMetric
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
index 914cff3..e3fed03 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
@@ -20,8 +20,10 @@ package org.apache.inlong.dataproxy.sink.pulsar.federation;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Event;
import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
@@ -101,6 +103,10 @@ public class PulsarFederationWorker extends Thread {
DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
this.dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
currentRecord.getHeaders().get(Constants.TOPIC));
+ long msgTime =
NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
+ System.currentTimeMillis());
+ long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME,
String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem =
this.context.getMetricItemSet().findMetricItem(dimensions);
metricItem.sendCount.incrementAndGet();
metricItem.sendSize.addAndGet(currentRecord.getBody().length);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
index 4c82144..6b8457f 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
@@ -29,6 +29,7 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
@@ -269,14 +270,15 @@ public class PulsarProducerCluster implements
LifecycleAware {
DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.cacheClusterName);
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
+ long msgTime =
NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
sendTime);
+ long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME,
String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem =
this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
if (result) {
metricItem.sendSuccessCount.incrementAndGet();
metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
if (sendTime > 0) {
long currentTime = System.currentTimeMillis();
- long msgTime =
NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
- sendTime);
long sinkDuration = currentTime - sendTime;
long nodeDuration = currentTime -
NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
long wholeDuration = currentTime - msgTime;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 3e05d92..1f6d87d 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -42,6 +42,7 @@ import org.apache.flume.source.AbstractSource;
import org.apache.inlong.commons.msg.TDMsg1;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.ErrorCode;
@@ -688,6 +689,9 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID,
source.getName());
dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, "");
dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, "");
+ long msgTime = System.currentTimeMillis();
+ long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME,
String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem =
this.metricItemSet.findMetricItem(dimensions);
if (result) {
metricItem.readSuccessCount.incrementAndGet();