This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d3abcf6f25 [INLONG-9741][Agent] Update Kafka source for Agent (#9749)
d3abcf6f25 is described below
commit d3abcf6f25677fc1007c04cff4a1f2f445023d9c
Author: haifxu <[email protected]>
AuthorDate: Fri Mar 1 15:53:16 2024 +0800
[INLONG-9741][Agent] Update Kafka source for Agent (#9749)
---
.../inlong/agent/constant/TaskConstants.java | 11 +-
.../org/apache/inlong/agent/pojo/KafkaJob.java | 2 +
.../apache/inlong/agent/pojo/TaskProfileDto.java | 4 +-
.../agent/core/instance/InstanceManager.java | 3 +
.../apache/inlong/agent/core/task/TaskManager.java | 2 +
inlong-agent/agent-docker/agent-docker.sh | 2 +-
.../agent/plugin/instance/KafkaInstance.java | 193 +++++++++++
.../inlong/agent/plugin/sources/KafkaSource.java | 358 ++++++++++++++++-----
.../agent/plugin/sources/reader/KafkaReader.java | 292 -----------------
.../apache/inlong/agent/plugin/task/KafkaTask.java | 168 ++++++++++
10 files changed, 656 insertions(+), 379 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 0498358d2d..b1fea01366 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -34,6 +34,7 @@ public class TaskConstants extends CommonConstants {
public static final String JOB_UUID = "job.uuid";
public static final String TASK_GROUP_ID = "task.groupId";
public static final String TASK_STREAM_ID = "task.streamId";
+ public static final String RESTORE_FROM_DB = "task.restoreFromDB";
public static final String TASK_SOURCE = "task.source";
public static final String JOB_SOURCE_TYPE = "job.sourceType";
@@ -107,14 +108,14 @@ public class TaskConstants extends CommonConstants {
public static final String JOB_DATABASE_PORT = "job.binlogJob.port";
// Kafka job
- public static final String JOB_KAFKA_TOPIC = "job.kafkaJob.topic";
- public static final String JOB_KAFKA_BOOTSTRAP_SERVERS =
"job.kafkaJob.bootstrap.servers";
- public static final String JOB_KAFKA_GROUP_ID = "job.kafkaJob.group.id";
+ public static final String TASK_KAFKA_TOPIC = "task.kafkaJob.topic";
+ public static final String TASK_KAFKA_BOOTSTRAP_SERVERS =
"task.kafkaJob.bootstrap.servers";
+ public static final String TASK_KAFKA_GROUP_ID = "task.kafkaJob.group.id";
public static final String JOB_KAFKA_RECORD_SPEED_LIMIT =
"job.kafkaJob.recordSpeed.limit";
public static final String JOB_KAFKA_BYTE_SPEED_LIMIT =
"job.kafkaJob.byteSpeed.limit";
- public static final String JOB_KAFKA_OFFSET =
"job.kafkaJob.partition.offset";
+ public static final String TASK_KAFKA_OFFSET =
"task.kafkaJob.partition.offset";
public static final String JOB_KAFKA_READ_TIMEOUT =
"job.kafkaJob.read.timeout";
- public static final String JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET =
"job.kafkaJob.autoOffsetReset";
+ public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET =
"task.kafkaJob.autoOffsetReset";
public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts";
public static final String JOB_MONGO_USER = "job.mongoJob.user";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/KafkaJob.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/KafkaJob.java
index db792fc202..c5ea65596e 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/KafkaJob.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/KafkaJob.java
@@ -30,6 +30,7 @@ public class KafkaJob {
private RecordSpeed recordSpeed;
private ByteSpeed byteSpeed;
private String autoOffsetReset;
+ private String partitionOffsets;
@Data
public static class Group {
@@ -70,5 +71,6 @@ public class KafkaJob {
private String recordSpeedLimit;
private String byteSpeedLimit;
private String autoOffsetReset;
+ private String partitionOffsets;
}
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 5c02954f5d..d080ee734c 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -38,6 +38,7 @@ import static
org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_D
public class TaskProfileDto {
public static final String DEFAULT_FILE_TASK =
"org.apache.inlong.agent.plugin.task.file.LogFileTask";
+ public static final String DEFAULT_KAFKA_TASK =
"org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_CHANNEL =
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK =
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -190,7 +191,7 @@ public class TaskProfileDto {
bootstrap.setServers(kafkaJobTaskConfig.getBootstrapServers());
kafkaJob.setBootstrap(bootstrap);
KafkaJob.Partition partition = new KafkaJob.Partition();
- partition.setOffset(dataConfigs.getSnapshot());
+ partition.setOffset(kafkaJobTaskConfig.getPartitionOffsets());
kafkaJob.setPartition(partition);
KafkaJob.Group group = new KafkaJob.Group();
group.setId(kafkaJobTaskConfig.getGroupId());
@@ -449,6 +450,7 @@ public class TaskProfileDto {
profileDto.setTask(task);
break;
case KAFKA:
+ task.setTaskClass(DEFAULT_KAFKA_TASK);
KafkaJob kafkaJob = getKafkaJob(dataConfig);
task.setKafkaJob(kafkaJob);
task.setSource(KAFKA_SOURCE);
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index e980354fe2..2c9bd721d8 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -47,6 +47,8 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+
/**
* handle the instance created by task, including add, delete, update etc.
* the instance info is store in both db and memory.
@@ -339,6 +341,7 @@ public class InstanceManager extends AbstractDaemon {
if (state == InstanceStateEnum.DEFAULT) {
LOGGER.info("instance restoreFromDb addToMem state {} taskId
{} instanceId {}", state, taskId,
profile.getInstanceId());
+ profile.setBoolean(RESTORE_FROM_DB, true);
addToMemory(profile);
} else {
LOGGER.info("instance restoreFromDb ignore state {} taskId {}
instanceId {}", state, taskId,
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 12fab4d8c9..e3815f725e 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
/**
@@ -421,6 +422,7 @@ public class TaskManager extends AbstractDaemon {
taskProfileList.forEach((profile) -> {
if (profile.getState() == TaskStateEnum.RUNNING) {
LOGGER.info("restore from db taskId {}", profile.getTaskId());
+ profile.setBoolean(RESTORE_FROM_DB, true);
addToMemory(profile);
}
});
diff --git a/inlong-agent/agent-docker/agent-docker.sh
b/inlong-agent/agent-docker/agent-docker.sh
index f6c9960cbb..06f27bed2d 100644
--- a/inlong-agent/agent-docker/agent-docker.sh
+++ b/inlong-agent/agent-docker/agent-docker.sh
@@ -23,7 +23,7 @@ local_ip=$(ifconfig $ETH_NETWORK | grep "inet" | grep -v
"inet6" | awk '{print $
sed -i "s/agent.local.ip=.*$/agent.local.ip=$local_ip/g"
"${file_path}/conf/agent.properties"
sed -i
"s/agent.fetcher.interval=.*$/agent.fetcher.interval=$AGENT_FETCH_INTERVAL/g"
"${file_path}/conf/agent.properties"
sed -i
"s/agent.heartbeat.interval=.*$/agent.heartbeat.interval=$AGENT_HEARTBEAT_INTERVAL/g"
"${file_path}/conf/agent.properties"
-sed -i
"s/agent.manager.addr=.*$/agent.manager.addr=http://$MANAGER_OPENAPI_IP:$MANAGER_OPENAPI_PORT/g"
"${file_path}/conf/agent.properties"
+sed -i
"s/agent.manager.addr=.*$/agent.manager.addr=http:\/\/$MANAGER_OPENAPI_IP:$MANAGER_OPENAPI_PORT/g"
"${file_path}/conf/agent.properties"
sed -i "s/audit.enable=.*$/audit.enable=$AUDIT_ENABLE/g"
"${file_path}/conf/agent.properties"
sed -i "s/audit.proxys=.*$/audit.proxys=$AUDIT_PROXY_URL/g"
"${file_path}/conf/agent.properties"
sed -i "s/agent.cluster.tag=.*$/agent.cluster.tag=$CLUSTER_TAG/g"
"${file_path}/conf/agent.properties"
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java
new file mode 100644
index 0000000000..245ab36d02
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Instance;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.file.Sink;
+import org.apache.inlong.agent.plugin.file.Source;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.inlong.agent.plugin.instance.FileInstance.HEARTBEAT_CHECK_GAP;
+
+public class KafkaInstance extends Instance {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaInstance.class);
+ private Source source;
+ private Sink sink;
+ private InstanceProfile profile;
+ private InstanceManager instanceManager;
+ private volatile boolean running = false;
+ private volatile boolean inited = false;
+ private int checkFinishCount = 0;
+
+ public static final int CORE_THREAD_SLEEP_TIME = 1;
+ private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
+ private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
+ private final int WRITE_FAILED_WAIT_TIME_MS = 10;
+ private int heartBreakCheckCount = 0;
+ private long heartBeatStartTime = AgentUtils.getCurrentTime();
+
+ @Override
+ public boolean init(Object srcManager, InstanceProfile srcProfile) {
+ try {
+ instanceManager = (InstanceManager) srcManager;
+ profile = srcProfile;
+ profile.set(TaskConstants.INODE_INFO, "");
+ LOGGER.info("task id: {} submit new instance {} profile detail
{}.", profile.getTaskId(),
+ profile.getInstanceId(), profile.toJsonStr());
+ source = (Source)
Class.forName(profile.getSourceClass()).newInstance();
+ source.init(profile);
+ sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
+ sink.init(profile);
+ inited = true;
+ return true;
+ } catch (Throwable e) {
+ handleSourceDeleted();
+ doChangeState(State.FATAL);
+ LOGGER.error("init instance {} for task {} failed",
profile.getInstanceId(), profile.getInstanceId(), e);
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+ return false;
+ }
+ }
+
+ @Override
+ public void destroy() {
+ if (!inited) {
+ return;
+ }
+ doChangeState(State.SUCCEEDED);
+ while (running) {
+ AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
+ }
+ this.source.destroy();
+ this.sink.destroy();
+ }
+
+ @Override
+ public InstanceProfile getProfile() {
+ return profile;
+ }
+
+ @Override
+ public String getTaskId() {
+ return profile.getTaskId();
+ }
+
+ @Override
+ public String getInstanceId() {
+ return profile.getInstanceId();
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("kafka-instance-core-" + getTaskId() +
"-" + getInstanceId());
+ running = true;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error: ", e);
+ }
+ running = false;
+ }
+
+ private void doRun() {
+ while (!isFinished()) {
+ if (!source.sourceExist()) {
+ handleSourceDeleted();
+ break;
+ }
+ Message msg = source.read();
+ if (msg == null) {
+ if (source.sourceFinish() && sink.sinkFinish()) {
+ checkFinishCount++;
+ if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
+ handleReadEnd();
+ break;
+ }
+ } else {
+ checkFinishCount = 0;
+ }
+ heartbeatStatic();
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ } else {
+ boolean suc = false;
+ while (!isFinished() && !suc) {
+ suc = sink.write(msg);
+ if (!suc) {
+ heartbeatStatic();
+ AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
+ }
+ }
+ heartBreakCheckCount++;
+ if (heartBreakCheckCount > HEARTBEAT_CHECK_GAP) {
+ heartbeatStatic();
+ }
+ }
+ }
+ }
+
+ private void heartbeatStatic() {
+ String inlongGroupId = profile.getInlongGroupId();
+ String inlongStreamId = profile.getInlongStreamId();
+ if (AgentUtils.getCurrentTime() - heartBeatStartTime >
TimeUnit.SECONDS.toMillis(1)) {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT,
inlongGroupId, inlongStreamId,
+ AgentUtils.getCurrentTime(), 1, 1);
+ heartBreakCheckCount = 0;
+ heartBeatStartTime = AgentUtils.getCurrentTime();
+ }
+ }
+
+ private void handleSourceDeleted() {
+ profile.setState(InstanceStateEnum.DELETE);
+ profile.setModifyTime(AgentUtils.getCurrentTime());
+ InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
+ while (!isFinished() && !instanceManager.submitAction(action)) {
+ LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
+ AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+ }
+ }
+
+ private void handleReadEnd() {
+ InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
+ while (!isFinished() && !instanceManager.submitAction(action)) {
+ LOGGER.error("instance manager action queue is full: taskId {}",
+ instanceManager.getTaskId());
+ AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+ }
+ }
+
+ @Override
+ public void addCallbacks() {
+
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index 06aa31db8c..45fd00f75a 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -17,140 +17,338 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.except.FileException;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.KafkaReader;
+import org.apache.inlong.agent.utils.AgentUtils;
-import com.google.gson.Gson;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_LINE_FILTER;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_BOOTSTRAP_SERVERS;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_GROUP_ID;
-import static org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_OFFSET;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
-import static org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_TOPIC;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_LINE_FILTER_PATTERN;
import static
org.apache.inlong.agent.constant.TaskConstants.JOB_OFFSET_DELIMITER;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_ID;
+import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_BOOTSTRAP_SERVERS;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_OFFSET;
/**
* kafka source, split kafka source job into multi readers
*/
public class KafkaSource extends AbstractSource {
- public static final String JOB_KAFKA_AUTO_RESETE = "auto.offset.reset";
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ private class SourceData {
+
+ private byte[] data;
+ private Long offset;
+ }
+
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaSource.class);
- private static final String JOB_KAFKAJOB_PARAM_PREFIX = "job.kafkaJob.";
- private static final String JOB_KAFKAJOB_WAIT_TIMEOUT =
"job.kafkajob.wait.timeout";
- private static final String KAFKA_COMMIT_AUTO = "enable.auto.commit";
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("kafka-source"));
+ private BlockingQueue<KafkaSource.SourceData> queue;
+ public InstanceProfile profile;
+ private int maxPackSize;
+ private String taskId;
+ private String instanceId;
+ private String topic;
+ private Properties props = new Properties();
+ private String allPartitionOffsets;
+ Map<Integer, Long> partitionOffsets = new HashMap<>();
+ private volatile boolean running = false;
+ private volatile boolean runnable = true;
+ private volatile AtomicLong emptyCount = new AtomicLong(0);
+
+ private final Integer CACHE_QUEUE_SIZE = 100000;
+ private final Integer READ_WAIT_TIMEOUT_MS = 10;
+ private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
+ private final Integer BATCH_TOTAL_LEN = 1024 * 1024;
+
private static final String KAFKA_DESERIALIZER_METHOD =
"org.apache.kafka.common.serialization.ByteArrayDeserializer";
- private static final String KAFKA_KEY_DESERIALIZER = "key.deserializer";
- private static final String KAFKA_VALUE_DESERIALIZER =
"value.deserializer";
private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms";
- private static final Gson gson = new Gson();
- private static AtomicLong metricsIndex = new AtomicLong(0);
+ private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
+ private boolean isRealTime = false;
+ private boolean isRestoreFromDB = false;
public KafkaSource() {
}
@Override
- public List<Reader> split(TaskProfile conf) {
- List<Reader> result = new ArrayList<>();
- String filterPattern = conf.get(JOB_LINE_FILTER_PATTERN,
DEFAULT_JOB_LINE_FILTER);
-
- Properties props = new Properties();
- Map<String, String> map = gson.fromJson(conf.toJsonStr(), Map.class);
-
props.put(JOB_KAFKA_BOOTSTRAP_SERVERS.replace(JOB_KAFKAJOB_PARAM_PREFIX,
StringUtils.EMPTY),
- map.get(JOB_KAFKA_BOOTSTRAP_SERVERS));
-
- props.put(KAFKA_KEY_DESERIALIZER, KAFKA_DESERIALIZER_METHOD);
- props.put(KAFKA_VALUE_DESERIALIZER, KAFKA_DESERIALIZER_METHOD);
- // set offset
- props.put(KAFKA_COMMIT_AUTO, false);
- if
(ObjectUtils.isNotEmpty(map.get(JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET))) {
- props.put(JOB_KAFKA_AUTO_RESETE,
map.get(JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET));
- }
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(conf.get(JOB_KAFKA_TOPIC));
- String allPartitionOffsets = map.get(JOB_KAFKA_OFFSET);
- Long offset = null;
- String[] partitionOffsets = null;
- if (StringUtils.isNotBlank(allPartitionOffsets)) {
- // example:0#110_1#666_2#222
- partitionOffsets = allPartitionOffsets.split(JOB_OFFSET_DELIMITER);
+ public void init(InstanceProfile profile) {
+ try {
+ LOGGER.info("KafkaSource init: {}", profile.toJsonStr());
+ this.profile = profile;
+ super.init(profile);
+ String cycleUnit = profile.get(TASK_CYCLE_UNIT);
+ if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+ isRealTime = true;
+ cycleUnit = CycleUnitType.HOUR;
+ }
+ queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+ maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+ taskId = profile.getTaskId();
+ instanceId = profile.getInstanceId();
+ topic = profile.getInstanceId();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
profile.get(TASK_KAFKA_BOOTSTRAP_SERVERS));
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, taskId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
KAFKA_DESERIALIZER_METHOD);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KAFKA_DESERIALIZER_METHOD);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
profile.get(TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET));
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ allPartitionOffsets = profile.get(TASK_KAFKA_OFFSET);
+ isRestoreFromDB = profile.getBoolean(RESTORE_FROM_DB, false);
+ if (!isRestoreFromDB &&
StringUtils.isNotBlank(allPartitionOffsets)) {
+ // example:0#110_1#666_2#222
+ String[] offsets =
allPartitionOffsets.split(JOB_OFFSET_DELIMITER);
+ for (String offset : offsets) {
+
partitionOffsets.put(Integer.valueOf(offset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[0]),
+
Long.valueOf(offset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[1]));
+ }
+ }
+
+ EXECUTOR_SERVICE.execute(run());
+ } catch (Exception ex) {
+ stopRunning();
+ throw new FileException("error init stream for " + topic, ex);
}
- // set consumer session timeout
- props.put(KAFKA_SESSION_TIMEOUT, 30000);
- // spilt reader reduce to partition
- if (null != partitionInfoList) {
- for (PartitionInfo partitionInfo : partitionInfoList) {
-
props.put(JOB_KAFKA_GROUP_ID.replace(JOB_KAFKAJOB_PARAM_PREFIX,
StringUtils.EMPTY),
- map.getOrDefault(JOB_KAFKA_GROUP_ID,
- map.get(TASK_ID) + JOB_OFFSET_DELIMITER
- + "group" +
partitionInfo.partition()));
- KafkaConsumer<String, byte[]> partitonConsumer = new
KafkaConsumer<>(props);
- partitonConsumer.assign(Collections.singletonList(
- new TopicPartition(partitionInfo.topic(),
partitionInfo.partition())));
- // if get offset,consume from offset; if not,consume from 0
- if (partitionOffsets != null && partitionOffsets.length > 0) {
- for (String partitionOffset : partitionOffsets) {
- if
(partitionOffset.contains(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)
- &&
partitionOffset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[0]
-
.equals(String.valueOf(partitionInfo.partition()))) {
- offset =
Long.valueOf(partitionOffset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[1]);
+ }
+
+ private Runnable run() {
+ return () -> {
+ AgentThreadFactory.nameThread("kafka-source-" + taskId + "-" +
instanceId);
+ running = true;
+ try {
+ List<PartitionInfo> partitionInfoList;
+ try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props)) {
+ partitionInfoList = consumer.partitionsFor(topic);
+ }
+
+ props.put(KAFKA_SESSION_TIMEOUT, 30000);
+
+ try (KafkaConsumer<String, byte[]> kafkaConsumer = new
KafkaConsumer<>(props)) {
+ if (null != partitionInfoList) {
+ List<TopicPartition> topicPartitions = new
ArrayList<>();
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ TopicPartition topicPartition = new
TopicPartition(partitionInfo.topic(),
+ partitionInfo.partition());
+ topicPartitions.add(topicPartition);
+ }
+ kafkaConsumer.assign(topicPartitions);
+
+ if (!isRestoreFromDB &&
StringUtils.isNotBlank(allPartitionOffsets)) {
+ for (TopicPartition topicPartition :
topicPartitions) {
+ Long offset =
partitionOffsets.get(topicPartition.partition());
+ if (ObjectUtils.isNotEmpty(offset)) {
+ kafkaConsumer.seek(topicPartition, offset);
+ }
+ }
+ } else {
+ LOGGER.info("Skip to seek offset");
}
}
+ doRun(kafkaConsumer);
}
- LOGGER.info("kafka topic partition offset:{}", offset);
- if (offset != null) {
- // if offset not null,then consume from the offset
- partitonConsumer.seek(new
TopicPartition(partitionInfo.topic(), partitionInfo.partition()), offset);
+ } catch (Throwable e) {
+ LOGGER.error("do run error maybe topic is configured
incorrectly: ", e);
+ }
+ running = false;
+ };
+ }
+
+ private void doRun(KafkaConsumer<String, byte[]> kafkaConsumer) {
+ long lastPrintTime = 0;
+ while (isRunnable()) {
+ boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
+ if (!suc) {
+ break;
+ }
+ ConsumerRecords<String, byte[]> records =
kafkaConsumer.poll(Duration.ofMillis(1000));
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
+ if (records.isEmpty()) {
+ if (queue.isEmpty()) {
+ emptyCount.incrementAndGet();
+ } else {
+ emptyCount.set(0);
}
- KafkaReader<String, byte[]> kafkaReader = new
KafkaReader<>(partitonConsumer, map);
- addValidator(filterPattern, kafkaReader);
- result.add(kafkaReader);
+ AgentUtils.silenceSleepInSeconds(1);
+ continue;
}
- sourceMetric.sourceSuccessCount.incrementAndGet();
- } else {
- sourceMetric.sourceFailCount.incrementAndGet();
+ emptyCount.set(0);
+ long offset = 0L;
+ for (ConsumerRecord<String, byte[]> record : records) {
+ LOGGER.info("record: {}", record.value());
+ SourceData sourceData = new SourceData(record.value(),
record.offset());
+ boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, record.value().length);
+ if (!suc4Queue) {
+ break;
+ }
+ putIntoQueue(sourceData);
+ offset = record.offset();
+ }
+ kafkaConsumer.commitSync();
+
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
+ lastPrintTime = AgentUtils.getCurrentTime();
+ LOGGER.info("kafka topic is {}, offset is {}", topic, offset);
+ }
+ }
+ }
+
+ private boolean waitForPermit(String permitName, int permitLen) {
+ boolean suc = false;
+ while (!suc) {
+ suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
+ if (!suc) {
+ MemoryManager.getInstance().printDetail(permitName, "log file
source");
+ if (!isRunnable()) {
+ return false;
+ }
+ AgentUtils.silenceSleepInSeconds(1);
+ }
+ }
+ return true;
+ }
+
+ private void putIntoQueue(SourceData sourceData) {
+ if (sourceData == null) {
+ return;
+ }
+ try {
+ boolean offerSuc = false;
+ if (queue.remainingCapacity() > 0) {
+ while (isRunnable() && !offerSuc) {
+ offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
+ }
+ }
+
+ if (!offerSuc) {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
+ }
+ LOGGER.debug("Read {} from kafka topic {}", sourceData.getData(),
topic);
+ } catch (InterruptedException e) {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
+ LOGGER.error("fetchData offer failed {}", e.getMessage());
}
- return result;
+ }
+
+ public boolean isRunnable() {
+ return runnable;
+ }
+
+ /**
+ * Stop running threads.
+ */
+ public void stopRunning() {
+ runnable = false;
}
@Override
- public Message read() {
+ public List<Reader> split(TaskProfile conf) {
return null;
}
@Override
- public boolean sourceFinish() {
- return false;
+ public Message read() {
+ KafkaSource.SourceData sourceData = null;
+ try {
+ sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("poll {} data get interrupted.", topic, e);
+ }
+ if (sourceData == null) {
+ return null;
+ }
+ MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
+ Message finalMsg = createMessage(sourceData);
+ return finalMsg;
+ }
+
+ private Message createMessage(SourceData sourceData) {
+ String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
+ Map<String, String> header = new HashMap<>();
+ header.put(PROXY_KEY_DATA, proxyPartitionKey);
+ header.put(OFFSET, sourceData.offset.toString());
+ header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
+
+ long auditTime = 0;
+ if (isRealTime) {
+ auditTime = AgentUtils.getCurrentTime();
+ } else {
+ auditTime = profile.getSinkDataTime();
+ }
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
+ auditTime, 1, sourceData.data.length);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME,
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
+ AgentUtils.getCurrentTime(), 1, sourceData.data.length);
+ Message finalMsg = new DefaultMessage(sourceData.data, header);
+ if (finalMsg.getBody().length > maxPackSize) {
+ LOGGER.warn("message size is {}, greater than max pack size {},
drop it!",
+ finalMsg.getBody().length, maxPackSize);
+ return null;
+ }
+ return finalMsg;
}
@Override
- public boolean sourceExist() {
- return false;
+ public boolean sourceFinish() {
+ if (isRealTime) {
+ return false;
+ }
+ return emptyCount.get() > EMPTY_CHECK_COUNT_AT_LEAST;
}
- private void addValidator(String filterPattern, KafkaReader kafkaReader) {
- kafkaReader.addPatternValidator(filterPattern);
+ @Override
+ public boolean sourceExist() {
+ return true;
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
deleted file mode 100644
index 6a6947aaad..0000000000
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sources.reader;
-
-import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.message.DefaultMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader;
-import org.apache.inlong.agent.plugin.validator.PatternValidator;
-import org.apache.inlong.agent.utils.AgentUtils;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_BYTE_SPEED_LIMIT;
-import static org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_OFFSET;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_RECORD_SPEED_LIMIT;
-import static org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_TOPIC;
-
-/**
- * read kafka data
- */
-public class KafkaReader<K, V> extends AbstractReader {
-
- public static final int NEVER_STOP_SIGN = -1;
- private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaReader.class);
- /* metric */
- private static final String KAFKA_READER_TAG_NAME = "AgentKafkaMetric";
- private static final String KAFKA_SOURCE_READ_RECORD_SPEED =
"job.kafkaJob.record.speed.limit";
- private static final String KAFKA_SOURCE_READ_BYTE_SPEED =
"job.kafkaJob.byte.speed.limit";
- private static final String KAFKA_SOURCE_READ_MIN_INTERVAL =
"kafka.min.interval.limit";
- private static final String JOB_KAFKAJOB_READ_TIMEOUT =
"job.kafkaJob.read.timeout";
- /* total readRecords */
- private static AtomicLong currentTotalReadRecords = new AtomicLong(0);
- private static AtomicLong lastTotalReadRecords = new AtomicLong(0);
- /* total readBytes */
- private static AtomicLong currentTotalReadBytes = new AtomicLong(0);
- private static AtomicLong lastTotalReadBytes = new AtomicLong(0);
- KafkaConsumer<K, V> consumer;
- long lastTimestamp;
- /* bps: records/s */
- long recordSpeed;
- /* tps: bytes/s */
- long byteSpeed;
- /* sleepTime */
- long flowControlInterval;
- private Iterator<ConsumerRecord<K, V>> iterator;
- private List<Validator> validators = new ArrayList<>();
- private long timeout;
- private long waitTimeout = 1000;
- private long lastTime = 0;
- private String inlongGroupId;
- private String inlongStreamId;
- private String snapshot;
- private boolean isFinished = false;
- private boolean destroyed = false;
- private String topic;
-
- /**
- * init attribute
- */
- public KafkaReader(KafkaConsumer<K, V> consumer, Map<String, String>
paraMap) {
- this.consumer = consumer;
- this.recordSpeed =
Long.parseLong(paraMap.getOrDefault(JOB_KAFKA_RECORD_SPEED_LIMIT, "10000"));
- this.byteSpeed =
Long.parseLong(paraMap.getOrDefault(JOB_KAFKA_BYTE_SPEED_LIMIT,
String.valueOf(1024 * 1024)));
- this.flowControlInterval =
Long.parseLong(paraMap.getOrDefault(KAFKA_SOURCE_READ_MIN_INTERVAL, "1000"));
- this.lastTimestamp = System.currentTimeMillis();
- this.topic = paraMap.get(JOB_KAFKA_TOPIC);
-
- LOGGER.info("KAFKA_SOURCE_READ_RECORD_SPEED = {}", this.recordSpeed);
- LOGGER.info("KAFKA_SOURCE_READ_BYTE_SPEED = {}", this.byteSpeed);
- }
-
- @Override
- public Message read() {
-
- if (iterator != null && iterator.hasNext()) {
- ConsumerRecord<K, V> record = iterator.next();
- // body
- byte[] recordValue = (byte[]) record.value();
- if (validateMessage(recordValue)) {
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
- inlongGroupId, inlongStreamId,
System.currentTimeMillis(), 1, recordValue.length);
- // header
- Map<String, String> headerMap = new HashMap<>();
- headerMap.put("record.offset",
String.valueOf(record.offset()));
- headerMap.put("record.key", String.valueOf(record.key()));
- LOGGER.debug(
- "partition:" + record.partition()
- + ", value:" + new String(recordValue) + ",
offset:" + record.offset());
- // control speed
- readerMetric.pluginReadSuccessCount.incrementAndGet();
- readerMetric.pluginReadCount.incrementAndGet();
- // commit succeed,then record current offset
- snapshot = record.partition() +
JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
- DefaultMessage message = new DefaultMessage(recordValue,
headerMap);
- recordReadLimit(1L, message.getBody().length);
- return message;
- }
- } else {
- // commit offset
- if (isSourceExist()) {
- consumer.commitAsync();
- }
- fetchData(5000);
- }
- AgentUtils.silenceSleepInMs(waitTimeout);
-
- return null;
- }
-
- @Override
- public boolean isFinished() {
- return isFinished;
- }
-
- @Override
- public String getReadSource() {
- Set<TopicPartition> assignment = consumer.assignment();
- // cousumer->topic->one partition
- Iterator<TopicPartition> iterator = assignment.iterator();
- while (iterator.hasNext()) {
- TopicPartition topicPartition = iterator.next();
- return topicPartition.topic() + "_" + topicPartition.partition();
- }
- return StringUtils.EMPTY;
- }
-
- @Override
- public void setReadTimeout(long millis) {
- timeout = millis;
- }
-
- @Override
- public void setWaitMillisecond(long millis) {
- waitTimeout = millis;
- }
-
- @Override
- public void init(InstanceProfile jobConf) {
- super.init(jobConf);
- // get offset from jobConf
- snapshot = jobConf.get(JOB_KAFKA_OFFSET, null);
- initReadTimeout(jobConf);
- // fetch data
- fetchData(5000);
- }
-
- @Override
- public void destroy() {
- synchronized (this) {
- if (!destroyed) {
- consumer.close();
- destroyed = true;
- }
- }
- }
-
- private void initReadTimeout(InstanceProfile jobConf) {
- int waitTime = jobConf.getInt(JOB_KAFKAJOB_READ_TIMEOUT,
NEVER_STOP_SIGN);
- if (waitTime == NEVER_STOP_SIGN) {
- timeout = NEVER_STOP_SIGN;
- } else {
- timeout = TimeUnit.MINUTES.toMillis(waitTime);
- }
- }
-
- private boolean validateMessage(byte[] message) {
- if (validators.isEmpty()) {
- return true;
- }
- return validators.stream().allMatch(v -> v.validate(new
String(message)));
- }
-
- /**
- * add specified pattern to validators
- *
- * @param pattern specified pattern
- */
- public void addPatternValidator(String pattern) {
- if (pattern.isEmpty()) {
- return;
- }
- validators.add(new PatternValidator(pattern));
- }
-
- @Override
- public String getSnapshot() {
- return snapshot;
- }
-
- @Override
- public void finishRead() {
- isFinished = true;
- }
-
- @Override
- public boolean isSourceExist() {
- return !CollectionUtils.isEmpty(consumer.partitionsFor(topic));
- }
-
- private boolean fetchData(long fetchDataTimeout) {
- // cosume data
- ConsumerRecords<K, V> records =
consumer.poll(Duration.ofMillis(fetchDataTimeout));
- iterator = records.iterator();
- return iterator != null ? true : false;
- }
-
- private void recordReadLimit(long recordSize, long byteSize) {
- boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
- boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
- if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
- return;
- }
- currentTotalReadRecords.accumulateAndGet(recordSize, (x, y) -> x + y);
- currentTotalReadBytes.accumulateAndGet(byteSize, (x, y) -> x + y);
-
- long nowTimestamp = System.currentTimeMillis();
- long interval = nowTimestamp - lastTimestamp;
-
- if (interval - this.flowControlInterval >= 0) {
- long byteLimitSleepTime = 0;
- long recordLimitSleepTime = 0;
- if (isChannelByteSpeedLimit) {
- long currentByteSpeed = (currentTotalReadBytes.get() -
lastTotalReadBytes.get()) * 1000 / interval;
- LOGGER.info("current produce byte speed bytes/s:{}",
currentByteSpeed);
- if (currentByteSpeed > this.byteSpeed) {
- // calculate byteLimitSleepTime
- byteLimitSleepTime = currentByteSpeed * interval /
this.byteSpeed - interval;
- }
- }
-
- if (isChannelRecordSpeedLimit) {
- long currentRecordSpeed =
- (currentTotalReadRecords.get() -
lastTotalReadRecords.get()) * 1000 / interval;
- LOGGER.info("current read speed records/s:{}",
currentRecordSpeed);
- if (currentRecordSpeed > this.recordSpeed) {
- // calculate recordSleepTime reduce to recordLimit
- recordLimitSleepTime = currentRecordSpeed * interval /
this.recordSpeed - interval;
- }
- }
- // calculate sleep time
- long sleepTime = byteLimitSleepTime < recordLimitSleepTime
- ? recordLimitSleepTime
- : byteLimitSleepTime;
- if (sleepTime > 0) {
- LOGGER.info("sleep seconds:{}", sleepTime / 1000);
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- lastTimestamp = nowTimestamp;
- lastTotalReadRecords = currentTotalReadRecords;
- }
-}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
new file mode 100644
index 0000000000..78ad200db0
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_TOPIC;
+
+public class KafkaTask extends Task {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaTask.class);
+ public static final String DEFAULT_KAFKA_INSTANCE =
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
+ public static final int CORE_THREAD_SLEEP_TIME = 5000;
+ public static final int CORE_THREAD_PRINT_TIME = 10000;
+
+ private TaskProfile taskProfile;
+ private Db basicDb;
+ private TaskManager taskManager;
+ private InstanceManager instanceManager;
+ private long lastPrintTime = 0;
+ private boolean initOK = false;
+ private volatile boolean running = false;
+ private boolean isAdded = false;
+ private boolean isRestoreFromDB = false;
+
+ private String topic;
+
+ @Override
+ public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
+ taskManager = (TaskManager) srcManager;
+ commonInit(taskProfile, basicDb);
+ initOK = true;
+ }
+
+ private void commonInit(TaskProfile taskProfile, Db basicDb) {
+ LOGGER.info("kafka commonInit: {}", taskProfile.toJsonStr());
+ this.taskProfile = taskProfile;
+ this.basicDb = basicDb;
+ this.topic = taskProfile.get(TASK_KAFKA_TOPIC);
+ this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
+ instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
+ basicDb, taskManager.getTaskDb());
+ try {
+ instanceManager.start();
+ } catch (Exception e) {
+ LOGGER.error("start instance manager error: ", e);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ doChangeState(State.SUCCEEDED);
+ if (instanceManager != null) {
+ instanceManager.stop();
+ }
+ }
+
+ @Override
+ public TaskProfile getProfile() {
+ return taskProfile;
+ }
+
+ @Override
+ public String getTaskId() {
+ if (taskProfile == null) {
+ return "";
+ }
+ return taskProfile.getTaskId();
+ }
+
+ @Override
+ public boolean isProfileValid(TaskProfile profile) {
+ if (!profile.allRequiredKeyExist()) {
+ LOGGER.error("task profile needs all required key");
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void addCallbacks() {
+
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("kafka-task-core-" + getTaskId());
+ running = true;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error: ", e);
+ }
+ running = false;
+ }
+
+ private void doRun() {
+ while (!isFinished()) {
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
+ LOGGER.info("kafka task running! taskId {}", getTaskId());
+ lastPrintTime = AgentUtils.getCurrentTime();
+ }
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ if (!initOK) {
+ continue;
+ }
+
+ // Add instance profile to instance manager
+ addInstanceProfile();
+
+ String inlongGroupId = taskProfile.getInlongGroupId();
+ String inlongStreamId = taskProfile.getInlongStreamId();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
inlongGroupId, inlongStreamId,
+ AgentUtils.getCurrentTime(), 1, 1);
+ }
+ }
+
+ private void addInstanceProfile() {
+ if (isAdded) {
+ return;
+ }
+ String dataTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
+ CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
+ InstanceAction action = new InstanceAction(ActionType.ADD,
instanceProfile);
+ while (!isFinished() && !instanceManager.submitAction(action)) {
+ LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ }
+ this.isAdded = true;
+ }
+}