This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e1c3c5ea64 [INLONG-10288][Agent] Update the Oracle Source (#10746)
e1c3c5ea64 is described below
commit e1c3c5ea64e760169f9bc65c1ed4c42a4f945cc5
Author: Zkplo <[email protected]>
AuthorDate: Mon Aug 5 10:35:52 2024 +0800
[INLONG-10288][Agent] Update the Oracle Source (#10746)
Co-authored-by: ZKpLo <[email protected]>
---
.../inlong/agent/constant/TaskConstants.java | 11 ++
.../org/apache/inlong/agent/pojo/OracleTask.java | 12 +-
.../apache/inlong/agent/pojo/TaskProfileDto.java | 14 ++-
inlong-agent/agent-plugins/pom.xml | 6 +
.../agent/plugin/instance/OracleInstance.java | 31 +++++
.../inlong/agent/plugin/sources/OracleSource.java | 138 +++++++++++++++++++--
.../task/{PostgreSQLTask.java => OracleTask.java} | 69 ++++++-----
.../inlong/agent/plugin/task/PostgreSQLTask.java | 1 -
.../agent/plugin/sources/TestOracleConnect.java | 13 +-
9 files changed, 236 insertions(+), 59 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 1f142f839e..297c163709 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -127,6 +127,17 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_MONGO_SSL_ENABLE =
"task.mongoTask.sslEnabled";
public static final String TASK_MONGO_POLL_INTERVAL =
"task.mongoTask.pollIntervalInMs";
+ // Oracle task
+ public static final String TASK_ORACLE_HOSTNAME =
"task.oracleTask.hostname";
+ public static final String TASK_ORACLE_PORT = "task.oracleTask.port";
+ public static final String TASK_ORACLE_USER = "task.oracleTask.user";
+ public static final String TASK_ORACLE_PASSWORD =
"task.oracleTask.password";
+ public static final String TASK_ORACLE_DBNAME = "task.oracleTask.dbname";
+ public static final String TASK_ORACLE_SERVERNAME =
"task.oracleTask.serverName";
+ public static final String TASK_ORACLE_SCHEMA_INCLUDE_LIST =
"task.oracleTask.schemaIncludeList";
+ public static final String TASK_ORACLE_TABLE_INCLUDE_LIST =
"task.oracleTask.tableIncludeList";
+ public static final String TASK_ORACLE_SNAPSHOT_MODE =
"task.oracleTask.snapshotMode";
+
// PostgreSQL task
public static final String TASK_POSTGRES_HOSTNAME =
"task.postgreSQLTask.hostname";
public static final String TASK_POSTGRES_PORT = "task.postgreSQLTask.port";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java
index 7810c0a807..69ee8f6064 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java
@@ -28,6 +28,8 @@ public class OracleTask {
private String port;
private String serverName;
private String dbname;
+ private String tableIncludeList;
+ private String schemaIncludeList;
private OracleTask.Snapshot snapshot;
private OracleTask.Offset offset;
@@ -58,13 +60,15 @@ public class OracleTask {
public static class OracleTaskConfig {
private String hostname;
- private String user;
+ private String username;
private String password;
private String port;
- private String dbname;
- private String serverName;
+ private String database;
+ private String schemaName;
+ private String tableName;
+ private String primaryKey;
- private String snapshotMode;
+ private String scanStartupMode;
private String intervalMs;
private String offsetFilename;
private String historyFilename;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 039acea32d..6fa58ff14b 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -52,6 +52,7 @@ public class TaskProfileDto {
public static final String DEFAULT_KAFKA_TASK =
"org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_PULSAR_TASK =
"org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_MONGODB_TASK =
"org.apache.inlong.agent.plugin.task.MongoDBTask";
+ public static final String DEFAULT_ORACLE_TASK =
"org.apache.inlong.agent.plugin.task.OracleTask";
public static final String DEFAULT_POSTGRESQL_TASK =
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_MQTT_TASK =
"org.apache.inlong.agent.plugin.task.MqttTask";
public static final String DEFAULT_SQLSERVER_TASK =
"org.apache.inlong.agent.plugin.task.SQLServerTask";
@@ -310,12 +311,14 @@ public class TaskProfileDto {
OracleTaskConfig config = GSON.fromJson(dataConfigs.getExtParams(),
OracleTaskConfig.class);
OracleTask oracleTask = new OracleTask();
- oracleTask.setUser(config.getUser());
+
oracleTask.setHostname(config.getHostname());
- oracleTask.setPassword(config.getPassword());
oracleTask.setPort(config.getPort());
- oracleTask.setServerName(config.getServerName());
- oracleTask.setDbname(config.getDbname());
+ oracleTask.setUser(config.getUsername());
+ oracleTask.setPassword(config.getPassword());
+ oracleTask.setSchemaIncludeList(config.getSchemaName());
+ oracleTask.setDbname(config.getDatabase());
+ oracleTask.setTableIncludeList(config.getTableName());
OracleTask.Offset offset = new OracleTask.Offset();
offset.setFilename(config.getOffsetFilename());
@@ -324,7 +327,7 @@ public class TaskProfileDto {
oracleTask.setOffset(offset);
OracleTask.Snapshot snapshot = new OracleTask.Snapshot();
- snapshot.setMode(config.getSnapshotMode());
+ snapshot.setMode(config.getScanStartupMode());
oracleTask.setSnapshot(snapshot);
OracleTask.History history = new OracleTask.History();
@@ -493,6 +496,7 @@ public class TaskProfileDto {
profileDto.setTask(task);
break;
case ORACLE:
+ task.setTaskClass(DEFAULT_ORACLE_TASK);
OracleTask oracleTask = getOracleTask(dataConfig);
task.setOracleTask(oracleTask);
task.setSource(ORACLE_SOURCE);
diff --git a/inlong-agent/agent-plugins/pom.xml
b/inlong-agent/agent-plugins/pom.xml
index 86cfe91a45..eb092bfed0 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -32,10 +32,16 @@
<properties>
<inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
<debezium.version>1.8.0.Final</debezium.version>
+ <ojdbc.version>19.3.0.0</ojdbc.version>
<darwinsys.version>1.5.1</darwinsys.version>
</properties>
<dependencies>
+ <dependency>
+ <groupId>com.oracle.ojdbc</groupId>
+ <artifactId>ojdbc8</artifactId>
+ <version>${ojdbc.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>agent-common</artifactId>
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/OracleInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/OracleInstance.java
new file mode 100644
index 0000000000..43cf2fd4a8
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/OracleInstance.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+import java.io.IOException;
+
+public class OracleInstance extends CommonInstance {
+
+ @Override
+ public void setInodeInfo(InstanceProfile profile) throws IOException {
+ profile.set(TaskConstants.INODE_INFO, "");
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
index c17dadeeff..fe7fe796ce 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
@@ -17,25 +17,56 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
+import io.debezium.connector.oracle.OracleConnector;
+import io.debezium.connector.oracle.OracleConnectorConfig;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.format.Json;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.relational.history.FileDatabaseHistory;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.agent.constant.TaskConstants.*;
/**
* Oracle SQL source
*/
public class OracleSource extends AbstractSource {
- private static final Logger logger =
LoggerFactory.getLogger(OracleSource.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OracleSource.class);
+ private static final Integer DEBEZIUM_QUEUE_SIZE = 100;
+ private ExecutorService executor;
+ public InstanceProfile profile;
+ private BlockingQueue<SourceData> debeziumQueue;
+ private Properties props = new Properties();
+
+ private String snapshotMode;
+ private String dbName;
+ private String tableName;
+ private String schema;
public OracleSource() {
}
@@ -51,32 +82,120 @@ public class OracleSource extends AbstractSource {
@Override
protected String getThreadName() {
- return null;
+ return "oracle-source-" + taskId + "-" + instanceId;
}
@Override
protected void initSource(InstanceProfile profile) {
+ try {
+ LOGGER.info("OracleSource init: {}", profile.toJsonStr());
+ debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);
+
+ dbName = profile.get(TASK_ORACLE_DBNAME);
+ tableName = profile.get(TASK_ORACLE_TABLE_INCLUDE_LIST);
+ schema = profile.get(TASK_ORACLE_SCHEMA_INCLUDE_LIST);
+ snapshotMode = profile.get(TASK_ORACLE_SNAPSHOT_MODE, "initial");
+
+ props.setProperty("name", "Oracle-" + instanceId);
+ props.setProperty("connector.class",
OracleConnector.class.getName());
+
+ // Unified storage in "[agentPath]/data/"
+ String agentPath =
+
AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_HOME,
AgentConstants.DEFAULT_AGENT_HOME);
+ String offsetPath = agentPath + "/data/" + getThreadName() + "/" +
"offset.dat";
+ String historyPath = agentPath + "/data/" + getThreadName() + "/"
+ "history.dat";
+ props.setProperty("offset.storage",
FileOffsetBackingStore.class.getName());
+ props.setProperty("offset.storage.file.filename", offsetPath);
+ props.setProperty("database.history",
FileDatabaseHistory.class.getCanonicalName());
+ props.setProperty("database.history.file.filename", historyPath);
+
+ props.setProperty(String.valueOf(OracleConnectorConfig.HOSTNAME),
profile.get(TASK_ORACLE_HOSTNAME));
+ props.setProperty(String.valueOf(OracleConnectorConfig.PORT),
profile.get(TASK_ORACLE_PORT));
+ props.setProperty(String.valueOf(OracleConnectorConfig.USER),
profile.get(TASK_ORACLE_USER));
+ props.setProperty(String.valueOf(OracleConnectorConfig.PASSWORD),
profile.get(TASK_ORACLE_PASSWORD));
+
props.setProperty(String.valueOf(OracleConnectorConfig.TABLE_INCLUDE_LIST),
schema + "." + tableName);
+
props.setProperty(String.valueOf(OracleConnectorConfig.SERVER_NAME),
getThreadName());
+
props.setProperty(String.valueOf(OracleConnectorConfig.DATABASE_NAME),
profile.get(TASK_ORACLE_DBNAME));
+
props.setProperty(String.valueOf(OracleConnectorConfig.SCHEMA_INCLUDE_LIST),
schema);
+
props.setProperty(String.valueOf(OracleConnectorConfig.SNAPSHOT_MODE),
snapshotMode);
+
+ // Prevent Base64 encoding of Oracle NUMBER type fields
+
props.setProperty(String.valueOf(OracleConnectorConfig.DECIMAL_HANDLING_MODE),
"string");
+
+ props.setProperty("key.converter.schemas.enable", "false");
+ props.setProperty("value.converter.schemas.enable", "false");
+
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(startDebeziumEngine());
+
+ } catch (Exception ex) {
+ stopRunning();
+ throw new FileException("error init stream for " + instanceId, ex);
+ }
+ }
+ private Runnable startDebeziumEngine() {
+ return () -> {
+ AgentThreadFactory.nameThread(getThreadName() + "debezium");
+ try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine =
DebeziumEngine.create(Json.class)
+ .using(props)
+ .using(OffsetCommitPolicy.always())
+ .notifying(this::handleConsumerEvent)
+ .build()) {
+ debeziumEngine.run();
+ } catch (Throwable e) {
+ LOGGER.error("do run error in postgres debezium: ", e);
+ }
+ };
+ }
+
+ private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
+ DebeziumEngine.RecordCommitter<ChangeEvent<String, String>>
committer) throws InterruptedException {
+ for (ChangeEvent<String, String> record : records) {
+ boolean offerSuc = false;
+ SourceData sourceData = new
SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0L");
+ while (isRunnable() && !offerSuc) {
+ offerSuc = debeziumQueue.offer(sourceData, 1,
TimeUnit.SECONDS);
+ }
+ committer.markProcessed(record);
+ }
+ committer.markBatchFinished();
}
@Override
protected void printCurrentState() {
-
+ LOGGER.info("oracle table is {}", tableName);
}
@Override
protected boolean doPrepareToRead() {
- return false;
+ return true;
}
@Override
protected List<SourceData> readFromSource() {
- return null;
+ List<SourceData> dataList = new ArrayList<>();
+ try {
+ int size = 0;
+ while (size < BATCH_READ_LINE_TOTAL_LEN) {
+ SourceData sourceData = debeziumQueue.poll(1,
TimeUnit.SECONDS);
+ if (sourceData != null) {
+ LOGGER.info("readFromSource: {}", sourceData.getData());
+ size += sourceData.getData().length;
+ dataList.add(sourceData);
+ } else {
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("poll {} data from debezium queue interrupted.",
instanceId);
+ }
+ return dataList;
}
@Override
public Message read() {
- return null;
+ return super.read();
}
@Override
@@ -86,16 +205,17 @@ public class OracleSource extends AbstractSource {
@Override
protected void releaseSource() {
-
+ LOGGER.info("release oracle source");
+ executor.shutdownNow();
}
@Override
public boolean sourceFinish() {
- return false;
+ return super.sourceFinish();
}
@Override
public boolean sourceExist() {
- return false;
+ return true;
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
similarity index 59%
copy from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
copy to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
index 554764911d..34b064d48d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin.task;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
@@ -29,34 +30,25 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_DBNAME;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_HOSTNAME;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PASSWORD;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PLUGIN_NAME;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PORT;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_TABLE_INCLUDE_LIST;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_USER;
+import static org.apache.inlong.agent.constant.TaskConstants.*;
-public class PostgreSQLTask extends AbstractTask {
+public class OracleTask extends AbstractTask {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PostgreSQLTask.class);
- public static final String DEFAULT_KAFKA_INSTANCE =
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OracleTask.class);
+
+ public static final String DEFAULT_ORACLE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.OracleInstance";
+ private AtomicBoolean isAdded = new AtomicBoolean(false);
private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
- private boolean isAdded = false;
- public static final int DEFAULT_INSTANCE_LIMIT = 1;
private String dbName;
private String tableName;
private String instanceId;
@Override
- protected void initTask() {
- LOGGER.info("postgres commonInit: {}", taskProfile.toJsonStr());
- taskProfile.get(TASK_POSTGRES_DBNAME);
- dbName = taskProfile.get(TASK_POSTGRES_DBNAME);
- tableName = taskProfile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST);
- instanceId = dbName + "-" + tableName;
+ protected int getInstanceLimit() {
+ return DEFAULT_INSTANCE_LIMIT;
}
@Override
@@ -65,49 +57,58 @@ public class PostgreSQLTask extends AbstractTask {
LOGGER.error("task profile needs all required key");
return false;
}
- if (!profile.hasKey(profile.get(TASK_POSTGRES_HOSTNAME))) {
+ if (!profile.hasKey(TaskConstants.TASK_ORACLE_HOSTNAME)) {
LOGGER.error("task profile needs hostname");
return false;
}
- if (!profile.hasKey(profile.get(TASK_POSTGRES_PORT))) {
+ if (!profile.hasKey(TaskConstants.TASK_ORACLE_PORT)) {
LOGGER.error("task profile needs port");
return false;
}
- if (!profile.hasKey(profile.get(TASK_POSTGRES_USER))) {
+ if (!profile.hasKey(TaskConstants.TASK_ORACLE_USER)) {
LOGGER.error("task profile needs username");
return false;
}
- if (!profile.hasKey(profile.get(TASK_POSTGRES_PASSWORD))) {
+ if (!profile.hasKey(TaskConstants.TASK_ORACLE_PASSWORD)) {
LOGGER.error("task profile needs password");
return false;
}
- if (!profile.hasKey(profile.get(TASK_POSTGRES_DBNAME))) {
+ if (!profile.hasKey(TaskConstants.TASK_ORACLE_DBNAME)) {
LOGGER.error("task profile needs DB name");
return false;
}
- if (!profile.hasKey(profile.get(TASK_POSTGRES_PLUGIN_NAME))) {
- LOGGER.error("task profile needs plugin name");
+ if (!profile.hasKey(TaskConstants.TASK_ORACLE_SCHEMA_INCLUDE_LIST)) {
+ LOGGER.error("task profile needs schema name");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_ORACLE_TABLE_INCLUDE_LIST)) {
+ LOGGER.error("task profile needs table list");
return false;
}
return true;
}
+ @Override
+ protected void initTask() {
+ LOGGER.info("oracle commonInit: {}", taskProfile.toJsonStr());
+ dbName = taskProfile.get(TASK_ORACLE_DBNAME);
+ tableName = taskProfile.get(TASK_ORACLE_TABLE_INCLUDE_LIST);
+ instanceId = dbName + "-" + tableName;
+ }
+
@Override
protected List<InstanceProfile> getNewInstanceList() {
List<InstanceProfile> list = new ArrayList<>();
- if (isAdded) {
+ if (isAdded.get()) {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
- InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, instanceId,
- CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ InstanceProfile instanceProfile =
+ taskProfile.createInstanceProfile(DEFAULT_ORACLE_INSTANCE,
instanceId,
+ CycleUnitType.HOUR, dataTime,
AgentUtils.getCurrentTime());
+ LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
list.add(instanceProfile);
- this.isAdded = true;
+ this.isAdded.set(true);
return list;
}
-
- @Override
- protected int getInstanceLimit() {
- return DEFAULT_INSTANCE_LIMIT;
- }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
index 554764911d..7cf382fbd6 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
@@ -53,7 +53,6 @@ public class PostgreSQLTask extends AbstractTask {
@Override
protected void initTask() {
LOGGER.info("postgres commonInit: {}", taskProfile.toJsonStr());
- taskProfile.get(TASK_POSTGRES_DBNAME);
dbName = taskProfile.get(TASK_POSTGRES_DBNAME);
tableName = taskProfile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST);
instanceId = dbName + "-" + tableName;
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
index ea1d9289cb..7f2e9e3c81 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
@@ -41,13 +41,14 @@ public class TestOracleConnect {
@Ignore
public void testOracle() {
InstanceProfile jobProfile = new InstanceProfile();
- jobProfile.set("job.oracleJob.hostname", "localhost");
- jobProfile.set("job.oracleJob.port", "1521");
- jobProfile.set("job.oracleJob.user", "c##dbzuser");
- jobProfile.set("job.oracleJob.password", "dbz");
- jobProfile.set("job.oracleJob.sid", "ORCLCDB");
- jobProfile.set("job.oracleJob.dbname", "ORCLCDB");
+ jobProfile.set("job.oracleJob.hostname", "192.168.101.11");
+ jobProfile.set("job.oracleJob.port", "49161");
+ jobProfile.set("job.oracleJob.user", "c##admin");
+ jobProfile.set("job.oracleJob.password", "inlong");
+ jobProfile.set("job.oracleJob.sid", "xe");
+ jobProfile.set("job.oracleJob.dbname", "xe");
jobProfile.set("job.oracleJob.serverName", "server1");
+ jobProfile.set("instance.id", "instance_test");
jobProfile.set(TaskConstants.JOB_INSTANCE_ID,
UUID.randomUUID().toString());
jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString());
jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString());