This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 8e82b26b8a0d6ac648530afe25ac13739be83a3e Author: Artisan <[email protected]> AuthorDate: Thu Dec 22 22:50:56 2022 +0800 updateSinkConnectorRequest --- .../adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java | 7 ++++--- .../adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java index 6edc0cc..6f5c468 100644 --- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java +++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java @@ -53,8 +53,9 @@ public class CreateSinkConnectorRequest extends BaseConnectorRequest { public Map<String, Object> getRequestObject() { Map<String, Object> config = Maps.newHashMap(); - config.put("connector-class", connectorClass); - config.put("connect-topicname", topicName); + config.put("connector.class", connectorClass); + config.put("connect.topicnames", topicName); + String sinPrefix = "."; config.put("transforms", String.join(",", transforms.stream() .map(TransformRequest::getName) .collect(Collectors.toList()))); @@ -62,7 +63,7 @@ public class CreateSinkConnectorRequest extends BaseConnectorRequest { transform.getConfig() .entrySet() .forEach(entry -> { - config.put("transforms" + "-" + transform.getName() + "-" + entry.getKey(), entry.getValue()); + config.put("transforms" + sinPrefix + transform.getName() + sinPrefix + entry.getKey(), entry.getValue()); }); }); config.putAll(connectorConfig); diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java index 40ffd38..65dd520 100644 --- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java +++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java @@ -53,8 +53,9 @@ public class CreateSourceConnectorRequest extends BaseConnectorRequest { public Map<String, Object> getRequestObject() { Map<String, Object> config = Maps.newHashMap(); - config.put("connector-class", connectorClass); - config.put("connect-topicname", topicName); + config.put("connector.class", connectorClass); + config.put("connect.topicname", topicName); + String sourcePrefix = "."; config.put("transforms", String.join(",", transforms.stream() .map(TransformRequest::getName) .collect(Collectors.toList()))); @@ -62,7 +63,7 @@ public class CreateSourceConnectorRequest extends BaseConnectorRequest { transform.getConfig() .entrySet() .forEach(entry -> { - config.put("transforms" + "-" + transform.getName() + "-" + entry.getKey(), entry.getValue()); + config.put("transforms" + sourcePrefix + transform.getName() + sourcePrefix + entry.getKey(), entry.getValue()); }); }); config.putAll(connectorConfig);
