justinwwhuang commented on code in PR #10801:
URL: https://github.com/apache/inlong/pull/10801#discussion_r1724194966
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##########
@@ -84,6 +181,278 @@ public boolean sourceFinish() {
@Override
public boolean sourceExist() {
- return false;
+ return true;
+ }
+
+ private String getRedisUri() {
+ StringBuffer sb = new StringBuffer("redis://");
+ sb.append(hostName).append(":").append(port);
+ sb.append("?");
+ if (!StringUtils.isEmpty(authPassword)) {
+ sb.append("authPassword=").append(authPassword).append("&");
+ }
+ if (!StringUtils.isEmpty(authUser)) {
+ sb.append("authUser=").append(authUser).append("&");
+ }
+ if (!StringUtils.isEmpty(readTimeout)) {
+ sb.append("readTimeout=").append(readTimeout).append("&");
+ }
+ if (ssl) {
+ sb.append("ssl=").append("yes").append("&");
+ }
+ if (!StringUtils.isEmpty(snapShot)) {
+ sb.append("replOffset=").append(snapShot).append("&");
+ }
+ if (!StringUtils.isEmpty(replId)) {
+ sb.append("replId=").append(replId).append("&");
+ }
+ if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) ==
'&') {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ private void initReplicator() {
+ DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+ redisReplicator.addCommandParser(CommandName.name("APPEND"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HMSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXPIRE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXPIREAT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GETSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HSETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MSETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PSETEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETRANGE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HDEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPUSHX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LRem"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPUSHX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RENAME"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("INCR"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DECR"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("INCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DECRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PERSIST"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SELECT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FLUSHALL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FLUSHDB"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HINCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZINCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFCOUNT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFMERGE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LINSERT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RENAMENX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RESTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PEXPIRE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GEOADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EVAL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EVALSHA"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SCRIPT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PUBLISH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BITOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BITFIELD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETBIT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SREM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("UNLINK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SWAPDB"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MULTI"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXEC"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LTRIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SORT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("REPLCONF"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XACK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XCLAIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XDEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XGROUP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XTRIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XSETID"),
defaultCommandParser);
+ // since redis 6.2
+ redisReplicator.addCommandParser(CommandName.name("COPY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BLMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"),
defaultCommandParser);
+ // since redis 7.0
+ redisReplicator.addCommandParser(CommandName.name("SPUBLISH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FUNCTION"),
defaultCommandParser);
+ // add EventListener
+ redisReplicator.addEventListener((replicator, event) -> {
Review Comment:
I hope we can also specify Redis commands to collect the data we want,
rather than just synchronizing all Redis data.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]