This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9636f16704 [INLONG-10006][Agent] Add MongoDB data source for Agent
(#10007)
9636f16704 is described below
commit 9636f16704fab1d78770dc42b837226116d3335b
Author: haifxu <[email protected]>
AuthorDate: Thu Apr 18 17:33:38 2024 +0800
[INLONG-10006][Agent] Add MongoDB data source for Agent (#10007)
---
.../org/apache/inlong/agent/pojo/MongoTask.java | 39 +----
.../apache/inlong/agent/pojo/TaskProfileDto.java | 24 +--
.../agent/plugin/instance/MongoDBInstance.java | 29 ++++
.../inlong/agent/plugin/sources/MongoDBSource.java | 151 ++++++++++++++----
.../inlong/agent/plugin/task/MongoDBTask.java | 170 +++++++++++++++++++++
5 files changed, 332 insertions(+), 81 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java
index f0440feb8d..a8c183c62f 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java
@@ -29,24 +29,8 @@ public class MongoTask {
private String user;
private String password;
private String databaseIncludeList;
- private String databaseExcludeList;
private String collectionIncludeList;
- private String collectionExcludeList;
- private String fieldExcludeList;
- private String connectTimeoutInMs;
- private String queueSize;
- private String cursorMaxAwaitTimeInMs;
- private String socketTimeoutInMs;
- private String selectionTimeoutInMs;
- private String fieldRenames;
- private String membersAutoDiscover;
- private String connectMaxAttempts;
- private String connectBackoffMaxDelayInMs;
- private String connectBackoffInitialDelayInMs;
- private String initialSyncMaxThreads;
- private String sslInvalidHostnameAllowed;
- private String sslEnabled;
- private String pollIntervalInMs;
+ private String snapshotMode;
private Snapshot snapshot;
private Capture capture;
private Offset offset;
@@ -87,25 +71,8 @@ public class MongoTask {
private String username;
private String password;
- private String databaseIncludeList;
- private String databaseExcludeList;
- private String collectionIncludeList;
- private String collectionExcludeList;
- private String fieldExcludeList;
- private String connectTimeoutInMs;
- private String queueSize;
- private String cursorMaxAwaitTimeInMs;
- private String socketTimeoutInMs;
- private String selectionTimeoutInMs;
- private String fieldRenames;
- private String membersAutoDiscover;
- private String connectMaxAttempts;
- private String connectBackoffMaxDelayInMs;
- private String connectBackoffInitialDelayInMs;
- private String initialSyncMaxThreads;
- private String sslInvalidHostnameAllowed;
- private String sslEnabled;
- private String pollIntervalInMs;
+ private String database;
+ private String collection;
private String snapshotMode;
private String captureMode;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index baf9e0c64b..0b6d5bcc2a 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -49,6 +49,7 @@ public class TaskProfileDto {
public static final String DEFAULT_FILE_TASK =
"org.apache.inlong.agent.plugin.task.file.LogFileTask";
public static final String DEFAULT_KAFKA_TASK =
"org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_PULSAR_TASK =
"org.apache.inlong.agent.plugin.task.PulsarTask";
+ public static final String DEFAULT_MONGODB_TASK =
"org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_CHANNEL =
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK =
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -283,25 +284,9 @@ public class TaskProfileDto {
mongoTask.setHosts(config.getHosts());
mongoTask.setUser(config.getUsername());
mongoTask.setPassword(config.getPassword());
- mongoTask.setDatabaseIncludeList(config.getDatabaseIncludeList());
- mongoTask.setDatabaseExcludeList(config.getDatabaseExcludeList());
- mongoTask.setCollectionIncludeList(config.getCollectionIncludeList());
- mongoTask.setCollectionExcludeList(config.getCollectionExcludeList());
- mongoTask.setFieldExcludeList(config.getFieldExcludeList());
- mongoTask.setConnectTimeoutInMs(config.getConnectTimeoutInMs());
- mongoTask.setQueueSize(config.getQueueSize());
-
mongoTask.setCursorMaxAwaitTimeInMs(config.getCursorMaxAwaitTimeInMs());
- mongoTask.setSocketTimeoutInMs(config.getSocketTimeoutInMs());
- mongoTask.setSelectionTimeoutInMs(config.getSelectionTimeoutInMs());
- mongoTask.setFieldRenames(config.getFieldRenames());
- mongoTask.setMembersAutoDiscover(config.getMembersAutoDiscover());
- mongoTask.setConnectMaxAttempts(config.getConnectMaxAttempts());
-
mongoTask.setConnectBackoffMaxDelayInMs(config.getConnectBackoffMaxDelayInMs());
-
mongoTask.setConnectBackoffInitialDelayInMs(config.getConnectBackoffInitialDelayInMs());
- mongoTask.setInitialSyncMaxThreads(config.getInitialSyncMaxThreads());
-
mongoTask.setSslInvalidHostnameAllowed(config.getSslInvalidHostnameAllowed());
- mongoTask.setSslEnabled(config.getSslEnabled());
- mongoTask.setPollIntervalInMs(config.getPollIntervalInMs());
+ mongoTask.setDatabaseIncludeList(config.getDatabase());
+ mongoTask.setCollectionIncludeList(config.getCollection());
+ mongoTask.setSnapshotMode(config.getSnapshotMode());
MongoTask.Offset offset = new MongoTask.Offset();
offset.setFilename(config.getOffsetFilename());
@@ -511,6 +496,7 @@ public class TaskProfileDto {
profileDto.setTask(task);
break;
case MONGODB:
+ task.setTaskClass(DEFAULT_MONGODB_TASK);
MongoTask mongoTask = getMongoTask(dataConfig);
task.setMongoTask(mongoTask);
task.setSource(MONGO_SOURCE);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MongoDBInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MongoDBInstance.java
new file mode 100644
index 0000000000..d393d1e0b7
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MongoDBInstance.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+public class MongoDBInstance extends CommonInstance {
+
+ @Override
+ public void setInodeInfo(InstanceProfile profile) {
+ profile.set(TaskConstants.INODE_INFO, "");
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
index 555d87b412..74fc50e111 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
@@ -17,56 +17,159 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.MongoDBReader;
+import io.debezium.connector.mongodb.MongoDbConnector;
+import io.debezium.connector.mongodb.MongoDbConnectorConfig;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.DebeziumEngine.RecordCommitter;
+import io.debezium.engine.format.Json;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
-/**
- * MongoDBSource : mongo source, split mongo source job into multi readers
- */
public class MongoDBSource extends AbstractSource {
private static final Logger LOGGER =
LoggerFactory.getLogger(MongoDBSource.class);
+ private static final Integer DEBEZIUM_QUEUE_SIZE = 100;
+ private ExecutorService executor;
+ public InstanceProfile profile;
+ private BlockingQueue<SourceData> debeziumQueue;
+ private final Properties props = new Properties();
+ private String database;
+ private String collection;
+ private String snapshotMode;
+
+ private boolean isRestoreFromDB = false;
+
+ public MongoDBSource() {
+ }
+
+ @Override
+ public void init(InstanceProfile profile) {
+ try {
+ LOGGER.info("MongoDBSource init: {}", profile.toJsonStr());
+ this.profile = profile;
+ super.init(profile);
+ debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);
+ database =
profile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST);
+ collection =
profile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST);
+ snapshotMode = profile.get(TaskConstants.TASK_MONGO_SNAPSHOT_MODE,
"initial");
+
+ props.setProperty("name", "MongoDB-" + instanceId);
+ props.setProperty("connector.class",
MongoDbConnector.class.getName());
+ props.setProperty("offset.storage",
FileOffsetBackingStore.class.getName());
+ String agentPath = AgentConfiguration.getAgentConf()
+ .get(AgentConstants.AGENT_HOME,
AgentConstants.DEFAULT_AGENT_HOME);
+ String offsetPath = agentPath + "/" + getThreadName() +
"offset.dat";
+ props.setProperty("offset.storage.file.filename", offsetPath);
+
+
props.setProperty(String.valueOf(MongoDbConnectorConfig.LOGICAL_NAME),
"agent-mongoDB-" + instanceId);
+ props.setProperty(String.valueOf(MongoDbConnectorConfig.HOSTS),
+ profile.get(TaskConstants.TASK_MONGO_HOSTS));
+ props.setProperty(String.valueOf(MongoDbConnectorConfig.USER),
profile.get(TaskConstants.TASK_MONGO_USER));
+ props.setProperty(String.valueOf(MongoDbConnectorConfig.PASSWORD),
+ profile.get(TaskConstants.TASK_MONGO_PASSWORD));
+
props.setProperty(String.valueOf(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST),
database);
+
props.setProperty(String.valueOf(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST),
collection);
+
props.setProperty(String.valueOf(MongoDbConnectorConfig.SNAPSHOT_MODE),
snapshotMode);
+
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(startDebeziumEngine());
+
+ } catch (Exception ex) {
+ stopRunning();
+ throw new FileException("error init stream for " + collection, ex);
+ }
+ }
+
+ private Runnable startDebeziumEngine() {
+ return () -> {
+ AgentThreadFactory.nameThread(getThreadName() + "debezium");
+ try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine =
DebeziumEngine.create(Json.class)
+ .using(props)
+ .using(OffsetCommitPolicy.always())
+ .notifying(this::handleConsumerEvent)
+ .build()) {
+
+ debeziumEngine.run();
+ } catch (Throwable e) {
+ LOGGER.error("do run error in mongoDB debezium: ", e);
+ }
+ };
+ }
+
+ private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
+ RecordCommitter<ChangeEvent<String, String>> committer) throws
InterruptedException {
+ boolean offerSuc = false;
+ for (ChangeEvent<String, String> record : records) {
+ SourceData sourceData = new
SourceData(record.value().getBytes(StandardCharsets.UTF_8), 0L);
+ while (isRunnable() && !offerSuc) {
+ offerSuc = debeziumQueue.offer(sourceData, 1,
TimeUnit.SECONDS);
+ }
+ committer.markProcessed(record);
+ }
+ committer.markBatchFinished();
+ }
@Override
public List<Reader> split(TaskProfile conf) {
- MongoDBReader mongoDBReader = new MongoDBReader();
- List<Reader> readerList = Collections.singletonList(mongoDBReader);
- sourceMetric.sourceSuccessCount.incrementAndGet();
- return readerList;
+ return null;
}
@Override
protected String getThreadName() {
- return null;
+ return "mongo-source-" + taskId + "-" + instanceId;
}
@Override
protected void printCurrentState() {
-
+ LOGGER.info("mongo collection is {}", collection);
}
@Override
protected boolean doPrepareToRead() {
- return false;
+ return true;
}
@Override
protected List<SourceData> readFromSource() {
- return null;
- }
-
- @Override
- public Message read() {
- return null;
+ List<SourceData> dataList = new ArrayList<>();
+ try {
+ int size = 0;
+ while (size < BATCH_READ_LINE_TOTAL_LEN) {
+ SourceData sourceData = debeziumQueue.poll(1,
TimeUnit.SECONDS);
+ if (sourceData != null) {
+ size += sourceData.getData().length;
+ dataList.add(sourceData);
+ } else {
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("poll {} data from debezium queue interrupted.",
instanceId);
+ }
+ return dataList;
}
@Override
@@ -76,16 +179,12 @@ public class MongoDBSource extends AbstractSource {
@Override
protected void releaseSource() {
-
- }
-
- @Override
- public boolean sourceFinish() {
- return false;
+ LOGGER.info("release mongo source");
+ executor.shutdownNow();
}
@Override
public boolean sourceExist() {
- return false;
+ return true;
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
new file mode 100644
index 0000000000..6081cc388c
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+
+public class MongoDBTask extends Task {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MongoDBTask.class);
+ public static final String DEFAULT_MONGODB_INSTANCE =
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
+ public static final int CORE_THREAD_SLEEP_TIME = 5000;
+ public static final int CORE_THREAD_PRINT_TIME = 10000;
+
+ private TaskProfile taskProfile;
+ private Db basicDb;
+ private TaskManager taskManager;
+ private InstanceManager instanceManager;
+ private long lastPrintTime = 0;
+ private boolean initOK = false;
+ private volatile boolean running = false;
+ private boolean isAdded = false;
+ private boolean isRestoreFromDB = false;
+
+ private String database;
+ private String collection;
+
+ @Override
+ public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
+ taskManager = (TaskManager) srcManager;
+ commonInit(taskProfile, basicDb);
+ initOK = true;
+ }
+
+ private void commonInit(TaskProfile taskProfile, Db basicDb) {
+ LOGGER.info("mongoDB commonInit: {}", taskProfile.toJsonStr());
+ this.taskProfile = taskProfile;
+ this.basicDb = basicDb;
+ this.database =
taskProfile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST);
+ this.collection =
taskProfile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST);
+ this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
+ instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
+ basicDb, taskManager.getTaskDb());
+ try {
+ instanceManager.start();
+ } catch (Exception e) {
+ LOGGER.error("start instance manager error: ", e);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ doChangeState(State.SUCCEEDED);
+ if (instanceManager != null) {
+ instanceManager.stop();
+ }
+ }
+
+ @Override
+ public TaskProfile getProfile() {
+ return taskProfile;
+ }
+
+ @Override
+ public String getTaskId() {
+ if (taskProfile == null) {
+ return "";
+ }
+ return taskProfile.getTaskId();
+ }
+
+ @Override
+ public boolean isProfileValid(TaskProfile profile) {
+ if (!profile.allRequiredKeyExist()) {
+ LOGGER.error("task profile needs all required key");
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void addCallbacks() {
+
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("mongoDB-task-core-" + getTaskId());
+ running = true;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error: ", e);
+ }
+ running = false;
+ }
+
+ private void doRun() {
+ while (!isFinished()) {
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
+ LOGGER.info("mongoDB task running! taskId {}", getTaskId());
+ lastPrintTime = AgentUtils.getCurrentTime();
+ }
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ if (!initOK) {
+ continue;
+ }
+
+ // Add instance profile to instance manager
+ addInstanceProfile();
+
+ String inlongGroupId = taskProfile.getInlongGroupId();
+ String inlongStreamId = taskProfile.getInlongStreamId();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
inlongGroupId, inlongStreamId,
+ AgentUtils.getCurrentTime(), 1, 1);
+ }
+ }
+
+ private void addInstanceProfile() {
+ if (isAdded) {
+ return;
+ }
+ String dataTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_MONGODB_INSTANCE, collection,
+ CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
+ InstanceAction action = new InstanceAction(ActionType.ADD,
instanceProfile);
+ while (!isFinished() && !instanceManager.submitAction(action)) {
+ LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ }
+ this.isAdded = true;
+ }
+}