This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d3abcf6f25 [INLONG-9741][Agent] Update Kafka source for Agent (#9749)
d3abcf6f25 is described below

commit d3abcf6f25677fc1007c04cff4a1f2f445023d9c
Author: haifxu <[email protected]>
AuthorDate: Fri Mar 1 15:53:16 2024 +0800

    [INLONG-9741][Agent] Update Kafka source for Agent (#9749)
---
 .../inlong/agent/constant/TaskConstants.java       |  11 +-
 .../org/apache/inlong/agent/pojo/KafkaJob.java     |   2 +
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |   4 +-
 .../agent/core/instance/InstanceManager.java       |   3 +
 .../apache/inlong/agent/core/task/TaskManager.java |   2 +
 inlong-agent/agent-docker/agent-docker.sh          |   2 +-
 .../agent/plugin/instance/KafkaInstance.java       | 193 +++++++++++
 .../inlong/agent/plugin/sources/KafkaSource.java   | 358 ++++++++++++++++-----
 .../agent/plugin/sources/reader/KafkaReader.java   | 292 -----------------
 .../apache/inlong/agent/plugin/task/KafkaTask.java | 168 ++++++++++
 10 files changed, 656 insertions(+), 379 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 0498358d2d..b1fea01366 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -34,6 +34,7 @@ public class TaskConstants extends CommonConstants {
     public static final String JOB_UUID = "job.uuid";
     public static final String TASK_GROUP_ID = "task.groupId";
     public static final String TASK_STREAM_ID = "task.streamId";
+    public static final String RESTORE_FROM_DB = "task.restoreFromDB";
 
     public static final String TASK_SOURCE = "task.source";
     public static final String JOB_SOURCE_TYPE = "job.sourceType";
@@ -107,14 +108,14 @@ public class TaskConstants extends CommonConstants {
     public static final String JOB_DATABASE_PORT = "job.binlogJob.port";
 
     // Kafka job
-    public static final String JOB_KAFKA_TOPIC = "job.kafkaJob.topic";
-    public static final String JOB_KAFKA_BOOTSTRAP_SERVERS = 
"job.kafkaJob.bootstrap.servers";
-    public static final String JOB_KAFKA_GROUP_ID = "job.kafkaJob.group.id";
+    public static final String TASK_KAFKA_TOPIC = "task.kafkaJob.topic";
+    public static final String TASK_KAFKA_BOOTSTRAP_SERVERS = 
"task.kafkaJob.bootstrap.servers";
+    public static final String TASK_KAFKA_GROUP_ID = "task.kafkaJob.group.id";
     public static final String JOB_KAFKA_RECORD_SPEED_LIMIT = 
"job.kafkaJob.recordSpeed.limit";
     public static final String JOB_KAFKA_BYTE_SPEED_LIMIT = 
"job.kafkaJob.byteSpeed.limit";
-    public static final String JOB_KAFKA_OFFSET = 
"job.kafkaJob.partition.offset";
+    public static final String TASK_KAFKA_OFFSET = 
"task.kafkaJob.partition.offset";
     public static final String JOB_KAFKA_READ_TIMEOUT = 
"job.kafkaJob.read.timeout";
-    public static final String JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET = 
"job.kafkaJob.autoOffsetReset";
+    public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET = 
"task.kafkaJob.autoOffsetReset";
 
     public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts";
     public static final String JOB_MONGO_USER = "job.mongoJob.user";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/KafkaJob.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/KafkaJob.java
index db792fc202..c5ea65596e 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/KafkaJob.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/KafkaJob.java
@@ -30,6 +30,7 @@ public class KafkaJob {
     private RecordSpeed recordSpeed;
     private ByteSpeed byteSpeed;
     private String autoOffsetReset;
+    private String partitionOffsets;
 
     @Data
     public static class Group {
@@ -70,5 +71,6 @@ public class KafkaJob {
         private String recordSpeedLimit;
         private String byteSpeedLimit;
         private String autoOffsetReset;
+        private String partitionOffsets;
     }
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 5c02954f5d..d080ee734c 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -38,6 +38,7 @@ import static 
org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_D
 public class TaskProfileDto {
 
     public static final String DEFAULT_FILE_TASK = 
"org.apache.inlong.agent.plugin.task.file.LogFileTask";
+    public static final String DEFAULT_KAFKA_TASK = 
"org.apache.inlong.agent.plugin.task.KafkaTask";
     public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
     public static final String MANAGER_JOB = "MANAGER_JOB";
     public static final String DEFAULT_DATA_PROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -190,7 +191,7 @@ public class TaskProfileDto {
         bootstrap.setServers(kafkaJobTaskConfig.getBootstrapServers());
         kafkaJob.setBootstrap(bootstrap);
         KafkaJob.Partition partition = new KafkaJob.Partition();
-        partition.setOffset(dataConfigs.getSnapshot());
+        partition.setOffset(kafkaJobTaskConfig.getPartitionOffsets());
         kafkaJob.setPartition(partition);
         KafkaJob.Group group = new KafkaJob.Group();
         group.setId(kafkaJobTaskConfig.getGroupId());
@@ -449,6 +450,7 @@ public class TaskProfileDto {
                 profileDto.setTask(task);
                 break;
             case KAFKA:
+                task.setTaskClass(DEFAULT_KAFKA_TASK);
                 KafkaJob kafkaJob = getKafkaJob(dataConfig);
                 task.setKafkaJob(kafkaJob);
                 task.setSource(KAFKA_SOURCE);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index e980354fe2..2c9bd721d8 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -47,6 +47,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+
 /**
  * handle the instance created by task, including add, delete, update etc.
  * the instance info is store in both db and memory.
@@ -339,6 +341,7 @@ public class InstanceManager extends AbstractDaemon {
             if (state == InstanceStateEnum.DEFAULT) {
                 LOGGER.info("instance restoreFromDb addToMem state {} taskId 
{} instanceId {}", state, taskId,
                         profile.getInstanceId());
+                profile.setBoolean(RESTORE_FROM_DB, true);
                 addToMemory(profile);
             } else {
                 LOGGER.info("instance restoreFromDb ignore state {} taskId {} 
instanceId {}", state, taskId,
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 12fab4d8c9..e3815f725e 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
 import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
 
 /**
@@ -421,6 +422,7 @@ public class TaskManager extends AbstractDaemon {
         taskProfileList.forEach((profile) -> {
             if (profile.getState() == TaskStateEnum.RUNNING) {
                 LOGGER.info("restore from db taskId {}", profile.getTaskId());
+                profile.setBoolean(RESTORE_FROM_DB, true);
                 addToMemory(profile);
             }
         });
diff --git a/inlong-agent/agent-docker/agent-docker.sh 
b/inlong-agent/agent-docker/agent-docker.sh
index f6c9960cbb..06f27bed2d 100644
--- a/inlong-agent/agent-docker/agent-docker.sh
+++ b/inlong-agent/agent-docker/agent-docker.sh
@@ -23,7 +23,7 @@ local_ip=$(ifconfig $ETH_NETWORK | grep "inet" | grep -v 
"inet6" | awk '{print $
 sed -i "s/agent.local.ip=.*$/agent.local.ip=$local_ip/g" 
"${file_path}/conf/agent.properties"
 sed -i 
"s/agent.fetcher.interval=.*$/agent.fetcher.interval=$AGENT_FETCH_INTERVAL/g" 
"${file_path}/conf/agent.properties"
 sed -i 
"s/agent.heartbeat.interval=.*$/agent.heartbeat.interval=$AGENT_HEARTBEAT_INTERVAL/g"
 "${file_path}/conf/agent.properties"
-sed -i 
"s/agent.manager.addr=.*$/agent.manager.addr=http://$MANAGER_OPENAPI_IP:$MANAGER_OPENAPI_PORT/g";
 "${file_path}/conf/agent.properties"
+sed -i 
"s/agent.manager.addr=.*$/agent.manager.addr=http:\/\/$MANAGER_OPENAPI_IP:$MANAGER_OPENAPI_PORT/g"
 "${file_path}/conf/agent.properties"
 sed -i "s/audit.enable=.*$/audit.enable=$AUDIT_ENABLE/g" 
"${file_path}/conf/agent.properties"
 sed -i "s/audit.proxys=.*$/audit.proxys=$AUDIT_PROXY_URL/g" 
"${file_path}/conf/agent.properties"
 sed -i "s/agent.cluster.tag=.*$/agent.cluster.tag=$CLUSTER_TAG/g" 
"${file_path}/conf/agent.properties"
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java
new file mode 100644
index 0000000000..245ab36d02
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Instance;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.file.Sink;
+import org.apache.inlong.agent.plugin.file.Source;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.agent.plugin.instance.FileInstance.HEARTBEAT_CHECK_GAP;
+
+public class KafkaInstance extends Instance {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaInstance.class);
+    private Source source;
+    private Sink sink;
+    private InstanceProfile profile;
+    private InstanceManager instanceManager;
+    private volatile boolean running = false;
+    private volatile boolean inited = false;
+    private int checkFinishCount = 0;
+
+    public static final int CORE_THREAD_SLEEP_TIME = 1;
+    private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
+    private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
+    private final int WRITE_FAILED_WAIT_TIME_MS = 10;
+    private int heartBreakCheckCount = 0;
+    private long heartBeatStartTime = AgentUtils.getCurrentTime();
+
+    @Override
+    public boolean init(Object srcManager, InstanceProfile srcProfile) {
+        try {
+            instanceManager = (InstanceManager) srcManager;
+            profile = srcProfile;
+            profile.set(TaskConstants.INODE_INFO, "");
+            LOGGER.info("task id: {} submit new instance {} profile detail 
{}.", profile.getTaskId(),
+                    profile.getInstanceId(), profile.toJsonStr());
+            source = (Source) 
Class.forName(profile.getSourceClass()).newInstance();
+            source.init(profile);
+            sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
+            sink.init(profile);
+            inited = true;
+            return true;
+        } catch (Throwable e) {
+            handleSourceDeleted();
+            doChangeState(State.FATAL);
+            LOGGER.error("init instance {} for task {} failed", 
profile.getInstanceId(), profile.getInstanceId(), e);
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+            return false;
+        }
+    }
+
+    @Override
+    public void destroy() {
+        if (!inited) {
+            return;
+        }
+        doChangeState(State.SUCCEEDED);
+        while (running) {
+            AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
+        }
+        this.source.destroy();
+        this.sink.destroy();
+    }
+
+    @Override
+    public InstanceProfile getProfile() {
+        return profile;
+    }
+
+    @Override
+    public String getTaskId() {
+        return profile.getTaskId();
+    }
+
+    @Override
+    public String getInstanceId() {
+        return profile.getInstanceId();
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName("kafka-instance-core-" + getTaskId() + 
"-" + getInstanceId());
+        running = true;
+        try {
+            doRun();
+        } catch (Throwable e) {
+            LOGGER.error("do run error: ", e);
+        }
+        running = false;
+    }
+
+    private void doRun() {
+        while (!isFinished()) {
+            if (!source.sourceExist()) {
+                handleSourceDeleted();
+                break;
+            }
+            Message msg = source.read();
+            if (msg == null) {
+                if (source.sourceFinish() && sink.sinkFinish()) {
+                    checkFinishCount++;
+                    if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
+                        handleReadEnd();
+                        break;
+                    }
+                } else {
+                    checkFinishCount = 0;
+                }
+                heartbeatStatic();
+                AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+            } else {
+                boolean suc = false;
+                while (!isFinished() && !suc) {
+                    suc = sink.write(msg);
+                    if (!suc) {
+                        heartbeatStatic();
+                        AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
+                    }
+                }
+                heartBreakCheckCount++;
+                if (heartBreakCheckCount > HEARTBEAT_CHECK_GAP) {
+                    heartbeatStatic();
+                }
+            }
+        }
+    }
+
+    private void heartbeatStatic() {
+        String inlongGroupId = profile.getInlongGroupId();
+        String inlongStreamId = profile.getInlongStreamId();
+        if (AgentUtils.getCurrentTime() - heartBeatStartTime > 
TimeUnit.SECONDS.toMillis(1)) {
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
inlongGroupId, inlongStreamId,
+                    AgentUtils.getCurrentTime(), 1, 1);
+            heartBreakCheckCount = 0;
+            heartBeatStartTime = AgentUtils.getCurrentTime();
+        }
+    }
+
+    private void handleSourceDeleted() {
+        profile.setState(InstanceStateEnum.DELETE);
+        profile.setModifyTime(AgentUtils.getCurrentTime());
+        InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
+        while (!isFinished() && !instanceManager.submitAction(action)) {
+            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
+            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+        }
+    }
+
+    private void handleReadEnd() {
+        InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
+        while (!isFinished() && !instanceManager.submitAction(action)) {
+            LOGGER.error("instance manager action queue is full: taskId {}",
+                    instanceManager.getTaskId());
+            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+        }
+    }
+
+    @Override
+    public void addCallbacks() {
+
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index 06aa31db8c..45fd00f75a 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -17,140 +17,338 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.except.FileException;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.KafkaReader;
+import org.apache.inlong.agent.utils.AgentUtils;
 
-import com.google.gson.Gson;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static 
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_LINE_FILTER;
-import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET;
-import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_BOOTSTRAP_SERVERS;
-import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_GROUP_ID;
-import static org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_OFFSET;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
 import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
-import static org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_TOPIC;
-import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_LINE_FILTER_PATTERN;
 import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_OFFSET_DELIMITER;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_ID;
+import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_BOOTSTRAP_SERVERS;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_OFFSET;
 
 /**
  * kafka source, split kafka source job into multi readers
  */
 public class KafkaSource extends AbstractSource {
 
-    public static final String JOB_KAFKA_AUTO_RESETE = "auto.offset.reset";
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private class SourceData {
+
+        private byte[] data;
+        private Long offset;
+    }
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaSource.class);
-    private static final String JOB_KAFKAJOB_PARAM_PREFIX = "job.kafkaJob.";
-    private static final String JOB_KAFKAJOB_WAIT_TIMEOUT = 
"job.kafkajob.wait.timeout";
-    private static final String KAFKA_COMMIT_AUTO = "enable.auto.commit";
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("kafka-source"));
+    private BlockingQueue<KafkaSource.SourceData> queue;
+    public InstanceProfile profile;
+    private int maxPackSize;
+    private String taskId;
+    private String instanceId;
+    private String topic;
+    private Properties props = new Properties();
+    private String allPartitionOffsets;
+    Map<Integer, Long> partitionOffsets = new HashMap<>();
+    private volatile boolean running = false;
+    private volatile boolean runnable = true;
+    private volatile AtomicLong emptyCount = new AtomicLong(0);
+
+    private final Integer CACHE_QUEUE_SIZE = 100000;
+    private final Integer READ_WAIT_TIMEOUT_MS = 10;
+    private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
+    private final Integer BATCH_TOTAL_LEN = 1024 * 1024;
+
     private static final String KAFKA_DESERIALIZER_METHOD =
             "org.apache.kafka.common.serialization.ByteArrayDeserializer";
-    private static final String KAFKA_KEY_DESERIALIZER = "key.deserializer";
-    private static final String KAFKA_VALUE_DESERIALIZER = 
"value.deserializer";
     private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms";
-    private static final Gson gson = new Gson();
-    private static AtomicLong metricsIndex = new AtomicLong(0);
+    private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
+    private boolean isRealTime = false;
+    private boolean isRestoreFromDB = false;
 
     public KafkaSource() {
     }
 
     @Override
-    public List<Reader> split(TaskProfile conf) {
-        List<Reader> result = new ArrayList<>();
-        String filterPattern = conf.get(JOB_LINE_FILTER_PATTERN, 
DEFAULT_JOB_LINE_FILTER);
-
-        Properties props = new Properties();
-        Map<String, String> map = gson.fromJson(conf.toJsonStr(), Map.class);
-        
props.put(JOB_KAFKA_BOOTSTRAP_SERVERS.replace(JOB_KAFKAJOB_PARAM_PREFIX, 
StringUtils.EMPTY),
-                map.get(JOB_KAFKA_BOOTSTRAP_SERVERS));
-
-        props.put(KAFKA_KEY_DESERIALIZER, KAFKA_DESERIALIZER_METHOD);
-        props.put(KAFKA_VALUE_DESERIALIZER, KAFKA_DESERIALIZER_METHOD);
-        // set offset
-        props.put(KAFKA_COMMIT_AUTO, false);
-        if 
(ObjectUtils.isNotEmpty(map.get(JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET))) {
-            props.put(JOB_KAFKA_AUTO_RESETE, 
map.get(JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET));
-        }
-        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-        List<PartitionInfo> partitionInfoList = 
consumer.partitionsFor(conf.get(JOB_KAFKA_TOPIC));
-        String allPartitionOffsets = map.get(JOB_KAFKA_OFFSET);
-        Long offset = null;
-        String[] partitionOffsets = null;
-        if (StringUtils.isNotBlank(allPartitionOffsets)) {
-            // example:0#110_1#666_2#222
-            partitionOffsets = allPartitionOffsets.split(JOB_OFFSET_DELIMITER);
+    public void init(InstanceProfile profile) {
+        try {
+            LOGGER.info("KafkaSource init: {}", profile.toJsonStr());
+            this.profile = profile;
+            super.init(profile);
+            String cycleUnit = profile.get(TASK_CYCLE_UNIT);
+            if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+                isRealTime = true;
+                cycleUnit = CycleUnitType.HOUR;
+            }
+            queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+            maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+            taskId = profile.getTaskId();
+            instanceId = profile.getInstanceId();
+            topic = profile.getInstanceId();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
profile.get(TASK_KAFKA_BOOTSTRAP_SERVERS));
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, taskId);
+            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
KAFKA_DESERIALIZER_METHOD);
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
KAFKA_DESERIALIZER_METHOD);
+            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
profile.get(TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET));
+            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+            allPartitionOffsets = profile.get(TASK_KAFKA_OFFSET);
+            isRestoreFromDB = profile.getBoolean(RESTORE_FROM_DB, false);
+            if (!isRestoreFromDB && 
StringUtils.isNotBlank(allPartitionOffsets)) {
+                // example:0#110_1#666_2#222
+                String[] offsets = 
allPartitionOffsets.split(JOB_OFFSET_DELIMITER);
+                for (String offset : offsets) {
+                    
partitionOffsets.put(Integer.valueOf(offset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[0]),
+                            
Long.valueOf(offset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[1]));
+                }
+            }
+
+            EXECUTOR_SERVICE.execute(run());
+        } catch (Exception ex) {
+            stopRunning();
+            throw new FileException("error init stream for " + topic, ex);
         }
-        // set consumer session timeout
-        props.put(KAFKA_SESSION_TIMEOUT, 30000);
-        // spilt reader reduce to partition
-        if (null != partitionInfoList) {
-            for (PartitionInfo partitionInfo : partitionInfoList) {
-                
props.put(JOB_KAFKA_GROUP_ID.replace(JOB_KAFKAJOB_PARAM_PREFIX, 
StringUtils.EMPTY),
-                        map.getOrDefault(JOB_KAFKA_GROUP_ID,
-                                map.get(TASK_ID) + JOB_OFFSET_DELIMITER
-                                        + "group" + 
partitionInfo.partition()));
-                KafkaConsumer<String, byte[]> partitonConsumer = new 
KafkaConsumer<>(props);
-                partitonConsumer.assign(Collections.singletonList(
-                        new TopicPartition(partitionInfo.topic(), 
partitionInfo.partition())));
-                // if get offset,consume from offset; if not,consume from 0
-                if (partitionOffsets != null && partitionOffsets.length > 0) {
-                    for (String partitionOffset : partitionOffsets) {
-                        if 
(partitionOffset.contains(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)
-                                && 
partitionOffset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[0]
-                                        
.equals(String.valueOf(partitionInfo.partition()))) {
-                            offset = 
Long.valueOf(partitionOffset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[1]);
+    }
+
+    private Runnable run() {
+        return () -> {
+            AgentThreadFactory.nameThread("kafka-source-" + taskId + "-" + 
instanceId);
+            running = true;
+            try {
+                List<PartitionInfo> partitionInfoList;
+                try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(props)) {
+                    partitionInfoList = consumer.partitionsFor(topic);
+                }
+
+                props.put(KAFKA_SESSION_TIMEOUT, 30000);
+
+                try (KafkaConsumer<String, byte[]> kafkaConsumer = new 
KafkaConsumer<>(props)) {
+                    if (null != partitionInfoList) {
+                        List<TopicPartition> topicPartitions = new 
ArrayList<>();
+                        for (PartitionInfo partitionInfo : partitionInfoList) {
+                            TopicPartition topicPartition = new 
TopicPartition(partitionInfo.topic(),
+                                    partitionInfo.partition());
+                            topicPartitions.add(topicPartition);
+                        }
+                        kafkaConsumer.assign(topicPartitions);
+
+                        if (!isRestoreFromDB && 
StringUtils.isNotBlank(allPartitionOffsets)) {
+                            for (TopicPartition topicPartition : 
topicPartitions) {
+                                Long offset = 
partitionOffsets.get(topicPartition.partition());
+                                if (ObjectUtils.isNotEmpty(offset)) {
+                                    kafkaConsumer.seek(topicPartition, offset);
+                                }
+                            }
+                        } else {
+                            LOGGER.info("Skip to seek offset");
                         }
                     }
+                    doRun(kafkaConsumer);
                 }
-                LOGGER.info("kafka topic partition offset:{}", offset);
-                if (offset != null) {
-                    // if offset not null,then consume from the offset
-                    partitonConsumer.seek(new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition()), offset);
+            } catch (Throwable e) {
+                LOGGER.error("do run error maybe topic is configured 
incorrectly: ", e);
+            }
+            running = false;
+        };
+    }
+
+    private void doRun(KafkaConsumer<String, byte[]> kafkaConsumer) {
+        long lastPrintTime = 0;
+        while (isRunnable()) {
+            boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
+            if (!suc) {
+                break;
+            }
+            ConsumerRecords<String, byte[]> records = 
kafkaConsumer.poll(Duration.ofMillis(1000));
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
+            if (records.isEmpty()) {
+                if (queue.isEmpty()) {
+                    emptyCount.incrementAndGet();
+                } else {
+                    emptyCount.set(0);
                 }
-                KafkaReader<String, byte[]> kafkaReader = new 
KafkaReader<>(partitonConsumer, map);
-                addValidator(filterPattern, kafkaReader);
-                result.add(kafkaReader);
+                AgentUtils.silenceSleepInSeconds(1);
+                continue;
             }
-            sourceMetric.sourceSuccessCount.incrementAndGet();
-        } else {
-            sourceMetric.sourceFailCount.incrementAndGet();
+            emptyCount.set(0);
+            long offset = 0L;
+            for (ConsumerRecord<String, byte[]> record : records) {
+                LOGGER.info("record: {}", record.value());
+                SourceData sourceData = new SourceData(record.value(), 
record.offset());
+                boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, record.value().length);
+                if (!suc4Queue) {
+                    break;
+                }
+                putIntoQueue(sourceData);
+                offset = record.offset();
+            }
+            kafkaConsumer.commitSync();
+
+            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
+                lastPrintTime = AgentUtils.getCurrentTime();
+                LOGGER.info("kafka topic is {}, offset is {}", topic, offset);
+            }
+        }
+    }
+
+    private boolean waitForPermit(String permitName, int permitLen) {
+        boolean suc = false;
+        while (!suc) {
+            suc = MemoryManager.getInstance().tryAcquire(permitName, 
permitLen);
+            if (!suc) {
+                MemoryManager.getInstance().printDetail(permitName, "log file 
source");
+                if (!isRunnable()) {
+                    return false;
+                }
+                AgentUtils.silenceSleepInSeconds(1);
+            }
+        }
+        return true;
+    }
+
+    private void putIntoQueue(SourceData sourceData) {
+        if (sourceData == null) {
+            return;
+        }
+        try {
+            boolean offerSuc = false;
+            if (queue.remainingCapacity() > 0) {
+                while (isRunnable() && !offerSuc) {
+                    offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
+                }
+            }
+
+            if (!offerSuc) {
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
+            }
+            LOGGER.debug("Read {} from kafka topic {}", sourceData.getData(), 
topic);
+        } catch (InterruptedException e) {
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
+            LOGGER.error("fetchData offer failed {}", e.getMessage());
         }
-        return result;
+    }
+
+    public boolean isRunnable() {
+        return runnable;
+    }
+
+    /**
+     * Stop running threads.
+     */
+    public void stopRunning() {
+        runnable = false;
     }
 
     @Override
-    public Message read() {
+    public List<Reader> split(TaskProfile conf) {
         return null;
     }
 
     @Override
-    public boolean sourceFinish() {
-        return false;
+    public Message read() {
+        KafkaSource.SourceData sourceData = null;
+        try {
+            sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.warn("poll {} data get interrupted.", topic, e);
+        }
+        if (sourceData == null) {
+            return null;
+        }
+        MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
+        Message finalMsg = createMessage(sourceData);
+        return finalMsg;
+    }
+
+    private Message createMessage(SourceData sourceData) {
+        String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
+        Map<String, String> header = new HashMap<>();
+        header.put(PROXY_KEY_DATA, proxyPartitionKey);
+        header.put(OFFSET, sourceData.offset.toString());
+        header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
+
+        long auditTime = 0;
+        if (isRealTime) {
+            auditTime = AgentUtils.getCurrentTime();
+        } else {
+            auditTime = profile.getSinkDataTime();
+        }
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
+                auditTime, 1, sourceData.data.length);
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, 
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
+                AgentUtils.getCurrentTime(), 1, sourceData.data.length);
+        Message finalMsg = new DefaultMessage(sourceData.data, header);
+        if (finalMsg.getBody().length > maxPackSize) {
+            LOGGER.warn("message size is {}, greater than max pack size {}, 
drop it!",
+                    finalMsg.getBody().length, maxPackSize);
+            return null;
+        }
+        return finalMsg;
     }
 
     @Override
-    public boolean sourceExist() {
-        return false;
+    public boolean sourceFinish() {
+        if (isRealTime) {
+            return false;
+        }
+        return emptyCount.get() > EMPTY_CHECK_COUNT_AT_LEAST;
     }
 
-    private void addValidator(String filterPattern, KafkaReader kafkaReader) {
-        kafkaReader.addPatternValidator(filterPattern);
+    @Override
+    public boolean sourceExist() {
+        return true;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
deleted file mode 100644
index 6a6947aaad..0000000000
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sources.reader;
-
-import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.message.DefaultMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader;
-import org.apache.inlong.agent.plugin.validator.PatternValidator;
-import org.apache.inlong.agent.utils.AgentUtils;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_BYTE_SPEED_LIMIT;
-import static org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_OFFSET;
-import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
-import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_RECORD_SPEED_LIMIT;
-import static org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_TOPIC;
-
-/**
- * read kafka data
- */
-public class KafkaReader<K, V> extends AbstractReader {
-
-    public static final int NEVER_STOP_SIGN = -1;
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaReader.class);
-    /* metric */
-    private static final String KAFKA_READER_TAG_NAME = "AgentKafkaMetric";
-    private static final String KAFKA_SOURCE_READ_RECORD_SPEED = 
"job.kafkaJob.record.speed.limit";
-    private static final String KAFKA_SOURCE_READ_BYTE_SPEED = 
"job.kafkaJob.byte.speed.limit";
-    private static final String KAFKA_SOURCE_READ_MIN_INTERVAL = 
"kafka.min.interval.limit";
-    private static final String JOB_KAFKAJOB_READ_TIMEOUT = 
"job.kafkaJob.read.timeout";
-    /* total readRecords */
-    private static AtomicLong currentTotalReadRecords = new AtomicLong(0);
-    private static AtomicLong lastTotalReadRecords = new AtomicLong(0);
-    /* total readBytes */
-    private static AtomicLong currentTotalReadBytes = new AtomicLong(0);
-    private static AtomicLong lastTotalReadBytes = new AtomicLong(0);
-    KafkaConsumer<K, V> consumer;
-    long lastTimestamp;
-    /* bps: records/s */
-    long recordSpeed;
-    /* tps: bytes/s */
-    long byteSpeed;
-    /* sleepTime */
-    long flowControlInterval;
-    private Iterator<ConsumerRecord<K, V>> iterator;
-    private List<Validator> validators = new ArrayList<>();
-    private long timeout;
-    private long waitTimeout = 1000;
-    private long lastTime = 0;
-    private String inlongGroupId;
-    private String inlongStreamId;
-    private String snapshot;
-    private boolean isFinished = false;
-    private boolean destroyed = false;
-    private String topic;
-
-    /**
-     * init attribute
-     */
-    public KafkaReader(KafkaConsumer<K, V> consumer, Map<String, String> 
paraMap) {
-        this.consumer = consumer;
-        this.recordSpeed = 
Long.parseLong(paraMap.getOrDefault(JOB_KAFKA_RECORD_SPEED_LIMIT, "10000"));
-        this.byteSpeed = 
Long.parseLong(paraMap.getOrDefault(JOB_KAFKA_BYTE_SPEED_LIMIT, 
String.valueOf(1024 * 1024)));
-        this.flowControlInterval = 
Long.parseLong(paraMap.getOrDefault(KAFKA_SOURCE_READ_MIN_INTERVAL, "1000"));
-        this.lastTimestamp = System.currentTimeMillis();
-        this.topic = paraMap.get(JOB_KAFKA_TOPIC);
-
-        LOGGER.info("KAFKA_SOURCE_READ_RECORD_SPEED = {}", this.recordSpeed);
-        LOGGER.info("KAFKA_SOURCE_READ_BYTE_SPEED = {}", this.byteSpeed);
-    }
-
-    @Override
-    public Message read() {
-
-        if (iterator != null && iterator.hasNext()) {
-            ConsumerRecord<K, V> record = iterator.next();
-            // body
-            byte[] recordValue = (byte[]) record.value();
-            if (validateMessage(recordValue)) {
-                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
-                        inlongGroupId, inlongStreamId, 
System.currentTimeMillis(), 1, recordValue.length);
-                // header
-                Map<String, String> headerMap = new HashMap<>();
-                headerMap.put("record.offset", 
String.valueOf(record.offset()));
-                headerMap.put("record.key", String.valueOf(record.key()));
-                LOGGER.debug(
-                        "partition:" + record.partition()
-                                + ", value:" + new String(recordValue) + ", 
offset:" + record.offset());
-                // control speed
-                readerMetric.pluginReadSuccessCount.incrementAndGet();
-                readerMetric.pluginReadCount.incrementAndGet();
-                // commit succeed,then record current offset
-                snapshot = record.partition() + 
JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
-                DefaultMessage message = new DefaultMessage(recordValue, 
headerMap);
-                recordReadLimit(1L, message.getBody().length);
-                return message;
-            }
-        } else {
-            // commit offset
-            if (isSourceExist()) {
-                consumer.commitAsync();
-            }
-            fetchData(5000);
-        }
-        AgentUtils.silenceSleepInMs(waitTimeout);
-
-        return null;
-    }
-
-    @Override
-    public boolean isFinished() {
-        return isFinished;
-    }
-
-    @Override
-    public String getReadSource() {
-        Set<TopicPartition> assignment = consumer.assignment();
-        // cousumer->topic->one partition
-        Iterator<TopicPartition> iterator = assignment.iterator();
-        while (iterator.hasNext()) {
-            TopicPartition topicPartition = iterator.next();
-            return topicPartition.topic() + "_" + topicPartition.partition();
-        }
-        return StringUtils.EMPTY;
-    }
-
-    @Override
-    public void setReadTimeout(long millis) {
-        timeout = millis;
-    }
-
-    @Override
-    public void setWaitMillisecond(long millis) {
-        waitTimeout = millis;
-    }
-
-    @Override
-    public void init(InstanceProfile jobConf) {
-        super.init(jobConf);
-        // get offset from jobConf
-        snapshot = jobConf.get(JOB_KAFKA_OFFSET, null);
-        initReadTimeout(jobConf);
-        // fetch data
-        fetchData(5000);
-    }
-
-    @Override
-    public void destroy() {
-        synchronized (this) {
-            if (!destroyed) {
-                consumer.close();
-                destroyed = true;
-            }
-        }
-    }
-
-    private void initReadTimeout(InstanceProfile jobConf) {
-        int waitTime = jobConf.getInt(JOB_KAFKAJOB_READ_TIMEOUT, 
NEVER_STOP_SIGN);
-        if (waitTime == NEVER_STOP_SIGN) {
-            timeout = NEVER_STOP_SIGN;
-        } else {
-            timeout = TimeUnit.MINUTES.toMillis(waitTime);
-        }
-    }
-
-    private boolean validateMessage(byte[] message) {
-        if (validators.isEmpty()) {
-            return true;
-        }
-        return validators.stream().allMatch(v -> v.validate(new 
String(message)));
-    }
-
-    /**
-     * add specified pattern to validators
-     *
-     * @param pattern specified pattern
-     */
-    public void addPatternValidator(String pattern) {
-        if (pattern.isEmpty()) {
-            return;
-        }
-        validators.add(new PatternValidator(pattern));
-    }
-
-    @Override
-    public String getSnapshot() {
-        return snapshot;
-    }
-
-    @Override
-    public void finishRead() {
-        isFinished = true;
-    }
-
-    @Override
-    public boolean isSourceExist() {
-        return !CollectionUtils.isEmpty(consumer.partitionsFor(topic));
-    }
-
-    private boolean fetchData(long fetchDataTimeout) {
-        // cosume data
-        ConsumerRecords<K, V> records = 
consumer.poll(Duration.ofMillis(fetchDataTimeout));
-        iterator = records.iterator();
-        return iterator != null ? true : false;
-    }
-
-    private void recordReadLimit(long recordSize, long byteSize) {
-        boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
-        boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
-        if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
-            return;
-        }
-        currentTotalReadRecords.accumulateAndGet(recordSize, (x, y) -> x + y);
-        currentTotalReadBytes.accumulateAndGet(byteSize, (x, y) -> x + y);
-
-        long nowTimestamp = System.currentTimeMillis();
-        long interval = nowTimestamp - lastTimestamp;
-
-        if (interval - this.flowControlInterval >= 0) {
-            long byteLimitSleepTime = 0;
-            long recordLimitSleepTime = 0;
-            if (isChannelByteSpeedLimit) {
-                long currentByteSpeed = (currentTotalReadBytes.get() - 
lastTotalReadBytes.get()) * 1000 / interval;
-                LOGGER.info("current produce byte speed bytes/s:{}", 
currentByteSpeed);
-                if (currentByteSpeed > this.byteSpeed) {
-                    // calculate byteLimitSleepTime
-                    byteLimitSleepTime = currentByteSpeed * interval / 
this.byteSpeed - interval;
-                }
-            }
-
-            if (isChannelRecordSpeedLimit) {
-                long currentRecordSpeed =
-                        (currentTotalReadRecords.get() - 
lastTotalReadRecords.get()) * 1000 / interval;
-                LOGGER.info("current read speed records/s:{}", 
currentRecordSpeed);
-                if (currentRecordSpeed > this.recordSpeed) {
-                    // calculate recordSleepTime reduce to recordLimit
-                    recordLimitSleepTime = currentRecordSpeed * interval / 
this.recordSpeed - interval;
-                }
-            }
-            // calculate sleep time
-            long sleepTime = byteLimitSleepTime < recordLimitSleepTime
-                    ? recordLimitSleepTime
-                    : byteLimitSleepTime;
-            if (sleepTime > 0) {
-                LOGGER.info("sleep seconds:{}", sleepTime / 1000);
-                try {
-                    Thread.sleep(sleepTime);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-        lastTimestamp = nowTimestamp;
-        lastTotalReadRecords = currentTotalReadRecords;
-    }
-}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
new file mode 100644
index 0000000000..78ad200db0
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_TOPIC;
+
+public class KafkaTask extends Task {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaTask.class);
+    public static final String DEFAULT_KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
+    public static final int CORE_THREAD_SLEEP_TIME = 5000;
+    public static final int CORE_THREAD_PRINT_TIME = 10000;
+
+    private TaskProfile taskProfile;
+    private Db basicDb;
+    private TaskManager taskManager;
+    private InstanceManager instanceManager;
+    private long lastPrintTime = 0;
+    private boolean initOK = false;
+    private volatile boolean running = false;
+    private boolean isAdded = false;
+    private boolean isRestoreFromDB = false;
+
+    private String topic;
+
+    @Override
+    public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
+        taskManager = (TaskManager) srcManager;
+        commonInit(taskProfile, basicDb);
+        initOK = true;
+    }
+
+    private void commonInit(TaskProfile taskProfile, Db basicDb) {
+        LOGGER.info("kafka commonInit: {}", taskProfile.toJsonStr());
+        this.taskProfile = taskProfile;
+        this.basicDb = basicDb;
+        this.topic = taskProfile.get(TASK_KAFKA_TOPIC);
+        this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
+        instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
+                basicDb, taskManager.getTaskDb());
+        try {
+            instanceManager.start();
+        } catch (Exception e) {
+            LOGGER.error("start instance manager error: ", e);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        doChangeState(State.SUCCEEDED);
+        if (instanceManager != null) {
+            instanceManager.stop();
+        }
+    }
+
+    @Override
+    public TaskProfile getProfile() {
+        return taskProfile;
+    }
+
+    @Override
+    public String getTaskId() {
+        if (taskProfile == null) {
+            return "";
+        }
+        return taskProfile.getTaskId();
+    }
+
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        if (!profile.allRequiredKeyExist()) {
+            LOGGER.error("task profile needs all required key");
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public void addCallbacks() {
+
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName("kafka-task-core-" + getTaskId());
+        running = true;
+        try {
+            doRun();
+        } catch (Throwable e) {
+            LOGGER.error("do run error: ", e);
+        }
+        running = false;
+    }
+
+    private void doRun() {
+        while (!isFinished()) {
+            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
+                LOGGER.info("kafka task running! taskId {}", getTaskId());
+                lastPrintTime = AgentUtils.getCurrentTime();
+            }
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+            if (!initOK) {
+                continue;
+            }
+
+            // Add instance profile to instance manager
+            addInstanceProfile();
+
+            String inlongGroupId = taskProfile.getInlongGroupId();
+            String inlongStreamId = taskProfile.getInlongStreamId();
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
inlongGroupId, inlongStreamId,
+                    AgentUtils.getCurrentTime(), 1, 1);
+        }
+    }
+
+    private void addInstanceProfile() {
+        if (isAdded) {
+            return;
+        }
+        String dataTime = 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
+                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
+        InstanceAction action = new InstanceAction(ActionType.ADD, 
instanceProfile);
+        while (!isFinished() && !instanceManager.submitAction(action)) {
+            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+        }
+        this.isAdded = true;
+    }
+}

Reply via email to