This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 6de6c34089 [INLONG-937][Agent] Renew Update of development guidelines 
for new collection data source plugins (#941)
6de6c34089 is described below

commit 6de6c34089bd3d717b80735e8b3e86092e86c03f
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Apr 23 10:17:02 2024 +0800

    [INLONG-937][Agent] Renew Update of development guidelines for new 
collection data source plugins (#941)
    
    Co-authored-by: Charles Zhang <[email protected]>
---
 .../how_to_write_plugin_agent.md                   | 294 ++++++++-------------
 docs/design_and_concept/img/Agent_Flow.png         | Bin 34510 -> 0 bytes
 docs/design_and_concept/img/agent_audit.png        | Bin 0 -> 110061 bytes
 .../img/agent_basic_concepts.png                   | Bin 0 -> 139852 bytes
 .../how_to_write_plugin_agent.md                   | 292 ++++++++------------
 .../current/design_and_concept/img/Agent_Flow.png  | Bin 34510 -> 0 bytes
 .../current/design_and_concept/img/agent_audit.png | Bin 0 -> 110061 bytes
 .../img/agent_basic_concepts.png                   | Bin 0 -> 139852 bytes
 8 files changed, 231 insertions(+), 355 deletions(-)

diff --git a/docs/design_and_concept/how_to_write_plugin_agent.md 
b/docs/design_and_concept/how_to_write_plugin_agent.md
index 6d329f4c89..2bb4d1239e 100644
--- a/docs/design_and_concept/how_to_write_plugin_agent.md
+++ b/docs/design_and_concept/how_to_write_plugin_agent.md
@@ -2,218 +2,156 @@
 title: Agent Plugin
 sidebar_position: 3
 ---
+## Summary
+In Standard Architecture, we can collect various types of data sources through 
the InLong Agent. The InLong Agent supports the extension of new collection 
types through plugins. This article will guide developers on how to customize 
the new Agent collection data source plugin.
+
+## Concepts
+### Task and Instance
+Task and Instance are the two core concepts of Agent. Simple understanding: 
Task corresponds to a collection task configured on the management platform, 
while Instance is a specific collection instance generated by Task. For 
example, there is a collection task configuration on the management platform: 
`127.0.0.1 -> /data/log/YYMMDDhh.log._[0-9]+`, which means that the user needs 
to access the machine `127.0.0.1` collect data that conforms to the path rule 
`/data/log/YYMMDDhh.log._[0-9]+` [...]
+![](img/agent_basic_concepts.png)
+
+### Source and Sink
+Source and Sink are lower-level concepts of Instance. They can be simply 
understood as each Instance has a Source and a Sink. As the name suggests, 
Source is used to read data from the data source; Sink is used to write data to 
the target storage.
+
+## Development process (taking Pulsar as an example)
+### Process
+- Add Task: implement logic such as initialization, destruction, configuration 
verification, etc.
+- Add Instance: implements logic such as node information setting.
+- Add Source: implements logic such as initialization, destruction, data 
collection, and data provision.
+- Add Sink: implement logic such as initialization, destruction, data input, 
data output (this article only focuses on new data sources, Sink will not be 
introduced, the default Sink is ProxySink)
+
+### Add Task
+Here we need to add a PulsarTask class to org.apache.inlong.agent.plugin.task.
+```
+public class PulsarTask extends AbstractTask {
 
-## Overview
-
-In Standard Architecture, we can collect various types of data through InLong 
Agent. InLong Agent supports extending new collection types in the form of 
plug-ins. This article will guide developers on customizing the new Agent 
collection plug-ins.
-
-## Concepts and Models
-
-InLong Agent is a data collection framework, adopted `Job` + `Task` 
architectural model. And abstract data source reading and writing into 
Reader/Sink plugins.
-
-Developers need to be clear about the concepts of Job and Task:
-
-- `Job`: `Job` is used by Agent to describe the synchronization job from a 
source to a destination, and is the smallest business unit of Agent data 
synchronization. For example: read all files in a file directory
-- `Task`: `Task` is the smallest execution unit obtained by splitting `Job`. 
For example, if there are multiple files in the folder that need to be read, 
then a job will be split into multiple tasks, and each task will read the 
corresponding file
-
-A Task contains the following components:
-
-- Reader: a data collection module, which is responsible for collecting data 
from the data source and sending the data to the Channel.
-- Sink: a data writing module, responsible for continuously fetching data from 
the Channel and writing the data to the destination.
-- Channel: connect Reader and Sink, as a data transmission channel for both, 
and plays a role in monitoring data writing and reading.
-
-When extending an Agent plugin, you need to develop specific Source, Reader 
and Sink. If the data needs to be persisted to the local disk, use the 
persistent Channel, otherwise use the memory Channel
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        return false;
+    }
 
-## Demonstration
+    @Override
+    protected void initTask() {
 
-The Job/Task/Reader/Sink/Channel concept introduced above can be represented 
by the following figure:
+    }
 
-<div align="center">
+    @Override
+    protected List<InstanceProfile> getNewInstanceList() {
+        return null;
+    }
+}
+```
+- initTask: initialization. For example, file collection can start folder 
monitoring during initialization.
+- isProfilevalid: determine whether the task configuration is legal. The 
configuration of each type of task will have different restrictions, which can 
be implemented here.
+- releaseTask: release task resources. For example, file collection can cancel 
folder monitoring here.
+- getNewInstanceList: get a new instance. For example, when collecting files, 
it is found that there are new files that can be collected.
 
-![](img/Agent_Flow.png)
-</div>
+### Add Instance
+Add the PulsarInstance class in `org.apache.inlong.agent.plugin.instance`. 
This class will be relatively idle because the main logic is in the 
CommonInstance base class. Its function is to create Source and Sink, read data 
from Source, and then write it to Sink. We only need to implement the 
setInodeInfo interface here. Except for FileInstance, which needs to set the 
Inode Info of the file, the other Instance classes only need to be set to empty 
strings.
+```
+public class PulsarInstance extends CommonInstance {
 
-- The user submits a Job (via the manager), and the Job defines the Source, 
Channel, and Sink that need to be used (defined by the fully qualified name of 
the class)
-- The framework starts the Job and creates the Source through the reflection 
mechanism
-- The framework starts the Source and calls the Split interface of the Source 
to generate one or more Tasks
-- When a Task is generated, a Reader (a type of Source will generate a 
corresponding reader), a User-configured Channel and a User-configured Sink are 
generated at the same time
-- Task starts to execute, Reader starts to read data to Channel, Sink fetches 
data from Channel and sends it
-- All the information needed for Job and Task execution is encapsulated in the 
JobProfile
+    @Override
+    public void setInodeInfo(InstanceProfile profile) {
+        profile.set(TaskConstants.INODE_INFO, "");
+    }
+}
+```
 
-## Development Process
+### Add Source
+Add the PulsarSource class to `org.apache.inlong.agent.plugin.sources:
+```
+public class PulsarSource extends AbstractSource {
 
-- First develop Source, implement split logic, and return ReaderList
-- The developed Reader implements the logic of reading data and writing to 
Channel
-- The sink under development implements the logic of fetching data from the 
channel and writing it to the specified sink
+    @Override
+    public boolean sourceExist() {
+        return false;
+    }
 
-## Interface
+    @Override
+    protected void initSource(InstanceProfile profile) {
 
-The following will introduce the classes and interfaces you need to know to 
develop an Agent plug-in.
+    }
 
-### Reader
-```java
-private class ReaderImpl implements Reader {
+    @Override
+    protected void printCurrentState() {
 
-    private int count = 0;
+    }
 
     @Override
-    public Message read() {
-        count += 1;
-        return new DefaultMessage("".getBytes(StandardCharsets.UTF_8));
+    protected boolean doPrepareToRead() {
+        return false;
     }
 
     @Override
-    public boolean isFinished() {
-        return count > 99999;
+    protected List<SourceData> readFromSource() {
+        return null;
     }
 
     @Override
-    public String getReadSource() {
+    protected String getThreadName() {
         return null;
     }
 
     @Override
-    public void setReadTimeout(long mill) {
-
+    protected boolean isRunnable() {
+        return false;
     }
-}
-```
-
-The `Reader` interface functions as follows:
-- `read`: Called by a single Task, and returns a read message after the call, 
and the message inside the Agent is encapsulated by Message
-- `isFinished`: judge whether the reading is completed, for example: if it is 
an SQL task, judge whether all the contents in the ResultSet have been read; if 
it is a file task, judge whether there is still data written after the waiting 
time set by the user
-- `getReadSource`: Get the acquisition source, for example: if it is a file 
task, it will return the file name currently read
-- `setReadTimeout`: set read timeout
-
-### Sink
-
-```java
-public interface Sink extends Stage {
-
-    /**
-     * Write data into data center
-     *
-     * @param message - message
-     */
-    void write(Message message);
-
-    /**
-     * set source file name where the message is generated
-     * @param sourceName
-     */
-    void setSourceName(String sourceName);
-
-    /**
-     * every sink should include a message filter to filter out stream id
-     */
-    MessageFilter initMessageFilter(JobProfile jobConf);
-}
 
-```
+    @Override
+    protected void releaseSource() {
 
-The `Sink` interface functions as follows:
-- `write`: called by a single Task, reads a message from the Channel in the 
Task and writes it to a specific storage medium. Taking PulsarSink as an 
example, it needs to be sent to Pulsar through PulsarSender
-- `setSourceName`: set the data source name, if it is a file, the file name
-- `initMessageFilter`: Initialize MessageFilter , the user can create a 
message filter to filter each message by setting agent.message.filter.classname 
in the Job configuration file. For details, please refer to the MessageFilter 
interface
-
-### Source
-
-```java
-/**
- * Source can be split into multiple reader.
- */
-public interface Source {
-
-    /**
-     * Split source into a list of readers.
-     *
-     * @param conf job conf
-     * @return - list of reader
-     */
-    List<Reader> split(JobProfile conf);
+    }
 }
-
 ```
-
-The `Source` interface functions as follows:
-- `split`: Called by a single job to generate multiple Readers, for example: a 
read file task, matching multiple files in a folder, when the job starts, it 
will specify TextFileSource as the Source entry,
-  After calling the split function, TextFileSource will detect how many paths 
match the path matching expression in the folder set by the user, and generate 
TextFileReader to read
-
-### Job
-
-```java
-public class xxJob {
-    
-  private String username;
-  private String password;
-  private String hostname;
-  private String port;
-  private String database;
-  // ...
-  
-  public static class xxJobConfig {
-
-    private String username;
-    private String password;
-    private String hostname;
-    private String port;
-    private String database;
-    // ...
-  }
-}
-
+- initSource: initialize the basic resource of this data source.
+- sourceExist: returns whether the current data source exists, for example, 
whether the file was deleted during file collection.
+- printCurrentState: prints the current collection status and is called 
regularly.
+- doPrepareToRead: you can do some checks before reading data, such as whether 
the file is overwritten during file collection.
+- readFromSource: actually reads data from the data source, such as consuming 
data from Kafka SDK and Pulsar SDK.
+- getThreadName: get the worker thread name of the data source.
+- isRunnable: returns whether this data source should continue.
+- releaseSource: release the resources of the data source
+
+## Task configuration
+From the above, we can see that we have created new classes such as Task, 
Instance, Source, etc., and task configuration is to connect these classes 
together.
 ```
-
-- The fields in `config` come from the manager and need to be consistent with 
the manager fields. When submitting the task, convert it into the corresponding 
job
-
-
-## Job Definition
-
-The code is written, have you ever wondered how the framework finds the entry 
class of the plugin? How does the framework load plugins?
-
-When submitting a task, you will find information about the plugin defined in 
the task, including the entry class. For example:
-
-```json
 {
-"job": {
-"name": "fileAgentTest",
-"source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
-"sink": "org.apache.inlong.agent.plugin.sinks.ProxySink",
-"channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel"
-}
+    "task.id": "74",
+    "task.groupId": "test_group_pulsar",
+    "task.streamId": "test_stream_pulsar",
+    "task.source": "org.apache.inlong.agent.plugin.sources.PulsarSource",
+    "task.sink": "org.apache.inlong.agent.plugin.sinks.ProxySink",
+    "task.taskClass": "org.apache.inlong.agent.plugin.task.PulsarTask"
 }
 ```
+- task.source: Source class specified
+- task.sink: Sink class specified
+- task.taskClass: specifies the Task class
 
-- `source`: The fully qualified name of the Source class, the instance of 
which the framework imports the plugin through reflection.
-- `sink`: The fully qualified name of the Sink class, the instance of which 
the framework imports through the reflection plugin.
-- `channel`: The name of the Channel class used by the framework, the instance 
of the entry class of the plugin through reflection.
-
-## Message
-
-Like the general `producer-consumer` model, the `Reader` plugin and the `Sink` 
plugin also use `channel` to achieve data transmission.
-`channel` can be in-memory or persistent, plugins don't have to care. Plugins 
write data to `channel` through `RecordSender` and read data from `channel` 
through `RecordReceiver`.
-
-A piece of data in `channel` is a `Message` object, `Message` contains a byte 
array and attribute data represented by a Map
-
-`Message` has the following methods:
-
-```java
-public interface Message {
-
-    /**
-     * Data content of message.
-     *
-     * @return bytes body
-     */
-    byte[] getBody();
+## Offset control
+```
+    protected class SourceData {
 
-    /**
-     * Data attribute of message
-     *
-     * @return map header
-     */
-    Map<String, String> getHeader();
-}
+        private byte[] data;
+        private Long offset;
+    }
+```
+```
+    protected List<SourceData> readFromSource() {
+        return null;
+    }
+```
+We can see that when the Source reads data, each piece of data will record its 
corresponding Offset. This Offset will be automatically recorded by the Agent 
after the Sink is successfully written.
+When Source is initialized, its corresponding Offset will be automatically 
read and stored in the member variable offsetProfile of AbstractSource. You can 
use offsetProfile.getOffset() to
+Get its Offset for initializing the data source.
+```
+       protected void initOffset() {
+        offsetProfile = OffsetManager.getInstance().getOffset(taskId, 
instanceId);
+    }
 ```
 
-Developers can expand customized Message according to this interface. For 
example, ProxyMessage contains InLongGroupId, InLongStreamId and other 
attributes
\ No newline at end of file
+## Test
+- **Audit Metrics Alignment**
+It is required that the three indicators of Agent collection, Agent sending, 
and DataProxy receiving are completely aligned.
+![](img/agent_audit.png)
\ No newline at end of file
diff --git a/docs/design_and_concept/img/Agent_Flow.png 
b/docs/design_and_concept/img/Agent_Flow.png
deleted file mode 100644
index 208e7298d9..0000000000
Binary files a/docs/design_and_concept/img/Agent_Flow.png and /dev/null differ
diff --git a/docs/design_and_concept/img/agent_audit.png 
b/docs/design_and_concept/img/agent_audit.png
new file mode 100644
index 0000000000..71c5d65732
Binary files /dev/null and b/docs/design_and_concept/img/agent_audit.png differ
diff --git a/docs/design_and_concept/img/agent_basic_concepts.png 
b/docs/design_and_concept/img/agent_basic_concepts.png
new file mode 100644
index 0000000000..b66098c8b3
Binary files /dev/null and 
b/docs/design_and_concept/img/agent_basic_concepts.png differ
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_agent.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_agent.md
index c714dcb3e2..c995ceead7 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_agent.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_agent.md
@@ -2,218 +2,156 @@
 title: Agent 插件
 sidebar_position: 3
 ---
+## 概述
+在 Standard Architecture 中,我们可以通过 InLong Agent 来采集各种类型的数据源。InLong Agent 
支持以插件的方式扩展新的采集类型,本文将指导开发者如何自定义新的 Agent 采集数据源插件。
+
+## 核心概念
+### Task 和 Instance
+Task 和 Instance 是 Agent 最核心的两个概念,简单理解:Task 对应管理平台上配置的一个采集任务,而 Instance 则是由 
Task 生成的一个具体的采集实例。举个例子,管理平台上有个采集任务的配置: `127.0.0.1 -> 
/data/log/YYMMDDhh.log._[0-9]+`,表示用户需要在  `127.0.0.1` 这台机器上采集符合 
`/data/log/YYMMDDhh.log._[0-9]+`,这个路径规则的数据,**这就是一个 Task**。这个 Task 
会根据这个路径规则去寻找满足条件的文件,**为每个符合条件的文件生成一个对应的 
Instance**,比如说有`/data/log/2024040221.log.0,/data/log/2024040221.log.1,/data/log/2024040221.log.3`
 3个文件,那么 Task 就会生成 3 个 Instance 分别采集这三个文件的数据。
+![](img/agent_basic_concepts.png)
+
+### Source 和 Sink
+Source 和 Sink 属于 Instance 下一级的概念,可以简单理解为每个 Instance 都有一个 Source 和 一个 
Sink。顾名思义,Source 用于从数据源读取数据;Sink 用于向目标存储写入数据。
+
+## 开发流程(以 Pulsar 为例)
+### 主流程
+- 新增 Task:实现初始化、销毁、配置校验等逻辑。
+- 新增 Instance:实现节点信息设置等逻辑。
+- 新增 Source:实现初始化、销毁、采集数据、提供数据等逻辑。
+- 新增 Sink:实现初始化、销毁、数据输入、数据输出等逻辑(本文只针对新增数据源,Sink 不做介绍,默认 Sink 是 ProxySink)
+
+### 新增 Task
+这里就是要在 org.apache.inlong.agent.plugin.task 新增一个 PulsarTask 类。
+```
+public class PulsarTask extends AbstractTask {
 
-## 总览
-
-在 Standard Architecture 中,我们可以通过 InLong Agent 来采集各种类型的数据源。InLong Agent 
支持以插件的方式扩展新的采集类型,本文将指导开发者如何自定义新的 Agent 采集插件。
-
-## 概念和模型
-
-InLong Agent 是一个数据采集框架,采用 `Job` + `Task` 架构模型,将数据源读取和写入抽象成为 Reader/Sink 插件。
-
-- `Job`: `Job`是 Agent 用以描述从一个源头到一个目的端的同步作业,是 Agent 
数据同步的最小业务单元。比如:读取一个文件目录下的所有文件
-- `Task`: `Task`是把`Job`拆分得到的最小执行单元。比如:文件夹下有多个文件需要被读取,那么一个 job 会被拆分成为多个 task 
,每个 task 读取对应的文件
-
-一个 Task 包含以下组件:
-
-- Reader:数据采集模块,负责采集数据源的数据,将数据发送给 Channel。
-- Sink: 数据写入模块,负责不断向 Channel 取数据,并将数据写入到目的端。
-- Channel:连接 Reader 和 Sink,作为两者的数据传输通道,并起到了数据的写入读取监控作用。
-
-当扩展一个 Agent 插件时,需要开发特定的 Source、Reader 以及 Sink,数据如果需要持久化到本地磁盘,使用持久化 Channel 
,如果否则使用内存 Channel
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        return false;
+    }
 
-## 流程图示
+    @Override
+    protected void initTask() {
 
-上述介绍的 Job/Task/Reader/Sink/Channel 概念可以用下图表示:
+    }
 
-<div align="center">
+    @Override
+    protected List<InstanceProfile> getNewInstanceList() {
+        return null;
+    }
+}
+```
+- initTask:初始化,比如文件采集可以在初始化时进行文件夹监听。
+- isProfilevalid:判断任务配置是否合法,每种类型任务的配置会有不同的限制,可以在这里实现。
+- releaseTask:释放任务资源,比如文件采集可以在这里取消文件夹监听。
+- getNewInstanceList:获取新的实例,比如文件采集时发现有新的文件可以采集。
 
-![](img/Agent_Flow.png)
-</div>
+### 新增 Instance
+在 `org.apache.inlong.agent.plugin.instance` 增加 PulsarInstance 
类,这个类会比较空闲,主要逻辑都是在 CommonInstance 基类里。作用是创建 Source、Sink,从 Source 读数据,然后写入 
Sink。我们这里只要实现一下 setInodeInfo 接口即可。除了 FileInstance 比较特殊需要设置文件的 Inode Info,其余的 
Instance 类都只要设置成空字符串即可。
+```
+public class PulsarInstance extends CommonInstance {
 
-- 用户提交 Job(通过 manager),Job 中定义了需要使用的 Source, Channel, Sink(通过类的全限定名定义)
-- 框架启动 Job,通过反射机制创建出 Source
-- 框架启动 Source,并调用 Source 的 Split 接口,生成一个或者多个 Task
-- 生成一个 Task 时,同时生成 Reader(一种类型的 Source 会生成对应的 reader),用户配置的 Channel 以及用户配置的 
Sink
-- Task 开始执行,Reader 开始读取数据到 Channel,Sink 从 Channel 中取数进行发送
-- Job 和 Task 执行时所需要的所有信息都封装在 JobProfile 中
+    @Override
+    public void setInodeInfo(InstanceProfile profile) {
+        profile.set(TaskConstants.INODE_INFO, "");
+    }
+}
+```
 
-## 开发流程
+### 新增 Source
+在 `org.apache.inlong.agent.plugin.sources 增加 PulsarSource` 类:
+```
+public class PulsarSource extends AbstractSource {
 
-- 首先开发 Source , 实现 Split 逻辑,返回 Reader 列表
-- 开发对应的 Reader ,实现读取数据并写入到 Channel 的逻辑
-- 开发对应的 Sink , 实现从 Channel 中取数并写入到指定 Sink 中的逻辑
+    @Override
+    public boolean sourceExist() {
+        return false;
+    }
 
-## 接口
+    @Override
+    protected void initSource(InstanceProfile profile) {
 
-下面将介绍开发一个 Agent 插件需要知道的类与接口。
+    }
 
-### Reader
-```java
-private class ReaderImpl implements Reader {
+    @Override
+    protected void printCurrentState() {
 
-    private int count = 0;
+    }
 
     @Override
-    public Message read() {
-        count += 1;
-        return new DefaultMessage("".getBytes(StandardCharsets.UTF_8));
+    protected boolean doPrepareToRead() {
+        return false;
     }
 
     @Override
-    public boolean isFinished() {
-        return count > 99999;
+    protected List<SourceData> readFromSource() {
+        return null;
     }
 
     @Override
-    public String getReadSource() {
+    protected String getThreadName() {
         return null;
     }
 
     @Override
-    public void setReadTimeout(long mill) {
-
+    protected boolean isRunnable() {
+        return false;
     }
-}
-```
-
-`Reader` 接口功能如下:
-- `read`: 被单个 Task 调用,调用后返回读取的一条消息,Agent 内部的消息使用 Message 封装
-- `isFinished`: 判断是否读取完成,举例:如果是 SQL 任务,则判断是否读取完了 ResultSet 
中的所有内容,如果是文件任务,则判断超过用户设置的等待时间后是否还有数据写入
-- `getReadSource`: 获取采集源,举例:如果是文件任务,则返回当前读取的文件名
-- `setReadTimeout`: 设置读取超时时间
-
-### Sink
-
-```java
-public interface Sink extends Stage {
-
-    /**
-     * Write data into data center
-     *
-     * @param message - message
-     */
-    void write(Message message);
-
-    /**
-     * set source file name where the message is generated
-     * @param sourceName
-     */
-    void setSourceName(String sourceName);
-
-    /**
-     * every sink should include a message filter to filter out stream id
-     */
-    MessageFilter initMessageFilter(JobProfile jobConf);
-}
 
-```
-
-`Sink` 接口功能如下:
-- `write`: 被单个 Task 调用,从 Task 中的 Channel 读取一条消息,并写入到特定的存储介质中,以 PulsarSink 
为例,则需要通过 PulsarSender 发送到 Pulsar
-- `setSourceName`: 设置数据源名称,如果是文件,则是文件名
-- `initMessageFilter`: 初始化 MessageFilter , 用户可以在Job配置文件中通过设置 
agent.message.filter.classname 来创建一个消息过滤器来过滤每一条消息,详情可以参考 MessageFilter 接口
-
-
-### Source
-
-```java
-/**
- * Source can be split into multiple reader.
- */
-public interface Source {
-
-    /**
-     * Split source into a list of readers.
-     *
-     * @param conf job conf
-     * @return - list of reader
-     */
-    List<Reader> split(JobProfile conf);
-}
-
-```
+    @Override
+    protected void releaseSource() {
 
-`Source`接口功能如下:
-- `split`: 被单个 Job 调用,产生多个 Reader,举例:一个读取文件任务,匹配文件夹内的多个文件,在 job 启动时,会指定 
TextFileSource 作为 Source 入口,
-  调用 split 函数后,TextFileSource 会检测用户设置的文件夹内有多少符合路径匹配表达式的路径,并生成 TextFileReader 读取
-  
-
-### Job
-
-```java
-public class xxJob {
-    
-  private String username;
-  private String password;
-  private String hostname;
-  private String port;
-  private String database;
-  // ...
-  
-  public static class xxJobConfig {
-
-    private String username;
-    private String password;
-    private String hostname;
-    private String port;
-    private String database;
-    // ...
-  }
+    }
 }
-
 ```
-
-- `config` 中的字段来自 manager,需要与 manager 字段保持一致,在任务配置时,将其转换成对应的 job
-
+- initSource:初始化该数据源的基本资源。
+- sourceExist:返回当前数据源是否存在,例如文件采集时文件是否被删除。
+- printCurrentState:打印当前采集状态,定时调用。
+- doPrepareToRead:在读数据之前可以做一些检查,例如文件采集时文件是否被覆盖。
+- readFromSource:真正从数据源读取数据,例如从 Kafka SDK、Pulsar SDK 消费数据。
+- getThreadName:获取该数据源的工作线程名。
+- isRunnable:返回该数据源是否应该继续。
+- releaseSource:释放该数据源的资源
 
 ## 任务配置
-
-代码写好了,有没有想过框架是怎么找到插件的入口类的?框架是如何加载插件的呢?
-
-在提交任务时,会发现任务中定义了插件的相关信息,包括入口类。例如:
-
-```json
+从上面看我们新建了 Task、Instance、Source 等类,而任务配置就是将这些了类串联起来
+```
 {
-"job": {
-"name": "fileAgentTest",
-"source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
-"sink": "org.apache.inlong.agent.plugin.sinks.ProxySink",
-"channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel"
-}
+    "task.id": "74",
+    "task.groupId": "test_group_pulsar",
+    "task.streamId": "test_stream_pulsar",
+    "task.source": "org.apache.inlong.agent.plugin.sources.PulsarSource",
+    "task.sink": "org.apache.inlong.agent.plugin.sinks.ProxySink",
+    "task.taskClass": "org.apache.inlong.agent.plugin.task.PulsarTask"
 }
 ```
+- task.source:指定了 Source 类
+- task.sink:指定了 Sink 类
+- task.taskClass:指定了 Task 类
 
-- `source`: Source 类的全限定名称,框架通过反射插件入口类的实例。
-- `sink`: Sink 类的全限定名称,框架通过反射插件入口类的实例。
-- `channel`: 使用的 Channel 类名,框架通过反射插件入口类的实例。
-
-## Message
-
-跟一般的`生产者-消费者`模式一样,`Reader`插件和`Sink`插件之间也是通过`channel`来实现数据的传输的。
-`channel`可以是内存的,也可能是持久化的,插件不必关心。插件通过`RecordSender`往`channel`写入数据,通过`RecordReceiver`从`channel`读取数据。
-
-`channel`中的一条数据为一个`Message`的对象,`Message`中包含一个字节数组以及一个Map表示的属性数据
-
-`Message`有如下方法:
-
-```java
-public interface Message {
-
-    /**
-     * Data content of message.
-     *
-     * @return bytes body
-     */
-    byte[] getBody();
+## 位点控制
+```
+    protected class SourceData {
 
-    /**
-     * Data attribute of message
-     *
-     * @return map header
-     */
-    Map<String, String> getHeader();
-}
+        private byte[] data;
+        private Long offset;
+    }
+```
+```
+    protected List<SourceData> readFromSource() {
+        return null;
+    }
+```
+我们可以看到,Source 读取数据时每一条数据都会记录其对应的 Offset,这个 Offset 最终在 Sink 端写入成功后才会由 Agent 
自动记录。
+而在 Source 初始化时会自动读取其对应的 Offset,保存在 AbstractSource 的成员变量 offsetProfile,通过 
offsetProfile.getOffset() 可以
+获得其 Offset 用于初始化数据源。
+```
+       protected void initOffset() {
+        offsetProfile = OffsetManager.getInstance().getOffset(taskId, 
instanceId);
+    }
 ```
 
-开发人员可以根据该接口拓展定制化的 Message ,比如 ProxyMessage 中,就包含了 InLongGroupId, 
InLongStreamId 等属性
\ No newline at end of file
+## 测试
+- **审计指标对齐**
+要求 Agent 采集、Agent 发送、DataProxy 接收 三个指标完全对齐
+![](img/agent_audit.png)
\ No newline at end of file
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/Agent_Flow.png
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/Agent_Flow.png
deleted file mode 100644
index 208e7298d9..0000000000
Binary files 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/Agent_Flow.png
 and /dev/null differ
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/agent_audit.png
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/agent_audit.png
new file mode 100644
index 0000000000..71c5d65732
Binary files /dev/null and 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/agent_audit.png
 differ
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/agent_basic_concepts.png
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/agent_basic_concepts.png
new file mode 100644
index 0000000000..b66098c8b3
Binary files /dev/null and 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/agent_basic_concepts.png
 differ

Reply via email to