This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a3cc5011f8 [INLONG-10010][Agent] Adjust source encapsulation, keep
public initialization in the base class init, and place specific source related
initialization in the subclass's initSource (#10011)
a3cc5011f8 is described below
commit a3cc5011f8a985da60a6b89b60ecca680c9e39b2
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Apr 19 09:44:01 2024 +0800
[INLONG-10010][Agent] Adjust source encapsulation, keep public
initialization in the base class init, and place specific source related
initialization in the subclass's initSource (#10011)
---
.../apache/inlong/agent/plugin/sources/DatabaseSqlSource.java | 6 ++++++
.../org/apache/inlong/agent/plugin/sources/KafkaSource.java | 4 +---
.../org/apache/inlong/agent/plugin/sources/LogFileSource.java | 3 +--
.../org/apache/inlong/agent/plugin/sources/MongoDBSource.java | 4 +---
.../org/apache/inlong/agent/plugin/sources/MqttSource.java | 6 ++++++
.../org/apache/inlong/agent/plugin/sources/OracleSource.java | 6 ++++++
.../apache/inlong/agent/plugin/sources/PostgreSQLSource.java | 6 ++++++
.../org/apache/inlong/agent/plugin/sources/PulsarSource.java | 3 +--
.../org/apache/inlong/agent/plugin/sources/RedisSource.java | 6 ++++++
.../apache/inlong/agent/plugin/sources/SQLServerSource.java | 6 ++++++
.../inlong/agent/plugin/sources/file/AbstractSource.java | 10 ++++++++++
11 files changed, 50 insertions(+), 10 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
index 2df77b4f83..9be71ca32f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
@@ -93,6 +94,11 @@ public class DatabaseSqlSource extends AbstractSource {
return null;
}
+ @Override
+ protected void initSource(InstanceProfile profile) {
+
+ }
+
@Override
protected void printCurrentState() {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index 42e6d7c70f..0ca9b97def 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -69,11 +69,9 @@ public class KafkaSource extends AbstractSource {
}
@Override
- public void init(InstanceProfile profile) {
+ protected void initSource(InstanceProfile profile) {
try {
LOGGER.info("KafkaSource init: {}", profile.toJsonStr());
- this.profile = profile;
- super.init(profile);
topic = profile.getInstanceId();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
profile.get(TASK_KAFKA_BOOTSTRAP_SERVERS));
props.put(ConsumerConfig.GROUP_ID_CONFIG, taskId);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 037470fb41..09d200a742 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -65,10 +65,9 @@ public class LogFileSource extends AbstractSource {
}
@Override
- public void init(InstanceProfile profile) {
+ protected void initSource(InstanceProfile profile) {
try {
LOGGER.info("LogFileSource init: {}", profile.toJsonStr());
- super.init(profile);
fileName = profile.getInstanceId();
bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
isIncrement = isIncrement(profile);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
index 74fc50e111..d9cf63ee0e 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
@@ -66,11 +66,9 @@ public class MongoDBSource extends AbstractSource {
}
@Override
- public void init(InstanceProfile profile) {
+ protected void initSource(InstanceProfile profile) {
try {
LOGGER.info("MongoDBSource init: {}", profile.toJsonStr());
- this.profile = profile;
- super.init(profile);
debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);
database =
profile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST);
collection =
profile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
index 5f14cf3027..a1c4af9be7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.plugin.Message;
@@ -84,6 +85,11 @@ public class MqttSource extends AbstractSource {
return null;
}
+ @Override
+ protected void initSource(InstanceProfile profile) {
+
+ }
+
@Override
protected void printCurrentState() {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
index 1d3088a0aa..c17dadeeff 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
@@ -53,6 +54,11 @@ public class OracleSource extends AbstractSource {
return null;
}
+ @Override
+ protected void initSource(InstanceProfile profile) {
+
+ }
+
@Override
protected void printCurrentState() {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
index 7a65767c1b..400dc9ab49 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
@@ -54,6 +55,11 @@ public class PostgreSQLSource extends AbstractSource {
return null;
}
+ @Override
+ protected void initSource(InstanceProfile profile) {
+
+ }
+
@Override
protected void printCurrentState() {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index 64974d7994..34a81f7d22 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -63,10 +63,9 @@ public class PulsarSource extends AbstractSource {
}
@Override
- public void init(InstanceProfile profile) {
+ protected void initSource(InstanceProfile profile) {
try {
LOGGER.info("PulsarSource init: {}", profile.toJsonStr());
- super.init(profile);
topic = profile.getInstanceId();
serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL);
subscription = profile.get(TASK_PULSAR_SUBSCRIPTION,
PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
index a844ba38cb..25cd7a3e01 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
@@ -54,6 +55,11 @@ public class RedisSource extends AbstractSource {
return null;
}
+ @Override
+ protected void initSource(InstanceProfile profile) {
+
+ }
+
@Override
protected void printCurrentState() {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
index 8fceb8f635..e067833c6b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
@@ -53,6 +54,11 @@ public class SQLServerSource extends AbstractSource {
return null;
}
+ @Override
+ protected void initSource(InstanceProfile profile) {
+
+ }
+
@Override
protected void printCurrentState() {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index d9e8b9834a..192756abf8 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin.sources.file;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.core.task.MemoryManager;
@@ -29,6 +30,7 @@ import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.file.Source;
import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -128,8 +130,11 @@ public abstract class AbstractSource implements Source {
initOffset();
registerMetric();
initExtendHandler();
+ initSource(profile);
}
+ protected abstract void initSource(InstanceProfile profile);
+
protected void initOffset() {
offsetProfile = OffsetManager.getInstance().getOffset(taskId,
instanceId);
}
@@ -396,4 +401,9 @@ public abstract class AbstractSource implements Source {
}
return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
}
+
+ @Override
+ public List<Reader> split(TaskProfile conf) {
+ return null;
+ }
}