This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 03a768fd2 [INLONG-4112][Agent] Support collect data from the specified
position for MySQL binlog (#4113)
03a768fd2 is described below
commit 03a768fd2e718c27f08f23e7b50bb8791caaf893
Author: Greedyu <[email protected]>
AuthorDate: Fri May 20 10:13:47 2022 +0800
[INLONG-4112][Agent] Support collect data from the specified position for
MySQL binlog (#4113)
---
.../agent/constant/SnapshotModeConstants.java | 30 +++
.../apache/inlong/agent/pojo/DebeziumOffset.java | 61 +++++
.../agent/utils/DebeziumOffsetSerializer.java | 43 ++++
inlong-agent/agent-plugins/pom.xml | 5 +
.../inlong/agent/plugin/message/SchemaRecord.java | 84 +++++++
.../agent/plugin/sources/reader/BinlogReader.java | 85 +++++--
.../agent/plugin/utils/DatabaseHistoryUtil.java | 65 ++++++
.../agent/plugin/utils/InLongDatabaseHistory.java | 247 +++++++++++++++++++++
.../plugin/utils/InLongFileOffsetBackingStore.java | 196 ++++++++++++++++
.../agent/plugin/sources/TestBinlogReader.java | 79 ++++---
licenses/inlong-agent/LICENSE | 1 +
pom.xml | 2 +
12 files changed, 849 insertions(+), 49 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SnapshotModeConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SnapshotModeConstants.java
new file mode 100644
index 000000000..f9c21ab6a
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/SnapshotModeConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * Constants of job fetcher snapshot mode
+ */
+public class SnapshotModeConstants {
+
+ public static final String INITIAL = "initial";
+
+ public static final String EARLIEST_OFFSET = "never";
+
+ public static final String SPECIFIC_OFFSETS = "schema_only_recovery";
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumOffset.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumOffset.java
new file mode 100644
index 000000000..d2345cb3d
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumOffset.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import java.util.Map;
+
+/**
+ * The state that the InLong Debezium Consumer holds for each instance.
+ *
+ * <p>This class describes the most basic state that Debezium used for
recovering based on Kafka
+ * Connect mechanism. It includes a sourcePartition and sourceOffset.
+ *
+ * <p>The sourcePartition represents a single input sourcePartition that the
record came from (e.g.
+ * a filename, table name, or topic-partition). The sourceOffset represents a
position in that
+ * sourcePartition which can be used to resume consumption of data.
+ *
+ * <p>These values can have arbitrary structure and should be represented using
+ * org.apache.kafka.connect.data objects (or primitive values). For example, a
database connector
+ * might specify the sourcePartition as a record containing { "db":
"database_name", "table":
+ * "table_name"} and the sourceOffset as a Long containing the timestamp of
the row.
+ */
+public class DebeziumOffset {
+
+ private static final long serialVersionUID = 1L;
+
+ public Map<String, ?> sourcePartition;
+ public Map<String, ?> sourceOffset;
+
+ public void setSourcePartition(Map<String, ?> sourcePartition) {
+ this.sourcePartition = sourcePartition;
+ }
+
+ public void setSourceOffset(Map<String, ?> sourceOffset) {
+ this.sourceOffset = sourceOffset;
+ }
+
+ @Override
+ public String toString() {
+ return "DebeziumOffset{"
+ + "sourcePartition="
+ + sourcePartition
+ + ", sourceOffset="
+ + sourceOffset
+ + '}';
+ }
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DebeziumOffsetSerializer.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DebeziumOffsetSerializer.java
new file mode 100644
index 000000000..5a6e03782
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DebeziumOffsetSerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
+
+import java.io.IOException;
+
+/**
+ * Serializer implementation for a {@link DebeziumOffset}
+ */
+public class DebeziumOffsetSerializer {
+
+ public static final DebeziumOffsetSerializer INSTANCE = new
DebeziumOffsetSerializer();
+
+ public byte[] serialize(DebeziumOffset debeziumOffset) throws IOException {
+ // we currently use JSON serialization for simplification, as the
state is very small.
+ // we can improve this in the future if needed
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.writeValueAsBytes(debeziumOffset);
+ }
+
+ public DebeziumOffset deserialize(byte[] bytes) throws IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.readValue(bytes, DebeziumOffset.class);
+ }
+}
diff --git a/inlong-agent/agent-plugins/pom.xml
b/inlong-agent/agent-plugins/pom.xml
index 80b2546b0..17881c3ac 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -69,6 +69,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.clients.version}</version>
+ </dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SchemaRecord.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SchemaRecord.java
new file mode 100644
index 000000000..a4877f7db
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SchemaRecord.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.message;
+
+import io.debezium.document.Document;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges.TableChange;
+
+import java.io.IOException;
+
+/**
+ * The Record represents a schema change event, it contains either one {@link
HistoryRecord} or
+ * {@link TableChange}.
+ */
+public class SchemaRecord {
+
+ private final HistoryRecord historyRecord;
+
+ private final Document tableChangeDoc;
+
+ public SchemaRecord(HistoryRecord historyRecord) {
+ this.historyRecord = historyRecord;
+ this.tableChangeDoc = null;
+ }
+
+ public SchemaRecord(Document document) {
+ if (this.isHistoryRecordDocument(document)) {
+ this.historyRecord = new HistoryRecord(document);
+ this.tableChangeDoc = null;
+ } else {
+ this.tableChangeDoc = document;
+ this.historyRecord = null;
+ }
+
+ }
+
+ public HistoryRecord getHistoryRecord() {
+ return this.historyRecord;
+ }
+
+ public Document getTableChangeDoc() {
+ return this.tableChangeDoc;
+ }
+
+ public boolean isHistoryRecord() {
+ return this.historyRecord != null;
+ }
+
+ public boolean isTableChangeRecord() {
+ return this.tableChangeDoc != null;
+ }
+
+ public Document toDocument() {
+ return this.historyRecord != null ? this.historyRecord.document() :
this.tableChangeDoc;
+ }
+
+ public String toString() {
+ try {
+ return DocumentWriter.defaultWriter().write(this.toDocument());
+ } catch (IOException var2) {
+ return super.toString();
+ }
+ }
+
+ private boolean isHistoryRecordDocument(Document document) {
+ return (new HistoryRecord(document)).isValid();
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 42b4302c9..9ca78b5f7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.sources.reader;
+import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.engine.ChangeEvent;
@@ -27,18 +28,24 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
+import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
+import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -55,27 +62,24 @@ import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
*/
public class BinlogReader implements Reader {
+ public static final String COMPONENT_NAME = "BinlogReader";
+ public static final String JOB_DATABASE_USER = "job.binlogJob.user";
+ public static final String JOB_DATABASE_PASSWORD =
"job.binlogJob.password";
+ public static final String JOB_DATABASE_HOSTNAME =
"job.binlogJob.hostname";
+ public static final String JOB_TABLE_WHITELIST =
"job.binlogJob.tableWhiteList";
+ public static final String JOB_DATABASE_WHITELIST =
"job.binlogJob.databaseWhiteList";
+ public static final String JOB_DATABASE_OFFSETS = "job.binlogJob.offsets";
+ public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE =
"job.binlogJob.offset.specificOffsetFile";
+ public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS =
"job.binlogJob.offset.specificOffsetPos";
+ public static final String JOB_DATABASE_SERVER_TIME_ZONE =
"job.binlogJob.serverTimezone";
+ public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS =
"job.binlogJob.offset.intervalMs";
+ public static final String JOB_DATABASE_STORE_HISTORY_FILENAME =
"job.binlogJob.history.filename";
+ public static final String JOB_DATABASE_INCLUDE_SCHEMA_CHANGES =
"job.binlogJob.schema";
+ public static final String JOB_DATABASE_SNAPSHOT_MODE =
"job.binlogJob.snapshot.mode";
+ public static final String JOB_DATABASE_HISTORY_MONITOR_DDL =
"job.binlogJob.ddl";
+ public static final String JOB_DATABASE_PORT = "job.binlogJob.port";
+ public static final String JOB_DATABASE_QUEUE_SIZE =
"job.binlogJob.queueSize";
private static final Logger LOGGER =
LoggerFactory.getLogger(BinlogReader.class);
-
- private static final String COMPONENT_NAME = "BinlogReader";
- private static final String JOB_DATABASE_USER = "job.binlogJob.user";
- private static final String JOB_DATABASE_PASSWORD =
"job.binlogJob.password";
- private static final String JOB_DATABASE_HOSTNAME =
"job.binlogJob.hostname";
- private static final String JOB_TABLE_WHITELIST =
"job.binlogJob.tableWhiteList";
- private static final String JOB_DATABASE_WHITELIST =
"job.binlogJob.databaseWhiteList";
-
- private static final String JOB_DATABASE_OFFSETS = "job.binlogJob.offsets";
- private static final String JOB_DATABASE_OFFSET_FILENAME =
"job.binlogJob.offset.filename";
-
- private static final String JOB_DATABASE_SERVER_TIME_ZONE =
"job.binlogJob.serverTimezone";
- private static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS =
"job.binlogJob.offset.intervalMs";
-
- private static final String JOB_DATABASE_STORE_HISTORY_FILENAME =
"job.binlogJob.history.filename";
- private static final String JOB_DATABASE_INCLUDE_SCHEMA_CHANGES =
"job.binlogJob.schema";
- private static final String JOB_DATABASE_SNAPSHOT_MODE =
"job.binlogJob.snapshot.mode";
- private static final String JOB_DATABASE_HISTORY_MONITOR_DDL =
"job.binlogJob.ddl";
- private static final String JOB_DATABASE_PORT = "job.binlogJob.port";
- private static final String JOB_DATABASE_QUEUE_SIZE =
"job.binlogJob.queueSize";
private static final Gson gson = new Gson();
private final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
/**
@@ -100,6 +104,8 @@ public class BinlogReader implements Reader {
private String instanceId;
private ExecutorService executor;
private String offset;
+ private String specificOffsetFile;
+ private String specificOffsetPos;
private BinlogSnapshotBase binlogSnapshot;
private JobProfile jobProfile;
private boolean destroyed = false;
@@ -152,6 +158,8 @@ public class BinlogReader implements Reader {
finished = false;
offset = jobConf.get(JOB_DATABASE_OFFSETS, "");
+ specificOffsetFile =
jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "");
+ specificOffsetPos =
jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1");
binlogSnapshot = new BinlogSnapshotBase(offsetStoreFileName);
binlogSnapshot.save(offset);
@@ -193,6 +201,7 @@ public class BinlogReader implements Reader {
committer.markBatchFinished();
} catch (Exception e) {
LOGGER.error("parse binlog message error", e);
+
}
})
@@ -228,11 +237,7 @@ public class BinlogReader implements Reader {
props.setProperty("table.whitelist", tableWhiteList);
props.setProperty("database.whitelist", databaseWhiteList);
- props.setProperty("offset.storage",
FileOffsetBackingStore.class.getCanonicalName());
- props.setProperty("offset.storage.file.filename", offsetStoreFileName);
props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs);
- props.setProperty("database.history",
FileDatabaseHistory.class.getCanonicalName());
- props.setProperty("database.history.file.filename",
databaseStoreHistoryName);
props.setProperty("database.snapshot.mode", snapshotMode);
props.setProperty("database.history.store.only.monitored.tables.ddl",
historyMonitorDdl);
props.setProperty("database.allowPublicKeyRetrieval", "true");
@@ -240,6 +245,16 @@ public class BinlogReader implements Reader {
props.setProperty("value.converter.schemas.enable", "false");
props.setProperty("include.schema.changes", includeSchemaChanges);
props.setProperty("snapshot.mode", snapshotMode);
+ props.setProperty("offset.storage.file.filename", offsetStoreFileName);
+ props.setProperty("database.history.file.filename",
databaseStoreHistoryName);
+ if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) {
+ props.setProperty("offset.storage",
InLongFileOffsetBackingStore.class.getCanonicalName());
+ props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE,
serializeOffset());
+ props.setProperty("database.history",
InLongDatabaseHistory.class.getCanonicalName());
+ } else {
+ props.setProperty("offset.storage",
FileOffsetBackingStore.class.getCanonicalName());
+ props.setProperty("database.history",
FileDatabaseHistory.class.getCanonicalName());
+ }
props.setProperty("tombstones.on.delete", "false");
props.setProperty("converters", "datetime");
props.setProperty("datetime.type",
"org.apache.inlong.agent.plugin.utils.BinlogTimeConverter");
@@ -253,6 +268,28 @@ public class BinlogReader implements Reader {
return props;
}
+ private String serializeOffset() {
+ Map<String, Object> sourceOffset = new HashMap<>();
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be
null");
+ sourceOffset.put("file", specificOffsetFile);
+ Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
+ JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be
null");
+ sourceOffset.put("pos", specificOffsetPos);
+ DebeziumOffset specificOffset = new DebeziumOffset();
+ specificOffset.setSourceOffset(sourceOffset);
+ Map<String, String> sourcePartition = new HashMap<>();
+ sourcePartition.put("server", instanceId);
+ specificOffset.setSourcePartition(sourcePartition);
+ byte[] serializedOffset = new byte[0];
+ try {
+ serializedOffset =
DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+ } catch (IOException e) {
+ LOGGER.error("serialize offset message error", e);
+ }
+ return new String(serializedOffset, StandardCharsets.UTF_8);
+ }
+
@Override
public void destroy() {
synchronized (this) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/DatabaseHistoryUtil.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/DatabaseHistoryUtil.java
new file mode 100644
index 000000000..619167e80
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/DatabaseHistoryUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import io.debezium.relational.history.DatabaseHistory;
+import org.apache.inlong.agent.plugin.message.SchemaRecord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Util to safely visit schema history {@link DatabaseHistory}
+ */
+public class DatabaseHistoryUtil {
+
+ private static final Map<String, Collection<SchemaRecord>> HISTORY = new
HashMap();
+ private static final Map<String, Boolean> HISTORY_CLEANUP_STATUS = new
HashMap();
+
+ private DatabaseHistoryUtil() {
+ }
+
+ public static void registerHistory(String engineName,
Collection<SchemaRecord> engineHistory) {
+ synchronized (HISTORY) {
+ HISTORY.put(engineName, engineHistory);
+ HISTORY_CLEANUP_STATUS.put(engineName, false);
+ }
+ }
+
+ public static void removeHistory(String engineName) {
+ synchronized (HISTORY) {
+ HISTORY_CLEANUP_STATUS.put(engineName, true);
+ HISTORY.remove(engineName);
+ }
+ }
+
+ public static Collection<SchemaRecord> retrieveHistory(String engineName) {
+ synchronized (HISTORY) {
+ if (Boolean.TRUE.equals(HISTORY_CLEANUP_STATUS.get(engineName))) {
+ throw new IllegalStateException(String.format(
+ "Retrieve schema history failed, the schema records
for engine %s has been removed, "
+ + "this might because the debezium engine has
been shutdown due to other errors.",
+ engineName));
+ } else {
+ return (Collection) HISTORY.getOrDefault(engineName,
Collections.emptyList());
+ }
+ }
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongDatabaseHistory.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongDatabaseHistory.java
new file mode 100644
index 000000000..29074bbe2
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongDatabaseHistory.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.util.Collect;
+import io.debezium.util.FunctionalReadWriteLock;
+import org.apache.inlong.agent.plugin.message.SchemaRecord;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * A {@link DatabaseHistory} implementation that stores the schema history in
a local file.
+ */
+public class InLongDatabaseHistory extends AbstractDatabaseHistory {
+
+ public static final String DATABASE_HISTORY_INSTANCE_NAME =
"database.history.instance.name";
+ public static final Field FILE_PATH =
Field.create("database.history.file.filename")
+ .withDescription("The path to the file that will be used to record
the database history").required();
+ private static final Charset UTF8;
+ public static Collection<Field> ALL_FIELDS;
+
+ static {
+ ALL_FIELDS = Collect.arrayListOf(FILE_PATH, new Field[0]);
+ UTF8 = StandardCharsets.UTF_8;
+ }
+
+ private final FunctionalReadWriteLock lock =
FunctionalReadWriteLock.reentrant();
+ private final DocumentWriter writer = DocumentWriter.defaultWriter();
+ private final DocumentReader reader = DocumentReader.defaultReader();
+ private final AtomicBoolean running = new AtomicBoolean();
+ private ConcurrentLinkedQueue<SchemaRecord> schemaRecords;
+ private String instanceName;
+ private Path path;
+
+ public InLongDatabaseHistory() {
+ }
+
+ public static boolean isCompatible(Collection<SchemaRecord> records) {
+ Iterator var1 = records.iterator();
+ if (var1.hasNext()) {
+ SchemaRecord record = (SchemaRecord) var1.next();
+ if (!record.isHistoryRecord()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private ConcurrentLinkedQueue<SchemaRecord>
getRegisteredHistoryRecord(String instanceName) {
+ Collection<SchemaRecord> historyRecords =
DatabaseHistoryUtil.retrieveHistory(instanceName);
+ return new ConcurrentLinkedQueue(historyRecords);
+ }
+
+ public void configure(Configuration config, HistoryRecordComparator
comparator, DatabaseHistoryListener listener,
+ boolean useCatalogBeforeSchema) {
+ Collection var10001 = ALL_FIELDS;
+ Logger var10002 = this.logger;
+ Objects.requireNonNull(var10002);
+ if (!config.validateAndRecord(var10001, var10002::error)) {
+ throw new ConnectException("Error configuring an instance of " +
this.getClass().getSimpleName()
+ + "; check the logs for details");
+ } else {
+ if (this.running.get()) {
+ throw new IllegalStateException("Database history file already
initialized to " + this.path);
+ } else {
+ super.configure(config, comparator, listener,
useCatalogBeforeSchema);
+ this.instanceName =
config.getString("database.history.instance.name");
+ this.schemaRecords =
this.getRegisteredHistoryRecord(this.instanceName);
+ DatabaseHistoryUtil.registerHistory(this.instanceName,
this.schemaRecords);
+ this.path = Paths.get(config.getString(FILE_PATH));
+ }
+ }
+ }
+
+ public void start() {
+ super.start();
+ this.lock.write(() -> {
+ if (this.running.compareAndSet(false, true)) {
+ Path path = this.path;
+ if (path == null) {
+ throw new IllegalStateException("FileDatabaseHistory must
be configured before it is started");
+ }
+
+ try {
+ if (!this.storageExists()) {
+ if (path.getParent() != null &&
!Files.exists(path.getParent(), new LinkOption[0])) {
+ Files.createDirectories(path.getParent());
+ }
+
+ try {
+ Files.createFile(path);
+ } catch (FileAlreadyExistsException var3) {
+ // do nothing
+ }
+ }
+ } catch (IOException var4) {
+ throw new DatabaseHistoryException(
+ "Unable to create history file at " + path + ": "
+ var4.getMessage(), var4);
+ }
+ }
+
+ });
+ }
+
+ public void stop() {
+ this.running.set(false);
+ super.stop();
+ DatabaseHistoryUtil.removeHistory(this.instanceName);
+ }
+
+ protected void storeRecord(HistoryRecord record) throws
DatabaseHistoryException {
+ if (record != null) {
+ this.lock.write(() -> {
+ if (!this.running.get()) {
+ throw new IllegalStateException("The history has been
stopped and will not accept more records");
+ } else {
+ try {
+ String line = this.writer.write(record.document());
+
+ try {
+ BufferedWriter historyWriter = Files
+ .newBufferedWriter(this.path,
StandardOpenOption.APPEND);
+
+ label58:
+ {
+ try {
+ try {
+ historyWriter.append(line);
+ historyWriter.newLine();
+ this.schemaRecords.add(new
SchemaRecord(record));
+ break label58;
+ } catch (IOException var7) {
+ this.logger.error("Failed to add
record to history at {}: {}",
+ new Object[]{this.path,
record, var7});
+ }
+ } catch (Throwable var8) {
+ if (historyWriter != null) {
+ try {
+ historyWriter.close();
+ } catch (Throwable var6) {
+ var8.addSuppressed(var6);
+ }
+ }
+
+ throw var8;
+ }
+
+ if (historyWriter != null) {
+ historyWriter.close();
+ }
+
+ return;
+ }
+
+ if (historyWriter != null) {
+ historyWriter.close();
+ }
+ } catch (IOException var9) {
+ throw new DatabaseHistoryException(
+ "Unable to create writer for history file
" + this.path + ": " + var9.getMessage(),
+ var9);
+ }
+ } catch (IOException var10) {
+ this.logger.error("Failed to convert record to string:
{}", record, var10);
+ }
+
+ }
+ });
+ }
+ }
+
+ protected synchronized void recoverRecords(Consumer<HistoryRecord>
records) {
+ this.lock.write(() -> {
+ try {
+ if (this.exists()) {
+ Iterator var2 = Files.readAllLines(this.path,
UTF8).iterator();
+
+ while (var2.hasNext()) {
+ String line = (String) var2.next();
+ if (line != null && !line.isEmpty()) {
+ records.accept(new
HistoryRecord(this.reader.read(line)));
+ }
+ }
+ }
+ } catch (IOException var4) {
+ this.logger.error("Failed to add recover records from history
at {}", this.path, var4);
+ }
+
+ });
+
this.schemaRecords.stream().map(SchemaRecord::getHistoryRecord).forEach(records);
+ }
+
+ public boolean exists() {
+ return !this.schemaRecords.isEmpty();
+ }
+
+ public boolean storageExists() {
+ return Files.exists(path);
+ }
+
+ public String toString() {
+ return "file " + (this.path != null ? this.path : "(unstarted)");
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongFileOffsetBackingStore.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongFileOffsetBackingStore.java
new file mode 100644
index 000000000..c9651ab93
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/InLongFileOffsetBackingStore.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import io.debezium.embedded.EmbeddedEngine;
+import org.apache.inlong.agent.pojo.DebeziumOffset;
+import org.apache.inlong.agent.utils.DebeziumOffsetSerializer;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.util.SafeObjectInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implementation of OffsetBackingStore that saves data locally to a file. To
ensure this behaves
+ * similarly to a real backing store, operations are executed asynchronously
on a background thread.
+ * The offset position can be specified
+ */
+public class InLongFileOffsetBackingStore extends MemoryOffsetBackingStore {
+
+ public static final String OFFSET_STATE_VALUE =
"offset.storage.inlong.state.value";
+ public static final int FLUSH_TIMEOUT_SECONDS = 10;
+ private static final Logger log =
LoggerFactory.getLogger(FileOffsetBackingStore.class);
+ private File file;
+
+ public InLongFileOffsetBackingStore() {
+
+ }
+
+ @Override
+ public void configure(WorkerConfig config) {
+ super.configure(config);
+ file = new
File(config.getString(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG));
+ // eagerly initialize the executor, because OffsetStorageWriter will
use it later
+ start();
+
+ Map<String, ?> conf = config.originals();
+ if (!conf.containsKey(OFFSET_STATE_VALUE)) {
+ // a normal startup from clean state, not need to initialize the
offset
+ return;
+ }
+
+ String stateJson = (String) conf.get(OFFSET_STATE_VALUE);
+ DebeziumOffsetSerializer serializer = new DebeziumOffsetSerializer();
+ DebeziumOffset debeziumOffset;
+ try {
+ debeziumOffset =
serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ log.error("Can't deserialize debezium offset state from JSON: " +
stateJson, e);
+ throw new RuntimeException(e);
+ }
+
+ final String engineName = (String)
conf.get(EmbeddedEngine.ENGINE_NAME.name());
+ Converter keyConverter = new JsonConverter();
+ Converter valueConverter = new JsonConverter();
+ keyConverter.configure(config.originals(), true);
+ Map<String, Object> valueConfigs = new HashMap<>(conf);
+ valueConfigs.put("schemas.enable", false);
+ valueConverter.configure(valueConfigs, true);
+ OffsetStorageWriter offsetWriter =
+ new OffsetStorageWriter(
+ this,
+ // must use engineName as namespace to align with
Debezium Engine
+ // implementation
+ engineName,
+ keyConverter,
+ valueConverter);
+
+ offsetWriter.offset(debeziumOffset.sourcePartition,
debeziumOffset.sourceOffset);
+
+ // flush immediately
+ if (!offsetWriter.beginFlush()) {
+ // if nothing is needed to be flushed, there must be something
wrong with the
+ // initialization
+ log.warn(
+ "Initialize InLongFileOffsetBackingStore from empty offset
state, this shouldn't happen.");
+ return;
+ }
+
+ // trigger flushing
+ Future<Void> flushFuture =
+ offsetWriter.doFlush(
+ (error, result) -> {
+ if (error != null) {
+ log.error("Failed to flush initial offset.",
error);
+ } else {
+ log.debug("Successfully flush initial
offset.");
+ }
+ });
+
+ // wait until flushing finished
+ try {
+ flushFuture.get(FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ log.info(
+ "Flush offsets successfully, partition: {}, offsets: {}",
+ debeziumOffset.sourcePartition,
+ debeziumOffset.sourceOffset);
+ } catch (InterruptedException e) {
+ log.warn("Flush offsets interrupted, cancelling.", e);
+ offsetWriter.cancelFlush();
+ } catch (ExecutionException e) {
+ log.error("Flush offsets threw an unexpected exception.", e);
+ offsetWriter.cancelFlush();
+ } catch (TimeoutException e) {
+ log.error("Timed out waiting to flush offsets to storage.", e);
+ offsetWriter.cancelFlush();
+ }
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ log.info("Starting FileOffsetBackingStore with file {}", file);
+ load();
+ }
+
+ @Override
+ public synchronized void stop() {
+ super.stop();
+ // Nothing to do since this doesn't maintain any outstanding
connections/data
+ log.info("Stopped FileOffsetBackingStore");
+ }
+
+ @SuppressWarnings("unchecked")
+ private void load() {
+ try (SafeObjectInputStream is = new
SafeObjectInputStream(Files.newInputStream(file.toPath()))) {
+ Object obj = is.readObject();
+ if (!(obj instanceof HashMap)) {
+ throw new ConnectException("Expected HashMap but found " +
obj.getClass());
+ }
+ Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
+ data = new HashMap<>();
+ for (Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
+ ByteBuffer key = (mapEntry.getKey() != null) ?
ByteBuffer.wrap(mapEntry.getKey()) : null;
+ ByteBuffer value = (mapEntry.getValue() != null) ?
ByteBuffer.wrap(mapEntry.getValue()) : null;
+ data.put(key, value);
+ }
+ } catch (NoSuchFileException | EOFException e) {
+ // NoSuchFileException: Ignore, may be new.
+ // EOFException: Ignore, this means the file was missing or corrupt
+ } catch (IOException | ClassNotFoundException e) {
+ throw new ConnectException(e);
+ }
+ }
+
+ @Override
+ protected void save() {
+ try (ObjectOutputStream os = new
ObjectOutputStream(Files.newOutputStream(file.toPath()))) {
+ Map<byte[], byte[]> raw = new HashMap<>();
+ for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet())
{
+ byte[] key = (mapEntry.getKey() != null) ?
mapEntry.getKey().array() : null;
+ byte[] value = (mapEntry.getValue() != null) ?
mapEntry.getValue().array() : null;
+ raw.put(key, value);
+ }
+ os.writeObject(raw);
+ } catch (IOException e) {
+ throw new ConnectException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogReader.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogReader.java
index 120b895bc..8c71a79a4 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogReader.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogReader.java
@@ -18,10 +18,16 @@
package org.apache.inlong.agent.plugin.sources;
import com.google.gson.Gson;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.SnapshotModeConstants;
+import org.apache.inlong.agent.plugin.sources.reader.BinlogReader;
import org.apache.inlong.agent.pojo.DebeziumFormat;
import org.junit.Assert;
import org.junit.Test;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+
public class TestBinlogReader {
private static Gson gson = new Gson();
@@ -29,32 +35,55 @@ public class TestBinlogReader {
@Test
public void testDebeziumFormat() {
String debeziumJson = "{\n"
- + " \"before\": null,\n"
- + " \"after\": {\n"
- + " \"id\": 1004,\n"
- + " \"first_name\": \"Anne\",\n"
- + " \"last_name\": \"Kretchmar\",\n"
- + " \"email\": \"[email protected]\"\n"
- + " },\n"
- + " \"source\": {\n"
- + " \"version\": \"1.8.1.Final\",\n"
- + " \"name\": \"dbserver1\",\n"
- + " \"server_id\": 0,\n"
- + " \"ts_sec\": 0,\n"
- + " \"gtid\": null,\n"
- + " \"file\": \"mysql-bin.000003\",\n"
- + " \"pos\": 154,\n"
- + " \"row\": 0,\n"
- + " \"snapshot\": true,\n"
- + " \"thread\": null,\n"
- + " \"db\": \"inventory\",\n"
- + " \"table\": \"customers\"\n"
- + " },\n"
- + " \"op\": \"r\",\n"
- + " \"ts_ms\": 1486500577691\n"
- + " }";
+ + " \"before\": null,\n"
+ + " \"after\": {\n"
+ + " \"id\": 1004,\n"
+ + " \"first_name\": \"Anne\",\n"
+ + " \"last_name\": \"Kretchmar\",\n"
+ + " \"email\": \"[email protected]\"\n"
+ + " },\n"
+ + " \"source\": {\n"
+ + " \"version\": \"1.8.1.Final\",\n"
+ + " \"name\": \"dbserver1\",\n"
+ + " \"server_id\": 0,\n"
+ + " \"ts_sec\": 0,\n"
+ + " \"gtid\": null,\n"
+ + " \"file\": \"mysql-bin.000003\",\n"
+ + " \"pos\": 154,\n"
+ + " \"row\": 0,\n"
+ + " \"snapshot\": true,\n"
+ + " \"thread\": null,\n"
+ + " \"db\": \"inventory\",\n"
+ + " \"table\": \"customers\"\n"
+ + " },\n"
+ + " \"op\": \"r\",\n"
+ + " \"ts_ms\": 1486500577691\n"
+ + " }";
DebeziumFormat debeziumFormat = gson
- .fromJson(debeziumJson, DebeziumFormat.class);
+ .fromJson(debeziumJson, DebeziumFormat.class);
Assert.assertEquals("customers",
debeziumFormat.getSource().getTable());
}
+
+ // @Test
+ public void binlogStartSpacialTest() throws Exception {
+ JobProfile jobProfile = new JobProfile();
+ jobProfile.set(BinlogReader.JOB_DATABASE_USER, "root");
+ jobProfile.set(BinlogReader.JOB_DATABASE_PASSWORD, "123456");
+ jobProfile.set(BinlogReader.JOB_DATABASE_HOSTNAME, "");
+ jobProfile.set(BinlogReader.JOB_DATABASE_PORT, "3307");
+ jobProfile.set(BinlogReader.JOB_DATABASE_WHITELIST, "etl");
+ jobProfile.set(BinlogReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE,
"mysql-bin.000001");
+ jobProfile.set(BinlogReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS,
"5641");
+ jobProfile.set(BinlogReader.JOB_DATABASE_SNAPSHOT_MODE,
SnapshotModeConstants.SPECIFIC_OFFSETS);
+ // jobProfile.set(BinlogReader.JOB_DATABASE_STORE_HISTORY_FILENAME,
"");
+ jobProfile.set(BinlogReader.JOB_DATABASE_STORE_OFFSET_INTERVAL_MS,
String.valueOf(6000L));
+ jobProfile.set("job.instance.id", "_1");
+ jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
+ jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
+ BinlogReader binlogReader = new BinlogReader();
+ binlogReader.init(jobProfile);
+ while (true) {
+
+ }
+ }
}
diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE
index 96e9fb054..e5ec86824 100644
--- a/licenses/inlong-agent/LICENSE
+++ b/licenses/inlong-agent/LICENSE
@@ -614,6 +614,7 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
org.eclipse.jetty:jetty-util-ajax:9.4.44.v20210927 - Jetty :: Utilities ::
Ajax(JSON) (https://eclipse.org/jetty/jetty-util-ajax), (Apache Software
License - Version 2.0; Eclipse Public License - Version 1.0)
org.apache.kafka:kafka_2.11:2.4.1 - Apache Kafka (https://kafka.apache.org),
(The Apache Software License, Version 2.0)
org.apache.kafka:kafka-clients:2.4.1 - Apache Kafka
(https://kafka.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.kafka:kafka-clients:3.0.0 - Apache Kafka
(https://kafka.apache.org), (The Apache Software License, Version 2.0)
org.apache.thrift:libthrift:0.9.3 - Apache Thrift
(http://thrift.apache.org), (The Apache Software License, Version 2.0)
log4j:log4j:1.2.17 - Apache Log4j (http://logging.apache.org/log4j/1.2/),
(The Apache Software License, Version 2.0)
org.apache.logging.log4j:log4j-core:2.17.2 - Apache Log4j Core
(https://logging.apache.org/log4j/2.x/log4j-core/), (Apache License, Version
2.0)
diff --git a/pom.xml b/pom.xml
index cd5514fe1..778bff0a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -230,6 +230,8 @@
<tencentcloud.cls.version>1.0.5</tencentcloud.cls.version>
<esri-geometry-api.version>2.0.0</esri-geometry-api.version>
<HikariCP.version>4.0.3</HikariCP.version>
+
+ <kafka.clients.version>3.0.0</kafka.clients.version>
</properties>
<dependencyManagement>