This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 03a768fd2 [INLONG-4112][Agent] Support collect data from the specified 
position for MySQL binlog (#4113)
03a768fd2 is described below

commit 03a768fd2e718c27f08f23e7b50bb8791caaf893
Author: Greedyu <[email protected]>
AuthorDate: Fri May 20 10:13:47 2022 +0800

    [INLONG-4112][Agent] Support collect data from the specified position for 
MySQL binlog (#4113)
---
 .../agent/constant/SnapshotModeConstants.java      |  30 +++
 .../apache/inlong/agent/pojo/DebeziumOffset.java   |  61 +++++
 .../agent/utils/DebeziumOffsetSerializer.java      |  43 ++++
 inlong-agent/agent-plugins/pom.xml                 |   5 +
 .../inlong/agent/plugin/message/SchemaRecord.java  |  84 +++++++
 .../agent/plugin/sources/reader/BinlogReader.java  |  85 +++++--
 .../agent/plugin/utils/DatabaseHistoryUtil.java    |  65 ++++++
 .../agent/plugin/utils/InLongDatabaseHistory.java  | 247 +++++++++++++++++++++
 .../plugin/utils/InLongFileOffsetBackingStore.java | 196 ++++++++++++++++
 .../agent/plugin/sources/TestBinlogReader.java     |  79 ++++---
 licenses/inlong-agent/LICENSE                      |   1 +
 pom.xml                                            |   2 +
 12 files changed, 849 insertions(+), 49 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SnapshotModeConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SnapshotModeConstants.java
new file mode 100644
index 000000000..f9c21ab6a
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SnapshotModeConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * Constants of job fetcher snapshot mode
+ */
+public class SnapshotModeConstants {
+
+    public static final String INITIAL = "initial";
+
+    public static final String EARLIEST_OFFSET = "never";
+
+    public static final String SPECIFIC_OFFSETS = "schema_only_recovery";
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumOffset.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumOffset.java
new file mode 100644
index 000000000..d2345cb3d
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumOffset.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import java.util.Map;
+
+/**
+ * The state that the InLong Debezium Consumer holds for each instance.
+ *
+ * <p>This class describes the most basic state that Debezium used for 
recovering based on Kafka
+ * Connect mechanism. It includes a sourcePartition and sourceOffset.
+ *
+ * <p>The sourcePartition represents a single input sourcePartition that the 
record came from (e.g.
+ * a filename, table name, or topic-partition). The sourceOffset represents a 
position in that
+ * sourcePartition which can be used to resume consumption of data.
+ *
+ * <p>These values can have arbitrary structure and should be represented using
+ * org.apache.kafka.connect.data objects (or primitive values). For example, a 
database connector
+ * might specify the sourcePartition as a record containing { "db": 
"database_name", "table":
+ * "table_name"} and the sourceOffset as a Long containing the timestamp of 
the row.
+ */
+public class DebeziumOffset {
+
+    private static final long serialVersionUID = 1L;
+
+    public Map<String, ?> sourcePartition;
+    public Map<String, ?> sourceOffset;
+
+    public void setSourcePartition(Map<String, ?> sourcePartition) {
+        this.sourcePartition = sourcePartition;
+    }
+
+    public void setSourceOffset(Map<String, ?> sourceOffset) {
+        this.sourceOffset = sourceOffset;
+    }
+
+    @Override
+    public String toString() {
+        return "DebeziumOffset{"
+                + "sourcePartition="
+                + sourcePartition
+                + ", sourceOffset="
+                + sourceOffset
+                + '}';
+    }
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DebeziumOffsetSerializer.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DebeziumOffsetSerializer.java
new file mode 100644
index 000000000..5a6e03782
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DebeziumOffsetSerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
+
+import java.io.IOException;
+
+/**
+ * Serializer implementation for a {@link DebeziumOffset}
+ */
+public class DebeziumOffsetSerializer {
+
+    public static final DebeziumOffsetSerializer INSTANCE = new 
DebeziumOffsetSerializer();
+
+    public byte[] serialize(DebeziumOffset debeziumOffset) throws IOException {
+        // we currently use JSON serialization for simplification, as the 
state is very small.
+        // we can improve this in the future if needed
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.writeValueAsBytes(debeziumOffset);
+    }
+
+    public DebeziumOffset deserialize(byte[] bytes) throws IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.readValue(bytes, DebeziumOffset.class);
+    }
+}
diff --git a/inlong-agent/agent-plugins/pom.xml 
b/inlong-agent/agent-plugins/pom.xml
index 80b2546b0..17881c3ac 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -69,6 +69,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.clients.version}</version>
+        </dependency>
         <dependency>
             <groupId>io.debezium</groupId>
             <artifactId>debezium-connector-mysql</artifactId>
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SchemaRecord.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SchemaRecord.java
new file mode 100644
index 000000000..a4877f7db
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SchemaRecord.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.message;
+
+import io.debezium.document.Document;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges.TableChange;
+
+import java.io.IOException;
+
+/**
+ * The Record represents a schema change event, it contains either one {@link 
HistoryRecord} or
+ * {@link TableChange}.
+ */
+public class SchemaRecord {
+
+    private final HistoryRecord historyRecord;
+
+    private final Document tableChangeDoc;
+
+    public SchemaRecord(HistoryRecord historyRecord) {
+        this.historyRecord = historyRecord;
+        this.tableChangeDoc = null;
+    }
+
+    public SchemaRecord(Document document) {
+        if (this.isHistoryRecordDocument(document)) {
+            this.historyRecord = new HistoryRecord(document);
+            this.tableChangeDoc = null;
+        } else {
+            this.tableChangeDoc = document;
+            this.historyRecord = null;
+        }
+
+    }
+
+    public HistoryRecord getHistoryRecord() {
+        return this.historyRecord;
+    }
+
+    public Document getTableChangeDoc() {
+        return this.tableChangeDoc;
+    }
+
+    public boolean isHistoryRecord() {
+        return this.historyRecord != null;
+    }
+
+    public boolean isTableChangeRecord() {
+        return this.tableChangeDoc != null;
+    }
+
+    public Document toDocument() {
+        return this.historyRecord != null ? this.historyRecord.document() : 
this.tableChangeDoc;
+    }
+
+    public String toString() {
+        try {
+            return DocumentWriter.defaultWriter().write(this.toDocument());
+        } catch (IOException var2) {
+            return super.toString();
+        }
+    }
+
+    private boolean isHistoryRecordDocument(Document document) {
+        return (new HistoryRecord(document)).isValid();
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 42b4302c9..9ca78b5f7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources.reader;
 
+import com.google.common.base.Preconditions;
 import com.google.gson.Gson;
 import io.debezium.connector.mysql.MySqlConnector;
 import io.debezium.engine.ChangeEvent;
@@ -27,18 +28,24 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
+import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
+import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
 import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
 import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -55,27 +62,24 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
  */
 public class BinlogReader implements Reader {
 
+    public static final String COMPONENT_NAME = "BinlogReader";
+    public static final String JOB_DATABASE_USER = "job.binlogJob.user";
+    public static final String JOB_DATABASE_PASSWORD = 
"job.binlogJob.password";
+    public static final String JOB_DATABASE_HOSTNAME = 
"job.binlogJob.hostname";
+    public static final String JOB_TABLE_WHITELIST = 
"job.binlogJob.tableWhiteList";
+    public static final String JOB_DATABASE_WHITELIST = 
"job.binlogJob.databaseWhiteList";
+    public static final String JOB_DATABASE_OFFSETS = "job.binlogJob.offsets";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = 
"job.binlogJob.offset.specificOffsetFile";
+    public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = 
"job.binlogJob.offset.specificOffsetPos";
+    public static final String JOB_DATABASE_SERVER_TIME_ZONE = 
"job.binlogJob.serverTimezone";
+    public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = 
"job.binlogJob.offset.intervalMs";
+    public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = 
"job.binlogJob.history.filename";
+    public static final String JOB_DATABASE_INCLUDE_SCHEMA_CHANGES = 
"job.binlogJob.schema";
+    public static final String JOB_DATABASE_SNAPSHOT_MODE = 
"job.binlogJob.snapshot.mode";
+    public static final String JOB_DATABASE_HISTORY_MONITOR_DDL = 
"job.binlogJob.ddl";
+    public static final String JOB_DATABASE_PORT = "job.binlogJob.port";
+    public static final String JOB_DATABASE_QUEUE_SIZE = 
"job.binlogJob.queueSize";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BinlogReader.class);
-
-    private static final String COMPONENT_NAME = "BinlogReader";
-    private static final String JOB_DATABASE_USER = "job.binlogJob.user";
-    private static final String JOB_DATABASE_PASSWORD = 
"job.binlogJob.password";
-    private static final String JOB_DATABASE_HOSTNAME = 
"job.binlogJob.hostname";
-    private static final String JOB_TABLE_WHITELIST = 
"job.binlogJob.tableWhiteList";
-    private static final String JOB_DATABASE_WHITELIST = 
"job.binlogJob.databaseWhiteList";
-
-    private static final String JOB_DATABASE_OFFSETS = "job.binlogJob.offsets";
-    private static final String JOB_DATABASE_OFFSET_FILENAME = 
"job.binlogJob.offset.filename";
-
-    private static final String JOB_DATABASE_SERVER_TIME_ZONE = 
"job.binlogJob.serverTimezone";
-    private static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = 
"job.binlogJob.offset.intervalMs";
-
-    private static final String JOB_DATABASE_STORE_HISTORY_FILENAME = 
"job.binlogJob.history.filename";
-    private static final String JOB_DATABASE_INCLUDE_SCHEMA_CHANGES = 
"job.binlogJob.schema";
-    private static final String JOB_DATABASE_SNAPSHOT_MODE = 
"job.binlogJob.snapshot.mode";
-    private static final String JOB_DATABASE_HISTORY_MONITOR_DDL = 
"job.binlogJob.ddl";
-    private static final String JOB_DATABASE_PORT = "job.binlogJob.port";
-    private static final String JOB_DATABASE_QUEUE_SIZE = 
"job.binlogJob.queueSize";
     private static final Gson gson = new Gson();
     private final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
     /**
@@ -100,6 +104,8 @@ public class BinlogReader implements Reader {
     private String instanceId;
     private ExecutorService executor;
     private String offset;
+    private String specificOffsetFile;
+    private String specificOffsetPos;
     private BinlogSnapshotBase binlogSnapshot;
     private JobProfile jobProfile;
     private boolean destroyed = false;
@@ -152,6 +158,8 @@ public class BinlogReader implements Reader {
         finished = false;
 
         offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
+        specificOffsetFile = 
jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
+        specificOffsetPos = 
jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
         binlogSnapshot = new BinlogSnapshotBase(offsetStoreFileName);
         binlogSnapshot.save(offset);
 
@@ -193,6 +201,7 @@ public class BinlogReader implements Reader {
                         committer.markBatchFinished();
                     } catch (Exception e) {
                         LOGGER.error("parse binlog message error", e);
+
                     }
 
                 })
@@ -228,11 +237,7 @@ public class BinlogReader implements Reader {
         props.setProperty("table.whitelist", tableWhiteList);
         props.setProperty("database.whitelist", databaseWhiteList);
 
-        props.setProperty("offset.storage", 
FileOffsetBackingStore.class.getCanonicalName());
-        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
         props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
-        props.setProperty("database.history", 
FileDatabaseHistory.class.getCanonicalName());
-        props.setProperty("database.history.file.filename", 
databaseStoreHistoryName);
         props.setProperty("database.snapshot.mode", snapshotMode);
         props.setProperty("database.history.store.only.monitored.tables.ddl", 
historyMonitorDdl);
         props.setProperty("database.allowPublicKeyRetrieval", "true");
@@ -240,6 +245,16 @@ public class BinlogReader implements Reader {
         props.setProperty("value.converter.schemas.enable", "false");
         props.setProperty("include.schema.changes", includeSchemaChanges);
         props.setProperty("snapshot.mode", snapshotMode);
+        props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+        props.setProperty("database.history.file.filename", 
databaseStoreHistoryName);
+        if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+            props.setProperty("offset.storage", 
InLongFileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, 
serializeOffset());
+            props.setProperty("database.history", 
InLongDatabaseHistory.class.getCanonicalName());
+        } else {
+            props.setProperty("offset.storage", 
FileOffsetBackingStore.class.getCanonicalName());
+            props.setProperty("database.history", 
FileDatabaseHistory.class.getCanonicalName());
+        }
         props.setProperty("tombstones.on.delete", "false");
         props.setProperty("converters", "datetime");
         props.setProperty("datetime.type", 
"org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
@@ -253,6 +268,28 @@ public class BinlogReader implements Reader {
         return props;
     }
 
+    private String serializeOffset() {
+        Map<String, Object> sourceOffset = new HashMap<>();
+        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be 
null");
+        sourceOffset.put("file", specificOffsetFile);
+        Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+                JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be 
null");
+        sourceOffset.put("pos", specificOffsetPos);
+        DebeziumOffset specificOffset = new DebeziumOffset();
+        specificOffset.setSourceOffset(sourceOffset);
+        Map<String, String> sourcePartition = new HashMap<>();
+        sourcePartition.put("server", instanceId);
+        specificOffset.setSourcePartition(sourcePartition);
+        byte[] serializedOffset = new byte[0];
+        try {
+            serializedOffset = 
DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+        } catch (IOException e) {
+            LOGGER.error("serialize offset message error", e);
+        }
+        return new String(serializedOffset, StandardCharsets.UTF_8);
+    }
+
     @Override
     public void destroy() {
         synchronized (this) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/DatabaseHistoryUtil.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/DatabaseHistoryUtil.java
new file mode 100644
index 000000000..619167e80
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/DatabaseHistoryUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import io.debezium.relational.history.DatabaseHistory;
+import org.apache.inlong.agent.plugin.message.SchemaRecord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Util to safely visit schema history {@link DatabaseHistory}
+ */
+public class DatabaseHistoryUtil {
+
+    private static final Map<String, Collection<SchemaRecord>> HISTORY = new 
HashMap();
+    private static final Map<String, Boolean> HISTORY_CLEANUP_STATUS = new 
HashMap();
+
+    private DatabaseHistoryUtil() {
+    }
+
+    public static void registerHistory(String engineName, 
Collection<SchemaRecord> engineHistory) {
+        synchronized (HISTORY) {
+            HISTORY.put(engineName, engineHistory);
+            HISTORY_CLEANUP_STATUS.put(engineName, false);
+        }
+    }
+
+    public static void removeHistory(String engineName) {
+        synchronized (HISTORY) {
+            HISTORY_CLEANUP_STATUS.put(engineName, true);
+            HISTORY.remove(engineName);
+        }
+    }
+
+    public static Collection<SchemaRecord> retrieveHistory(String engineName) {
+        synchronized (HISTORY) {
+            if (Boolean.TRUE.equals(HISTORY_CLEANUP_STATUS.get(engineName))) {
+                throw new IllegalStateException(String.format(
+                        "Retrieve schema history failed, the schema records 
for engine %s has been removed, "
+                                + "this might because the debezium engine has 
been shutdown due to other errors.",
+                        engineName));
+            } else {
+                return (Collection) HISTORY.getOrDefault(engineName, 
Collections.emptyList());
+            }
+        }
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongDatabaseHistory.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongDatabaseHistory.java
new file mode 100644
index 000000000..29074bbe2
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongDatabaseHistory.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.util.Collect;
+import io.debezium.util.FunctionalReadWriteLock;
+import org.apache.inlong.agent.plugin.message.SchemaRecord;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * A {@link DatabaseHistory} implementation that stores the schema history in 
a local file.
+ */
+public class InLongDatabaseHistory extends AbstractDatabaseHistory {
+
+    public static final String DATABASE_HISTORY_INSTANCE_NAME = 
"database.history.instance.name";
+    public static final Field FILE_PATH = 
Field.create("database.history.file.filename")
+            .withDescription("The path to the file that will be used to record 
the database history").required();
+    private static final Charset UTF8;
+    public static Collection<Field> ALL_FIELDS;
+
+    static {
+        ALL_FIELDS = Collect.arrayListOf(FILE_PATH, new Field[0]);
+        UTF8 = StandardCharsets.UTF_8;
+    }
+
+    private final FunctionalReadWriteLock lock = 
FunctionalReadWriteLock.reentrant();
+    private final DocumentWriter writer = DocumentWriter.defaultWriter();
+    private final DocumentReader reader = DocumentReader.defaultReader();
+    private final AtomicBoolean running = new AtomicBoolean();
+    private ConcurrentLinkedQueue<SchemaRecord> schemaRecords;
+    private String instanceName;
+    private Path path;
+
+    public InLongDatabaseHistory() {
+    }
+
+    public static boolean isCompatible(Collection<SchemaRecord> records) {
+        Iterator var1 = records.iterator();
+        if (var1.hasNext()) {
+            SchemaRecord record = (SchemaRecord) var1.next();
+            if (!record.isHistoryRecord()) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private ConcurrentLinkedQueue<SchemaRecord> 
getRegisteredHistoryRecord(String instanceName) {
+        Collection<SchemaRecord> historyRecords = 
DatabaseHistoryUtil.retrieveHistory(instanceName);
+        return new ConcurrentLinkedQueue(historyRecords);
+    }
+
+    public void configure(Configuration config, HistoryRecordComparator 
comparator, DatabaseHistoryListener listener,
+            boolean useCatalogBeforeSchema) {
+        Collection var10001 = ALL_FIELDS;
+        Logger var10002 = this.logger;
+        Objects.requireNonNull(var10002);
+        if (!config.validateAndRecord(var10001, var10002::error)) {
+            throw new ConnectException("Error configuring an instance of " + 
this.getClass().getSimpleName()
+                    + "; check the logs for details");
+        } else {
+            if (this.running.get()) {
+                throw new IllegalStateException("Database history file already 
initialized to " + this.path);
+            } else {
+                super.configure(config, comparator, listener, 
useCatalogBeforeSchema);
+                this.instanceName = 
config.getString("database.history.instance.name");
+                this.schemaRecords = 
this.getRegisteredHistoryRecord(this.instanceName);
+                DatabaseHistoryUtil.registerHistory(this.instanceName, 
this.schemaRecords);
+                this.path = Paths.get(config.getString(FILE_PATH));
+            }
+        }
+    }
+
+    public void start() {
+        super.start();
+        this.lock.write(() -> {
+            if (this.running.compareAndSet(false, true)) {
+                Path path = this.path;
+                if (path == null) {
+                    throw new IllegalStateException("FileDatabaseHistory must 
be configured before it is started");
+                }
+
+                try {
+                    if (!this.storageExists()) {
+                        if (path.getParent() != null && 
!Files.exists(path.getParent(), new LinkOption[0])) {
+                            Files.createDirectories(path.getParent());
+                        }
+
+                        try {
+                            Files.createFile(path);
+                        } catch (FileAlreadyExistsException var3) {
+                            // do nothing
+                        }
+                    }
+                } catch (IOException var4) {
+                    throw new DatabaseHistoryException(
+                            "Unable to create history file at " + path + ": " 
+ var4.getMessage(), var4);
+                }
+            }
+
+        });
+    }
+
+    public void stop() {
+        this.running.set(false);
+        super.stop();
+        DatabaseHistoryUtil.removeHistory(this.instanceName);
+    }
+
+    protected void storeRecord(HistoryRecord record) throws 
DatabaseHistoryException {
+        if (record != null) {
+            this.lock.write(() -> {
+                if (!this.running.get()) {
+                    throw new IllegalStateException("The history has been 
stopped and will not accept more records");
+                } else {
+                    try {
+                        String line = this.writer.write(record.document());
+
+                        try {
+                            BufferedWriter historyWriter = Files
+                                    .newBufferedWriter(this.path, 
StandardOpenOption.APPEND);
+
+                            label58:
+                            {
+                                try {
+                                    try {
+                                        historyWriter.append(line);
+                                        historyWriter.newLine();
+                                        this.schemaRecords.add(new 
SchemaRecord(record));
+                                        break label58;
+                                    } catch (IOException var7) {
+                                        this.logger.error("Failed to add 
record to history at {}: {}",
+                                                new Object[]{this.path, 
record, var7});
+                                    }
+                                } catch (Throwable var8) {
+                                    if (historyWriter != null) {
+                                        try {
+                                            historyWriter.close();
+                                        } catch (Throwable var6) {
+                                            var8.addSuppressed(var6);
+                                        }
+                                    }
+
+                                    throw var8;
+                                }
+
+                                if (historyWriter != null) {
+                                    historyWriter.close();
+                                }
+
+                                return;
+                            }
+
+                            if (historyWriter != null) {
+                                historyWriter.close();
+                            }
+                        } catch (IOException var9) {
+                            throw new DatabaseHistoryException(
+                                    "Unable to create writer for history file 
" + this.path + ": " + var9.getMessage(),
+                                    var9);
+                        }
+                    } catch (IOException var10) {
+                        this.logger.error("Failed to convert record to string: 
{}", record, var10);
+                    }
+
+                }
+            });
+        }
+    }
+
+    protected synchronized void recoverRecords(Consumer<HistoryRecord> 
records) {
+        this.lock.write(() -> {
+            try {
+                if (this.exists()) {
+                    Iterator var2 = Files.readAllLines(this.path, 
UTF8).iterator();
+
+                    while (var2.hasNext()) {
+                        String line = (String) var2.next();
+                        if (line != null && !line.isEmpty()) {
+                            records.accept(new 
HistoryRecord(this.reader.read(line)));
+                        }
+                    }
+                }
+            } catch (IOException var4) {
+                this.logger.error("Failed to add recover records from history 
at {}", this.path, var4);
+            }
+
+        });
+        
this.schemaRecords.stream().map(SchemaRecord::getHistoryRecord).forEach(records);
+    }
+
+    public boolean exists() {
+        return !this.schemaRecords.isEmpty();
+    }
+
+    public boolean storageExists() {
+        return Files.exists(path);
+    }
+
+    public String toString() {
+        return "file " + (this.path != null ? this.path : "(unstarted)");
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongFileOffsetBackingStore.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongFileOffsetBackingStore.java
new file mode 100644
index 000000000..c9651ab93
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongFileOffsetBackingStore.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import io.debezium.embedded.EmbeddedEngine;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.util.SafeObjectInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implementation of OffsetBackingStore that saves data locally to a file. To 
ensure this behaves
+ * similarly to a real backing store, operations are executed asynchronously 
on a background thread.
+ * The offset position can be specified
+ */
+public class InLongFileOffsetBackingStore extends MemoryOffsetBackingStore {
+
+    public static final String OFFSET_STATE_VALUE = 
"offset.storage.inlong.state.value";
+    public static final int FLUSH_TIMEOUT_SECONDS = 10;
+    private static final Logger log = 
LoggerFactory.getLogger(FileOffsetBackingStore.class);
+    private File file;
+
+    public InLongFileOffsetBackingStore() {
+
+    }
+
+    @Override
+    public void configure(WorkerConfig config) {
+        super.configure(config);
+        file = new 
File(config.getString(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG));
+        // eagerly initialize the executor, because OffsetStorageWriter will 
use it later
+        start();
+
+        Map<String, ?> conf = config.originals();
+        if (!conf.containsKey(OFFSET_STATE_VALUE)) {
+            // a normal startup from clean state, not need to initialize the 
offset
+            return;
+        }
+
+        String stateJson = (String) conf.get(OFFSET_STATE_VALUE);
+        DebeziumOffsetSerializer serializer = new DebeziumOffsetSerializer();
+        DebeziumOffset debeziumOffset;
+        try {
+            debeziumOffset = 
serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8));
+        } catch (IOException e) {
+            log.error("Can't deserialize debezium offset state from JSON: " + 
stateJson, e);
+            throw new RuntimeException(e);
+        }
+
+        final String engineName = (String) 
conf.get(EmbeddedEngine.ENGINE_NAME.name());
+        Converter keyConverter = new JsonConverter();
+        Converter valueConverter = new JsonConverter();
+        keyConverter.configure(config.originals(), true);
+        Map<String, Object> valueConfigs = new HashMap<>(conf);
+        valueConfigs.put("schemas.enable", false);
+        valueConverter.configure(valueConfigs, true);
+        OffsetStorageWriter offsetWriter =
+                new OffsetStorageWriter(
+                        this,
+                        // must use engineName as namespace to align with 
Debezium Engine
+                        // implementation
+                        engineName,
+                        keyConverter,
+                        valueConverter);
+
+        offsetWriter.offset(debeziumOffset.sourcePartition, 
debeziumOffset.sourceOffset);
+
+        // flush immediately
+        if (!offsetWriter.beginFlush()) {
+            // if nothing is needed to be flushed, there must be something 
wrong with the
+            // initialization
+            log.warn(
+                    "Initialize InLongFileOffsetBackingStore from empty offset 
state, this shouldn't happen.");
+            return;
+        }
+
+        // trigger flushing
+        Future<Void> flushFuture =
+                offsetWriter.doFlush(
+                        (error, result) -> {
+                            if (error != null) {
+                                log.error("Failed to flush initial offset.", 
error);
+                            } else {
+                                log.debug("Successfully flush initial 
offset.");
+                            }
+                        });
+
+        // wait until flushing finished
+        try {
+            flushFuture.get(FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            log.info(
+                    "Flush offsets successfully, partition: {}, offsets: {}",
+                    debeziumOffset.sourcePartition,
+                    debeziumOffset.sourceOffset);
+        } catch (InterruptedException e) {
+            log.warn("Flush offsets interrupted, cancelling.", e);
+            offsetWriter.cancelFlush();
+        } catch (ExecutionException e) {
+            log.error("Flush offsets threw an unexpected exception.", e);
+            offsetWriter.cancelFlush();
+        } catch (TimeoutException e) {
+            log.error("Timed out waiting to flush offsets to storage.", e);
+            offsetWriter.cancelFlush();
+        }
+    }
+
+    @Override
+    public synchronized void start() {
+        super.start();
+        log.info("Starting FileOffsetBackingStore with file {}", file);
+        load();
+    }
+
+    @Override
+    public synchronized void stop() {
+        super.stop();
+        // Nothing to do since this doesn't maintain any outstanding 
connections/data
+        log.info("Stopped FileOffsetBackingStore");
+    }
+
+    @SuppressWarnings("unchecked")
+    private void load() {
+        try (SafeObjectInputStream is = new 
SafeObjectInputStream(Files.newInputStream(file.toPath()))) {
+            Object obj = is.readObject();
+            if (!(obj instanceof HashMap)) {
+                throw new ConnectException("Expected HashMap but found " + 
obj.getClass());
+            }
+            Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
+            data = new HashMap<>();
+            for (Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
+                ByteBuffer key = (mapEntry.getKey() != null) ? 
ByteBuffer.wrap(mapEntry.getKey()) : null;
+                ByteBuffer value = (mapEntry.getValue() != null) ? 
ByteBuffer.wrap(mapEntry.getValue()) : null;
+                data.put(key, value);
+            }
+        } catch (NoSuchFileException | EOFException e) {
+            // NoSuchFileException: Ignore, may be new.
+            // EOFException: Ignore, this means the file was missing or corrupt
+        } catch (IOException | ClassNotFoundException e) {
+            throw new ConnectException(e);
+        }
+    }
+
+    @Override
+    protected void save() {
+        try (ObjectOutputStream os = new 
ObjectOutputStream(Files.newOutputStream(file.toPath()))) {
+            Map<byte[], byte[]> raw = new HashMap<>();
+            for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) 
{
+                byte[] key = (mapEntry.getKey() != null) ? 
mapEntry.getKey().array() : null;
+                byte[] value = (mapEntry.getValue() != null) ? 
mapEntry.getValue().array() : null;
+                raw.put(key, value);
+            }
+            os.writeObject(raw);
+        } catch (IOException e) {
+            throw new ConnectException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogReader.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogReader.java
index 120b895bc..8c71a79a4 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogReader.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogReader.java
@@ -18,10 +18,16 @@
 package org.apache.inlong.agent.plugin.sources;
 
 import com.google.gson.Gson;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
+import org.apache.inlong.agent.plugin.sources.reader.BinlogReader;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+
 public class TestBinlogReader {
 
     private static Gson gson = new Gson();
@@ -29,32 +35,55 @@ public class TestBinlogReader {
     @Test
     public void testDebeziumFormat() {
         String debeziumJson = "{\n"
-            + "    \"before\": null,\n"
-            + "    \"after\": {\n"
-            + "      \"id\": 1004,\n"
-            + "      \"first_name\": \"Anne\",\n"
-            + "      \"last_name\": \"Kretchmar\",\n"
-            + "      \"email\": \"[email protected]\"\n"
-            + "    },\n"
-            + "    \"source\": {\n"
-            + "      \"version\": \"1.8.1.Final\",\n"
-            + "      \"name\": \"dbserver1\",\n"
-            + "      \"server_id\": 0,\n"
-            + "      \"ts_sec\": 0,\n"
-            + "      \"gtid\": null,\n"
-            + "      \"file\": \"mysql-bin.000003\",\n"
-            + "      \"pos\": 154,\n"
-            + "      \"row\": 0,\n"
-            + "      \"snapshot\": true,\n"
-            + "      \"thread\": null,\n"
-            + "      \"db\": \"inventory\",\n"
-            + "      \"table\": \"customers\"\n"
-            + "    },\n"
-            + "    \"op\": \"r\",\n"
-            + "    \"ts_ms\": 1486500577691\n"
-            + "  }";
+                + "    \"before\": null,\n"
+                + "    \"after\": {\n"
+                + "      \"id\": 1004,\n"
+                + "      \"first_name\": \"Anne\",\n"
+                + "      \"last_name\": \"Kretchmar\",\n"
+                + "      \"email\": \"[email protected]\"\n"
+                + "    },\n"
+                + "    \"source\": {\n"
+                + "      \"version\": \"1.8.1.Final\",\n"
+                + "      \"name\": \"dbserver1\",\n"
+                + "      \"server_id\": 0,\n"
+                + "      \"ts_sec\": 0,\n"
+                + "      \"gtid\": null,\n"
+                + "      \"file\": \"mysql-bin.000003\",\n"
+                + "      \"pos\": 154,\n"
+                + "      \"row\": 0,\n"
+                + "      \"snapshot\": true,\n"
+                + "      \"thread\": null,\n"
+                + "      \"db\": \"inventory\",\n"
+                + "      \"table\": \"customers\"\n"
+                + "    },\n"
+                + "    \"op\": \"r\",\n"
+                + "    \"ts_ms\": 1486500577691\n"
+                + "  }";
         DebeziumFormat debeziumFormat = gson
-            .fromJson(debeziumJson, DebeziumFormat.class);
+                .fromJson(debeziumJson, DebeziumFormat.class);
         Assert.assertEquals("customers", 
debeziumFormat.getSource().getTable());
     }
+
+    // @Test
+    public void binlogStartSpacialTest() throws Exception {
+        JobProfile jobProfile = new JobProfile();
+        jobProfile.set(BinlogReader.JOB_DATABASE_USER, "root");
+        jobProfile.set(BinlogReader.JOB_DATABASE_PASSWORD, "123456");
+        jobProfile.set(BinlogReader.JOB_DATABASE_HOSTNAME, "");
+        jobProfile.set(BinlogReader.JOB_DATABASE_PORT, "3307");
+        jobProfile.set(BinlogReader.JOB_DATABASE_WHITELIST, "etl");
+        jobProfile.set(BinlogReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, 
"mysql-bin.000001");
+        jobProfile.set(BinlogReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, 
"5641");
+        jobProfile.set(BinlogReader.JOB_DATABASE_SNAPSHOT_MODE, 
SnapshotModeConstants.SPECIFIC_OFFSETS);
+        // jobProfile.set(BinlogReader.JOB_DATABASE_STORE_HISTORY_FILENAME, 
"");
+        jobProfile.set(BinlogReader.JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, 
String.valueOf(6000L));
+        jobProfile.set("job.instance.id", "_1");
+        jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
+        jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
+        BinlogReader binlogReader = new BinlogReader();
+        binlogReader.init(jobProfile);
+        while (true) {
+
+        }
+    }
 }
diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE
index 96e9fb054..e5ec86824 100644
--- a/licenses/inlong-agent/LICENSE
+++ b/licenses/inlong-agent/LICENSE
@@ -614,6 +614,7 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
   org.eclipse.jetty:jetty-util-ajax:9.4.44.v20210927 - Jetty :: Utilities :: 
Ajax(JSON) (https://eclipse.org/jetty/jetty-util-ajax), (Apache Software 
License - Version 2.0;  Eclipse Public License - Version 1.0)
   org.apache.kafka:kafka_2.11:2.4.1 - Apache Kafka (https://kafka.apache.org), 
(The Apache Software License, Version 2.0)
   org.apache.kafka:kafka-clients:2.4.1 - Apache Kafka 
(https://kafka.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.kafka:kafka-clients:3.0.0 - Apache Kafka 
(https://kafka.apache.org), (The Apache Software License, Version 2.0)
   org.apache.thrift:libthrift:0.9.3 - Apache Thrift 
(http://thrift.apache.org),  (The Apache Software License, Version 2.0)
   log4j:log4j:1.2.17 - Apache Log4j (http://logging.apache.org/log4j/1.2/), 
(The Apache Software License, Version 2.0)
   org.apache.logging.log4j:log4j-core:2.17.2 - Apache Log4j Core 
(https://logging.apache.org/log4j/2.x/log4j-core/), (Apache License, Version 
2.0)
diff --git a/pom.xml b/pom.xml
index cd5514fe1..778bff0a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -230,6 +230,8 @@
         <tencentcloud.cls.version>1.0.5</tencentcloud.cls.version>
         <esri-geometry-api.version>2.0.0</esri-geometry-api.version>
         <HikariCP.version>4.0.3</HikariCP.version>
+
+        <kafka.clients.version>3.0.0</kafka.clients.version>
     </properties>
 
     <dependencyManagement>

Reply via email to