This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 84f738c91 [INLONG-7404][Agent] Fix error of Redis connector (#7405)
84f738c91 is described below
commit 84f738c9132250e99496e4badcc44215089169f3
Author: haifxu <[email protected]>
AuthorDate: Wed Feb 22 16:41:54 2023 +0800
[INLONG-7404][Agent] Fix error of Redis connector (#7405)
---
.../apache/inlong/agent/pojo/JobProfileDto.java | 27 +++++++++++++
.../org/apache/inlong/agent/pojo/RedisJob.java | 46 ++++++++++++++++++++++
.../inlong/agent/plugin/sources/RedisSource.java | 3 +-
.../agent/plugin/sources/reader/RedisReader.java | 4 ++
.../apache/inlong/common/enums/TaskTypeEnum.java | 2 +
5 files changed, 81 insertions(+), 1 deletion(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 74e8377af..169392f02 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -64,6 +64,10 @@ public class JobProfileDto {
* oracle source
*/
public static final String ORACLE_SOURCE =
"org.apache.inlong.agent.plugin.sources.OracleSource";
+ /**
+ * redis source
+ */
+ public static final String REDIS_SOURCE =
"org.apache.inlong.agent.plugin.sources.RedisSource";
/**
* mqtt source
*/
@@ -215,6 +219,22 @@ public class JobProfileDto {
return postgreSQLJob;
}
+ private static RedisJob getRedisJob(DataConfig dataConfig) {
+ RedisJob.RedisJobConfig config =
GSON.fromJson(dataConfig.getExtParams(), RedisJob.RedisJobConfig.class);
+ RedisJob redisJob = new RedisJob();
+
+ redisJob.setAuthUser(config.getUsername());
+ redisJob.setAuthPassword(config.getPassword());
+ redisJob.setHostname(config.getHostname());
+ redisJob.setPort(config.getPort());
+ redisJob.setSsl(config.getSsl());
+ redisJob.setReadTimeout(config.getTimeout());
+ redisJob.setQueueSize(config.getQueueSize());
+ redisJob.setReplId(config.getReplId());
+
+ return redisJob;
+ }
+
private static MongoJob getMongoJob(DataConfig dataConfigs) {
MongoJob.MongoJobTaskConfig config =
GSON.fromJson(dataConfigs.getExtParams(),
@@ -440,6 +460,12 @@ public class JobProfileDto {
job.setSource(MONGO_SOURCE);
profileDto.setJob(job);
break;
+ case REDIS:
+ RedisJob redisJob = getRedisJob(dataConfig);
+ job.setRedisJob(redisJob);
+ job.setSource(REDIS_SOURCE);
+ profileDto.setJob(job);
+ break;
case MQTT:
MqttJob mqttJob = getMqttJob(dataConfig);
job.setMqttJob(mqttJob);
@@ -481,6 +507,7 @@ public class JobProfileDto {
private PostgreSQLJob postgreSQLJob;
private OracleJob oracleJob;
private MongoJob mongoJob;
+ private RedisJob redisJob;
private MqttJob mqttJob;
private SqlServerJob sqlserverJob;
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisJob.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisJob.java
new file mode 100644
index 000000000..5917f9efe
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisJob.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class RedisJob {
+
+ private String authUser;
+ private String authPassword;
+ private String hostname;
+ private String port;
+ private Boolean ssl;
+ private String readTimeout;
+ private String queueSize;
+ private String replId;
+
+ @Data
+ public static class RedisJobConfig {
+
+ private String username;
+ private String password;
+ private String hostname;
+ private String port;
+ private Boolean ssl;
+ private String timeout;
+ private String queueSize;
+ private String replId;
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
index 48003c8de..04d51b933 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
@@ -40,7 +40,8 @@ public class RedisSource extends AbstractSource {
@Override
public List<Reader> split(JobProfile conf) {
super.init(conf);
- Reader redisReader = new RedisReader();
+ RedisReader redisReader = new RedisReader();
+ redisReader.setReadSource(conf.getInstanceId());
List<Reader> readerList = new ArrayList<>();
readerList.add(redisReader);
sourceMetric.sourceSuccessCount.incrementAndGet();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
index 394ffc209..62203c459 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
@@ -202,6 +202,10 @@ public class RedisReader extends AbstractReader {
return instanceId;
}
+ public void setReadSource(String instanceId) {
+ this.instanceId = instanceId;
+ }
+
@Override
public void setReadTimeout(long mill) {
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index 97dc71f6f..180fb0a88 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -72,6 +72,8 @@ public enum TaskTypeEnum {
return MONGODB;
case 10:
return TUBEMQ;
+ case 11:
+ return REDIS;
case 12:
return MQTT;
case 13: