emptyOVO commented on code in PR #10801:
URL: https://github.com/apache/inlong/pull/10801#discussion_r1721832043
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##########
@@ -84,6 +181,278 @@ public boolean sourceFinish() {
@Override
public boolean sourceExist() {
- return false;
+ return true;
+ }
+
+ private String getRedisUri() {
+ StringBuffer sb = new StringBuffer("redis://");
+ sb.append(hostName).append(":").append(port);
+ sb.append("?");
+ if (!StringUtils.isEmpty(authPassword)) {
+ sb.append("authPassword=").append(authPassword).append("&");
+ }
+ if (!StringUtils.isEmpty(authUser)) {
+ sb.append("authUser=").append(authUser).append("&");
+ }
+ if (!StringUtils.isEmpty(readTimeout)) {
+ sb.append("readTimeout=").append(readTimeout).append("&");
+ }
+ if (ssl) {
+ sb.append("ssl=").append("yes").append("&");
+ }
+ if (!StringUtils.isEmpty(snapShot)) {
+ sb.append("replOffset=").append(snapShot).append("&");
+ }
+ if (!StringUtils.isEmpty(replId)) {
+ sb.append("replId=").append(replId).append("&");
+ }
+ if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) ==
'&') {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ private void initReplicator() {
+ DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+ redisReplicator.addCommandParser(CommandName.name("APPEND"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HMSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXPIRE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXPIREAT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GETSET"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HSETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MSETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PSETEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETNX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETRANGE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HDEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LPUSHX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LRem"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPUSHX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RENAME"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("INCR"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DECR"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("INCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("DECRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PERSIST"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SELECT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FLUSHALL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FLUSHDB"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("HINCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZINCRBY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFCOUNT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PFMERGE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LINSERT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RENAMENX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RESTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PEXPIRE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GEOADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EVAL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EVALSHA"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SCRIPT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("PUBLISH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BITOP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BITFIELD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SETBIT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SREM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("UNLINK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SWAPDB"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("MULTI"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("EXEC"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LTRIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("SORT"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("REPLCONF"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XACK"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XADD"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XCLAIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XDEL"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XGROUP"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XTRIM"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("XSETID"),
defaultCommandParser);
+ // since redis 6.2
+ redisReplicator.addCommandParser(CommandName.name("COPY"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("LMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("BLMOVE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"),
defaultCommandParser);
+ // since redis 7.0
+ redisReplicator.addCommandParser(CommandName.name("SPUBLISH"),
defaultCommandParser);
+ redisReplicator.addCommandParser(CommandName.name("FUNCTION"),
defaultCommandParser);
+ // add EventListener
+ redisReplicator.addEventListener((replicator, event) -> {
Review Comment:
Sorry, Maybe I didn't make that clear in the description, do you mean that
EventListener will only obtain redis data that is added after startup ? when
RedisReplicator connect to redis server it will first synchronizes the rdb
file, then PostRdbSyncEvent event is triggered which means historical data of
Redis is synchronized completedly, then the redisReplicator switches to command
incremental synchronization mode and captures every command that occurs in the
Redis instance (such as SET, DEL, etc.) and synchronizes it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]