This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c4f820bf30 [INLONG-9225][Audit] Automatically create audit topic after
service startup (#9226)
c4f820bf30 is described below
commit c4f820bf3092726cb6905086d7f15570118bc1e7
Author: LiJie20190102 <[email protected]>
AuthorDate: Tue Nov 28 11:19:41 2023 +0800
[INLONG-9225][Audit] Automatically create audit topic after service startup
(#9226)
Co-authored-by: lijie0203 <[email protected]>
---
.../org/apache/inlong/audit/sink/KafkaSink.java | 65 ++++++++++++++++++++
.../inlong/audit/config/MessageQueueConfig.java | 6 ++
.../inlong/audit/service/consume/KafkaConsume.java | 70 +++++++++++++++++++++-
inlong-audit/conf/application.properties | 3 +
inlong-audit/conf/audit-proxy-kafka.conf | 6 ++
5 files changed, 148 insertions(+), 2 deletions(-)
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
index 82a3d7fb8b..dc2c7c154d 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
@@ -34,19 +34,30 @@ import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@@ -62,6 +73,10 @@ public class KafkaSink extends AbstractSink implements
Configurable {
private String kafkaServerUrl;
private static final String BOOTSTRAP_SERVER = "bootstrap_servers";
private static final String TOPIC = "topic";
+
+ private static final String TOPIC_REPLICATIONS = "topic_replications";
+
+ private static final String TOPIC_PARTITIONS = "topic_partitions";
private static final String RETRIES = "retries";
private static final String BATCH_SIZE = "batch_size";
private static final String LINGER_MS = "linger_ms";
@@ -81,6 +96,12 @@ public class KafkaSink extends AbstractSink implements
Configurable {
public Map<String, KafkaProducer<String, byte[]>> producerMap;
private SinkCounter sinkCounter;
private String topic;
+
+ private int topicReplications;
+ private int topicPartitions;
+ private static final int DEFAULT_TOPIC_REPLICATIONS = 2;
+ private static final int DEFAULT_TOPIC_PARTITIONS = 3;
+
private volatile boolean canSend = false;
private volatile boolean canTake = false;
private int threadNum;
@@ -227,6 +248,10 @@ public class KafkaSink extends AbstractSink implements
Configurable {
topic = context.getString(TOPIC);
Preconditions.checkState(StringUtils.isNotEmpty(topic), "No topic
specified");
+ // topic config
+ topicPartitions = context.getInteger(TOPIC_PARTITIONS,
DEFAULT_TOPIC_PARTITIONS);
+ topicReplications = context.getInteger(TOPIC_REPLICATIONS,
DEFAULT_TOPIC_REPLICATIONS);
+
producerMap = new HashMap<>();
logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS,
DEFAULT_LOG_EVERY_N_EVENTS);
@@ -274,6 +299,9 @@ public class KafkaSink extends AbstractSink implements
Configurable {
logger.error("topic is empty");
}
+ // create topic if need
+ createTopic();
+
if (producer == null) {
producer = new KafkaProducer<>(properties, new StringSerializer(),
new ByteArraySerializer());
}
@@ -282,6 +310,43 @@ public class KafkaSink extends AbstractSink implements
Configurable {
logger.info(getName() + " success create producer");
}
+ /**
+ * create topic if need
+ */
+ private void createTopic() {
+
+ try (AdminClient adminClient = AdminClient.create(properties)) {
+ ListTopicsResult topicList = adminClient.listTopics();
+ KafkaFuture<Set<String>> kafkaFuture = topicList.names();
+ Set<String> topicSet = kafkaFuture.get();
+
+ if (topicSet.contains(topic)) {
+ // not need
+ logger.info("The audit topic:{} already exists.", topic);
+ return;
+ }
+
+ DescribeClusterResult describeClusterResult =
adminClient.describeCluster();
+ Collection<Node> nodes = describeClusterResult.nodes().get();
+ if (nodes.isEmpty()) {
+ throw new IllegalArgumentException("kafka server not find");
+ }
+
+ int partition = Math.min(topicPartitions, nodes.size());
+ int factor = Math.min(topicReplications, nodes.size());
+
+ NewTopic needCreateTopic = new NewTopic(topic, partition, (short)
factor);
+
+ CreateTopicsResult createTopicsResult =
+
adminClient.createTopics(Collections.singletonList(needCreateTopic));
+ createTopicsResult.all().get();
+
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(
+ String.format("create audit topic:{} error with
config:%s", properties));
+ }
+ }
+
private KafkaProducer<String, byte[]> getProducer(String topic) {
if (!producerMap.containsKey(topic)) {
synchronized (this) {
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
index 05036ff552..2cfa4b1d34 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
@@ -72,6 +72,12 @@ public class MessageQueueConfig {
@Value("${audit.kafka.topic:}")
private String kafkaTopic;
+ @Value("${audit.kafka.topic.numPartitions:3}")
+ private String numPartitions;
+
+ @Value("${audit.kafka.topic.replicationFactor:2}")
+ private String replicationFactor;
+
@Value("${audit.kafka.consumer.name:}")
private String kafkaConsumerName;
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
index 421147ef73..cc0d8cac21 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
@@ -23,10 +23,17 @@ import org.apache.inlong.audit.service.InsertData;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
@@ -34,9 +41,12 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
public class KafkaConsume extends BaseConsume {
@@ -45,6 +55,9 @@ public class KafkaConsume extends BaseConsume {
private String serverUrl;
private String topic;
+ private static final int DEFAULT_NUM_PARTITIONS = 3;
+ private static final int DEFAULT_REPLICATION_FACTOR = 2;
+
/**
* Constructor
*
@@ -67,6 +80,9 @@ public class KafkaConsume extends BaseConsume {
Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getKafkaConsumerName()),
"no kafka consume name specified");
+ // create topic if need
+ createTopic();
+
initConsumer(mqConfig);
Thread thread = new Thread(new Fetcher(consumer, topic, isAutoCommit,
mqConfig.getFetchWaitMs()),
@@ -74,9 +90,60 @@ public class KafkaConsume extends BaseConsume {
thread.start();
}
+ /**
+ * create topic if need
+ */
+ private void createTopic() {
+ int numPartitions = DEFAULT_NUM_PARTITIONS;
+ if (StringUtils.isNotEmpty(mqConfig.getNumPartitions())) {
+ numPartitions = Integer.parseInt(mqConfig.getNumPartitions());
+ }
+
+ int replicationFactor = DEFAULT_REPLICATION_FACTOR;
+ if (StringUtils.isNotEmpty(mqConfig.getReplicationFactor())) {
+ replicationFactor =
Integer.parseInt(mqConfig.getReplicationFactor());
+ }
+
+ try (AdminClient adminClient =
AdminClient.create(getProperties(mqConfig))) {
+ ListTopicsResult topicList = adminClient.listTopics();
+ KafkaFuture<Set<String>> kafkaFuture = topicList.names();
+ Set<String> topicSet = kafkaFuture.get();
+
+ if (topicSet.contains(topic)) {
+ // not need
+ LOG.info("The audit topic:{} already exists.", topic);
+ return;
+ }
+
+ DescribeClusterResult describeClusterResult =
adminClient.describeCluster();
+ Collection<Node> nodes = describeClusterResult.nodes().get();
+ if (nodes.isEmpty()) {
+ throw new IllegalArgumentException("kafka server not find");
+ }
+
+ int partition = Math.min(numPartitions, nodes.size());
+ int factor = Math.min(replicationFactor, nodes.size());
+
+ NewTopic needCreateTopic = new NewTopic(topic, partition, (short)
factor);
+
+ CreateTopicsResult createTopicsResult =
+
adminClient.createTopics(Collections.singletonList(needCreateTopic));
+ createTopicsResult.all().get();
+
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(
+ String.format("create audit topic:%s error with
config:%s", topic, getProperties(mqConfig)), e);
+ }
+ }
+
protected void initConsumer(MessageQueueConfig mqConfig) {
LOG.info("init kafka consumer, topic:{}, serverUrl:{}", topic,
serverUrl);
+ Properties properties = getProperties(mqConfig);
+ consumer = new KafkaConsumer<>(properties);
+ consumer.subscribe(Collections.singleton(topic));
+ }
+ private Properties getProperties(MessageQueueConfig mqConfig) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,
mqConfig.getKafkaGroupId());
@@ -85,8 +152,7 @@ public class KafkaConsume extends BaseConsume {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
mqConfig.getAutoOffsetReset());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
- consumer = new KafkaConsumer<>(properties);
- consumer.subscribe(Collections.singleton(topic));
+ return properties;
}
public class Fetcher implements Runnable {
diff --git a/inlong-audit/conf/application.properties
b/inlong-audit/conf/application.properties
index 41fd4e1e28..c76942da90 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -64,6 +64,9 @@ audit.tube.consumer.group.name=inlong-audit-consumer
# kafka config
audit.kafka.topic=inlong-audit
+# create a topic if the topic does not exist.
+audit.kafka.topic.numPartitions=3
+audit.kafka.topic.replicationFactor=2
audit.kafka.consumer.name=inlong-audit-consumer
audit.kafka.group.id=audit-consumer-group
diff --git a/inlong-audit/conf/audit-proxy-kafka.conf
b/inlong-audit/conf/audit-proxy-kafka.conf
index df14a6e6d1..8e700c31bf 100644
--- a/inlong-audit/conf/audit-proxy-kafka.conf
+++ b/inlong-audit/conf/audit-proxy-kafka.conf
@@ -59,6 +59,9 @@ agent1.channels.ch-msg2.fsyncInterval = 10
agent1.sinks.kafka-sink-msg1.channel = ch-msg1
agent1.sinks.kafka-sink-msg1.type = org.apache.inlong.audit.sink.KafkaSink
agent1.sinks.kafka-sink-msg1.topic = inlong-audit
+# create a topic if the topic does not exist.
+agent1.sinks.kafka-sink-msg1.topic_replications = 2
+agent1.sinks.kafka-sink-msg1.topic_partitions = 3
agent1.sinks.kafka-sink-msg1.retries = 0
agent1.sinks.kafka-sink-msg1.batch_size = 16384
agent1.sinks.kafka-sink-msg1.linger_ms = 0
@@ -67,6 +70,9 @@ agent1.sinks.kafka-sink-msg1.buffer_memory = 33554432
agent1.sinks.kafka-sink-msg2.channel = ch-msg1
agent1.sinks.kafka-sink-msg2.type = org.apache.inlong.audit.sink.KafkaSink
agent1.sinks.kafka-sink-msg2.topic = inlong-audit
+# create a topic if the topic does not exist.
+agent1.sinks.kafka-sink-msg2.topic_replications = 2
+agent1.sinks.kafka-sink-msg2.topic_partitions = 3
agent1.sinks.kafka-sink-msg2.retries = 0
agent1.sinks.kafka-sink-msg2.batch_size = 16384
agent1.sinks.kafka-sink-msg2.linger_ms = 0