This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 99a9979  [INLONG-2540][Agent] Create db sql collect task by config 
from manager (#2543)
99a9979 is described below

commit 99a99795b507f70d98d1024a262e1a1367be520b
Author: justinwwhuang <[email protected]>
AuthorDate: Sat Feb 19 18:28:59 2022 +0800

    [INLONG-2540][Agent] Create db sql collect task by config from manager 
(#2543)
---
 .../java/org/apache/inlong/agent/core/job/Job.java |  2 +-
 .../apache/inlong/agent/core/task/TaskWrapper.java | 16 ++++++-----
 .../agent/plugin/fetcher/dtos/DbCollectorTask.java |  2 +-
 .../plugin/fetcher/dtos/SqlJobProfileDto.java      | 31 +++++++++++++---------
 ...{DataBaseSource.java => DatabaseSqlSource.java} |  7 +++--
 5 files changed, 35 insertions(+), 23 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 4c9ac92..0a4f876 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -92,7 +92,7 @@ public class Job {
                 taskList.add(new Task(taskId, reader, writer, channel, 
getJobConf()));
             }
         } catch (Exception ex) {
-            LOGGER.error("create taks fail", ex);
+            LOGGER.error("create task failed", ex);
             throw new RuntimeException(ex);
         }
         return taskList;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index e394d37..6247b1e 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -23,6 +23,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.constants.AgentConstants;
@@ -58,16 +59,16 @@ public class TaskWrapper extends AbstractStateWrapper {
         this.task = task;
         AgentConfiguration conf = AgentConfiguration.getAgentConf();
         maxRetryTime = conf.getInt(
-            AgentConstants.TASK_MAX_RETRY_TIME, 
AgentConstants.DEFAULT_TASK_MAX_RETRY_TIME);
+                AgentConstants.TASK_MAX_RETRY_TIME, 
AgentConstants.DEFAULT_TASK_MAX_RETRY_TIME);
         pushMaxWaitTime = conf.getInt(
-            AgentConstants.TASK_PUSH_MAX_SECOND, 
AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
+                AgentConstants.TASK_PUSH_MAX_SECOND, 
AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
         pullMaxWaitTime = conf.getInt(
-            AgentConstants.TASK_PULL_MAX_SECOND, 
AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
+                AgentConstants.TASK_PULL_MAX_SECOND, 
AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
         if (executorService == null) {
             executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
-                60L, TimeUnit.SECONDS,
-                new SynchronousQueue<Runnable>(),
-                new AgentThreadFactory("task-reader-writer"));
+                    60L, TimeUnit.SECONDS,
+                    new SynchronousQueue<Runnable>(),
+                    new AgentThreadFactory("task-reader-writer"));
         }
         doChangeState(State.ACCEPTED);
     }
@@ -84,10 +85,11 @@ public class TaskWrapper extends AbstractStateWrapper {
                 if (message == null || task.getChannel()
                         .push(message, pushMaxWaitTime, TimeUnit.SECONDS)) {
                     message = task.getReader().read();
+                    LOGGER.info("submitReadThread message {}", message);
                 }
             }
             LOGGER.info("read end, task exception status is {}, read finish 
status is {}",
-                isException(), task.isReadFinished());
+                    isException(), task.isReadFinished());
             // write end message
             task.getChannel().push(new EndMessage());
         }, executorService);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DbCollectorTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DbCollectorTask.java
index 89f24e7..0a5da64 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DbCollectorTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DbCollectorTask.java
@@ -26,7 +26,7 @@ public class DbCollectorTask {
     private Integer type;
     private Integer dbType;
     private String ip;
-    private Integer port;
+    private Integer dbport;
     private String dbName;
     private String user;
     private String password;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/SqlJobProfileDto.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/SqlJobProfileDto.java
index 9b72436..87e049f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/SqlJobProfileDto.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/SqlJobProfileDto.java
@@ -35,7 +35,7 @@ public class SqlJobProfileDto {
     public static final String SQL_JOB = "SQL_JOB";
     public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
     public static final String DEFAULT_DATAPROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
-    public static final String DEFAULT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.TextFileSource";
+    public static final String DEFAULT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.DatabaseSqlSource";
 
     @Data
     public static class Running {
@@ -50,6 +50,16 @@ public class SqlJobProfileDto {
     }
 
     @Data
+    public static class Sql {
+
+        private String user;
+        private String password;
+        private String hostname;
+        private Integer port;
+        private String command;
+    }
+
+    @Data
     public static class Job {
 
         private int id;
@@ -57,16 +67,12 @@ public class SqlJobProfileDto {
         private String source;
         private String sink;
         private String channel;
-        private String ip;
-        private Integer port;
         private String dbName;
-        private String user;
-        private String password;
-        private String sqlStatement;
         private Integer totalLimit;
         private Integer onceLimit;
         private Integer timeLimit;
         private Integer retryTimes;
+        private Sql sql;
     }
 
     @Data
@@ -85,23 +91,24 @@ public class SqlJobProfileDto {
     }
 
     private static Job getJob(DbCollectorTask task) {
+        Sql sql = new Sql();
+        sql.setHostname(task.getIp());
+        sql.setPort(task.getDbport());
+        sql.setUser(task.getUser());
+        sql.setPassword(task.getPassword());
+        sql.setCommand(task.getSqlStatement());
         Job job = new Job();
         job.setId(Integer.parseInt(task.getId()));
         job.setName(SQL_JOB);
         job.setSource(DEFAULT_SOURCE);
         job.setSink(DEFAULT_DATAPROXY_SINK);
         job.setChannel(DEFAULT_CHANNEL);
-        job.setIp(task.getIp());
-        job.setPort(task.getPort());
         job.setDbName(task.getDbName());
-        job.setUser(task.getUser());
-        job.setPassword(task.getPassword());
-        job.setSqlStatement(task.getSqlStatement());
         job.setTotalLimit(task.getTotalLimit());
         job.setOnceLimit(task.getOnceLimit());
         job.setTimeLimit(task.getTimeLimit());
         job.setRetryTimes(task.getRetryTimes());
-
+        job.setSql(sql);
         return job;
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
similarity index 93%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
rename to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
index 8b9ad8c..1a0bcbd 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DataBaseSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
@@ -30,11 +30,14 @@ import 
org.apache.inlong.agent.plugin.sources.reader.SqlReader;
 import org.apache.inlong.agent.utils.AgentDbUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Make database as Source
  */
-public class DataBaseSource implements Source {
+public class DatabaseSqlSource  implements Source {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DatabaseSqlSource.class);
 
     private static final String JOB_DATABASE_SQL = "job.sql.command";
 
@@ -43,7 +46,7 @@ public class DataBaseSource implements Source {
     private final SourceMetrics sourceMetrics;
     private static AtomicLong metricsIndex = new AtomicLong(0);
 
-    public DataBaseSource() {
+    public DatabaseSqlSource() {
         if (ConfigUtil.isPrometheusEnabled()) {
             this.sourceMetrics = new 
SourcePrometheusMetrics(AgentUtils.getUniqId(
                 DATABASE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));

Reply via email to