This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9636f16704 [INLONG-10006][Agent] Add MongoDB data source for Agent 
(#10007)
9636f16704 is described below

commit 9636f16704fab1d78770dc42b837226116d3335b
Author: haifxu <[email protected]>
AuthorDate: Thu Apr 18 17:33:38 2024 +0800

    [INLONG-10006][Agent] Add MongoDB data source for Agent (#10007)
---
 .../org/apache/inlong/agent/pojo/MongoTask.java    |  39 +----
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  24 +--
 .../agent/plugin/instance/MongoDBInstance.java     |  29 ++++
 .../inlong/agent/plugin/sources/MongoDBSource.java | 151 ++++++++++++++----
 .../inlong/agent/plugin/task/MongoDBTask.java      | 170 +++++++++++++++++++++
 5 files changed, 332 insertions(+), 81 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java
index f0440feb8d..a8c183c62f 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java
@@ -29,24 +29,8 @@ public class MongoTask {
     private String user;
     private String password;
     private String databaseIncludeList;
-    private String databaseExcludeList;
     private String collectionIncludeList;
-    private String collectionExcludeList;
-    private String fieldExcludeList;
-    private String connectTimeoutInMs;
-    private String queueSize;
-    private String cursorMaxAwaitTimeInMs;
-    private String socketTimeoutInMs;
-    private String selectionTimeoutInMs;
-    private String fieldRenames;
-    private String membersAutoDiscover;
-    private String connectMaxAttempts;
-    private String connectBackoffMaxDelayInMs;
-    private String connectBackoffInitialDelayInMs;
-    private String initialSyncMaxThreads;
-    private String sslInvalidHostnameAllowed;
-    private String sslEnabled;
-    private String pollIntervalInMs;
+    private String snapshotMode;
     private Snapshot snapshot;
     private Capture capture;
     private Offset offset;
@@ -87,25 +71,8 @@ public class MongoTask {
         private String username;
         private String password;
 
-        private String databaseIncludeList;
-        private String databaseExcludeList;
-        private String collectionIncludeList;
-        private String collectionExcludeList;
-        private String fieldExcludeList;
-        private String connectTimeoutInMs;
-        private String queueSize;
-        private String cursorMaxAwaitTimeInMs;
-        private String socketTimeoutInMs;
-        private String selectionTimeoutInMs;
-        private String fieldRenames;
-        private String membersAutoDiscover;
-        private String connectMaxAttempts;
-        private String connectBackoffMaxDelayInMs;
-        private String connectBackoffInitialDelayInMs;
-        private String initialSyncMaxThreads;
-        private String sslInvalidHostnameAllowed;
-        private String sslEnabled;
-        private String pollIntervalInMs;
+        private String database;
+        private String collection;
 
         private String snapshotMode;
         private String captureMode;
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index baf9e0c64b..0b6d5bcc2a 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -49,6 +49,7 @@ public class TaskProfileDto {
     public static final String DEFAULT_FILE_TASK = 
"org.apache.inlong.agent.plugin.task.file.LogFileTask";
     public static final String DEFAULT_KAFKA_TASK = 
"org.apache.inlong.agent.plugin.task.KafkaTask";
     public static final String DEFAULT_PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
+    public static final String DEFAULT_MONGODB_TASK = 
"org.apache.inlong.agent.plugin.task.MongoDBTask";
     public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
     public static final String MANAGER_JOB = "MANAGER_JOB";
     public static final String DEFAULT_DATA_PROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -283,25 +284,9 @@ public class TaskProfileDto {
         mongoTask.setHosts(config.getHosts());
         mongoTask.setUser(config.getUsername());
         mongoTask.setPassword(config.getPassword());
-        mongoTask.setDatabaseIncludeList(config.getDatabaseIncludeList());
-        mongoTask.setDatabaseExcludeList(config.getDatabaseExcludeList());
-        mongoTask.setCollectionIncludeList(config.getCollectionIncludeList());
-        mongoTask.setCollectionExcludeList(config.getCollectionExcludeList());
-        mongoTask.setFieldExcludeList(config.getFieldExcludeList());
-        mongoTask.setConnectTimeoutInMs(config.getConnectTimeoutInMs());
-        mongoTask.setQueueSize(config.getQueueSize());
-        
mongoTask.setCursorMaxAwaitTimeInMs(config.getCursorMaxAwaitTimeInMs());
-        mongoTask.setSocketTimeoutInMs(config.getSocketTimeoutInMs());
-        mongoTask.setSelectionTimeoutInMs(config.getSelectionTimeoutInMs());
-        mongoTask.setFieldRenames(config.getFieldRenames());
-        mongoTask.setMembersAutoDiscover(config.getMembersAutoDiscover());
-        mongoTask.setConnectMaxAttempts(config.getConnectMaxAttempts());
-        
mongoTask.setConnectBackoffMaxDelayInMs(config.getConnectBackoffMaxDelayInMs());
-        
mongoTask.setConnectBackoffInitialDelayInMs(config.getConnectBackoffInitialDelayInMs());
-        mongoTask.setInitialSyncMaxThreads(config.getInitialSyncMaxThreads());
-        
mongoTask.setSslInvalidHostnameAllowed(config.getSslInvalidHostnameAllowed());
-        mongoTask.setSslEnabled(config.getSslEnabled());
-        mongoTask.setPollIntervalInMs(config.getPollIntervalInMs());
+        mongoTask.setDatabaseIncludeList(config.getDatabase());
+        mongoTask.setCollectionIncludeList(config.getCollection());
+        mongoTask.setSnapshotMode(config.getSnapshotMode());
 
         MongoTask.Offset offset = new MongoTask.Offset();
         offset.setFilename(config.getOffsetFilename());
@@ -511,6 +496,7 @@ public class TaskProfileDto {
                 profileDto.setTask(task);
                 break;
             case MONGODB:
+                task.setTaskClass(DEFAULT_MONGODB_TASK);
                 MongoTask mongoTask = getMongoTask(dataConfig);
                 task.setMongoTask(mongoTask);
                 task.setSource(MONGO_SOURCE);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MongoDBInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MongoDBInstance.java
new file mode 100644
index 0000000000..d393d1e0b7
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MongoDBInstance.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+public class MongoDBInstance extends CommonInstance {
+
+    @Override
+    public void setInodeInfo(InstanceProfile profile) {
+        profile.set(TaskConstants.INODE_INFO, "");
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
index 555d87b412..74fc50e111 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
@@ -17,56 +17,159 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.MongoDBReader;
 
+import io.debezium.connector.mongodb.MongoDbConnector;
+import io.debezium.connector.mongodb.MongoDbConnectorConfig;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.DebeziumEngine.RecordCommitter;
+import io.debezium.engine.format.Json;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
-/**
- * MongoDBSource : mongo source, split mongo source job into multi readers
- */
 public class MongoDBSource extends AbstractSource {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MongoDBSource.class);
+    private static final Integer DEBEZIUM_QUEUE_SIZE = 100;
+    private ExecutorService executor;
+    public InstanceProfile profile;
+    private BlockingQueue<SourceData> debeziumQueue;
+    private final Properties props = new Properties();
+    private String database;
+    private String collection;
+    private String snapshotMode;
+
+    private boolean isRestoreFromDB = false;
+
+    public MongoDBSource() {
+    }
+
+    @Override
+    public void init(InstanceProfile profile) {
+        try {
+            LOGGER.info("MongoDBSource init: {}", profile.toJsonStr());
+            this.profile = profile;
+            super.init(profile);
+            debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);
+            database = 
profile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST);
+            collection = 
profile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST);
+            snapshotMode = profile.get(TaskConstants.TASK_MONGO_SNAPSHOT_MODE, 
"initial");
+
+            props.setProperty("name", "MongoDB-" + instanceId);
+            props.setProperty("connector.class", 
MongoDbConnector.class.getName());
+            props.setProperty("offset.storage", 
FileOffsetBackingStore.class.getName());
+            String agentPath = AgentConfiguration.getAgentConf()
+                    .get(AgentConstants.AGENT_HOME, 
AgentConstants.DEFAULT_AGENT_HOME);
+            String offsetPath = agentPath + "/" + getThreadName() + 
"offset.dat";
+            props.setProperty("offset.storage.file.filename", offsetPath);
+
+            
props.setProperty(String.valueOf(MongoDbConnectorConfig.LOGICAL_NAME), 
"agent-mongoDB-" + instanceId);
+            props.setProperty(String.valueOf(MongoDbConnectorConfig.HOSTS),
+                    profile.get(TaskConstants.TASK_MONGO_HOSTS));
+            props.setProperty(String.valueOf(MongoDbConnectorConfig.USER), 
profile.get(TaskConstants.TASK_MONGO_USER));
+            props.setProperty(String.valueOf(MongoDbConnectorConfig.PASSWORD),
+                    profile.get(TaskConstants.TASK_MONGO_PASSWORD));
+            
props.setProperty(String.valueOf(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST), 
database);
+            
props.setProperty(String.valueOf(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST),
 collection);
+            
props.setProperty(String.valueOf(MongoDbConnectorConfig.SNAPSHOT_MODE), 
snapshotMode);
+
+            executor = Executors.newSingleThreadExecutor();
+            executor.execute(startDebeziumEngine());
+
+        } catch (Exception ex) {
+            stopRunning();
+            throw new FileException("error init stream for " + collection, ex);
+        }
+    }
+
+    private Runnable startDebeziumEngine() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName() + "debezium");
+            try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = 
DebeziumEngine.create(Json.class)
+                    .using(props)
+                    .using(OffsetCommitPolicy.always())
+                    .notifying(this::handleConsumerEvent)
+                    .build()) {
+
+                debeziumEngine.run();
+            } catch (Throwable e) {
+                LOGGER.error("do run error in mongoDB debezium: ", e);
+            }
+        };
+    }
+
+    private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
+            RecordCommitter<ChangeEvent<String, String>> committer) throws 
InterruptedException {
+        boolean offerSuc = false;
+        for (ChangeEvent<String, String> record : records) {
+            SourceData sourceData = new 
SourceData(record.value().getBytes(StandardCharsets.UTF_8), 0L);
+            while (isRunnable() && !offerSuc) {
+                offerSuc = debeziumQueue.offer(sourceData, 1, 
TimeUnit.SECONDS);
+            }
+            committer.markProcessed(record);
+        }
+        committer.markBatchFinished();
+    }
 
     @Override
     public List<Reader> split(TaskProfile conf) {
-        MongoDBReader mongoDBReader = new MongoDBReader();
-        List<Reader> readerList = Collections.singletonList(mongoDBReader);
-        sourceMetric.sourceSuccessCount.incrementAndGet();
-        return readerList;
+        return null;
     }
 
     @Override
     protected String getThreadName() {
-        return null;
+        return "mongo-source-" + taskId + "-" + instanceId;
     }
 
     @Override
     protected void printCurrentState() {
-
+        LOGGER.info("mongo collection is {}", collection);
     }
 
     @Override
     protected boolean doPrepareToRead() {
-        return false;
+        return true;
     }
 
     @Override
     protected List<SourceData> readFromSource() {
-        return null;
-    }
-
-    @Override
-    public Message read() {
-        return null;
+        List<SourceData> dataList = new ArrayList<>();
+        try {
+            int size = 0;
+            while (size < BATCH_READ_LINE_TOTAL_LEN) {
+                SourceData sourceData = debeziumQueue.poll(1, 
TimeUnit.SECONDS);
+                if (sourceData != null) {
+                    size += sourceData.getData().length;
+                    dataList.add(sourceData);
+                } else {
+                    break;
+                }
+            }
+        } catch (InterruptedException e) {
+            LOGGER.error("poll {} data from debezium queue interrupted.", 
instanceId);
+        }
+        return dataList;
     }
 
     @Override
@@ -76,16 +179,12 @@ public class MongoDBSource extends AbstractSource {
 
     @Override
     protected void releaseSource() {
-
-    }
-
-    @Override
-    public boolean sourceFinish() {
-        return false;
+        LOGGER.info("release mongo source");
+        executor.shutdownNow();
     }
 
     @Override
     public boolean sourceExist() {
-        return false;
+        return true;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
new file mode 100644
index 0000000000..6081cc388c
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+
+public class MongoDBTask extends Task {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MongoDBTask.class);
+    public static final String DEFAULT_MONGODB_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
+    public static final int CORE_THREAD_SLEEP_TIME = 5000;
+    public static final int CORE_THREAD_PRINT_TIME = 10000;
+
+    private TaskProfile taskProfile;
+    private Db basicDb;
+    private TaskManager taskManager;
+    private InstanceManager instanceManager;
+    private long lastPrintTime = 0;
+    private boolean initOK = false;
+    private volatile boolean running = false;
+    private boolean isAdded = false;
+    private boolean isRestoreFromDB = false;
+
+    private String database;
+    private String collection;
+
+    @Override
+    public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
+        taskManager = (TaskManager) srcManager;
+        commonInit(taskProfile, basicDb);
+        initOK = true;
+    }
+
+    private void commonInit(TaskProfile taskProfile, Db basicDb) {
+        LOGGER.info("mongoDB commonInit: {}", taskProfile.toJsonStr());
+        this.taskProfile = taskProfile;
+        this.basicDb = basicDb;
+        this.database = 
taskProfile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST);
+        this.collection = 
taskProfile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST);
+        this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
+        instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
+                basicDb, taskManager.getTaskDb());
+        try {
+            instanceManager.start();
+        } catch (Exception e) {
+            LOGGER.error("start instance manager error: ", e);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        doChangeState(State.SUCCEEDED);
+        if (instanceManager != null) {
+            instanceManager.stop();
+        }
+    }
+
+    @Override
+    public TaskProfile getProfile() {
+        return taskProfile;
+    }
+
+    @Override
+    public String getTaskId() {
+        if (taskProfile == null) {
+            return "";
+        }
+        return taskProfile.getTaskId();
+    }
+
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        if (!profile.allRequiredKeyExist()) {
+            LOGGER.error("task profile needs all required key");
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public void addCallbacks() {
+
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName("mongoDB-task-core-" + getTaskId());
+        running = true;
+        try {
+            doRun();
+        } catch (Throwable e) {
+            LOGGER.error("do run error: ", e);
+        }
+        running = false;
+    }
+
+    private void doRun() {
+        while (!isFinished()) {
+            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
+                LOGGER.info("mongoDB task running! taskId {}", getTaskId());
+                lastPrintTime = AgentUtils.getCurrentTime();
+            }
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+            if (!initOK) {
+                continue;
+            }
+
+            // Add instance profile to instance manager
+            addInstanceProfile();
+
+            String inlongGroupId = taskProfile.getInlongGroupId();
+            String inlongStreamId = taskProfile.getInlongStreamId();
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
inlongGroupId, inlongStreamId,
+                    AgentUtils.getCurrentTime(), 1, 1);
+        }
+    }
+
+    private void addInstanceProfile() {
+        if (isAdded) {
+            return;
+        }
+        String dataTime = 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_MONGODB_INSTANCE, collection,
+                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
+        InstanceAction action = new InstanceAction(ActionType.ADD, 
instanceProfile);
+        while (!isFinished() && !instanceManager.submitAction(action)) {
+            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+        }
+        this.isAdded = true;
+    }
+}

Reply via email to