This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 99a9979 [INLONG-2540][Agent] Create db sql collect task by config
from manager (#2543)
99a9979 is described below
commit 99a99795b507f70d98d1024a262e1a1367be520b
Author: justinwwhuang <[email protected]>
AuthorDate: Sat Feb 19 18:28:59 2022 +0800
[INLONG-2540][Agent] Create db sql collect task by config from manager
(#2543)
---
.../java/org/apache/inlong/agent/core/job/Job.java | 2 +-
.../apache/inlong/agent/core/task/TaskWrapper.java | 16 ++++++-----
.../agent/plugin/fetcher/dtos/DbCollectorTask.java | 2 +-
.../plugin/fetcher/dtos/SqlJobProfileDto.java | 31 +++++++++++++---------
...{DataBaseSource.java => DatabaseSqlSource.java} | 7 +++--
5 files changed, 35 insertions(+), 23 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 4c9ac92..0a4f876 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -92,7 +92,7 @@ public class Job {
taskList.add(new Task(taskId, reader, writer, channel,
getJobConf()));
}
} catch (Exception ex) {
- LOGGER.error("create taks fail", ex);
+ LOGGER.error("create task failed", ex);
throw new RuntimeException(ex);
}
return taskList;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index e394d37..6247b1e 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -23,6 +23,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.constants.AgentConstants;
@@ -58,16 +59,16 @@ public class TaskWrapper extends AbstractStateWrapper {
this.task = task;
AgentConfiguration conf = AgentConfiguration.getAgentConf();
maxRetryTime = conf.getInt(
- AgentConstants.TASK_MAX_RETRY_TIME,
AgentConstants.DEFAULT_TASK_MAX_RETRY_TIME);
+ AgentConstants.TASK_MAX_RETRY_TIME,
AgentConstants.DEFAULT_TASK_MAX_RETRY_TIME);
pushMaxWaitTime = conf.getInt(
- AgentConstants.TASK_PUSH_MAX_SECOND,
AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
+ AgentConstants.TASK_PUSH_MAX_SECOND,
AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
pullMaxWaitTime = conf.getInt(
- AgentConstants.TASK_PULL_MAX_SECOND,
AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
+ AgentConstants.TASK_PULL_MAX_SECOND,
AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- new AgentThreadFactory("task-reader-writer"));
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ new AgentThreadFactory("task-reader-writer"));
}
doChangeState(State.ACCEPTED);
}
@@ -84,10 +85,11 @@ public class TaskWrapper extends AbstractStateWrapper {
if (message == null || task.getChannel()
.push(message, pushMaxWaitTime, TimeUnit.SECONDS)) {
message = task.getReader().read();
+ LOGGER.info("submitReadThread message {}", message);
}
}
LOGGER.info("read end, task exception status is {}, read finish
status is {}",
- isException(), task.isReadFinished());
+ isException(), task.isReadFinished());
// write end message
task.getChannel().push(new EndMessage());
}, executorService);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DbCollectorTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DbCollectorTask.java
index 89f24e7..0a5da64 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DbCollectorTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DbCollectorTask.java
@@ -26,7 +26,7 @@ public class DbCollectorTask {
private Integer type;
private Integer dbType;
private String ip;
- private Integer port;
+ private Integer dbport;
private String dbName;
private String user;
private String password;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/SqlJobProfileDto.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/SqlJobProfileDto.java
index 9b72436..87e049f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/SqlJobProfileDto.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/SqlJobProfileDto.java
@@ -35,7 +35,7 @@ public class SqlJobProfileDto {
public static final String SQL_JOB = "SQL_JOB";
public static final String DEFAULT_CHANNEL =
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String DEFAULT_DATAPROXY_SINK =
"org.apache.inlong.agent.plugin.sinks.ProxySink";
- public static final String DEFAULT_SOURCE =
"org.apache.inlong.agent.plugin.sources.TextFileSource";
+ public static final String DEFAULT_SOURCE =
"org.apache.inlong.agent.plugin.sources.DatabaseSqlSource";
@Data
public static class Running {
@@ -50,6 +50,16 @@ public class SqlJobProfileDto {
}
@Data
+ public static class Sql {
+
+ private String user;
+ private String password;
+ private String hostname;
+ private Integer port;
+ private String command;
+ }
+
+ @Data
public static class Job {
private int id;
@@ -57,16 +67,12 @@ public class SqlJobProfileDto {
private String source;
private String sink;
private String channel;
- private String ip;
- private Integer port;
private String dbName;
- private String user;
- private String password;
- private String sqlStatement;
private Integer totalLimit;
private Integer onceLimit;
private Integer timeLimit;
private Integer retryTimes;
+ private Sql sql;
}
@Data
@@ -85,23 +91,24 @@ public class SqlJobProfileDto {
}
private static Job getJob(DbCollectorTask task) {
+ Sql sql = new Sql();
+ sql.setHostname(task.getIp());
+ sql.setPort(task.getDbport());
+ sql.setUser(task.getUser());
+ sql.setPassword(task.getPassword());
+ sql.setCommand(task.getSqlStatement());
Job job = new Job();
job.setId(Integer.parseInt(task.getId()));
job.setName(SQL_JOB);
job.setSource(DEFAULT_SOURCE);
job.setSink(DEFAULT_DATAPROXY_SINK);
job.setChannel(DEFAULT_CHANNEL);
- job.setIp(task.getIp());
- job.setPort(task.getPort());
job.setDbName(task.getDbName());
- job.setUser(task.getUser());
- job.setPassword(task.getPassword());
- job.setSqlStatement(task.getSqlStatement());
job.setTotalLimit(task.getTotalLimit());
job.setOnceLimit(task.getOnceLimit());
job.setTimeLimit(task.getTimeLimit());
job.setRetryTimes(task.getRetryTimes());
-
+ job.setSql(sql);
return job;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
similarity index 93%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
rename to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
index 8b9ad8c..1a0bcbd 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
@@ -30,11 +30,14 @@ import
org.apache.inlong.agent.plugin.sources.reader.SqlReader;
import org.apache.inlong.agent.utils.AgentDbUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Make database as Source
*/
-public class DataBaseSource implements Source {
+public class DatabaseSqlSource implements Source {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DatabaseSqlSource.class);
private static final String JOB_DATABASE_SQL = "job.sql.command";
@@ -43,7 +46,7 @@ public class DataBaseSource implements Source {
private final SourceMetrics sourceMetrics;
private static AtomicLong metricsIndex = new AtomicLong(0);
- public DataBaseSource() {
+ public DatabaseSqlSource() {
if (ConfigUtil.isPrometheusEnabled()) {
this.sourceMetrics = new
SourcePrometheusMetrics(AgentUtils.getUniqId(
DATABASE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));