This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cc82864873 [INLONG-10287][Agent] Update the Redis Source (#11084)
cc82864873 is described below
commit cc828648738e60e7796546a95bb95f94d2e90844
Author: emptyOVO <[email protected]>
AuthorDate: Tue Oct 8 18:58:53 2024 +0800
[INLONG-10287][Agent] Update the Redis Source (#11084)
---
.../inlong/agent/constant/TaskConstants.java | 16 +
.../org/apache/inlong/agent/pojo/RedisTask.java | 15 +-
.../apache/inlong/agent/pojo/TaskProfileDto.java | 10 +-
inlong-agent/agent-plugins/pom.xml | 4 +
.../agent/plugin/instance/RedisInstance.java} | 31 +-
.../inlong/agent/plugin/sources/RedisSource.java | 592 ++++++++++++++++++++-
.../apache/inlong/agent/plugin/task/RedisTask.java | 78 +++
.../agent/plugin/sources/TestRedisSource.java | 259 +++++++++
8 files changed, 967 insertions(+), 38 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 9398d70640..fbf0b5b705 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -178,6 +178,22 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_SQLSERVER_UNIX_TIMESTAMP_FORMAT_ENABLE =
"task.sqlserverTask.unixTimestampFormatEnable";
+ public static final String TASK_REDIS_PORT = "task.redisTask.port";
+ public static final String TASK_REDIS_HOSTNAME = "task.redisTask.hostname";
+ public static final String TASK_REDIS_SSL = "task.redisTask.ssl";
+ public static final String TASK_REDIS_AUTHUSER = "task.redisTask.authUser";
+ public static final String TASK_REDIS_AUTHPASSWORD =
"task.redisTask.authPassword";
+ public static final String TASK_REDIS_READTIMEOUT =
"task.redisTask.readTimeout";
+ public static final String TASK_REDIS_REPLID = "task.redisTask.replId";
+ public static final String TASK_REDIS_OFFSET = "task.redisTask.offset";
+ public static final String TASK_REDIS_DB_NUMBER =
"task.redisTask.dbNumber";
+ public static final String TASK_REDIS_COMMAND = "task.redisTask.command";
+ public static final String TASK_REDIS_KEYS = "task.redisTask.keys";
+ public static final String TASK_REDIS_FIELD_OR_MEMBER =
"task.redisTask.fieldOrMember";
+ public static final String TASK_REDIS_IS_SUBSCRIBE =
"task.redisTask.isSubscribe";
+ public static final String TASK_REDIS_SUBOPERATION =
"task.redisTask.subOperation";
+ public static final String TASK_REDIS_SYNC_FREQ =
"task.redisTask.syncFreq";
+
public static final String TASK_STATE = "task.state";
public static final String INSTANCE_STATE = "instance.state";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
index 701bea2b8e..2b8a5c4ca6 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
@@ -18,7 +18,6 @@
package org.apache.inlong.agent.pojo;
import lombok.Data;
-
@Data
public class RedisTask {
@@ -30,6 +29,13 @@ public class RedisTask {
private String readTimeout;
private String queueSize;
private String replId;
+ private String dbNumber;
+ private String command;
+ private String keys;
+ private String fieldOrMember;
+ private Boolean isSubscribe;
+ private String syncFreq;
+ private String subOperations;
@Data
public static class RedisTaskConfig {
@@ -42,5 +48,12 @@ public class RedisTask {
private String timeout;
private String queueSize;
private String replId;
+ private String dbNumber;
+ private String command;
+ private String keys;
+ private String fieldOrMember;
+ private Boolean isSubscribe;
+ private String syncFreq;
+ private String subOperations;
}
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index bb602e2c61..1558bc042f 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -53,6 +53,7 @@ public class TaskProfileDto {
public static final String DEFAULT_PULSAR_TASK =
"org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_MONGODB_TASK =
"org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_ORACLE_TASK =
"org.apache.inlong.agent.plugin.task.OracleTask";
+ public static final String DEFAULT_REDIS_TASK =
"org.apache.inlong.agent.plugin.task.RedisTask";
public static final String DEFAULT_POSTGRESQL_TASK =
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_MQTT_TASK =
"org.apache.inlong.agent.plugin.task.MqttTask";
public static final String DEFAULT_SQLSERVER_TASK =
"org.apache.inlong.agent.plugin.task.SQLServerTask";
@@ -274,8 +275,14 @@ public class TaskProfileDto {
redisTask.setPort(config.getPort());
redisTask.setSsl(config.getSsl());
redisTask.setReadTimeout(config.getTimeout());
- redisTask.setQueueSize(config.getQueueSize());
redisTask.setReplId(config.getReplId());
+ redisTask.setCommand(config.getCommand());
+ redisTask.setDbNumber(config.getDbNumber());
+ redisTask.setKeys(config.getKeys());
+ redisTask.setFieldOrMember(config.getFieldOrMember());
+ redisTask.setIsSubscribe(config.getIsSubscribe());
+ redisTask.setSyncFreq(config.getSyncFreq());
+ redisTask.setSubOperations(config.getSubOperations());
return redisTask;
}
@@ -521,6 +528,7 @@ public class TaskProfileDto {
profileDto.setTask(task);
break;
case REDIS:
+ task.setTaskClass(DEFAULT_REDIS_TASK);
RedisTask redisTask = getRedisTask(dataConfig);
task.setRedisTask(redisTask);
task.setSource(REDIS_SOURCE);
diff --git a/inlong-agent/agent-plugins/pom.xml
b/inlong-agent/agent-plugins/pom.xml
index eb092bfed0..750141798b 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -42,6 +42,10 @@
<artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version>
</dependency>
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>agent-common</artifactId>
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/RedisInstance.java
similarity index 56%
copy from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
copy to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/RedisInstance.java
index 701bea2b8e..f34c3db923 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/RedisInstance.java
@@ -15,32 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.pojo;
+package org.apache.inlong.agent.plugin.instance;
-import lombok.Data;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
-@Data
-public class RedisTask {
+public class RedisInstance extends CommonInstance {
- private String authUser;
- private String authPassword;
- private String hostname;
- private String port;
- private Boolean ssl;
- private String readTimeout;
- private String queueSize;
- private String replId;
-
- @Data
- public static class RedisTaskConfig {
-
- private String username;
- private String password;
- private String hostname;
- private String port;
- private Boolean ssl;
- private String timeout;
- private String queueSize;
- private String replId;
+ @Override
+ public void setInodeInfo(InstanceProfile profile) {
+ profile.set(TaskConstants.INODE_INFO, "");
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
index b7fea66e4c..aada1bf504 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
@@ -17,14 +17,53 @@
package org.apache.inlong.agent.plugin.sources;
+import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Redis source
@@ -32,6 +71,50 @@ import java.util.List;
public class RedisSource extends AbstractSource {
private static final Logger LOGGER =
LoggerFactory.getLogger(RedisSource.class);
+ private static final long MAX_DATA_SIZE = 500 * 1024;
+ private static final int REDIS_QUEUE_SIZE = 10000;
+ private static final long DEFAULT_FREQ = 60 * 1000;
+ private static final String GET_COMMAND = "GET";
+ private static final String MGET_COMMAND = "MGET";
+ private static final String HGET_COMMAND = "HGET";
+ private static final String ZSCORE_COMMAND = "ZSCORE";
+ private static final String ZREVRANK_COMMAND = "ZREVRANK";
+ private static final String EXISTS_COMMAND = "EXISTS";
+ private Gson gson;
+
+ public InstanceProfile profile;
+ private String port;
+ private Jedis jedis;
+ private String hostName;
+ private boolean ssl;
+ private String authUser;
+ private String authPassword;
+ private String readTimeout;
+ private String replId;
+ private String snapShot;
+ private String dbNumber;
+ private String redisCommand;
+
+ private String fieldOrMember;
+ private boolean destroyed;
+ private boolean isSubscribe;
+ private Set<String> keys;
+ private Set<String> subOperations;
+ private Replicator redisReplicator;
+ private BlockingQueue<SourceData> redisQueue;
+ private ScheduledExecutorService executor;
+
+ // Command handler map
+ private static final Map<String, CommandHandler> commandHandlers =
Maps.newConcurrentMap();
+
+ static {
+ commandHandlers.put(GET_COMMAND, RedisSource::handleGet);
+ commandHandlers.put(MGET_COMMAND, RedisSource::handleMGet);
+ commandHandlers.put(HGET_COMMAND, RedisSource::handleHGet);
+ commandHandlers.put(ZSCORE_COMMAND, RedisSource::handleZScore);
+ commandHandlers.put(ZREVRANK_COMMAND, RedisSource::handleZRevRank);
+ commandHandlers.put(EXISTS_COMMAND, RedisSource::handleExists);
+ }
public RedisSource() {
@@ -39,32 +122,207 @@ public class RedisSource extends AbstractSource {
@Override
protected String getThreadName() {
- return null;
+ return "redis-source-" + taskId + "-" + instanceId;
}
@Override
protected void initSource(InstanceProfile profile) {
+ LOGGER.info("Redis Source init: {}", profile.toJsonStr());
+ this.port = profile.get(TaskConstants.TASK_REDIS_PORT);
+ this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME);
+ this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false);
+ this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, "");
+ this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD,
"");
+ this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT,
"");
+ this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, "");
+ this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1");
+ this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0");
+ this.keys = new
ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(",")));
+ this.isSubscribe =
profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+ this.instanceId = profile.getInstanceId();
+ this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE);
+ initGson();
+ String uri = getRedisUri();
+ try {
+ if (isSubscribe) {
+ // use subscribe mode
+ this.subOperations = new ConcurrentSkipListSet<>(
+
Arrays.asList(profile.get(TaskConstants.TASK_REDIS_SUBOPERATION).split(",")));
+ this.executor = (ScheduledExecutorService)
Executors.newSingleThreadExecutor();
+ this.redisReplicator = new RedisReplicator(uri);
+ initReplicator();
+ this.executor.execute(startReplicatorSync());
+ } else {
+ this.executor = Executors.newScheduledThreadPool(1);
+ // use command mode
+ this.redisCommand =
profile.get(TaskConstants.TASK_REDIS_COMMAND, GET_COMMAND);
+ this.fieldOrMember =
profile.get(TaskConstants.TASK_REDIS_FIELD_OR_MEMBER, null);
+ // default frequency 1min
+ long syncFreq =
profile.getLong(TaskConstants.TASK_REDIS_SYNC_FREQ, DEFAULT_FREQ);
+ this.jedis = new Jedis(uri);
+ jedis.connect();
+ this.executor.scheduleWithFixedDelay(startJedisSync(), 0,
syncFreq, TimeUnit.MILLISECONDS);
+ }
+ } catch (URISyntaxException | IOException | JedisConnectionException
e) {
+ sourceMetric.pluginReadFailCount.addAndGet(1);
+ LOGGER.error("Connect to redis {}:{} failed.", hostName, port, e);
+ }
+ }
+ private Runnable startReplicatorSync() {
+ return () -> {
+ AgentThreadFactory.nameThread(getThreadName() + "redis subscribe
mode");
+ executor.execute(new Thread(() -> {
+ try {
+ this.redisReplicator.open();
+ } catch (IOException e) {
+ LOGGER.error("Redis source error, fail to start
replicator", e);
+ }
+ }));
+ };
}
- @Override
- protected void printCurrentState() {
+ private Runnable startJedisSync() {
+ return () -> {
+ AgentThreadFactory.nameThread(getThreadName() + "redis command
mode");
+ executor.execute(new Thread(() -> {
+ Map<String, Object> dataMap =
+ fetchDataByJedis(jedis, redisCommand, new
ArrayList<>(keys), fieldOrMember);
+ synchronizeData(gson.toJson(dataMap));
+ }));
+ };
+ }
+
+ private Map<String, Object> fetchDataByJedis(Jedis jedis, String command,
List<String> keys, String fieldOrMember) {
+ Map<String, Object> result = new HashMap<>();
+ CommandHandler handler = commandHandlers.get(command.toUpperCase());
+ if (handler != null) {
+ handler.handle(jedis, keys, fieldOrMember, result);
+ } else {
+ LOGGER.error("Unsupported command: " + command);
+ throw new UnsupportedOperationException("Unsupported command: " +
command);
+ }
+ return result;
+ }
+ private static void handleGet(Jedis jedis, List<String> keys, String
fieldOrMember, Map<String, Object> result) {
+ Pipeline pipeline = jedis.pipelined();
+ for (String key : keys) {
+ pipeline.get(key);
+ }
+ List<Object> getValues = pipeline.syncAndReturnAll();
+ for (int i = 0; i < keys.size(); i++) {
+ result.put(keys.get(i), getValues.get(i));
+ }
+ }
+
+ private static void handleMGet(Jedis jedis, List<String> keys, String
fieldOrMember, Map<String, Object> result) {
+ List<String> mGetValues = jedis.mget(keys.toArray(new String[0]));
+ for (int i = 0; i < keys.size(); i++) {
+ result.put(keys.get(i), mGetValues.get(i));
+ }
+ }
+
+ private static void handleHGet(Jedis jedis, List<String> keys, String
fieldOrMember, Map<String, Object> result) {
+ for (String key : keys) {
+ String value = jedis.hget(key, fieldOrMember);
+ result.put(key, value);
+ }
+ }
+
+ private static void handleZScore(Jedis jedis, List<String> keys, String
fieldOrMember, Map<String, Object> result) {
+ for (String key : keys) {
+ if (!StringUtils.isEmpty(fieldOrMember)) {
+ Double score = jedis.zscore(key, fieldOrMember);
+ result.put(key, score);
+ }
+ }
+ }
+
+ private static void handleZRevRank(Jedis jedis, List<String> keys, String
fieldOrMember,
+ Map<String, Object> result) {
+ for (String key : keys) {
+ if (!StringUtils.isEmpty(fieldOrMember)) {
+ Long rank = jedis.zrevrank(key, fieldOrMember);
+ result.put(key, rank);
+ }
+ }
+ }
+
+ private static void handleExists(Jedis jedis, List<String> keys, String
fieldOrMember, Map<String, Object> result) {
+ for (String key : keys) {
+ boolean exists = jedis.exists(key);
+ result.put(key, exists);
+ }
+ }
+
+ // Functional interface for handling commands
+ @FunctionalInterface
+ private interface CommandHandler {
+
+ void handle(Jedis jedis, List<String> keys, String fieldOrMember,
Map<String, Object> result);
+ }
+
+ private void synchronizeData(String data) {
+ try {
+ if (!StringUtils.isEmpty(data)) {
+ byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
+ // limit data size
+ if (dataBytes.length <= MAX_DATA_SIZE) {
+ SourceData sourceData = new SourceData(dataBytes, "0L");
+ boolean offerSuc = false;
+ while (isRunnable() && !offerSuc) {
+ offerSuc = redisQueue.offer(sourceData, 1,
TimeUnit.SECONDS);
+ }
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
inlongGroupId, inlongStreamId,
+ System.currentTimeMillis(), 1, data.length());
+ sourceMetric.pluginReadCount.incrementAndGet();
+ } else {
+ sourceMetric.pluginReadFailCount.incrementAndGet();
+ LOGGER.warn("Read redis data warn, data overload,
Automatically skip and discard");
+ }
+ }
+ } catch (InterruptedException e) {
+ sourceMetric.pluginReadFailCount.incrementAndGet();
+ LOGGER.error("Read redis data error", e);
+ }
}
@Override
- protected boolean doPrepareToRead() {
- return false;
+ protected void printCurrentState() {
+ if (isSubscribe) {
+ LOGGER.info("redis subscribe synchronization is {} on source {}",
+ redisReplicator != null && !executor.isShutdown() ?
"running" : "stop",
+ hostName + ":" + port);
+ } else {
+ LOGGER.info("redis command synchronization is {} on source {}",
!executor.isShutdown() ? "running" : "stop",
+ hostName + ":" + port);
+ }
}
@Override
- protected List<SourceData> readFromSource() {
- return null;
+ protected boolean doPrepareToRead() {
+ return true;
}
@Override
- public Message read() {
- return null;
+ protected List<SourceData> readFromSource() {
+ List<SourceData> dataList = new ArrayList<>();
+ try {
+ int size = 0;
+ while (size < BATCH_READ_LINE_TOTAL_LEN) {
+ SourceData sourceData = redisQueue.poll(1, TimeUnit.SECONDS);
+ if (sourceData != null) {
+ size += sourceData.getData().length;
+ dataList.add(sourceData);
+ } else {
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("poll {} data from redis queue interrupted.",
instanceId);
+ }
+ return dataList;
}
@Override
@@ -74,7 +332,23 @@ public class RedisSource extends AbstractSource {
@Override
protected void releaseSource() {
-
+ LOGGER.info("releasing redis source");
+ if (!destroyed) {
+ try {
+ executor.shutdown();
+ // subscribe mode then close replicator
+ if (redisReplicator != null) {
+ redisReplicator.close();
+ }
+ // command mode then close jedis
+ if (jedis.isConnected()) {
+ jedis.close();
+ }
+ } catch (IOException e) {
+ LOGGER.error("Redis reader close failed.");
+ }
+ destroyed = true;
+ }
}
@Override
@@ -84,6 +358,300 @@ public class RedisSource extends AbstractSource {
@Override
public boolean sourceExist() {
- return false;
+ return true;
+ }
+
+ private String getRedisUri() {
+ StringBuffer sb = new StringBuffer("redis://");
+ sb.append(hostName).append(":").append(port);
+ if (!StringUtils.isEmpty(dbNumber)) {
+ sb.append("/").append(dbNumber);
+ }
+ sb.append("?");
+ if (!StringUtils.isEmpty(authPassword)) {
+ sb.append("authPassword=").append(authPassword).append("&");
+ }
+ if (!StringUtils.isEmpty(authUser)) {
+ sb.append("authUser=").append(authUser).append("&");
+ }
+ if (!StringUtils.isEmpty(readTimeout)) {
+ sb.append("readTimeout=").append(readTimeout).append("&");
+ }
+ if (ssl) {
+ sb.append("ssl=").append("yes").append("&");
+ }
+ if (!StringUtils.isEmpty(snapShot)) {
+ sb.append("replOffset=").append(snapShot).append("&");
+ }
+ if (!StringUtils.isEmpty(replId)) {
+ sb.append("replId=").append(replId).append("&");
+ }
+ if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) ==
'&') {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ private void initReplicator() {
+ if (!subOperations.isEmpty()) {
+ DefaultCommandParser replicatorCommandParser = new
DefaultCommandParser();
+ for (String subOperation : subOperations) {
+
this.redisReplicator.addCommandParser(CommandName.name(subOperation),
replicatorCommandParser);
+ }
+ this.redisReplicator.addEventListener((replicator, event) -> {
+ if (event instanceof DefaultCommand) {
+ DefaultCommand defaultCommand = (DefaultCommand) event;
+ Object[] args = defaultCommand.getArgs();
+ if (args[0] instanceof byte[]) {
+ String key = new String((byte[]) args[0],
StandardCharsets.UTF_8);
+ if (keys.contains(key)) {
+ synchronizeData(gson.toJson(event));
+ }
+ }
+ }
+ if (event instanceof PostRdbSyncEvent) {
+ this.snapShot =
String.valueOf(replicator.getConfiguration().getReplOffset());
+ LOGGER.info("after rdb snapShot is: {}", snapShot);
+ }
+ });
+ } else {
+ // if SubOperation is not configured, subscribe all modification
+ initDefaultReplicator();
+ }
+ }
+
+ private void initDefaultReplicator() {
+ DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+ this.redisReplicator.addCommandParser(CommandName.name("APPEND"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SET"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SETEX"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("MSET"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("DEL"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SADD"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("HMSET"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("HSET"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("LSET"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("EXPIRE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("EXPIREAT"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("GETSET"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("HSETNX"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("PSETEX"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SETNX"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SETRANGE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("HDEL"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("LPOP"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("LPUSH"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("LPUSHX"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("LRem"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("RPOP"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("RPUSH"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("RPUSHX"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("ZREM"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("RENAME"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("INCR"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("DECR"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("INCRBY"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("DECRBY"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("PERSIST"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SELECT"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("FLUSHALL"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("FLUSHDB"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("HINCRBY"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("ZINCRBY"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("MOVE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SMOVE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("PFADD"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("PFCOUNT"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("PFMERGE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("ZADD"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("LINSERT"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("RENAMENX"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("RESTORE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("PEXPIRE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("GEOADD"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("EVAL"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("EVALSHA"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SCRIPT"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("PUBLISH"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("BITOP"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("BITFIELD"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SETBIT"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SREM"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("UNLINK"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SWAPDB"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("MULTI"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("EXEC"),
defaultCommandParser);
+
this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"),
defaultCommandParser);
+
this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"),
defaultCommandParser);
+
this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("LTRIM"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("SORT"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("REPLCONF"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("XACK"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("XADD"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("XCLAIM"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("XDEL"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("XGROUP"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("XTRIM"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("XSETID"),
defaultCommandParser);
+ // since redis 6.2
+ this.redisReplicator.addCommandParser(CommandName.name("COPY"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("LMOVE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("BLMOVE"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"),
defaultCommandParser);
+
this.redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"),
defaultCommandParser);
+ // since redis 7.0
+ this.redisReplicator.addCommandParser(CommandName.name("SPUBLISH"),
defaultCommandParser);
+ this.redisReplicator.addCommandParser(CommandName.name("FUNCTION"),
defaultCommandParser);
+ // add EventListener
+ this.redisReplicator.addEventListener((replicator, event) -> {
+ if (event instanceof KeyValuePair<?, ?> || event instanceof
DefaultCommand) {
+ KeyValuePair<?, ?> kvEvent = (KeyValuePair<?, ?>) event;
+ String key = kvEvent.getKey().toString();
+ if (keys.contains(key)) {
+ synchronizeData(gson.toJson(event));
+ }
+ }
+ if (event instanceof PostRdbSyncEvent) {
+ this.snapShot =
String.valueOf(replicator.getConfiguration().getReplOffset());
+ LOGGER.info("after rdb snapShot is: {}", snapShot);
+ }
+ });
+ }
+
+ /**
+ * init GSON parser
+ */
+ private void initGson() {
+ this.gson =
+ new
GsonBuilder().registerTypeAdapter(KeyStringValueHash.class, new
TypeAdapter<KeyStringValueHash>() {
+
+ @Override
+ public void write(JsonWriter out, KeyStringValueHash kv)
throws IOException {
+ out.beginObject();
+ out.name("DB").beginObject();
+ out.name("dbNumber").value(kv.getDb().getDbNumber());
+ out.name("dbSize").value(kv.getDb().getDbsize());
+ out.name("expires").value(kv.getDb().getExpires());
+ out.endObject();
+ out.name("valueRdbType").value(kv.getValueRdbType());
+ out.name("key").value(new String(kv.getKey()));
+ out.name("value").beginObject();
+ for (byte[] b : kv.getValue().keySet()) {
+ out.name(new String(b)).value(new
String(kv.getValue().get(b)));
+ }
+ out.endObject();
+ out.endObject();
+ }
+
+ @Override
+ public KeyStringValueHash read(JsonReader in) throws
IOException {
+ return null;
+ }
+ }).registerTypeAdapter(DefaultCommand.class, new
TypeAdapter<DefaultCommand>() {
+
+ @Override
+ public void write(JsonWriter out, DefaultCommand dc)
throws IOException {
+ out.beginObject();
+ out.name("key").value(new String(dc.getCommand()));
+ out.name("value").beginArray();
+ for (byte[] bytes : dc.getArgs()) {
+ out.value(new String(bytes));
+ }
+ out.endArray();
+ out.endObject();
+ }
+
+ @Override
+ public DefaultCommand read(JsonReader in) throws
IOException {
+ return null;
+ }
+ })
+ .registerTypeAdapter(KeyStringValueList.class, new
TypeAdapter<KeyStringValueList>() {
+
+ @Override
+ public void write(JsonWriter out,
KeyStringValueList kv) throws IOException {
+ out.beginObject();
+ out.name("key").value(new String(kv.getKey()));
+ out.name("value").beginArray();
+ for (byte[] bytes : kv.getValue()) {
+ out.value(new String(bytes));
+ }
+ out.endArray();
+ out.endObject();
+ }
+
+ @Override
+ public KeyStringValueList read(JsonReader in)
throws IOException {
+ return null;
+ }
+ })
+ .registerTypeAdapter(KeyStringValueSet.class, new
TypeAdapter<KeyStringValueSet>() {
+
+ @Override
+ public void write(JsonWriter out,
KeyStringValueSet kv) throws IOException {
+ out.beginObject();
+ out.name("key").value(new String(kv.getKey()));
+ out.name("value").beginArray();
+ for (byte[] bytes : kv.getValue()) {
+ out.value(new String(bytes));
+ }
+ out.endArray();
+ out.endObject();
+ }
+
+ @Override
+ public KeyStringValueSet read(JsonReader in)
throws IOException {
+ return null;
+ }
+ })
+ .registerTypeAdapter(KeyStringValueString.class, new
TypeAdapter<KeyStringValueString>() {
+
+ @Override
+ public void write(JsonWriter out,
KeyStringValueString kv) throws IOException {
+ out.beginObject();
+ out.name("key").value(new String(kv.getKey()));
+ out.name("value").value(new
String(kv.getValue()));
+ out.endObject();
+ }
+
+ @Override
+ public KeyStringValueString read(JsonReader in)
throws IOException {
+ return null;
+ }
+ })
+ .registerTypeAdapter(KeyStringValueZSet.class, new
TypeAdapter<KeyStringValueZSet>() {
+
+ @Override
+ public void write(JsonWriter out,
KeyStringValueZSet kv) throws IOException {
+ out.beginObject();
+ out.name("key").value(new String(kv.getKey()));
+ out.name("value").beginArray();
+ for (ZSetEntry entry : kv.getValue()) {
+ out.beginObject();
+ out.name("element").value(new
String(entry.getElement()));
+ out.name("score").value(entry.getScore());
+ out.endObject();
+ }
+ out.endArray();
+ out.endObject();
+ }
+
+ @Override
+ public KeyStringValueZSet read(JsonReader in)
throws IOException {
+ return null;
+ }
+ })
+ .create();
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
new file mode 100644
index 0000000000..b9f7449ecd
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RedisTask extends AbstractTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RedisTask.class);
+ public static final String DEFAULT_REDIS_INSTANCE =
"org.apache.inlong.agent.plugin.instance.RedisInstance";
+ private boolean isAdded = false;
+ private String taskId;
+
+ private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMddHH");
+
+ @Override
+ public boolean isProfileValid(TaskProfile profile) {
+ if (!profile.allRequiredKeyExist()) {
+ LOGGER.error("task profile needs all required key");
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected int getInstanceLimit() {
+ return DEFAULT_INSTANCE_LIMIT;
+ }
+
+ @Override
+ protected void initTask() {
+ LOGGER.info("Redis commonInit: {}", taskProfile.toJsonStr());
+ this.taskId = taskProfile.get(TaskConstants.TASK_REDIS_REPLID)
+ + "-" + taskProfile.get(TaskConstants.TASK_REDIS_IS_SUBSCRIBE);
+ }
+
+ @Override
+ protected List<InstanceProfile> getNewInstanceList() {
+ List<InstanceProfile> list = new ArrayList<>();
+ if (isAdded) {
+ return list;
+ }
+ String dataTime = LocalDateTime.now().format(dateTimeFormatter);
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_REDIS_INSTANCE, taskId,
+ CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
+ list.add(instanceProfile);
+ this.isAdded = true;
+ return list;
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
new file mode 100644
index 0000000000..061a74c092
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.metric.MetricRegister;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Test cases for {@link RedisSource}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestRedisSource {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestRedisSource.class);
+
+ private static AgentBaseTestsHelper helper;
+
+ private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d";
+
+ private static Store taskBasicStore;
+ private static Store instanceBasicStore;
+ private static Store offsetBasicStore;
+
+ @Mock
+ private InstanceProfile profile;
+
+ @Mock
+ private Jedis jedis;
+
+ @Mock
+ private Pipeline pipeline;
+
+ @Mock
+ private ScheduledExecutorService executor;
+
+ @InjectMocks
+ private RedisSource redisSource;
+
+ @Before
+ public void setUp() {
+ helper = new
AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome();
+ taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+ instanceBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
+ offsetBasicStore =
+ TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
+ OffsetManager.init(taskBasicStore, instanceBasicStore,
offsetBasicStore);
+ mockStatic(Executors.class);
+ when(Executors.newSingleThreadExecutor()).thenReturn(executor);
+ when(Executors.newScheduledThreadPool(1)).thenReturn(executor);
+ initProfile();
+ }
+
+ private void initProfile() {
+ final String username = "";
+ final String password = "123456";
+ final String hostname = "127.0.0.1";
+ final String port = "6379";
+ final String groupId = "group01";
+ final String streamId = "stream01";
+ final String keys = "age,name,sex";
+ final String command = "zscore";
+ final String subOperation = "set,del";
+
+ TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false,
0L, 0L, TaskStateEnum.RUNNING, "D",
+ "GMT+8:00", null);
+ profile = taskProfile.createInstanceProfile("",
+ "", taskProfile.getCycleUnit(), "20240725",
AgentUtils.getCurrentTime());
+ profile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
+ profile.set(CommonConstants.PROXY_INLONG_STREAM_ID, streamId);
+ profile.set(TaskConstants.TASK_REDIS_AUTHUSER, username);
+ profile.set(TaskConstants.TASK_REDIS_AUTHPASSWORD, password);
+ profile.set(TaskConstants.TASK_REDIS_HOSTNAME, hostname);
+ profile.set(TaskConstants.TASK_REDIS_PORT, port);
+ profile.set(TaskConstants.TASK_REDIS_COMMAND, command);
+ profile.set(TaskConstants.TASK_REDIS_KEYS, keys);
+ profile.set(TaskConstants.TASK_AUDIT_VERSION, "0");
+ profile.set(TaskConstants.TASK_REDIS_SUBOPERATION, subOperation);
+ profile.setInstanceId(instanceId);
+ }
+
+ @Test
+ public void testJedisStartup() {
+ try {
+ profile.setBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+ redisSource.initSource(profile);
+ redisSource.releaseSource();
+ } catch (Exception e) {
+ }
+ }
+
+ @Test
+ public void testReplicatorStartup() {
+ try {
+ profile.setBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, true);
+ redisSource.initSource(profile);
+ redisSource.releaseSource();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testScheduledExecutorStartup() {
+ try {
+ profile.setBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+ redisSource.initSource(profile);
+ verify(executor,
times(1)).scheduleWithFixedDelay(any(Runnable.class), eq(0L), eq(60 * 1000L),
+ eq(TimeUnit.MILLISECONDS));
+ redisSource.releaseSource();
+ } catch (Exception e) {
+ }
+ }
+
+ @Test
+ public void testFetchDataByJedis_Get()
+ throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
+ when(jedis.pipelined()).thenReturn(pipeline);
+ when(pipeline.syncAndReturnAll()).thenReturn(Arrays.asList("value1",
"value2", "value3"));
+
+ List<String> keys = Arrays.asList("key1", "key2", "key3");
+
+ Map<String, Object> expectedData = new HashMap<>();
+ expectedData.put("key1", "value1");
+ expectedData.put("key2", "value2");
+ expectedData.put("key3", "value3");
+ Method method =
RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class,
String.class, List.class,
+ String.class);
+ method.setAccessible(true);
+ Map<String, Object> result = (Map<String, Object>)
method.invoke(redisSource, jedis, "GET", keys, null);
+ assertEquals(expectedData, result);
+ verify(jedis).pipelined();
+ verify(pipeline, times(3)).get(anyString());
+ verify(pipeline).syncAndReturnAll();
+ executor.shutdown();
+ }
+
+ @Test
+ public void testFetchDataByJedis_Mget()
+ throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
+ when(jedis.mget(eq("key1"), eq("key2"),
eq("key3"))).thenReturn(Arrays.asList("value1", "value2", "value3"));
+ List<String> keys = Arrays.asList("key1", "key2", "key3");
+ Map<String, Object> expectedData = new HashMap<>();
+ expectedData.put("key1", "value1");
+ expectedData.put("key2", "value2");
+ expectedData.put("key3", "value3");
+ Method method =
RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class,
String.class, List.class,
+ String.class);
+ method.setAccessible(true);
+ Map<String, Object> result = (Map<String, Object>)
method.invoke(redisSource, jedis, "MGET", keys, null);
+ assertEquals(expectedData, result);
+ verify(jedis).mget(eq("key1"), eq("key2"), eq("key3"));
+ executor.shutdown();
+ }
+
+ @Test
+ public void testFetchDataByJedis_Hget()
+ throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
+ when(jedis.hget("key1", "field1")).thenReturn("hash_value1");
+ when(jedis.hget("key2", "field1")).thenReturn("hash_value2");
+ List<String> keys = Arrays.asList("key1", "key2");
+
+ Map<String, Object> expectedData = new HashMap<>();
+ expectedData.put("key1", "hash_value1");
+ expectedData.put("key2", "hash_value2");
+
+ Method method =
RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class,
String.class, List.class,
+ String.class);
+ method.setAccessible(true);
+ Map<String, Object> result = (Map<String, Object>)
method.invoke(redisSource, jedis, "HGET", keys, "field1");
+ assertEquals(expectedData, result);
+
+ verify(jedis, times(1)).hget("key1", "field1");
+ verify(jedis, times(1)).hget("key2", "field1");
+ executor.shutdown();
+ }
+
+ @Test
+ public void testFetchDataByJedis_Exists()
+ throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
+ when(jedis.exists("key1")).thenReturn(true);
+ when(jedis.exists("key2")).thenReturn(false);
+
+ List<String> keys = Arrays.asList("key1", "key2");
+
+ Map<String, Object> expectedData = new HashMap<>();
+ expectedData.put("key1", true);
+ expectedData.put("key2", false);
+
+ Method method =
RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class,
String.class, List.class,
+ String.class);
+ method.setAccessible(true);
+ Map<String, Object> result = (Map<String, Object>)
method.invoke(redisSource, jedis, "EXISTS", keys, null);
+ assertEquals(expectedData, result);
+
+ verify(jedis, times(1)).exists("key1");
+ verify(jedis, times(1)).exists("key2");
+ executor.shutdown();
+ }
+}