This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4e631e2cd [INLONG-5238][DataProxy] Error metric params in
addSendFailMetric (#5239)
4e631e2cd is described below
commit 4e631e2cda992e7806f115a755a593121195d39c
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Jul 29 17:07:15 2022 +0800
[INLONG-5238][DataProxy] Error metric params in addSendFailMetric (#5239)
---
.../org/apache/inlong/dataproxy/sink/SinkContext.java | 8 ++++----
.../sink/kafkazone/KafkaZoneSinkContext.java | 19 +++----------------
.../sink/pulsarzone/PulsarZoneSinkContext.java | 13 -------------
.../dataproxy/sink/tubezone/TubeZoneSinkContext.java | 19 +++----------------
4 files changed, 10 insertions(+), 49 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
index dbf823bd8..03c9c6100 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java
@@ -40,7 +40,7 @@ public class SinkContext {
public static final String KEY_PROCESS_INTERVAL = "processInterval";
public static final String KEY_RELOAD_INTERVAL = "reloadInterval";
- protected final String clusterId;
+ protected final String proxyClusterId;
protected final String sinkName;
protected final Context sinkContext;
@@ -60,7 +60,7 @@ public class SinkContext {
this.sinkName = sinkName;
this.sinkContext = context;
this.channel = channel;
- this.clusterId =
context.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
+ this.proxyClusterId =
context.getString(CommonPropertiesHolder.KEY_PROXY_CLUSTER_NAME);
this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
this.processInterval = sinkContext.getInteger(KEY_PROCESS_INTERVAL,
100);
this.reloadInterval = sinkContext.getLong(KEY_RELOAD_INTERVAL, 60000L);
@@ -117,8 +117,8 @@ public class SinkContext {
*
* @return the clusterId
*/
- public String getClusterId() {
- return clusterId;
+ public String getProxyClusterId() {
+ return proxyClusterId;
}
/**
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
index f3985e8dd..f94bfd6aa 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.dataproxy.sink.kafkazone;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
@@ -46,7 +45,6 @@ public class KafkaZoneSinkContext extends SinkContext {
private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
- private final String proxyClusterId;
private final String nodeId;
private final Context producerContext;
//
@@ -63,8 +61,6 @@ public class KafkaZoneSinkContext extends SinkContext {
LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
super(sinkName, context, channel);
this.dispatchQueue = dispatchQueue;
- // proxyClusterId
- this.proxyClusterId =
CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
// nodeId
this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID,
"127.0.0.1");
// compressionType
@@ -101,15 +97,6 @@ public class KafkaZoneSinkContext extends SinkContext {
this.cacheHolder.close();
}
- /**
- * get proxyClusterId
- *
- * @return the proxyClusterId
- */
- public String getProxyClusterId() {
- return proxyClusterId;
- }
-
/**
* get dispatchQueue
*
@@ -172,7 +159,7 @@ public class KafkaZoneSinkContext extends SinkContext {
*/
public void addSendingMetric(DispatchProfile currentRecord, String bid) {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getProxyClusterId());
// metric
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
@@ -192,7 +179,7 @@ public class KafkaZoneSinkContext extends SinkContext {
*/
public void addSendFailMetric() {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getProxyClusterId());
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
long msgTime = System.currentTimeMillis();
long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
@@ -241,7 +228,7 @@ public class KafkaZoneSinkContext extends SinkContext {
*/
public void addSendResultMetric(DispatchProfile currentRecord, String bid,
boolean result, long sendTime) {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getProxyClusterId());
// metric
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
index 8f5b22401..51cb7437a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.dataproxy.sink.pulsarzone;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
@@ -46,7 +45,6 @@ public class PulsarZoneSinkContext extends SinkContext {
private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
- private final String proxyClusterId;
private final String nodeId;
private final Context producerContext;
//
@@ -63,8 +61,6 @@ public class PulsarZoneSinkContext extends SinkContext {
LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
super(sinkName, context, channel);
this.dispatchQueue = dispatchQueue;
- // proxyClusterId
- this.proxyClusterId =
CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
// nodeId
this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID,
"127.0.0.1");
// compressionType
@@ -101,15 +97,6 @@ public class PulsarZoneSinkContext extends SinkContext {
this.cacheHolder.close();
}
- /**
- * get proxyClusterId
- *
- * @return the proxyClusterId
- */
- public String getProxyClusterId() {
- return proxyClusterId;
- }
-
/**
* get dispatchQueue
*
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
index f56cf3c81..9e94eae59 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.dataproxy.sink.tubezone;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
-import org.apache.inlong.dataproxy.config.RemoteConfigManager;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
@@ -46,7 +45,6 @@ public class TubeZoneSinkContext extends SinkContext {
private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
- private final String proxyClusterId;
private final String nodeId;
private final Context producerContext;
//
@@ -63,8 +61,6 @@ public class TubeZoneSinkContext extends SinkContext {
LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
super(sinkName, context, channel);
this.dispatchQueue = dispatchQueue;
- // proxyClusterId
- this.proxyClusterId =
CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
// nodeId
this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID,
"127.0.0.1");
// compressionType
@@ -101,15 +97,6 @@ public class TubeZoneSinkContext extends SinkContext {
this.cacheHolder.close();
}
- /**
- * get proxyClusterId
- *
- * @return the proxyClusterId
- */
- public String getProxyClusterId() {
- return proxyClusterId;
- }
-
/**
* get dispatchQueue
*
@@ -172,7 +159,7 @@ public class TubeZoneSinkContext extends SinkContext {
*/
public void addSendMetric(DispatchProfile currentRecord, String bid) {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getProxyClusterId());
// metric
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
@@ -192,7 +179,7 @@ public class TubeZoneSinkContext extends SinkContext {
*/
public void addSendFailMetric() {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getProxyClusterId());
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
long msgTime = System.currentTimeMillis();
long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
@@ -241,7 +228,7 @@ public class TubeZoneSinkContext extends SinkContext {
*/
public void addSendResultMetric(DispatchProfile currentRecord, String bid,
boolean result, long sendTime) {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getClusterId());
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.getProxyClusterId());
// metric
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());