odbozhou commented on code in PR #117:
URL: https://github.com/apache/rocketmq-connect/pull/117#discussion_r857070808
##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java:
##########
@@ -117,38 +128,19 @@ public void start(KeyValue config) {
}
for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable :
stats.getOffsetTable().entrySet()) {
-
MessageQueue mq = offsetTable.getKey();
long srcOffset = offsetTable.getValue().getConsumerOffset();
long targetOffset = this.store.convertTargetOffset(mq, group,
srcOffset);
+ List<Field> fields = new ArrayList<Field>();
+ Schema schema = new Schema(SchemaEnum.OFFSET.name(),
FieldType.STRING, fields);
+ schema.getFields().add(new Field(0, FieldName.OFFSET.getKey(),
SchemaBuilder.string().build()));
Review Comment:
offset use INT64 may be better
##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -89,26 +90,27 @@ private synchronized void startMQAdminTools() throws
MQClientException {
}
@Override
- public String verifyAndSetConfig(KeyValue config) {
+ public void validate(KeyValue config) {
// Check the need key.
for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
- return "Request config key: " + requestKey;
+ return;
Review Comment:
Missing configuration should throw an exception instead of returning
directly. Returning directly indicates that the verification has passed
##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java:
##########
@@ -87,20 +88,25 @@ public RmqMetaReplicator() {
executor = Executors.newSingleThreadScheduledExecutor(new
BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
}
- @Override public String verifyAndSetConfig(KeyValue config) {
+ @Override public void validate(KeyValue config) {
log.info("verifyAndSetConfig...");
try {
replicatorConfig.validate(config);
+ this.configValid = true;
} catch (IllegalArgumentException e) {
- return e.getMessage();
+ return;
}
this.prepare();
- this.configValid = true;
- return "";
+ return;
+ }
+
+ @Override public void init(KeyValue config) {
+
Review Comment:
config should be set by init
##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java:
##########
@@ -87,20 +88,25 @@ public RmqMetaReplicator() {
executor = Executors.newSingleThreadScheduledExecutor(new
BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
}
- @Override public String verifyAndSetConfig(KeyValue config) {
+ @Override public void validate(KeyValue config) {
log.info("verifyAndSetConfig...");
try {
replicatorConfig.validate(config);
+ this.configValid = true;
} catch (IllegalArgumentException e) {
Review Comment:
I think the exception can be thrown upward, because there is no return, the
upper layer cannot determine whether the verification is passed, otherwise the
exception will be replenished, and there is no exception log, it is difficult
to diagnose the problem
##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -164,6 +166,11 @@ public boolean compare(Map<String, Set<TaskTopicInfo>>
origin, Map<String, Set<T
return true;
}
+
+ @Override public void init(KeyValue config) {
+
Review Comment:
config should be init by init
##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -188,7 +195,7 @@ public Class<? extends Task> taskClass() {
}
@Override
- public List<KeyValue> taskConfigs() {
+ public List<KeyValue> taskConfigs(int maxTasks) {
if (!configValid) {
Review Comment:
maxTasks can specify the number of tasks. Can you consider whether it needs
to be divided into multiple tasks?
##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -89,26 +90,27 @@ private synchronized void startMQAdminTools() throws
MQClientException {
}
@Override
- public String verifyAndSetConfig(KeyValue config) {
+ public void validate(KeyValue config) {
// Check the need key.
for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
- return "Request config key: " + requestKey;
+ return;
}
}
try {
this.replicatorConfig.validate(config);
} catch (IllegalArgumentException e) {
- return e.getMessage();
+ return;
Review Comment:
Exception should be throw
##########
connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java:
##########
@@ -117,10 +119,10 @@ public void start() {
}
buildRoute();
- startListner();
+ startListener();
Review Comment:
startListener may not be needed, because the runtime has already done the
load balancing logic, and put the load balancing queue in the context of each
task
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]