emptyOVO commented on code in PR #10801:
URL: https://github.com/apache/inlong/pull/10801#discussion_r1721832043
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##########
@@ -84,6 +181,278 @@ public boolean sourceFinish() {
@Override
public boolean sourceExist() {
- return false;
+ return true;
+ }
+
+ private String getRedisUri() {
+ StringBuffer sb = new StringBuffer("redis://");
+ sb.append(hostName).append(":").append(port);
+ sb.append("?");
+ if (!StringUtils.isEmpty(authPassword)) {
+ sb.append("authPassword=").append(authPassword).append("&");
+ }
+ if (!StringUtils.isEmpty(authUser)) {
+ sb.append("authUser=").append(authUser).append("&");
+ }
+ if (!StringUtils.isEmpty(readTimeout)) {
+ sb.append("readTimeout=").append(readTimeout).append("&");
+ }
+ if (ssl) {
+ sb.append("ssl=").append("yes").append("&");
+ }
+ if (!StringUtils.isEmpty(snapShot)) {
+ sb.append("replOffset=").append(snapShot).append("&");
+ }
+ if (!StringUtils.isEmpty(replId)) {
+ sb.append("replId=").append(replId).append("&");
+ }
+ if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) ==
'&') {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ private void initReplicator() {
+ DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+ redisReplicator.addCommandParser(CommandName.name("APPEND"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HMSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXPIRE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXPIREAT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GETSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HSETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MSETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PSETEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETRANGE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HDEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPUSHX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LRem"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPUSHX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RENAME"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("INCR"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DECR"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("INCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DECRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PERSIST"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SELECT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FLUSHALL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FLUSHDB"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HINCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZINCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFCOUNT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFMERGE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LINSERT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RENAMENX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RESTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PEXPIRE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GEOADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EVAL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EVALSHA"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SCRIPT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PUBLISH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BITOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BITFIELD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETBIT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SREM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("UNLINK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SWAPDB"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MULTI"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXEC"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LTRIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SORT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("REPLCONF"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XACK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XCLAIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XDEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XGROUP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XTRIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XSETID"),
defaultCommandParser);
+ // since redis 6.2
+ redisReplicator.addCommandParser(CommandName.name("COPY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BLMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"),
defaultCommandParser);
+ // since redis 7.0
+ redisReplicator.addCommandParser(CommandName.name("SPUBLISH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FUNCTION"),
defaultCommandParser);
+ // add EventListener
+ redisReplicator.addEventListener((replicator, event) -> {
Review Comment:
Sorry, do you mean that EventListener will only obtain redis data that is
added after startup ? when RedisReplicator connect to redis server it will
first synchronizes the rdb file, then PostRdbSyncEvent event is triggered which
means historical data of Redis is synchronized completedly, then the
redisReplicator switches to command incremental synchronization mode and
captures every command that occurs in the Redis instance (such as SET, DEL,
etc.) and synchronizes it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]