emptyOVO commented on code in PR #10801:
URL: https://github.com/apache/inlong/pull/10801#discussion_r1721832043
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##########
@@ -84,6 +181,278 @@ public boolean sourceFinish() {
@Override
public boolean sourceExist() {
- return false;
+ return true;
+ }
+
+ private String getRedisUri() {
+ StringBuffer sb = new StringBuffer("redis://");
+ sb.append(hostName).append(":").append(port);
+ sb.append("?");
+ if (!StringUtils.isEmpty(authPassword)) {
+ sb.append("authPassword=").append(authPassword).append("&");
+ }
+ if (!StringUtils.isEmpty(authUser)) {
+ sb.append("authUser=").append(authUser).append("&");
+ }
+ if (!StringUtils.isEmpty(readTimeout)) {
+ sb.append("readTimeout=").append(readTimeout).append("&");
+ }
+ if (ssl) {
+ sb.append("ssl=").append("yes").append("&");
+ }
+ if (!StringUtils.isEmpty(snapShot)) {
+ sb.append("replOffset=").append(snapShot).append("&");
+ }
+ if (!StringUtils.isEmpty(replId)) {
+ sb.append("replId=").append(replId).append("&");
+ }
+ if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) ==
'&') {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ private void initReplicator() {
+ DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+ redisReplicator.addCommandParser(CommandName.name("APPEND"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HMSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXPIRE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXPIREAT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GETSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HSETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MSETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PSETEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETRANGE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HDEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPUSHX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LRem"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPUSHX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RENAME"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("INCR"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DECR"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("INCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DECRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PERSIST"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SELECT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FLUSHALL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FLUSHDB"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HINCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZINCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFCOUNT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFMERGE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LINSERT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RENAMENX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RESTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PEXPIRE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GEOADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EVAL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EVALSHA"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SCRIPT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PUBLISH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BITOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BITFIELD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETBIT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SREM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("UNLINK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SWAPDB"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MULTI"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXEC"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LTRIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SORT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("REPLCONF"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XACK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XCLAIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XDEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XGROUP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XTRIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XSETID"),
defaultCommandParser);
+ // since redis 6.2
+ redisReplicator.addCommandParser(CommandName.name("COPY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BLMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"),
defaultCommandParser);
+ // since redis 7.0
+ redisReplicator.addCommandParser(CommandName.name("SPUBLISH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FUNCTION"),
defaultCommandParser);
+ // add EventListener
+ redisReplicator.addEventListener((replicator, event) -> {
Review Comment:
Sorry, do you mean that EventListener will only obtain redis data that is
added after startup ? when RedisReplicator connect to redis server it will
first synchronizes the rdb file, then PostRdbSyncEvent event is triggered which
means historical data of Redis is synchronized, then the redisReplicator
switches to command incremental synchronization mode and captures every command
that occurs in the Redis instance (such as SET, DEL, etc.) and synchronizes it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]