Oliverwqcwrw commented on code in PR #289:
URL: https://github.com/apache/rocketmq-connect/pull/289#discussion_r971975775
##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java:
##########
@@ -94,11 +107,7 @@ public Config getConfig() {
this.config.load(keyValue);
LOGGER.info("task config msg: {}", this.config.toString());
- // get position info
- ByteBuffer byteBuffer =
this.context.positionStorageReader().getPosition(
- this.config.getPositionPartitionKey()
- );
- Long position = RedisPositionConverter.jsonToLong(byteBuffer);
+ final Long position =
this.sourceTaskContext.configs().getLong("offset");
Review Comment:
Hello @odbozhou ,
Agree with your opinion, But there's no problem with position here,
In
`org.apache.rocketmq.connect.redis.processor.DefaultRedisEventProcessor#start`
>
// 如果是LAST_OFFSET,则将offset设置为当前Redis最新的offset值。
// LAST_OFFSET、CUSTOM_OFFSET,优先使用connector runtime中的存储位点信息。
if (SyncMod.LAST_OFFSET.equals(this.config.getSyncMod())) {
if (this.config.getPosition() != null) {
this.config.setOffset(this.config.getPosition());
} else if (StringUtils.isNotBlank(offset)) {
this.config.setOffset(Long.parseLong(offset));
}
} else
if(SyncMod.CUSTOM_OFFSET.equals(this.config.getSyncMod())){
if (this.config.getPosition() != null) {
this.config.setOffset(this.config.getPosition());
}
}
startReplicatorAsync(this.config.getReplId(),
this.config.getOffset());
The synchronization is done from position at the end
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]