odbozhou commented on code in PR #52:
URL: https://github.com/apache/rocketmq-connect/pull/52#discussion_r846989942
##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java:
##########
@@ -154,6 +156,45 @@ public static DefaultMQAdminExt
startMQAdminTool(ConnectConfig connectConfig) th
return defaultMQAdminExt;
}
+ public static void createTopic(ConnectConfig connectConfig, TopicConfig
topicConfig) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+
+ Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt,
connectConfig.getClusterName());
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr,
topicConfig);
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException("create topic: " +
topicConfig.getTopicName() + " failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ }
+
+ public static boolean isAutoCreateTopic(ConnectConfig connectConfig) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ boolean autoCreateTopic = true;
+
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+ Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt,
connectConfig.getClusterName());
+ for (String addr : masterSet) {
+ Properties brokerConfig =
defaultMQAdminExt.getBrokerConfig(addr);
+ autoCreateTopic = autoCreateTopic &&
Boolean.parseBoolean(brokerConfig.getProperty("autoCreateTopicEnable"));
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException("get broker config
autoCreateTopicEnable failed", e);
Review Comment:
IllegalArgumentException may be inaccurate
##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java:
##########
@@ -69,7 +71,15 @@ public RebalanceImpl(Worker worker, ConfigManagementService
configManagementServ
public void checkClusterStoreTopic() {
if (!clusterManagementService.hasClusterStoreTopic()) {
- log.error("cluster store topic not exist, apply first please!");
+ if
(ConnectUtil.isAutoCreateTopic(this.connectController.getConnectConfig())) {
+ TopicConfig topicConfig = new
TopicConfig(this.connectController.getConnectConfig().getClusterStoreTopic(),
1, 1, 6);
Review Comment:
Better to define constants instead of using magic values
##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java:
##########
@@ -154,6 +156,45 @@ public static DefaultMQAdminExt
startMQAdminTool(ConnectConfig connectConfig) th
return defaultMQAdminExt;
}
+ public static void createTopic(ConnectConfig connectConfig, TopicConfig
topicConfig) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+
+ Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt,
connectConfig.getClusterName());
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr,
topicConfig);
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException("create topic: " +
topicConfig.getTopicName() + " failed", e);
Review Comment:
IllegalArgumentException may be inaccurate
##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java:
##########
@@ -154,6 +156,45 @@ public static DefaultMQAdminExt
startMQAdminTool(ConnectConfig connectConfig) th
return defaultMQAdminExt;
}
+ public static void createTopic(ConnectConfig connectConfig, TopicConfig
topicConfig) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+
+ Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt,
connectConfig.getClusterName());
Review Comment:
When a topic is automatically created, is the clusterName required in the
connect config, and can it be obtained through the name server?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]