Slideee commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857080985
##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java:
##########
@@ -117,38 +128,19 @@ public void start(KeyValue config) {
}
for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable :
stats.getOffsetTable().entrySet()) {
-
MessageQueue mq = offsetTable.getKey();
long srcOffset = offsetTable.getValue().getConsumerOffset();
long targetOffset = this.store.convertTargetOffset(mq, group,
srcOffset);
+ List<Field> fields = new ArrayList<Field>();
+ Schema schema = new Schema(SchemaEnum.OFFSET.name(),
FieldType.STRING, fields);
+ schema.getFields().add(new Field(0, FieldName.OFFSET.getKey(),
SchemaBuilder.string().build()));
Review Comment:
> offset use INT64 may be better
done
##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java:
##########
@@ -87,20 +88,25 @@ public RmqMetaReplicator() {
executor = Executors.newSingleThreadScheduledExecutor(new
BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
}
- @Override public String verifyAndSetConfig(KeyValue config) {
+ @Override public void validate(KeyValue config) {
log.info("verifyAndSetConfig...");
try {
replicatorConfig.validate(config);
+ this.configValid = true;
} catch (IllegalArgumentException e) {
Review Comment:
> I think the exception can be thrown upward, because there is no return,
the upper layer cannot determine whether the verification is passed, otherwise
the exception will be replenished, and there is no exception log, it is
difficult to diagnose the problem
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]