odbozhou commented on code in PR #52:
URL: https://github.com/apache/rocketmq-connect/pull/52#discussion_r846989942


##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java:
##########
@@ -154,6 +156,45 @@ public static DefaultMQAdminExt 
startMQAdminTool(ConnectConfig connectConfig) th
         return defaultMQAdminExt;
     }
 
+    public static void createTopic(ConnectConfig connectConfig, TopicConfig 
topicConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+
+            Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, 
connectConfig.getClusterName());
+            for (String addr : masterSet) {
+                defaultMQAdminExt.createAndUpdateTopicConfig(addr, 
topicConfig);
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException("create topic: " + 
topicConfig.getTopicName() + " failed", e);
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+    }
+
+    public static boolean isAutoCreateTopic(ConnectConfig connectConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        boolean autoCreateTopic = true;
+
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, 
connectConfig.getClusterName());
+            for (String addr : masterSet) {
+                Properties brokerConfig = 
defaultMQAdminExt.getBrokerConfig(addr);
+                autoCreateTopic = autoCreateTopic && 
Boolean.parseBoolean(brokerConfig.getProperty("autoCreateTopicEnable"));
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException("get broker config 
autoCreateTopicEnable failed", e);

Review Comment:
   IllegalArgumentException may be inaccurate



##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java:
##########
@@ -69,7 +71,15 @@ public RebalanceImpl(Worker worker, ConfigManagementService 
configManagementServ
 
     public void checkClusterStoreTopic() {
         if (!clusterManagementService.hasClusterStoreTopic()) {
-            log.error("cluster store topic not exist, apply first please!");
+            if 
(ConnectUtil.isAutoCreateTopic(this.connectController.getConnectConfig())) {
+                TopicConfig topicConfig = new 
TopicConfig(this.connectController.getConnectConfig().getClusterStoreTopic(), 
1, 1, 6);

Review Comment:
   Better to define constants instead of using magic values



##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java:
##########
@@ -154,6 +156,45 @@ public static DefaultMQAdminExt 
startMQAdminTool(ConnectConfig connectConfig) th
         return defaultMQAdminExt;
     }
 
+    public static void createTopic(ConnectConfig connectConfig, TopicConfig 
topicConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+
+            Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, 
connectConfig.getClusterName());
+            for (String addr : masterSet) {
+                defaultMQAdminExt.createAndUpdateTopicConfig(addr, 
topicConfig);
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException("create topic: " + 
topicConfig.getTopicName() + " failed", e);

Review Comment:
   IllegalArgumentException may be inaccurate



##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java:
##########
@@ -154,6 +156,45 @@ public static DefaultMQAdminExt 
startMQAdminTool(ConnectConfig connectConfig) th
         return defaultMQAdminExt;
     }
 
+    public static void createTopic(ConnectConfig connectConfig, TopicConfig 
topicConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+
+            Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, 
connectConfig.getClusterName());

Review Comment:
   When a topic is automatically created, is the clusterName required in the 
connect config, and can it be obtained through the name server?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to