This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5d062bc93 [INLONG-6738][DataProxy] New sink architecture integration
(#6739)
5d062bc93 is described below
commit 5d062bc93ba62a0d4e09268d5b328fa6663dff55
Author: woofyzhao <[email protected]>
AuthorDate: Wed Dec 7 17:27:19 2022 +0800
[INLONG-6738][DataProxy] New sink architecture integration (#6739)
---
.../inlong/common/heartbeat/ComponentHeartbeat.java | 12 ++++++++----
.../apache/inlong/common/heartbeat/HeartbeatMsg.java | 7 ++++++-
inlong-dataproxy/conf/common.properties | 5 +++++
inlong-dataproxy/conf/dataproxy-pulsar.conf | 15 ++++++++++-----
.../inlong/dataproxy/config/RemoteConfigManager.java | 11 ++++++-----
.../apache/inlong/dataproxy/consts/ConfigConstants.java | 1 +
.../inlong/dataproxy/heartbeat/HeartbeatManager.java | 1 +
.../dataproxy/sink/mq/MessageQueueZoneProducer.java | 4 ++++
.../inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java | 17 +++++++++++++----
.../inlong/dataproxy/source/ServerMessageHandler.java | 6 ++++--
.../apache/inlong/dataproxy/utils/ConfStringUtils.java | 3 +++
.../manager/service/heartbeat/HeartbeatManager.java | 2 ++
.../service/repository/DataProxyConfigRepository.java | 3 +++
13 files changed, 66 insertions(+), 21 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
index 567992918..96c173672 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
@@ -31,6 +31,8 @@ public class ComponentHeartbeat {
private String clusterTag;
+ private String extTag;
+
private String clusterName;
private String componentType;
@@ -49,11 +51,12 @@ public class ComponentHeartbeat {
public ComponentHeartbeat() {
}
- public ComponentHeartbeat(String clusterTag, String clusterName,
- String componentType, String ip, String port,
- String inCharges, String protocolType) {
+ public ComponentHeartbeat(String clusterTag, String extTag,
+ String clusterName, String componentType, String ip,
+ String port, String inCharges, String protocolType) {
this.nodeSrvStatus = NodeSrvStatus.OK;
this.clusterTag = clusterTag;
+ this.extTag = extTag;
this.clusterName = clusterName;
this.componentType = componentType;
this.ip = ip;
@@ -64,11 +67,12 @@ public class ComponentHeartbeat {
}
public ComponentHeartbeat(NodeSrvStatus nodeSrvStatus,
- String clusterTag, String clusterName,
+ String clusterTag, String extTag, String clusterName,
String componentType, String ip, String port,
String inCharges, String protocolType, int loadValue) {
this.nodeSrvStatus = nodeSrvStatus;
this.clusterTag = clusterTag;
+ this.extTag = extTag;
this.clusterName = clusterName;
this.componentType = componentType;
this.ip = ip;
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
index c386c9b60..cbfddf470 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
@@ -75,6 +75,11 @@ public class HeartbeatMsg {
*/
private String clusterTag;
+ /**
+ * Ext tag of cluster, key=value pairs seperated by &
+ */
+ private String extTag;
+
/**
* Name of responsible person, separated by commas(,)
*/
@@ -96,7 +101,7 @@ public class HeartbeatMsg {
private Integer load = 0xffff;
public ComponentHeartbeat componentHeartbeat() {
- return new ComponentHeartbeat(nodeSrvStatus, clusterTag, clusterName,
+ return new ComponentHeartbeat(nodeSrvStatus, clusterTag, extTag,
clusterName,
componentType, ip, port, inCharges, protocolType, load);
}
}
diff --git a/inlong-dataproxy/conf/common.properties
b/inlong-dataproxy/conf/common.properties
index 557a10cb2..a13d63ac6 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -24,6 +24,7 @@ manager.auth.secretKey=
# proxy cluster name
proxy.cluster.name=default_dataproxy
proxy.cluster.tag=default_cluster
+proxy.cluster.extTag=default=true
proxy.cluster.inCharges=admin
# check interval of local config (millisecond)
configCheckInterval=10000
@@ -38,3 +39,7 @@ prometheusHttpPort=9081
audit.enable=true
# audit proxy address
audit.proxys=127.0.0.1:10081
+
+# remote config loader
+idTopicConfig.type=org.apache.inlong.dataproxy.config.loader.ManagerIdTopicConfigLoader
+cacheClusterConfig.type=org.apache.inlong.dataproxy.config.loader.ManagerCacheClusterConfigLoader
diff --git a/inlong-dataproxy/conf/dataproxy-pulsar.conf
b/inlong-dataproxy/conf/dataproxy-pulsar.conf
index 7cd457392..628d71ec4 100644
--- a/inlong-dataproxy/conf/dataproxy-pulsar.conf
+++ b/inlong-dataproxy/conf/dataproxy-pulsar.conf
@@ -116,17 +116,22 @@ agent1.channels.ch-msg6.fsyncPerTransaction = false
agent1.channels.ch-msg6.fsyncInterval = 10
agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
-agent1.sinks.pulsar-sink-msg1.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg1.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.pulsar-sink-msg1.maxThreads=1
agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
-agent1.sinks.pulsar-sink-msg2.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg2.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.pulsar-sink-msg1.maxThreads=1
# For order message
agent1.sinks.pulsar-sink-msg3.channel = ch-msg3
-agent1.sinks.pulsar-sink-msg3.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg3.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.pulsar-sink-msg1.maxThreads=1
agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
-agent1.sinks.pulsar-sink-msg5.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg5.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.pulsar-sink-msg1.maxThreads=1
agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
-agent1.sinks.pulsar-sink-msg6.type =
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg6.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.pulsar-sink-msg1.maxThreads=1
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
index f2f33ce85..dc2548edb 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
@@ -108,18 +108,19 @@ public class RemoteConfigManager implements IRepository {
try {
String strReloadInterval =
CommonPropertiesHolder.getString(KEY_CONFIG_CHECK_INTERVAL);
instance.reloadInterval =
NumberUtils.toLong(strReloadInterval, DEFAULT_HEARTBEAT_INTERVAL_MS);
- //
- String ipListParserType =
CommonPropertiesHolder.getString(IManagerIpListParser.KEY_MANAGER_TYPE);
+
+ String ipListParserType =
CommonPropertiesHolder.getString(IManagerIpListParser.KEY_MANAGER_TYPE,
+ DefaultManagerIpListParser.class.getName());
Class<? extends IManagerIpListParser> ipListParserClass;
ipListParserClass = (Class<? extends
IManagerIpListParser>) Class
.forName(ipListParserType);
instance.ipListParser =
ipListParserClass.getDeclaredConstructor().newInstance();
- //
+
SecureRandom random = new
SecureRandom(String.valueOf(System.currentTimeMillis()).getBytes());
instance.managerIpListIndex.set(random.nextInt());
- //
+
instance.httpClient = constructHttpClient();
- //
+
instance.reload();
instance.setReloadTimer();
isInit = true;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 55f4e74ed..32dbfbb46 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -98,6 +98,7 @@ public class ConfigConstants {
public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name";
public static final String DEFAULT_PROXY_CLUSTER_NAME = "DataProxy";
public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
+ public static final String PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag";
public static final String PROXY_CLUSTER_INCHARGES =
"proxy.cluster.inCharges";
public static final String CONFIG_CHECK_INTERVAL = "configCheckInterval";
public static final String SOURCE_NO_TOPIC_ACCEPT =
"source.topic.notfound.accept";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index 208f46bab..5ae3001e3 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -144,6 +144,7 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
ConfigConstants.PROXY_CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
heartbeatMsg.setInCharges(commonProperties.getOrDefault(
ConfigConstants.PROXY_CLUSTER_INCHARGES,
DEFAULT_CLUSTER_INCHARGES));
+
heartbeatMsg.setExtTag(commonProperties.get(ConfigConstants.PROXY_CLUSTER_EXT_TAG));
Map<String, String> groupIdMappings =
configManager.getGroupIdMappingProperties();
Map<String, Map<String, String>> streamIdMappings =
configManager.getStreamIdMappingProperties();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
index bc85b6097..5c221f892 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.sink.mq;
+import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,6 +141,9 @@ public class MessageQueueZoneProducer {
}
}
this.clusterList = newClusterList;
+ if (!ConfigManager.getInstance().isMqClusterReady()) {
+ ConfigManager.getInstance().updMqClusterStatus(true);
+ }
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 23333a0ac..49eccce4f 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.sink.mq.pulsar;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
@@ -27,6 +28,7 @@ import
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
import org.apache.inlong.dataproxy.sink.mq.OrderBatchPackProfileV0;
import org.apache.inlong.dataproxy.sink.mq.SimpleBatchPackProfileV0;
import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -113,9 +115,12 @@ public class PulsarHandler implements MessageQueueHandler {
String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
String authentication = config.getParams().get(KEY_AUTHENTICATION);
Context context = sinkContext.getProducerContext();
- this.client = PulsarClient.builder()
+ ClientBuilder builder = PulsarClient.builder();
+ if (StringUtils.isNotEmpty(authentication)) {
+
builder.authentication(AuthenticationFactory.token(authentication));
+ }
+ this.client = builder
.serviceUrl(serviceUrl)
-
.authentication(AuthenticationFactory.token(authentication))
.ioThreads(context.getInteger(KEY_IOTHREADS, 1))
.memoryLimit(context.getLong(KEY_MEMORYLIMIT,
1073741824L), SizeUnit.BYTES)
.connectionsPerBroker(context.getInteger(KEY_CONNECTIONSPERBROKER, 10))
@@ -188,7 +193,7 @@ public class PulsarHandler implements MessageQueueHandler {
return false;
}
// topic
- String producerTopic = this.getProducerTopic(baseTopic);
+ String producerTopic = this.getProducerTopic(baseTopic, idConfig);
if (producerTopic == null) {
sinkContext.addSendResultMetric(event, event.getUid(), false,
0);
sinkContext.getDispatchQueue().release(event.getSize());
@@ -243,11 +248,15 @@ public class PulsarHandler implements MessageQueueHandler
{
/**
* getProducerTopic
*/
- private String getProducerTopic(String baseTopic) {
+ private String getProducerTopic(String baseTopic, IdTopicConfig config) {
StringBuilder builder = new StringBuilder();
if (tenant != null) {
builder.append(tenant).append("/");
}
+ String namespace = this.namespace;
+ if (namespace == null) {
+ namespace = config.getParams().get(PulsarHandler.KEY_NAMESPACE);
+ }
if (namespace != null) {
builder.append(namespace).append("/");
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 442e70699..31c0b4ee8 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -40,7 +40,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.event.EventBuilder;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.msg.AttributeConstants;
@@ -57,6 +56,7 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.apache.inlong.dataproxy.utils.MessageUtils;
+import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -556,7 +556,9 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
strBuff.delete(0, strBuff.length());
}
final byte[] data = inLongMsg.buildArray();
- Event event = EventBuilder.withBody(data, headers);
+ Event event = new ProxyEvent(groupId, streamIdEntry.getKey(),
data,
+ Long.parseLong(strDataTime), strRemoteIP);
+ event.getHeaders().putAll(headers);
inLongMsg.reset();
Pair<Boolean, String> evenProcType =
MessageUtils.getEventProcType(syncSend, proxySend);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java
index 9f5cb34bc..22daf77d0 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java
@@ -26,6 +26,9 @@ public class ConfStringUtils {
if (ip == null || ip.trim().isEmpty()) {
return false;
}
+ if (ip.equals("localhost")) {
+ ip = "127.0.0.1";
+ }
boolean b = false;
ip = ip.trim();
if (ip.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 8401c76a3..cc536f038 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -220,6 +220,7 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
final String clusterName = componentHeartbeat.getClusterName();
final String type = componentHeartbeat.getComponentType();
final String clusterTag = componentHeartbeat.getClusterTag();
+ final String extTag = componentHeartbeat.getExtTag();
Preconditions.checkNotNull(clusterTag, "cluster tag cannot be null");
Preconditions.checkNotNull(type, "cluster type cannot be null");
Preconditions.checkNotNull(clusterName, "cluster name cannot be null");
@@ -234,6 +235,7 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
cluster.setName(clusterName);
cluster.setType(type);
cluster.setClusterTags(clusterTag);
+ cluster.setExtTag(extTag);
String inCharges = componentHeartbeat.getInCharges();
if (StringUtils.isBlank(inCharges)) {
inCharges = InlongConstants.ADMIN_USER;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index adaa7c1f3..2a62f81f7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -262,6 +262,9 @@ public class DataProxyConfigRepository implements
IRepository {
// cache
String clusterTag = proxyObj.getSetName();
String extTag = proxyObj.getZone();
+ if (StringUtils.isEmpty(extTag)) {
+ continue;
+ }
Map<String, List<CacheCluster>> cacheClusterZoneMap =
cacheClusterMap.get(clusterTag);
if (cacheClusterZoneMap != null) {
Map<String, String> subTagMap =
tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));