This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
     new e6c1673  [TUBEMQ-536] broker - register special topic to master (#408)
e6c1673 is described below

commit e6c16731b0ac4487cae9af1a247ea4537696abe3
Author: EMsnap <[email protected]>
AuthorDate: Wed Feb 24 17:36:21 2021 +0800

    [TUBEMQ-536] broker - register special topic to master (#408)
    
    * [TUBEMQ-536] broker - register special topic to master
    
    * [TUBEMQ-536] broker - register special topic to master
---
 .../org/apache/tubemq/corebase/TBaseConstants.java |  2 ++
 .../bdbstore/bdbentitys/BdbBrokerConfEntity.java   |  8 ++++++
 .../web/handler/WebBrokerDefConfHandler.java       | 30 +++++++++++++++++++++-
 3 files changed, 39 insertions(+), 1 deletion(-)

diff --git 
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java 
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
index 5238323..7c515a6 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
@@ -64,6 +64,8 @@ public class TBaseConstants {
 
     public static final long CFG_DEFAULT_AUTH_TIMESTAMP_VALID_INTERVAL = 20000;
 
+    public static final String OFFSET_TOPIC = "offsetTopic";
+
     public static final int META_MB_UNIT_SIZE = (1024 * 1024);
     public static final int META_MESSAGE_SIZE_ADJUST = (512 * 1024);
     public static final int META_MAX_MESSAGE_HEADER_SIZE = (10 * 1024);
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
index 318e2ca..9ef9c2d 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
@@ -371,6 +371,14 @@ public class BdbBrokerConfEntity implements Serializable {
         return numPartitions;
     }
 
+    public boolean getAcceptPublish() {
+        return acceptPublish;
+    }
+
+    public boolean getAcceptSubscribe() {
+        return acceptSubscribe;
+    }
+
     public void setDftNumPartitions(int numPartitions) {
         this.numPartitions = numPartitions;
     }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
index d967af3..59d39ca 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
@@ -18,6 +18,7 @@
 package org.apache.tubemq.server.master.web.handler;
 
 import static java.lang.Math.abs;
+import static org.apache.tubemq.corebase.TBaseConstants.OFFSET_TOPIC;
 
 import com.google.common.collect.Maps;
 import java.text.SimpleDateFormat;
@@ -310,7 +311,6 @@ public class WebBrokerDefConfHandler extends 
AbstractWebHandler {
                             
.append(TokenConstants.EQ).append(memCacheFlushIntvl)
                             
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_TLS_PORT)
                             
.append(TokenConstants.EQ).append(brokerTlsPort).toString();
-            strBuffer.delete(0, strBuffer.length());
             BdbBrokerConfEntity brokerConfEntity =
                     new BdbBrokerConfEntity(brokerId, brokerIp, brokerPort,
                             numPartitions, unflushThreshold, unflushInterval,
@@ -318,6 +318,10 @@ public class WebBrokerDefConfHandler extends 
AbstractWebHandler {
                             acceptSubscribe, attributes, true, false, 
createUser,
                             createDate, modifyUser, modifyDate);
             brokerConfManager.confAddBrokerDefaultConfig(brokerConfEntity);
+            strBuffer.delete(0, strBuffer.length());
+
+            // for every broker configured, create an offset topic to store 
offsets
+            inAddOffsetTopic(brokerConfEntity, brokerId, attributes, 
strBuffer);
             
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
         } catch (Exception e) {
             strBuffer.delete(0, strBuffer.length());
@@ -327,6 +331,30 @@ public class WebBrokerDefConfHandler extends 
AbstractWebHandler {
         return strBuffer;
     }
 
+
+    private void inAddOffsetTopic(BdbBrokerConfEntity brokerConf,
+        int brokerId, String attributes, StringBuilder topicNameStrBuilder) 
throws Exception {
+
+        // reuse stringBuilder to create topic name
+        String topicName = 
topicNameStrBuilder.append(OFFSET_TOPIC).append(brokerId).toString();
+
+        BdbTopicConfEntity bdbTopicConfEntity = new 
BdbTopicConfEntity(brokerConf.getBrokerId(),
+            brokerConf.getBrokerIp(), brokerConf.getBrokerPort(), topicName,
+            brokerConf.getDftNumPartitions(), 
brokerConf.getDftUnflushThreshold(),
+            brokerConf.getDftUnflushInterval(), brokerConf.getDftDeleteWhen(),
+            brokerConf.getDftDeletePolicy(), brokerConf.getAcceptPublish(),
+            brokerConf.getAcceptSubscribe(), brokerConf.getNumTopicStores(),
+            attributes, brokerConf.getRecordCreateUser(), 
brokerConf.getRecordCreateDate(),
+            brokerConf.getRecordModifyUser(), 
brokerConf.getRecordModifyDate());
+
+        boolean result = 
brokerConfManager.confAddTopicConfig(bdbTopicConfEntity);
+        if (result) {  // if it succeeds in adding topic config
+            brokerConfManager.updateBrokerConfChanged(brokerConf.getBrokerId(),
+                    true, false);
+        }
+        topicNameStrBuilder.delete(0, topicNameStrBuilder.length());
+    }
+
     /**
      * Add default config to brokers in batch
      *

Reply via email to