justinwwhuang commented on code in PR #10007:
URL: https://github.com/apache/inlong/pull/10007#discussion_r1570289584
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java:
##########
@@ -17,56 +17,160 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.MongoDBReader;
+import io.debezium.connector.mongodb.MongoDbConnector;
+import io.debezium.connector.mongodb.MongoDbConnectorConfig;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.DebeziumEngine.RecordCommitter;
+import io.debezium.engine.format.Json;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
-/**
- * MongoDBSource : mongo source, split mongo source job into multi readers
- */
public class MongoDBSource extends AbstractSource {
private static final Logger LOGGER =
LoggerFactory.getLogger(MongoDBSource.class);
+ private static final Integer DEBEZIUM_QUEUE_SIZE = 100;
+ private ExecutorService executor;
+ public InstanceProfile profile;
+ private BlockingQueue<SourceData> debeziumQueue;
+ private final Properties props = new Properties();
+ private String database;
+ private String collection;
+ private String snapshotMode;
+
+ private boolean isRestoreFromDB = false;
+
+ public MongoDBSource() {
+ }
+
+ @Override
+ public void init(InstanceProfile profile) {
+ try {
+ LOGGER.info("MongoDBSource init: {}", profile.toJsonStr());
+ this.profile = profile;
+ super.init(profile);
+ debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);
+ database =
profile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST);
+ collection =
profile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST);
+ snapshotMode = profile.get(TaskConstants.TASK_MONGO_SNAPSHOT_MODE,
"initial");
+
+ props.setProperty("name", "MongoDB-" + instanceId);
+ props.setProperty("connector.class",
MongoDbConnector.class.getName());
+ props.setProperty("offset.storage",
FileOffsetBackingStore.class.getName());
+ String agentPath = AgentConfiguration.getAgentConf()
+ .get(AgentConstants.AGENT_HOME,
AgentConstants.DEFAULT_AGENT_HOME);
+ String offsetPath = agentPath + "/" + getThreadName() +
"offset.dat";
+ props.setProperty("offset.storage.file.filename", offsetPath);
+
+
props.setProperty(String.valueOf(MongoDbConnectorConfig.LOGICAL_NAME),
"agent-mongoDB-" + instanceId);
+ props.setProperty(String.valueOf(MongoDbConnectorConfig.HOSTS),
+ profile.get(TaskConstants.TASK_MONGO_HOSTS));
+ props.setProperty(String.valueOf(MongoDbConnectorConfig.USER),
profile.get(TaskConstants.TASK_MONGO_USER));
+ props.setProperty(String.valueOf(MongoDbConnectorConfig.PASSWORD),
+ profile.get(TaskConstants.TASK_MONGO_PASSWORD));
+
props.setProperty(String.valueOf(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST),
database);
+
props.setProperty(String.valueOf(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST),
collection);
+
props.setProperty(String.valueOf(MongoDbConnectorConfig.SNAPSHOT_MODE),
snapshotMode);
+
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(startDebeziumEngine());
+
+ } catch (Exception ex) {
+ stopRunning();
+ throw new FileException("error init stream for " + collection, ex);
+ }
+ }
+
+ private Runnable startDebeziumEngine() {
+ return () -> {
+ AgentThreadFactory.nameThread(getThreadName() + "debezium");
+ try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine =
DebeziumEngine.create(Json.class)
+ .using(props)
+ .using(OffsetCommitPolicy.always())
+ .notifying(this::handleConsumerEvent)
+ .build()) {
+
+ debeziumEngine.run();
+ } catch (Throwable e) {
+ LOGGER.error("do run error in mongoDB debezium: ", e);
+ }
+ };
+ }
+
+ private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
+ RecordCommitter<ChangeEvent<String, String>> committer) throws
InterruptedException {
+ boolean offerSuc = false;
+ for (ChangeEvent<String, String> record : records) {
+ SourceData sourceData = new
SourceData(record.value().getBytes(StandardCharsets.UTF_8), 0L);
+ while (isRunnable() && !offerSuc) {
+ offerSuc = debeziumQueue.offer(sourceData, 1,
TimeUnit.SECONDS);
+ }
+ committer.markProcessed(record);
+ }
+ committer.markBatchFinished();
+ }
@Override
public List<Reader> split(TaskProfile conf) {
- MongoDBReader mongoDBReader = new MongoDBReader();
- List<Reader> readerList = Collections.singletonList(mongoDBReader);
- sourceMetric.sourceSuccessCount.incrementAndGet();
- return readerList;
+ return null;
}
@Override
protected String getThreadName() {
- return null;
+ return "mongo-source-" + taskId + "-" + instanceId;
}
@Override
protected void printCurrentState() {
-
+ LOGGER.info("mongo collection is {}", collection);
}
@Override
protected boolean doPrepareToRead() {
- return false;
+ return true;
}
@Override
protected List<SourceData> readFromSource() {
- return null;
- }
-
- @Override
- public Message read() {
- return null;
+ List<SourceData> dataList = new ArrayList<>();
+ try {
+ int size = 0;
+ while (size < BATCH_READ_LINE_TOTAL_LEN) {
+ SourceData sourceData = debeziumQueue.poll(1,
TimeUnit.SECONDS);
+ if (sourceData != null) {
+ size += sourceData.getData().length;
+ dataList.add(sourceData);
+ } else {
+ break;
+ }
+ }
+ LOGGER.info("read {} messages from debezium queue",
dataList.size());
Review Comment:
log level should be debug here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]