This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a3cc5011f8 [INLONG-10010][Agent] Adjust source encapsulation, keep 
public initialization in the base class init, and place specific source related 
initialization in the subclass's initSource (#10011)
a3cc5011f8 is described below

commit a3cc5011f8a985da60a6b89b60ecca680c9e39b2
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Apr 19 09:44:01 2024 +0800

    [INLONG-10010][Agent] Adjust source encapsulation, keep public 
initialization in the base class init, and place specific source related 
initialization in the subclass's initSource (#10011)
---
 .../apache/inlong/agent/plugin/sources/DatabaseSqlSource.java  |  6 ++++++
 .../org/apache/inlong/agent/plugin/sources/KafkaSource.java    |  4 +---
 .../org/apache/inlong/agent/plugin/sources/LogFileSource.java  |  3 +--
 .../org/apache/inlong/agent/plugin/sources/MongoDBSource.java  |  4 +---
 .../org/apache/inlong/agent/plugin/sources/MqttSource.java     |  6 ++++++
 .../org/apache/inlong/agent/plugin/sources/OracleSource.java   |  6 ++++++
 .../apache/inlong/agent/plugin/sources/PostgreSQLSource.java   |  6 ++++++
 .../org/apache/inlong/agent/plugin/sources/PulsarSource.java   |  3 +--
 .../org/apache/inlong/agent/plugin/sources/RedisSource.java    |  6 ++++++
 .../apache/inlong/agent/plugin/sources/SQLServerSource.java    |  6 ++++++
 .../inlong/agent/plugin/sources/file/AbstractSource.java       | 10 ++++++++++
 11 files changed, 50 insertions(+), 10 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
index 2df77b4f83..9be71ca32f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
@@ -93,6 +94,11 @@ public class DatabaseSqlSource extends AbstractSource {
         return null;
     }
 
+    @Override
+    protected void initSource(InstanceProfile profile) {
+
+    }
+
     @Override
     protected void printCurrentState() {
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index 42e6d7c70f..0ca9b97def 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -69,11 +69,9 @@ public class KafkaSource extends AbstractSource {
     }
 
     @Override
-    public void init(InstanceProfile profile) {
+    protected void initSource(InstanceProfile profile) {
         try {
             LOGGER.info("KafkaSource init: {}", profile.toJsonStr());
-            this.profile = profile;
-            super.init(profile);
             topic = profile.getInstanceId();
             props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
profile.get(TASK_KAFKA_BOOTSTRAP_SERVERS));
             props.put(ConsumerConfig.GROUP_ID_CONFIG, taskId);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 037470fb41..09d200a742 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -65,10 +65,9 @@ public class LogFileSource extends AbstractSource {
     }
 
     @Override
-    public void init(InstanceProfile profile) {
+    protected void initSource(InstanceProfile profile) {
         try {
             LOGGER.info("LogFileSource init: {}", profile.toJsonStr());
-            super.init(profile);
             fileName = profile.getInstanceId();
             bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
             isIncrement = isIncrement(profile);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
index 74fc50e111..d9cf63ee0e 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
@@ -66,11 +66,9 @@ public class MongoDBSource extends AbstractSource {
     }
 
     @Override
-    public void init(InstanceProfile profile) {
+    protected void initSource(InstanceProfile profile) {
         try {
             LOGGER.info("MongoDBSource init: {}", profile.toJsonStr());
-            this.profile = profile;
-            super.init(profile);
             debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);
             database = 
profile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST);
             collection = 
profile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
index 5f14cf3027..a1c4af9be7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.plugin.Message;
@@ -84,6 +85,11 @@ public class MqttSource extends AbstractSource {
         return null;
     }
 
+    @Override
+    protected void initSource(InstanceProfile profile) {
+
+    }
+
     @Override
     protected void printCurrentState() {
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
index 1d3088a0aa..c17dadeeff 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
@@ -53,6 +54,11 @@ public class OracleSource extends AbstractSource {
         return null;
     }
 
+    @Override
+    protected void initSource(InstanceProfile profile) {
+
+    }
+
     @Override
     protected void printCurrentState() {
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
index 7a65767c1b..400dc9ab49 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
@@ -54,6 +55,11 @@ public class PostgreSQLSource extends AbstractSource {
         return null;
     }
 
+    @Override
+    protected void initSource(InstanceProfile profile) {
+
+    }
+
     @Override
     protected void printCurrentState() {
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index 64974d7994..34a81f7d22 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -63,10 +63,9 @@ public class PulsarSource extends AbstractSource {
     }
 
     @Override
-    public void init(InstanceProfile profile) {
+    protected void initSource(InstanceProfile profile) {
         try {
             LOGGER.info("PulsarSource init: {}", profile.toJsonStr());
-            super.init(profile);
             topic = profile.getInstanceId();
             serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL);
             subscription = profile.get(TASK_PULSAR_SUBSCRIPTION, 
PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
index a844ba38cb..25cd7a3e01 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
@@ -54,6 +55,11 @@ public class RedisSource extends AbstractSource {
         return null;
     }
 
+    @Override
+    protected void initSource(InstanceProfile profile) {
+
+    }
+
     @Override
     protected void printCurrentState() {
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
index 8fceb8f635..e067833c6b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
@@ -53,6 +54,11 @@ public class SQLServerSource extends AbstractSource {
         return null;
     }
 
+    @Override
+    protected void initSource(InstanceProfile profile) {
+
+    }
+
     @Override
     protected void printCurrentState() {
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index d9e8b9834a..192756abf8 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin.sources.file;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.core.task.MemoryManager;
@@ -29,6 +30,7 @@ import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.file.Source;
 import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler;
 import org.apache.inlong.agent.utils.AgentUtils;
@@ -128,8 +130,11 @@ public abstract class AbstractSource implements Source {
         initOffset();
         registerMetric();
         initExtendHandler();
+        initSource(profile);
     }
 
+    protected abstract void initSource(InstanceProfile profile);
+
     protected void initOffset() {
         offsetProfile = OffsetManager.getInstance().getOffset(taskId, 
instanceId);
     }
@@ -396,4 +401,9 @@ public abstract class AbstractSource implements Source {
         }
         return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
     }
+
+    @Override
+    public List<Reader> split(TaskProfile conf) {
+        return null;
+    }
 }

Reply via email to