morningman closed pull request #409: Refactor heartbeat logic
URL: https://github.com/apache/incubator-doris/pull/409
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/fe/src/main/java/org/apache/doris/analysis/ExportStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 4daad37f..b73eb36c 100644
--- a/fe/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -17,9 +17,9 @@
 
 package org.apache.doris.analysis;
 
-import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
@@ -145,8 +145,13 @@ public void analyze(Analyzer analyzer) throws 
AnalysisException, UserException {
         if (brokerDesc == null) {
             throw new AnalysisException("broker is not provided");
         }
-        BrokerMgr.BrokerAddress address = 
analyzer.getCatalog().getBrokerMgr().getAnyBroker(brokerDesc.getName());
-        if (address == null) {
+
+        if 
(!analyzer.getCatalog().getBrokerMgr().contaisnBroker(brokerDesc.getName())) {
+            throw new AnalysisException("broker " + brokerDesc.getName() + " 
does not exist");
+        }
+
+        FsBroker broker = 
analyzer.getCatalog().getBrokerMgr().getAnyBroker(brokerDesc.getName());
+        if (broker == null) {
             throw new AnalysisException("broker is not exist");
         }
 
diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java
index 610a212a..bff91911 100644
--- a/fe/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.ScalarType;
@@ -27,14 +28,7 @@
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSetMetaData;
 
-// Show
 public class ShowBrokerStmt extends ShowStmt {
-    private static final ShowResultSetMetaData META_DATA =
-            ShowResultSetMetaData.builder()
-                    .addColumn(new Column("Broker", 
ScalarType.createVarchar(20)))
-                    .addColumn(new Column("Instances", 
ScalarType.createVarchar(200)))
-                    .build();
-
     public ShowBrokerStmt() {
     }
 
@@ -49,6 +43,10 @@ public void analyze(Analyzer analyzer) throws 
AnalysisException {
 
     @Override
     public ShowResultSetMetaData getMetaData() {
-        return META_DATA;
+        ShowResultSetMetaData.Builder builder = 
ShowResultSetMetaData.builder();
+        for (String title : BrokerMgr.BROKER_PROC_NODE_TITLE_NAMES) {
+            builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
+        }
+        return builder.build();
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/src/main/java/org/apache/doris/backup/BackupJob.java
index e0586930..0d3707b8 100644
--- a/fe/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -17,21 +17,11 @@
 
 package org.apache.doris.backup;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.base.Strings;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 import org.apache.doris.analysis.TableRef;
 import org.apache.doris.backup.Status.ErrCode;
-import org.apache.doris.catalog.BrokerMgr.BrokerAddress;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
@@ -50,6 +40,17 @@
 import org.apache.doris.thrift.TFinishTaskRequest;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TTaskType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -471,13 +472,13 @@ private void uploadSnapshot() {
             int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
             LOG.debug("backend {} has {} batch, total {} tasks, {}", beId, 
batchNum, totalNum, this);
 
-            List<BrokerAddress> brokerAddrs = Lists.newArrayList();
-            Status st = repo.getBrokerAddress(beId, catalog, brokerAddrs);
+            List<FsBroker> brokers = Lists.newArrayList();
+            Status st = repo.getBrokerAddress(beId, catalog, brokers);
             if (!st.ok()) {
                 status = st;
                 return;
             }
-            Preconditions.checkState(brokerAddrs.size() == 1);
+            Preconditions.checkState(brokers.size() == 1);
             
             // allot tasks
             int index = 0;
@@ -492,7 +493,7 @@ private void uploadSnapshot() {
                 }
                 long signature = catalog.getNextId();
                 UploadTask task = new UploadTask(null, beId, signature, jobId, 
dbId, srcToDest,
-                        brokerAddrs.get(0), repo.getStorage().getProperties());
+                        brokers.get(0), repo.getStorage().getProperties());
                 batchTask.addTask(task);
                 unfinishedTaskIds.add(signature);
             }
diff --git a/fe/src/main/java/org/apache/doris/backup/BlobStorage.java 
b/fe/src/main/java/org/apache/doris/backup/BlobStorage.java
index 3c40e2d7..214678f3 100644
--- a/fe/src/main/java/org/apache/doris/backup/BlobStorage.java
+++ b/fe/src/main/java/org/apache/doris/backup/BlobStorage.java
@@ -18,8 +18,8 @@
 package org.apache.doris.backup;
 
 import org.apache.doris.backup.Status.ErrCode;
-import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
@@ -583,14 +583,14 @@ public static String clientId() {
     }
 
     private Status getBroker(Pair<TPaloBrokerService.Client, TNetworkAddress> 
result) {
-        BrokerMgr.BrokerAddress brokerAddress = null;
+        FsBroker broker = null;
         try {
             String localIP = FrontendOptions.getLocalHostAddress();
-            brokerAddress = 
Catalog.getInstance().getBrokerMgr().getBroker(brokerName, localIP);
+            broker = 
Catalog.getInstance().getBrokerMgr().getBroker(brokerName, localIP);
         } catch (AnalysisException e) {
             return new Status(ErrCode.COMMON_ERROR, "failed to get a broker 
address: " + e.getMessage());
         }
-        TNetworkAddress address = new TNetworkAddress(brokerAddress.ip, 
brokerAddress.port);
+        TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port);
         TPaloBrokerService.Client client = null;
         try {
             client = ClientPool.brokerPool.borrowObject(address);
diff --git a/fe/src/main/java/org/apache/doris/backup/Repository.java 
b/fe/src/main/java/org/apache/doris/backup/Repository.java
index 25acd0c1..0b46470b 100644
--- a/fe/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/src/main/java/org/apache/doris/backup/Repository.java
@@ -17,21 +17,22 @@
 
 package org.apache.doris.backup;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.doris.backup.Status.ErrCode;
-import org.apache.doris.catalog.BrokerMgr.BrokerAddress;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.system.Backend;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.json.JSONObject;
@@ -486,7 +487,7 @@ public static String 
replaceFileNameWithChecksumFileName(String origPath, String
         return origPath.substring(0, origPath.lastIndexOf(PATH_DELIMITER) + 1) 
+ fileNameWithChecksum;
     }
 
-    public Status getBrokerAddress(Long beId, Catalog catalog, 
List<BrokerAddress> brokerAddrs) {
+    public Status getBrokerAddress(Long beId, Catalog catalog, List<FsBroker> 
brokerAddrs) {
         // get backend
         Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
         if (be == null) {
@@ -495,7 +496,7 @@ public Status getBrokerAddress(Long beId, Catalog catalog, 
List<BrokerAddress> b
         }
 
         // get proper broker for this backend
-        BrokerAddress brokerAddr = null;
+        FsBroker brokerAddr = null;
         try {
             brokerAddr = 
catalog.getBrokerMgr().getBroker(storage.getBrokerName(), be.getHost());
         } catch (AnalysisException e) {
diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java
index f59a49f6..64227675 100644
--- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -23,11 +23,11 @@
 import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
 import org.apache.doris.backup.RestoreFileMapping.IdChain;
 import org.apache.doris.backup.Status.ErrCode;
-import org.apache.doris.catalog.BrokerMgr.BrokerAddress;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DataProperty;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
@@ -953,7 +953,7 @@ private void downloadSnapshots() {
                     LOG.debug("backend {} has {} batch, total {} tasks, {}",
                               beId, batchNum, totalNum, this);
 
-                    List<BrokerAddress> brokerAddrs = Lists.newArrayList();
+                    List<FsBroker> brokerAddrs = Lists.newArrayList();
                     Status st = repo.getBrokerAddress(beId, catalog, 
brokerAddrs);
                     if (!st.ok()) {
                         status = st;
diff --git a/fe/src/main/java/org/apache/doris/catalog/BrokerMgr.java 
b/fe/src/main/java/org/apache/doris/catalog/BrokerMgr.java
index 10e63b70..7cb57ec4 100644
--- a/fe/src/main/java/org/apache/doris/catalog/BrokerMgr.java
+++ b/fe/src/main/java/org/apache/doris/catalog/BrokerMgr.java
@@ -26,6 +26,7 @@
 import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.common.proc.ProcNodeInterface;
 import org.apache.doris.common.proc.ProcResult;
+import org.apache.doris.common.util.TimeUtils;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
@@ -36,6 +37,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -46,92 +48,25 @@
  */
 public class BrokerMgr {
     public static final ImmutableList<String> BROKER_PROC_NODE_TITLE_NAMES = 
new ImmutableList.Builder<String>()
-            .add("BrokerName").add("BrokerAddress").build();
+            .add("Name").add("IP").add("Port").add("IsAlive")
+            .add("LstStartTime").add("LstUpdateTime").add("ErrMsg")
+            .build();
 
     private final Random random = new Random(System.currentTimeMillis());
 
-    public static class BrokerAddress implements Writable, 
Comparable<BrokerAddress> {
-        public String ip;
-        public int port;
-
-        public BrokerAddress() {
-        }
-
-        public BrokerAddress(String ip, int port) {
-            this.ip = ip;
-            this.port = port;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (!(o instanceof BrokerAddress)) {
-                return false;
-            }
-
-            BrokerAddress that = (BrokerAddress) o;
-
-            if (port != that.port) {
-                return false;
-            }
-            return ip.equals(that.ip);
-
-        }
-
-        @Override
-        public int hashCode() {
-            int result = ip.hashCode();
-            result = 31 * result + port;
-            return result;
-        }
-
-        @Override
-        public int compareTo(BrokerAddress o) {
-            int ret = ip.compareTo(o.ip);
-            if (ret != 0) {
-                return ret;
-            }
-            return port - o.port;
-        }
-
-        @Override
-        public void write(DataOutput out) throws IOException {
-            Text.writeString(out, ip);
-            out.writeInt(port);
-        }
-
-        @Override
-        public void readFields(DataInput in) throws IOException {
-            ip = Text.readString(in);
-            port = in.readInt();
-        }
-
-        @Override
-        public String toString() {
-            return ip + ":" + port;
-        }
-
-        public static BrokerAddress readIn(DataInput in) throws IOException {
-            BrokerAddress address = new BrokerAddress();
-            address.readFields(in);
-            return address;
-        }
-    }
-
     // we need IP to find the co-location broker.
-    // { BrokerName -> { IP -> [BrokerAddress] } }
-    private final Map<String, ArrayListMultimap<String, BrokerAddress>> 
brokersMap = Maps.newHashMap();
-    private final Map<String, List<BrokerAddress>> addressListMap = 
Maps.newHashMap();
+    // { BrokerName -> { IP -> [FsBroker] } }
+    private final Map<String, ArrayListMultimap<String, FsBroker>> brokersMap 
= Maps.newHashMap();
+    // { BrokerName -> { list of FsBroker }
+    private final Map<String, List<FsBroker>> brokerListMap = 
Maps.newHashMap();
     private final ReentrantLock lock = new ReentrantLock();
     private BrokerProcNode procNode = null;
 
     public BrokerMgr() {
     }
 
-    public Map<String, List<BrokerAddress>> getAddressListMap() {
-        return addressListMap;
+    public Map<String, List<FsBroker>> getBrokerListMap() {
+        return brokerListMap;
     }
 
     public void execute(ModifyBrokerClause clause) throws DdlException {
@@ -150,16 +85,6 @@ public void execute(ModifyBrokerClause clause) throws 
DdlException {
         }
     }
 
-    // Get all brokers' name
-    public Collection<String> getBrokerNames() {
-        lock.lock();
-        try {
-            return brokersMap.keySet();
-        } finally {
-            lock.unlock();
-        }
-    }
-
     public boolean contaisnBroker(String brokerName) {
         lock.lock();
         try {
@@ -169,52 +94,72 @@ public boolean contaisnBroker(String brokerName) {
         }
     }
 
-    public BrokerAddress getAnyBroker(String name) {
+    public FsBroker getAnyBroker(String brokerName) {
         lock.lock();
         try {
-            List<BrokerAddress> addressList = addressListMap.get(name);
-            if (addressList == null || addressList.isEmpty()) {
+            List<FsBroker> brokerList = brokerListMap.get(brokerName);
+            if (brokerList == null || brokerList.isEmpty()) {
                 return null;
             }
-            return addressList.get(random.nextInt(addressList.size()));
+
+            Collections.shuffle(brokerList);
+            for (FsBroker fsBroker : brokerList) {
+                if (fsBroker.isAlive) {
+                    return fsBroker;
+                }
+            }
+            return null;
         } finally {
             lock.unlock();
         }
     }
 
-    public BrokerAddress getBroker(String name, String host) throws 
AnalysisException {
+    public FsBroker getBroker(String brokerName, String host) throws 
AnalysisException {
         lock.lock();
         try {
-            ArrayListMultimap<String, BrokerAddress> brokerAddsMap = 
brokersMap.get(name);
+            ArrayListMultimap<String, FsBroker> brokerAddsMap = 
brokersMap.get(brokerName);
             if (brokerAddsMap == null || brokerAddsMap.size() == 0) {
-                throw new AnalysisException("Unknown broker name(" + name + 
")");
+                throw new AnalysisException("Unknown broker name(" + 
brokerName + ")");
             }
-            List<BrokerAddress> addressList = brokerAddsMap.get(host);
-            if (addressList.isEmpty()) {
-                addressList = addressListMap.get(name);
+            List<FsBroker> brokers = brokerAddsMap.get(host);
+            if (brokers.isEmpty()) {
+                brokers = brokerListMap.get(brokerName);
             }
-            return addressList.get(random.nextInt(addressList.size()));
+
+            Collections.shuffle(brokers);
+            for (FsBroker fsBroker : brokers) {
+                if (fsBroker.isAlive) {
+                    return fsBroker;
+                }
+            }
+
+            throw new AnalysisException("failed to find alive broker");
         } finally {
             lock.unlock();
         }
     }
 
-    public List<BrokerAddress> getBrokers(String name, List<String> hosts) 
throws AnalysisException {
+    // find broker which is exactly matching name, host and port. return null 
if not found
+    public FsBroker getBroker(String name, String host, int port) {
         lock.lock();
         try {
-            ArrayListMultimap<String, BrokerAddress> brokerAddsMap = 
brokersMap.get(name);
+            ArrayListMultimap<String, FsBroker> brokerAddsMap = 
brokersMap.get(name);
             if (brokerAddsMap == null || brokerAddsMap.size() == 0) {
-                throw new AnalysisException("Unknown broker name(" + name + 
")");
+                return null;
             }
-            List<BrokerAddress> brokerList = Lists.newArrayList();
-            for (String host : hosts) {
-                List<BrokerAddress> addressList = brokerAddsMap.get(host);
-                if (addressList.isEmpty()) {
-                    addressList = addressListMap.get(name);
+
+            List<FsBroker> addressList = brokerAddsMap.get(host);
+            if (addressList.isEmpty()) {
+                return null;
+            }
+
+            for (FsBroker fsBroker : addressList) {
+                if (fsBroker.port == port) {
+                    return fsBroker;
                 }
-                
brokerList.add(addressList.get(random.nextInt(addressList.size())));
             }
-            return brokerList;
+            return null;
+
         } finally {
             lock.unlock();
         }
@@ -223,46 +168,46 @@ public BrokerAddress getBroker(String name, String host) 
throws AnalysisExceptio
     public void addBrokers(String name, Collection<Pair<String, Integer>> 
addresses) throws DdlException {
         lock.lock();
         try {
-            ArrayListMultimap<String, BrokerAddress> brokerAddrsMap = 
brokersMap.get(name);
+            ArrayListMultimap<String, FsBroker> brokerAddrsMap = 
brokersMap.get(name);
             if (brokerAddrsMap == null) {
                 brokerAddrsMap = ArrayListMultimap.create();
             }
 
-            List<BrokerAddress> addedBrokerAddress = Lists.newArrayList();
+            List<FsBroker> addedBrokerAddress = Lists.newArrayList();
             for (Pair<String, Integer> pair : addresses) {
-                List<BrokerAddress> addressList = 
brokerAddrsMap.get(pair.first);
-                for (BrokerAddress addr : addressList) {
+                List<FsBroker> addressList = brokerAddrsMap.get(pair.first);
+                for (FsBroker addr : addressList) {
                     if (addr.port == pair.second) {
                         throw new DdlException("Broker(" + pair.first + ":" + 
pair.second
                                 + ") has already in brokers.");
                     }
                 }
-                addedBrokerAddress.add(new BrokerAddress(pair.first, 
pair.second));
+                addedBrokerAddress.add(new FsBroker(pair.first, pair.second));
             }
             Catalog.getInstance().getEditLog().logAddBroker(new 
ModifyBrokerInfo(name, addedBrokerAddress));
-            for (BrokerAddress address : addedBrokerAddress) {
+            for (FsBroker address : addedBrokerAddress) {
                 brokerAddrsMap.put(address.ip, address);
             }
             brokersMap.put(name, brokerAddrsMap);
-            addressListMap.put(name, 
Lists.newArrayList(brokerAddrsMap.values()));
+            brokerListMap.put(name, 
Lists.newArrayList(brokerAddrsMap.values()));
         } finally {
             lock.unlock();
         }
     }
 
-    public void replayAddBrokers(String name, List<BrokerAddress> addresses) {
+    public void replayAddBrokers(String name, List<FsBroker> addresses) {
         lock.lock();
         try {
-            ArrayListMultimap<String, BrokerAddress> brokerAddrsMap = 
brokersMap.get(name);
+            ArrayListMultimap<String, FsBroker> brokerAddrsMap = 
brokersMap.get(name);
             if (brokerAddrsMap == null) {
                 brokerAddrsMap = ArrayListMultimap.create();
                 brokersMap.put(name, brokerAddrsMap);
             }
-            for (BrokerAddress address : addresses) {
+            for (FsBroker address : addresses) {
                 brokerAddrsMap.put(address.ip, address);
             }
 
-            addressListMap.put(name, 
Lists.newArrayList(brokerAddrsMap.values()));
+            brokerListMap.put(name, 
Lists.newArrayList(brokerAddrsMap.values()));
         } finally {
             lock.unlock();
         }
@@ -271,16 +216,16 @@ public void replayAddBrokers(String name, 
List<BrokerAddress> addresses) {
     public void dropBrokers(String name, Collection<Pair<String, Integer>> 
addresses) throws DdlException {
         lock.lock();
         try {
-            ArrayListMultimap<String, BrokerAddress> brokerAddrsMap = 
brokersMap.get(name);
+            ArrayListMultimap<String, FsBroker> brokerAddrsMap = 
brokersMap.get(name);
             if (brokerAddrsMap == null) {
                 throw new DdlException("Unknown broker name(" + name + ")");
             }
 
-            List<BrokerAddress> dropedAddressList = Lists.newArrayList();
+            List<FsBroker> dropedAddressList = Lists.newArrayList();
             for (Pair<String, Integer> pair : addresses) {
-                List<BrokerAddress> addressList = 
brokerAddrsMap.get(pair.first);
+                List<FsBroker> addressList = brokerAddrsMap.get(pair.first);
                 boolean found = false;
-                for (BrokerAddress addr : addressList) {
+                for (FsBroker addr : addressList) {
                     if (addr.port == pair.second) {
                         dropedAddressList.add(addr);
                         found = true;
@@ -292,25 +237,25 @@ public void dropBrokers(String name, 
Collection<Pair<String, Integer>> addresses
                 }
             }
             Catalog.getInstance().getEditLog().logDropBroker(new 
ModifyBrokerInfo(name, dropedAddressList));
-            for (BrokerAddress address : dropedAddressList) {
+            for (FsBroker address : dropedAddressList) {
                 brokerAddrsMap.remove(address.ip, address);
             }
 
-            addressListMap.put(name, 
Lists.newArrayList(brokerAddrsMap.values()));
+            brokerListMap.put(name, 
Lists.newArrayList(brokerAddrsMap.values()));
         } finally {
             lock.unlock();
         }
     }
 
-    public void replayDropBrokers(String name, List<BrokerAddress> addresses) {
+    public void replayDropBrokers(String name, List<FsBroker> addresses) {
         lock.lock();
         try {
-            ArrayListMultimap<String, BrokerAddress> brokerAddrsMap = 
brokersMap.get(name);
-            for (BrokerAddress addr : addresses) {
+            ArrayListMultimap<String, FsBroker> brokerAddrsMap = 
brokersMap.get(name);
+            for (FsBroker addr : addresses) {
                 brokerAddrsMap.remove(addr.ip, addr);
             }
 
-            addressListMap.put(name, 
Lists.newArrayList(brokerAddrsMap.values()));
+            brokerListMap.put(name, 
Lists.newArrayList(brokerAddrsMap.values()));
         } finally {
             lock.unlock();
         }
@@ -324,7 +269,7 @@ public void dropAllBroker(String name) throws DdlException {
             }
             Catalog.getInstance().getEditLog().logDropAllBroker(name);
             brokersMap.remove(name);
-            addressListMap.remove(name);
+            brokerListMap.remove(name);
         } finally {
             lock.unlock();
         }
@@ -334,7 +279,7 @@ public void replayDropAllBroker(String name) {
         lock.lock();
         try {
             brokersMap.remove(name);
-            addressListMap.remove(name);
+            brokerListMap.remove(name);
         } finally {
             lock.unlock();
         }
@@ -372,12 +317,17 @@ public ProcResult fetchResult() {
 
             lock.lock();
             try {
-                for (Map.Entry<String, ArrayListMultimap<String, 
BrokerAddress>> entry : brokersMap.entrySet()) {
+                for (Map.Entry<String, ArrayListMultimap<String, FsBroker>> 
entry : brokersMap.entrySet()) {
                     String brokerName = entry.getKey();
-                    for (BrokerAddress address : entry.getValue().values()) {
+                    for (FsBroker broker : entry.getValue().values()) {
                         List<String> row = Lists.newArrayList();
                         row.add(brokerName);
-                        row.add(address.toString());
+                        row.add(broker.ip);
+                        row.add(String.valueOf(broker.port));
+                        row.add(String.valueOf(broker.isAlive));
+                        
row.add(TimeUtils.longToTimeString(broker.lastStartTime));
+                        
row.add(TimeUtils.longToTimeString(broker.lastUpdateTime));
+                        row.add(broker.msg);
                         result.addRow(row);
                     }
                 }
@@ -390,12 +340,12 @@ public ProcResult fetchResult() {
 
     public static class ModifyBrokerInfo implements Writable {
         public String brokerName;
-        public List<BrokerAddress> brokerAddresses;
+        public List<FsBroker> brokerAddresses;
 
         public ModifyBrokerInfo() {
         }
 
-        public ModifyBrokerInfo(String brokerName, List<BrokerAddress> 
brokerAddresses) {
+        public ModifyBrokerInfo(String brokerName, List<FsBroker> 
brokerAddresses) {
             this.brokerName = brokerName;
             this.brokerAddresses = brokerAddresses;
         }
@@ -404,7 +354,7 @@ public ModifyBrokerInfo(String brokerName, 
List<BrokerAddress> brokerAddresses)
         public void write(DataOutput out) throws IOException {
             Text.writeString(out, brokerName);
             out.writeInt(brokerAddresses.size());
-            for (BrokerAddress address : brokerAddresses) {
+            for (FsBroker address : brokerAddresses) {
                 address.write(out);
             }
         }
@@ -415,7 +365,7 @@ public void readFields(DataInput in) throws IOException {
             int size = in.readInt();
             brokerAddresses = Lists.newArrayList();
             for (int i = 0; i < size; ++i) {
-                brokerAddresses.add(BrokerAddress.readIn(in));
+                brokerAddresses.add(FsBroker.readIn(in));
             }
         }
 
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 4415e24f..bc1975f6 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -71,7 +71,6 @@
 import org.apache.doris.backup.BackupHandler;
 import org.apache.doris.backup.BackupJob_D;
 import org.apache.doris.backup.RestoreJob_D;
-import org.apache.doris.catalog.BrokerMgr.BrokerAddress;
 import org.apache.doris.catalog.Database.DbState;
 import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
 import org.apache.doris.catalog.KuduPartition.KuduRange;
@@ -147,8 +146,8 @@
 import org.apache.doris.persist.Storage;
 import org.apache.doris.persist.StorageInfo;
 import org.apache.doris.persist.TableInfo;
-import org.apache.doris.persist.TruncateTableInfo;
 import org.apache.doris.persist.TablePropertyInfo;
+import org.apache.doris.persist.TruncateTableInfo;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.JournalObservable;
 import org.apache.doris.qe.SessionVariable;
@@ -157,6 +156,7 @@
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Backend.BackendState;
 import org.apache.doris.system.Frontend;
+import org.apache.doris.system.HeartbeatMgr;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTask;
@@ -312,6 +312,7 @@
     private JournalObservable journalObservable;
 
     private SystemInfoService systemInfo;
+    private HeartbeatMgr heartbeatMgr;
     private TabletInvertedIndex tabletInvertedIndex;
     private ColocateTableIndex colocateTableIndex;
 
@@ -361,6 +362,10 @@ private SystemInfoService getClusterInfo() {
         return this.systemInfo;
     }
 
+    private HeartbeatMgr getHeartbeatMgr() {
+        return this.heartbeatMgr;
+    }
+
     public TabletInvertedIndex getTabletInvertedIndex() {
         return this.tabletInvertedIndex;
     }
@@ -415,6 +420,7 @@ private Catalog() {
         this.masterIp = "";
 
         this.systemInfo = new SystemInfoService();
+        this.heartbeatMgr = new HeartbeatMgr(systemInfo);
         this.tabletInvertedIndex = new TabletInvertedIndex();
         this.colocateTableIndex = new ColocateTableIndex();
         this.recycleBin = new CatalogRecycleBin();
@@ -492,6 +498,10 @@ public static SystemInfoService getCurrentSystemInfo() {
         return getCurrentCatalog().getClusterInfo();
     }
 
+    public static HeartbeatMgr getCurrentHeartbeatMgr() {
+        return getCurrentCatalog().getHeartbeatMgr();
+    }
+
     // use this to get correct TabletInvertedIndex instance
     public static TabletInvertedIndex getCurrentInvertedIndex() {
         return getCurrentCatalog().getTabletInvertedIndex();
@@ -1007,10 +1017,10 @@ private void transferToMaster() throws IOException {
         checkpointThreadId = checkpointer.getId();
         LOG.info("checkpointer thread started. thread id is {}", 
checkpointThreadId);
 
-        // ClusterInfoService
-        Catalog.getCurrentSystemInfo().setMaster(
+        // heartbeat mgr
+        heartbeatMgr.setMaster(
                 FrontendOptions.getLocalHostAddress(), Config.rpc_port, 
clusterId, token, epoch);
-        Catalog.getCurrentSystemInfo().start();
+        heartbeatMgr.start();
 
         pullLoadJobMgr.start();
 
@@ -2267,6 +2277,15 @@ public Frontend getFeByHost(String host) {
         return null;
     }
 
+    public Frontend getFeByName(String name) {
+        for (Frontend fe : frontends.values()) {
+            if (fe.getNodeName().equals(name)) {
+                return fe;
+            }
+        }
+        return null;
+    }
+
 
     // The interface which DdlExecutor needs.
     public void createDb(CreateDbStmt stmt) throws DdlException {
@@ -4230,6 +4249,10 @@ public int getClusterId() {
         return this.clusterId;
     }
 
+    public String getToken() {
+        return token;
+    }
+
     public Database getDb(String name) {
         if (fullNameToDb.containsKey(name)) {
             return fullNameToDb.get(name);
@@ -5599,18 +5622,18 @@ public long saveCluster(DataOutputStream dos, long 
checksum) throws IOException
     }
 
     public long saveBrokers(DataOutputStream dos, long checksum) throws 
IOException {
-        Map<String, List<BrokerAddress>> addressListMap = 
brokerMgr.getAddressListMap();
+        Map<String, List<FsBroker>> addressListMap = 
brokerMgr.getBrokerListMap();
         int size = addressListMap.size();
         checksum ^= size;
         dos.writeInt(size);
 
-        for (Map.Entry<String, List<BrokerAddress>> entry : 
addressListMap.entrySet()) {
+        for (Map.Entry<String, List<FsBroker>> entry : 
addressListMap.entrySet()) {
             Text.writeString(dos, entry.getKey());
-            final List<BrokerAddress> addrs = entry.getValue();
+            final List<FsBroker> addrs = entry.getValue();
             size = addrs.size();
             checksum ^= size;
             dos.writeInt(size);
-            for (BrokerAddress addr : addrs) {
+            for (FsBroker addr : addrs) {
                 addr.write(dos);
             }
         }
@@ -5626,9 +5649,9 @@ public long loadBrokers(DataInputStream dis, long 
checksum) throws IOException,
                 String brokerName = Text.readString(dis);
                 int size = dis.readInt();
                 checksum ^= size;
-                List<BrokerAddress> addrs = Lists.newArrayList();
+                List<FsBroker> addrs = Lists.newArrayList();
                 for (int j = 0; j < size; j++) {
-                    BrokerAddress addr = BrokerAddress.readIn(dis);
+                    FsBroker addr = FsBroker.readIn(dis);
                     addrs.add(addr);
                 }
                 brokerMgr.replayAddBrokers(brokerName, addrs);
diff --git a/fe/src/main/java/org/apache/doris/catalog/FsBroker.java 
b/fe/src/main/java/org/apache/doris/catalog/FsBroker.java
new file mode 100644
index 00000000..967cabcf
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/catalog/FsBroker.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.system.BrokerHbResponse;
+import org.apache.doris.system.HeartbeatResponse.HbStatus;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class FsBroker implements Writable, Comparable<FsBroker> {
+    public String ip;
+    public int port;
+    // msg for ping result
+    public String msg;
+    public long lastUpdateTime;
+    public long lastStartTime;
+
+    public boolean isAlive;
+
+    public FsBroker() {
+    }
+
+    public FsBroker(String ip, int port) {
+        this.ip = ip;
+        this.port = port;
+    }
+
+    /*
+     * handle Broker's heartbeat response.
+     * return true if alive state is changed.
+     */
+    public boolean handleHbResponse(BrokerHbResponse hbResponse) {
+        boolean isChanged = false;
+        if (hbResponse.getStatus() == HbStatus.OK) {
+            if (!isAlive) {
+                isAlive = true;
+                isChanged = true;
+                lastStartTime = hbResponse.getHbTime();
+            }
+            lastUpdateTime = hbResponse.getHbTime();
+            msg = "";
+        } else {
+            if (isAlive) {
+                isAlive = false;
+                isChanged = true;
+            }
+            msg = hbResponse.getMsg();
+        }
+
+        return isChanged;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof FsBroker)) {
+            return false;
+        }
+
+        FsBroker that = (FsBroker) o;
+
+        if (port != that.port) {
+            return false;
+        }
+        return ip.equals(that.ip);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = ip.hashCode();
+        result = 31 * result + port;
+        return result;
+    }
+
+    @Override
+    public int compareTo(FsBroker o) {
+        int ret = ip.compareTo(o.ip);
+        if (ret != 0) {
+            return ret;
+        }
+        return port - o.port;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, ip);
+        out.writeInt(port);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        ip = Text.readString(in);
+        port = in.readInt();
+    }
+
+    @Override
+    public String toString() {
+        return ip + ":" + port;
+    }
+
+    public static FsBroker readIn(DataInput in) throws IOException {
+        FsBroker broker = new FsBroker();
+        broker.readFields(in);
+        return broker;
+    }
+}
+
diff --git a/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java 
b/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java
deleted file mode 100644
index a6d784db..00000000
--- a/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.clone;
-
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.common.util.Daemon;
-import org.apache.doris.system.SystemInfoService;
-
-import java.util.List;
-
-/*
- * LoadBalancer run at a fix interval.
- * Each run will re-calculate the load score of all backends
- */
-public class LoadBalancer extends Daemon {
-
-    private ClusterLoadStatistic clusterLoadStatistic;
-
-    public LoadBalancer() {
-        super("load balancer", 60 * 1000);
-        clusterLoadStatistic = new ClusterLoadStatistic(Catalog.getInstance(),
-                Catalog.getCurrentSystemInfo(),
-                Catalog.getCurrentInvertedIndex());
-    }
-
-    @Override
-    protected void runOneCycle() {
-        clusterLoadStatistic.init(SystemInfoService.DEFAULT_CLUSTER);
-    }
-
-    public BackendLoadStatistic getBackendStatistic(long beId) {
-        return clusterLoadStatistic.getBackendLoadStatistic(beId);
-    }
-
-    public List<List<String>> getClusterStatisticInfo() {
-        return clusterLoadStatistic.getCLusterStatistic();
-    }
-
-    public List<List<String>> getBackendStatisticInfo(long beId) {
-        return clusterLoadStatistic.getBackendStatistic(beId);
-    }
-}
diff --git 
a/fe/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java 
b/fe/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
index 369bcd84..d33ccff3 100644
--- a/fe/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
+++ b/fe/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
@@ -18,7 +18,9 @@
 package org.apache.doris.common.proc;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.system.Frontend;
 
 import com.google.common.collect.ImmutableList;
@@ -34,7 +36,9 @@
  */
 public class FrontendsProcNode implements ProcNodeInterface {
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            
.add("name").add("Host").add("EditLogPort").add("Role").add("IsMaster").add("ClusterId").add("Join")
+            
.add("name").add("Host").add("EditLogPort").add("HttpPort").add("QueryPort").add("RpcPort")
+            
.add("Role").add("IsMaster").add("ClusterId").add("Join").add("IsAlive")
+            .add("ReplayedJournalId").add("LstUpdateTime")
             .build();
     
     private Catalog catalog;
@@ -70,23 +74,35 @@ public static void getFrontendsInfo(Catalog catalog, 
List<List<String>> infos) {
         List<Pair<String, Integer>> allFeHosts = convertToHostPortPair(allFe);
         
         for (Frontend fe : catalog.getFrontends(null /* all */)) {
+
             List<String> info = new ArrayList<String>();
             info.add(fe.getNodeName());
             info.add(fe.getHost());
             info.add(Integer.toString(fe.getEditLogPort()));
-            info.add(fe.getRole().name());
-            if (fe.getHost().equals(masterIp) && fe.getEditLogPort() == 
masterPort) {
-                info.add("true");
+            info.add(Integer.toString(Config.http_port));
+
+            if (fe.getHost().equals(catalog.getSelfNode().first)) {
+                info.add(Integer.toString(Config.query_port));
+                info.add(Integer.toString(Config.rpc_port));
             } else {
-                info.add("false");
+                info.add(Integer.toString(fe.getQueryPort()));
+                info.add(Integer.toString(fe.getRpcPort()));
             }
+
+            info.add(fe.getRole().name());
+            info.add(String.valueOf(fe.getHost().equals(masterIp) && 
fe.getEditLogPort() == masterPort));
+
             info.add(Integer.toString(catalog.getClusterId()));
+            info.add(String.valueOf(isJoin(allFeHosts, fe)));
             
-            if (!isJoin(allFeHosts, fe)) {
-                info.add("false");
-            } else {
+            if (fe.getHost().equals(catalog.getSelfNode().first)) {
                 info.add("true");
+                
info.add(Long.toString(catalog.getEditLog().getMaxJournalId()));
+            } else {
+                info.add(String.valueOf(fe.isAlive()));
+                info.add(Long.toString(fe.getReplayedJournalId()));
             }
+            info.add(TimeUtils.longToTimeString(fe.getLastUpdateTime()));
             
             infos.add(info);
         }
diff --git a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 9a792e87..356644f9 100644
--- a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -18,8 +18,8 @@
 package org.apache.doris.common.util;
 
 import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.UserException;
@@ -43,14 +43,14 @@
 
     public static void parseBrokerFile(String path, BrokerDesc brokerDesc, 
List<TBrokerFileStatus> fileStatuses)
             throws UserException {
-        BrokerMgr.BrokerAddress brokerAddress = null;
+        FsBroker broker = null;
         try {
             String localIP = FrontendOptions.getLocalHostAddress();
-            brokerAddress = 
Catalog.getInstance().getBrokerMgr().getBroker(brokerDesc.getName(), localIP);
+            broker = 
Catalog.getInstance().getBrokerMgr().getBroker(brokerDesc.getName(), localIP);
         } catch (AnalysisException e) {
             throw new UserException(e.getMessage());
         }
-        TNetworkAddress address = new TNetworkAddress(brokerAddress.ip, 
brokerAddress.port);
+        TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port);
         TPaloBrokerService.Client client = null;
         try {
             client = ClientPool.brokerPool.borrowObject(address);
diff --git a/fe/src/main/java/org/apache/doris/common/util/Util.java 
b/fe/src/main/java/org/apache/doris/common/util/Util.java
index 20807318..9eeb20be 100644
--- a/fe/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/src/main/java/org/apache/doris/common/util/Util.java
@@ -17,18 +17,22 @@
 
 package org.apache.doris.common.util;
 
-import com.google.common.collect.Lists;
-
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
+
+import com.google.common.collect.Lists;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -321,5 +325,42 @@ public static String dumpThread(Thread t, int lineNum) {
         }
         return sb.toString();
     }
+
+    public static String getResultForUrl(String urlStr, String 
encodedAuthInfo, int connectTimeoutMs,
+            int readTimeoutMs) {
+        StringBuilder sb = new StringBuilder();
+        InputStream stream = null;
+        try {
+            URL url = new URL(urlStr);
+            URLConnection conn = url.openConnection();
+            if (encodedAuthInfo != null) {
+                conn.setRequestProperty("Authorization", "Basic " + 
encodedAuthInfo);
+            }
+            conn.setConnectTimeout(connectTimeoutMs);
+            conn.setReadTimeout(readTimeoutMs);
+
+            stream = (InputStream) conn.getContent();
+            BufferedReader br = new BufferedReader(new 
InputStreamReader(stream));
+
+            String line;
+            while ((line = br.readLine()) != null) {
+                sb.append(line);
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to get result from url: {}", urlStr, e);
+            return null;
+        } finally {
+            if (stream != null) {
+                try {
+                    stream.close();
+                } catch (IOException e) {
+                    LOG.warn("failed to close stream when get result from url: 
{}", urlStr, e);
+                    return null;
+                }
+            }
+        }
+        LOG.debug("get result from url {}: {}", urlStr, sb.toString());
+        return sb.toString();
+    }
 }
 
diff --git a/fe/src/main/java/org/apache/doris/deploy/DeployManager.java 
b/fe/src/main/java/org/apache/doris/deploy/DeployManager.java
index 3640af36..d2b999ba 100644
--- a/fe/src/main/java/org/apache/doris/deploy/DeployManager.java
+++ b/fe/src/main/java/org/apache/doris/deploy/DeployManager.java
@@ -17,8 +17,8 @@
 
 package org.apache.doris.deploy;
 
-import org.apache.doris.catalog.BrokerMgr.BrokerAddress;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
@@ -408,17 +408,17 @@ protected void runOneCycle() {
                     break BROKER_BLOCK;
                 }
 
-                Map<String, List<BrokerAddress>> localBrokers = 
catalog.getBrokerMgr().getAddressListMap();
+                Map<String, List<FsBroker>> localBrokers = 
catalog.getBrokerMgr().getBrokerListMap();
 
                 // 1. find missing brokers
-                for (Map.Entry<String, List<BrokerAddress>> entry : 
localBrokers.entrySet()) {
+                for (Map.Entry<String, List<FsBroker>> entry : 
localBrokers.entrySet()) {
                     String brokerName = entry.getKey();
                     if (remoteBrokerHosts.containsKey(brokerName)) {
-                        List<BrokerAddress> localList = entry.getValue();
+                        List<FsBroker> localList = entry.getValue();
                         List<Pair<String, Integer>> remoteList = 
remoteBrokerHosts.get(brokerName);
 
                         // 1.1 found missing broker host
-                        for (BrokerAddress addr : localList) {
+                        for (FsBroker addr : localList) {
                             Pair<String, Integer> foundHost = 
getHostFromPairList(remoteList, addr.ip, addr.port);
                             if (foundHost == null) {
                                 List<Pair<String, Integer>> list = 
Lists.newArrayList();
@@ -437,7 +437,7 @@ protected void runOneCycle() {
 
                         // 1.2 add new broker host
                         for (Pair<String, Integer> pair : remoteList) {
-                            BrokerAddress foundAddr = 
getHostFromBrokerAddrs(localList, pair.first, pair.second);
+                            FsBroker foundAddr = 
getHostFromBrokerAddrs(localList, pair.first, pair.second);
                             if (foundAddr == null) {
                                 // add new broker
                                 List<Pair<String, Integer>> list = 
Lists.newArrayList();
@@ -484,9 +484,9 @@ protected void runOneCycle() {
         }
     }
 
-    private BrokerAddress getHostFromBrokerAddrs(List<BrokerAddress> addrList,
+    private FsBroker getHostFromBrokerAddrs(List<FsBroker> addrList,
             String ip, Integer port) {
-        for (BrokerAddress brokerAddress : addrList) {
+        for (FsBroker brokerAddress : addrList) {
             if (brokerAddress.ip.equals(ip) && brokerAddress.port == port) {
                 return brokerAddress;
             }
diff --git 
a/fe/src/main/java/org/apache/doris/deploy/impl/AmbariDeployManager.java 
b/fe/src/main/java/org/apache/doris/deploy/impl/AmbariDeployManager.java
index be256fc4..65bc3789 100644
--- a/fe/src/main/java/org/apache/doris/deploy/impl/AmbariDeployManager.java
+++ b/fe/src/main/java/org/apache/doris/deploy/impl/AmbariDeployManager.java
@@ -20,6 +20,7 @@
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.Util;
 import org.apache.doris.deploy.DeployManager;
 import org.apache.doris.system.SystemInfoService;
 
@@ -34,12 +35,6 @@
 import org.json.JSONArray;
 import org.json.JSONObject;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.net.URLConnection;
 import java.util.List;
 import java.util.Map;
 
@@ -191,7 +186,7 @@ protected boolean init() {
         super.init();
 
         // get blueprint once.
-        blueprintJson = getResultForUrl(blueprintUrl);
+        blueprintJson = Util.getResultForUrl(blueprintUrl, encodedAuthInfo, 
2000, 2000);
         if (blueprintJson == null) {
             return false;
         }
@@ -294,7 +289,7 @@ private Integer getPort(String configNodeName, String 
portName) {
 
     private List<String> getHostnamesFromComponentsJson(String componentName) {
         String urlStr = String.format(URL_COMPONENTS, ambariUrl, clusterName, 
serviceName, componentName);
-        String componentsJson = getResultForUrl(urlStr);
+        String componentsJson = Util.getResultForUrl(urlStr, encodedAuthInfo, 
2000, 2000);
         if (componentsJson == null) {
             return null;
         }
@@ -316,40 +311,6 @@ private Integer getPort(String configNodeName, String 
portName) {
         return hostnames;
     }
 
-    private String getResultForUrl(String urlStr) {
-        StringBuilder sb = new StringBuilder();
-        InputStream stream = null;
-        try {
-            URL url = new URL(urlStr);
-            URLConnection conn = url.openConnection();
-            conn.setRequestProperty("Authorization", "Basic " + 
encodedAuthInfo);
-            conn.setConnectTimeout(2 * 1000);
-            conn.setReadTimeout(2 * 1000);
-
-            stream = (InputStream) conn.getContent();
-            BufferedReader br = new BufferedReader(new 
InputStreamReader(stream));
-
-            String line;
-            while ((line = br.readLine()) != null) {
-                sb.append(line);
-            }
-        } catch (Exception e) {
-            LOG.warn("failed to get result from url: {}", urlStr, e);
-            return null;
-        } finally {
-            if (stream != null) {
-                try {
-                    stream.close();
-                } catch (IOException e) {
-                    LOG.warn("failed to close stream when get result from url: 
{}", urlStr, e);
-                    return null;
-                }
-            }
-        }
-        LOG.debug("get result from url {}: {}", urlStr, sb.toString());
-        return sb.toString();
-    }
-
     private String getPropertyFromBlueprint(String configNodeName, String 
propName) {
         Preconditions.checkNotNull(blueprintJson);
         String resProp = null;
diff --git 
a/fe/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java 
b/fe/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java
index 95cdb733..61fa6ce4 100644
--- a/fe/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java
+++ b/fe/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java
@@ -18,22 +18,31 @@
 package org.apache.doris.http.rest;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.http.ActionController;
 import org.apache.doris.http.BaseRequest;
 import org.apache.doris.http.BaseResponse;
 import org.apache.doris.http.IllegalArgException;
 
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+
 import io.netty.handler.codec.http.HttpMethod;
 
 /*
  * fe_host:fe_http_port/api/bootstrap
  * return:
- * {"status":"OK","msg":"Success"}
+ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, 
"rpcPort"=9001}
  * {"status":"FAILED","msg":"err info..."}
  */
 public class BootstrapFinishAction extends RestBaseAction {
-    public static final String HOST_PORTS = "host_ports";
+    private static final String CLUSTER_ID = "cluster_id";
+    private static final String TOKEN = "token";
+
+    public static final String REPLAYED_JOURNAL_ID = "replayedJournalId";
+    public static final String QUERY_PORT = "queryPort";
+    public static final String RPC_PORT = "rpcPort";
 
     public BootstrapFinishAction(ActionController controller) {
         super(controller);
@@ -48,11 +57,45 @@ public void execute(BaseRequest request, BaseResponse 
response) throws DdlExcept
         boolean canRead = Catalog.getInstance().canRead();
 
         // to json response
-        RestBaseResult result = null;
+        BootstrapResult result = null;
         if (canRead) {
-            result = RestBaseResult.getOk();
+            result = new BootstrapResult();
+            String clusterIdStr = request.getSingleParameter(CLUSTER_ID);
+            String token = request.getSingleParameter(TOKEN);
+            if (!Strings.isNullOrEmpty(clusterIdStr) && 
!Strings.isNullOrEmpty(token)) {
+                // cluster id or token is provided, return more info
+                int clusterId = 0;
+                try {
+                    clusterId = Integer.valueOf(clusterIdStr);
+                } catch (NumberFormatException e) {
+                    result.status = ActionStatus.FAILED;
+                    result.msg = "invalid cluster id format: " + clusterIdStr;
+                }
+
+                if (result.status == ActionStatus.OK) {
+                    if (clusterId != Catalog.getInstance().getClusterId()) {
+                        result.status = ActionStatus.FAILED;
+                        result.msg = "invalid cluster id: " + clusterId;
+                    }
+                }
+
+                if (result.status == ActionStatus.OK) {
+                    if (!token.equals(Catalog.getInstance().getToken())) {
+                        result.status = ActionStatus.FAILED;
+                        result.msg = "invalid cluster id: " + clusterId;
+                    }
+                }
+
+                if (result.status == ActionStatus.OK) {
+                    // cluster id and token are valid, return replayed journal 
id
+                    long replayedJournalId = 
Catalog.getInstance().getReplayedJournalId();
+                    result.setMaxReplayedJournal(replayedJournalId);
+                    result.setQueryPort(Config.query_port);
+                    result.setRpcPort(Config.rpc_port);
+                }
+            }
         } else {
-            result = new RestBaseResult("unfinished");
+            result = new BootstrapResult("unfinished");
         }
 
         // send result
@@ -60,4 +103,48 @@ public void execute(BaseRequest request, BaseResponse 
response) throws DdlExcept
         response.getContent().append(result.toJson());
         sendResult(request, response);
     }
+
+    public static class BootstrapResult extends RestBaseResult {
+        private long replayedJournalId = 0;
+        private int queryPort = 0;
+        private int rpcPort = 0;
+
+        public BootstrapResult() {
+            super();
+        }
+
+        public BootstrapResult(String msg) {
+            super(msg);
+        }
+
+        public void setMaxReplayedJournal(long replayedJournalId) {
+            this.replayedJournalId = replayedJournalId;
+        }
+
+        public long getMaxReplayedJournal() {
+            return replayedJournalId;
+        }
+
+        public void setQueryPort(int queryPort) {
+            this.queryPort = queryPort;
+        }
+
+        public int getQueryPort() {
+            return queryPort;
+        }
+
+        public void setRpcPort(int rpcPort) {
+            this.rpcPort = rpcPort;
+        }
+
+        public int getRpcPort() {
+            return rpcPort;
+        }
+
+        @Override
+        public String toJson() {
+            Gson gson = new Gson();
+            return gson.toJson(this);
+        }
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
index 1a304cd9..79782e95 100644
--- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -50,6 +50,7 @@
 import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
 import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.HbPackage;
 import org.apache.doris.persist.ModifyPartitionInfo;
 import org.apache.doris.persist.OperationType;
 import org.apache.doris.persist.PartitionPersistInfo;
@@ -57,8 +58,8 @@
 import org.apache.doris.persist.RecoverInfo;
 import org.apache.doris.persist.ReplicaPersistInfo;
 import org.apache.doris.persist.TableInfo;
-import org.apache.doris.persist.TruncateTableInfo;
 import org.apache.doris.persist.TablePropertyInfo;
+import org.apache.doris.persist.TruncateTableInfo;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
@@ -388,6 +389,11 @@ public void readFields(DataInput in) throws IOException {
                 data = new TablePropertyInfo();
                 break;
             }
+            case OperationType.OP_HEARTBEAT: {
+                data = HbPackage.read(in);
+                needRead = false;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 42dadf9e..d9aab0c2 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -601,13 +601,11 @@ public static void loadJournal(Catalog catalog, 
JournalEntity journal) {
                     
catalog.getBackupHandler().getRepoMgr().removeRepo(repoName, true);
                     break;
                 }
-
                 case OperationType.OP_TRUNCATE_TABLE: {
                     TruncateTableInfo info = (TruncateTableInfo) 
journal.getData();
                     catalog.replayTruncateTable(info);
                     break;
                 }
-
                 case OperationType.OP_COLOCATE_ADD_TABLE: {
                     final ColocatePersistInfo info = (ColocatePersistInfo) 
journal.getData();
                     
catalog.getColocateTableIndex().replayAddTableToGroup(info);
@@ -636,8 +634,13 @@ public static void loadJournal(Catalog catalog, 
JournalEntity journal) {
                 case OperationType.OP_MODIFY_TABLE_COLOCATE: {
                     final TablePropertyInfo info = (TablePropertyInfo) 
journal.getData();
                     catalog.replayModifyTableColocate(info);
+                    break;
+                }
+                case OperationType.OP_HEARTBEAT: {
+                    final HbPackage hbPackage = (HbPackage) journal.getData();
+                    Catalog.getCurrentHeartbeatMgr().replayHearbeat(hbPackage);
+                    break;
                 }
-
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1120,4 +1123,8 @@ public void logColocateMarkStable(ColocatePersistInfo 
info) {
     public void logModifyTableColocate(TablePropertyInfo info) {
         logEdit(OperationType.OP_MODIFY_TABLE_COLOCATE, info);
     }
+
+    public void logHeartbeat(HbPackage hbPackage) {
+        logEdit(OperationType.OP_HEARTBEAT, hbPackage);
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/persist/HbPackage.java 
b/fe/src/main/java/org/apache/doris/persist/HbPackage.java
new file mode 100644
index 00000000..a608f95f
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/persist/HbPackage.java
@@ -0,0 +1,69 @@
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.system.HeartbeatResponse;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+public class HbPackage implements Writable {
+
+    private List<HeartbeatResponse> hbResults = Lists.newArrayList();
+
+    public HbPackage() {
+
+    }
+
+    public void addHbResponse(HeartbeatResponse result) {
+        hbResults.add(result);
+    }
+
+    public List<HeartbeatResponse> getHbResults() {
+        return hbResults;
+    }
+
+    public static HbPackage read(DataInput in) throws IOException {
+        HbPackage hbPackage = new HbPackage();
+        hbPackage.readFields(in);
+        return hbPackage;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(hbResults.size());
+        for (HeartbeatResponse heartbeatResult : hbResults) {
+            heartbeatResult.write(out);
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            HeartbeatResponse result = HeartbeatResponse.read(in);
+            hbResults.add(result);
+        }
+    }
+
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/src/main/java/org/apache/doris/persist/OperationType.java
index 6924b6dc..b7637542 100644
--- a/fe/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java
@@ -94,7 +94,7 @@
     public static final short OP_ADD_FIRST_FRONTEND = 56;
     public static final short OP_REMOVE_FRONTEND = 57;
     public static final short OP_SET_LOAD_ERROR_URL = 58;
-
+    public static final short OP_HEARTBEAT = 59;
     public static final short OP_ALTER_ACCESS_RESOURCE = 60;
     @Deprecated
     public static final short OP_DROP_USER = 61;
diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java 
b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 6c1a47b9..02fa6c2b 100644
--- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -20,7 +20,6 @@
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BinaryPredicate;
 import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.CastExpr;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.FunctionCallExpr;
@@ -31,10 +30,10 @@
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.BrokerTable;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
@@ -430,14 +429,14 @@ private TScanRangeLocations 
newLocations(TBrokerScanRangeParams params, String b
         // TODO(zc):
         int numBroker = Math.min(3, numBe);
         for (int i = 0; i < numBroker; ++i) {
-            BrokerMgr.BrokerAddress brokerAddress = null;
+            FsBroker broker = null;
             try {
-                brokerAddress = Catalog.getInstance().getBrokerMgr().getBroker(
+                broker = Catalog.getInstance().getBrokerMgr().getBroker(
                         brokerName, candidateBes.get(i).getHost());
             } catch (AnalysisException e) {
                 throw new UserException(e.getMessage());
             }
-            brokerScanRange.addToBroker_addresses(new 
TNetworkAddress(brokerAddress.ip, brokerAddress.port));
+            brokerScanRange.addToBroker_addresses(new 
TNetworkAddress(broker.ip, broker.port));
         }
 
         // Scan range
diff --git a/fe/src/main/java/org/apache/doris/planner/ExportSink.java 
b/fe/src/main/java/org/apache/doris/planner/ExportSink.java
index 1914bb41..41c0f65b 100644
--- a/fe/src/main/java/org/apache/doris/planner/ExportSink.java
+++ b/fe/src/main/java/org/apache/doris/planner/ExportSink.java
@@ -18,8 +18,8 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
@@ -66,9 +66,9 @@ protected TDataSink toThrift() {
         TDataSink result = new TDataSink(TDataSinkType.EXPORT_SINK);
         TExportSink tExportSink = new TExportSink(TFileType.FILE_BROKER, 
exportPath, columnSeparator, lineDelimiter);
 
-        BrokerMgr.BrokerAddress brokerAddress = 
Catalog.getInstance().getBrokerMgr().getAnyBroker(brokerDesc.getName());
-        if (brokerAddress != null) {
-            tExportSink.addToBroker_addresses(new 
TNetworkAddress(brokerAddress.ip, brokerAddress.port));
+        FsBroker broker = 
Catalog.getInstance().getBrokerMgr().getAnyBroker(brokerDesc.getName());
+        if (broker != null) {
+            tExportSink.addToBroker_addresses(new TNetworkAddress(broker.ip, 
broker.port));
         }
         tExportSink.setProperties(brokerDesc.getProperties());
 
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 47b4d708..97771f34 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -759,6 +759,5 @@ private TExecPlanFragmentParams 
streamLoadPutImpl(TStreamLoadPutRequest request)
             db.readUnlock();
         }
     }
-
 }
 
diff --git a/fe/src/main/java/org/apache/doris/system/Backend.java 
b/fe/src/main/java/org/apache/doris/system/Backend.java
index 547de81f..b7b26e64 100644
--- a/fe/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/src/main/java/org/apache/doris/system/Backend.java
@@ -17,10 +17,6 @@
 
 package org.apache.doris.system;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.EventBus;
-
 import org.apache.doris.alter.DecommissionBackendJob.DecommissionType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.DiskInfo;
@@ -29,8 +25,12 @@
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.metric.MetricRepo;
-import org.apache.doris.system.BackendEvent.BackendEventType;
+import org.apache.doris.system.HeartbeatResponse.HbStatus;
 import org.apache.doris.thrift.TDisk;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -153,12 +153,8 @@ public String getHeartbeatErrMsg() {
         return heartbeatErrMsg;
     }
 
-    // back compatible with unit test
+    // for test only
     public void updateOnce(int bePort, int httpPort, int beRpcPort) {
-        updateOnce(bePort, httpPort, beRpcPort, -1);
-    }
-
-    public void updateOnce(int bePort, int httpPort, int beRpcPort, int 
brpcPort) {
         boolean isChanged = false;
         if (this.bePort.get() != bePort) {
             isChanged = true;
@@ -175,11 +171,6 @@ public void updateOnce(int bePort, int httpPort, int 
beRpcPort, int brpcPort) {
             this.beRpcPort.set(beRpcPort);
         }
 
-        if (this.brpcPort.get() != brpcPort) {
-            isChanged = true;
-            this.brpcPort.set(brpcPort);
-        }
-
         long currentTime = System.currentTimeMillis();
         this.lastUpdateMs.set(currentTime);
         if (!isAlive.get()) {
@@ -204,20 +195,6 @@ public boolean setDecommissioned(boolean isDecommissioned) 
{
         return false;
     }
 
-    public void setBad(EventBus eventBus, String errMsg) {
-        if (isAlive.compareAndSet(true, false)) {
-            Catalog.getInstance().getEditLog().logBackendStateChange(this);
-            LOG.warn("{} is dead", this.toString());
-        }
-
-        eventBus.post(new BackendEvent(BackendEventType.BACKEND_DOWN, "missing 
heartbeat", Long.valueOf(id)));
-        // In some case, errMsg is null when catched Exception have no 
message, which can make
-        // `SHOW BACKENDS` return ERROR. We check errMsg here to avoid.
-        if (errMsg != null) {
-            heartbeatErrMsg = errMsg;
-        }
-    }
-
     public void setBackendState(BackendState state) {
         this.backendState.set(state.ordinal());
     }
@@ -549,5 +526,48 @@ public DecommissionType getDecommissionType() {
         return DecommissionType.SystemDecommission;
     }
 
+    /*
+     * handle Backend's heartbeat response.
+     * return true if any port changed, or alive state is changed.
+     */
+    public boolean handleHbResponse(BackendHbResponse hbResponse) {
+        boolean isChanged = false;
+        if (hbResponse.getStatus() == HbStatus.OK) {
+            if (this.bePort.get() != hbResponse.getBePort()) {
+                isChanged = true;
+                this.bePort.set(hbResponse.getBePort());
+            }
+
+            if (this.httpPort.get() != hbResponse.getHttpPort()) {
+                isChanged = true;
+                this.httpPort.set(hbResponse.getHttpPort());
+            }
+
+            if (this.brpcPort.get() != hbResponse.getBrpcPort()) {
+                isChanged = true;
+                this.brpcPort.set(hbResponse.getBrpcPort());
+            }
+
+            this.lastUpdateMs.set(hbResponse.getHbTime());
+            if (!isAlive.get()) {
+                isChanged = true;
+                this.lastStartTime.set(hbResponse.getHbTime());
+                LOG.info("{} is alive,", this.toString());
+                this.isAlive.set(true);
+            }
+
+            heartbeatErrMsg = "";
+        } else {
+            if (isAlive.compareAndSet(true, false)) {
+                isChanged = true;
+                LOG.info("{} is dead,", this.toString());
+            }
+
+            heartbeatErrMsg = hbResponse.getMsg();
+        }
+
+        return isChanged;
+    }
+
 }
 
diff --git a/fe/src/main/java/org/apache/doris/system/BackendHbResponse.java 
b/fe/src/main/java/org/apache/doris/system/BackendHbResponse.java
new file mode 100644
index 00000000..43b2185d
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/system/BackendHbResponse.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.common.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/*
+ * Backend heartbeat response contains Backend's be port, http port and brpc 
port
+ */
+public class BackendHbResponse extends HeartbeatResponse implements Writable {
+    private long beId;
+    private int bePort;
+    private int httpPort;
+    private int brpcPort;
+
+    public BackendHbResponse() {
+        super(HeartbeatResponse.Type.BACKEND);
+    }
+
+    public BackendHbResponse(long beId, int bePort, int httpPort, int 
brpcPort, long hbTime) {
+        super(HeartbeatResponse.Type.BACKEND);
+        this.beId = beId;
+        this.status = HbStatus.OK;
+        this.bePort = bePort;
+        this.httpPort = httpPort;
+        this.brpcPort = brpcPort;
+        this.hbTime = hbTime;
+    }
+
+    public BackendHbResponse(long beId, String errMsg) {
+        super(HeartbeatResponse.Type.BACKEND);
+        this.status = HbStatus.BAD;
+        this.beId = beId;
+        this.msg = errMsg;
+    }
+
+    public long getBeId() {
+        return beId;
+    }
+
+    public int getBePort() {
+        return bePort;
+    }
+
+    public int getHttpPort() {
+        return httpPort;
+    }
+
+    public int getBrpcPort() {
+        return brpcPort;
+    }
+
+    public static BackendHbResponse read(DataInput in) throws IOException {
+        BackendHbResponse result = new BackendHbResponse();
+        result.readFields(in);
+        return result;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+        out.writeLong(beId);
+        out.writeInt(bePort);
+        out.writeInt(httpPort);
+        out.writeInt(brpcPort);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        beId = in.readLong();
+        bePort = in.readInt();
+        httpPort = in.readInt();
+        brpcPort = in.readInt();
+    }
+
+}
diff --git a/fe/src/main/java/org/apache/doris/system/BrokerHbResponse.java 
b/fe/src/main/java/org/apache/doris/system/BrokerHbResponse.java
new file mode 100644
index 00000000..86ddc0d4
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/system/BrokerHbResponse.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/*
+ * Broker heartbeat response currently contains nothing and the heartbeat 
status
+ */
+public class BrokerHbResponse extends HeartbeatResponse implements Writable {
+
+    private String name;
+    private String host;
+    private int port;
+
+    public BrokerHbResponse() {
+        super(HeartbeatResponse.Type.BROKER);
+    }
+
+    public BrokerHbResponse(String name, String host, int port, long hbTime) {
+        super(HeartbeatResponse.Type.BROKER);
+        this.status = HbStatus.OK;
+        this.name = name;
+        this.host = host;
+        this.port = port;
+        this.hbTime = hbTime;
+    }
+
+    public BrokerHbResponse(String name, String host, int port, String errMsg) 
{
+        super(HeartbeatResponse.Type.BROKER);
+        this.status = HbStatus.OK;
+        this.name = name;
+        this.host = host;
+        this.port = port;
+        this.msg = errMsg;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public static BrokerHbResponse read(DataInput in) throws IOException {
+        BrokerHbResponse result = new BrokerHbResponse();
+        result.readFields(in);
+        return result;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+        Text.writeString(out, name);
+        Text.writeString(out, host);
+        out.writeInt(port);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        name = Text.readString(in);
+        host = Text.readString(in);
+        port = in.readInt();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(super.toString());
+        sb.append(", name: ").append(name);
+        sb.append(", host: ").append(host);
+        sb.append(", port: ").append(port);
+        return sb.toString();
+    }
+
+}
diff --git a/fe/src/main/java/org/apache/doris/system/Frontend.java 
b/fe/src/main/java/org/apache/doris/system/Frontend.java
index 4fbb1212..32f7a8f2 100644
--- a/fe/src/main/java/org/apache/doris/system/Frontend.java
+++ b/fe/src/main/java/org/apache/doris/system/Frontend.java
@@ -22,6 +22,7 @@
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.system.HeartbeatResponse.HbStatus;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -33,11 +34,16 @@
     private String nodeName;
     private String host;
     private int editLogPort;
-
-    // We cannot add other ports (http, query, etc...) here,
-    // because we don't these ports when we process ADD FRONTEND stmt.
-    // And there is no such 'Heartbeat' thing between frontends than can sync 
these ports' info.
     
+    private int queryPort;
+    private int rpcPort;
+
+    private long replayedJournalId;
+    private long lastUpdateTime;
+    private String msg;
+
+    private boolean isAlive = false;
+
     public Frontend() {
         role = FrontendNodeType.UNKNOWN;
         host = "";
@@ -71,6 +77,18 @@ public String getNodeName() {
         return nodeName;
     }
 
+    public int getQueryPort() {
+        return queryPort;
+    }
+
+    public int getRpcPort() {
+        return rpcPort;
+    }
+
+    public boolean isAlive() {
+        return isAlive;
+    }
+
     public void setEditLogPort(int editLogPort) {
         this.editLogPort = editLogPort;
     }
@@ -79,6 +97,44 @@ public int getEditLogPort() {
         return this.editLogPort;
     }
 
+    public long getReplayedJournalId() {
+        return replayedJournalId;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public long getLastUpdateTime() {
+        return lastUpdateTime;
+    }
+
+    /*
+     * handle Frontend's heartbeat response.
+     * Because the replayed journal id is very likely to be changed at each 
heartbeat response,
+     * so we simple return true if the heartbeat status is OK.
+     * But if heartbeat status is BAD, only return true if it is the first 
time to transfer from alive to dead.
+     */
+    public boolean handleHbResponse(FrontendHbResponse hbResponse) {
+        boolean isChanged = false;
+        if (hbResponse.getStatus() == HbStatus.OK) {
+            isAlive = true;
+            queryPort = hbResponse.getQueryPort();
+            rpcPort = hbResponse.getRpcPort();
+            replayedJournalId = hbResponse.getReplayedJournalId();
+            lastUpdateTime = hbResponse.getHbTime();
+            msg = "";
+            isChanged = true;
+        } else {
+            if (isAlive) {
+                isAlive = false;
+                isChanged = true;
+            }
+            msg = hbResponse.getMsg();
+        }
+        return isChanged;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, role.name());
diff --git a/fe/src/main/java/org/apache/doris/system/FrontendHbResponse.java 
b/fe/src/main/java/org/apache/doris/system/FrontendHbResponse.java
new file mode 100644
index 00000000..a7d43495
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/system/FrontendHbResponse.java
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/*
+ * Frontend heartbeat response contains Frontend's query port, rpc port and 
current replayed journal id.
+ * (http port is supposed to the same, so no need to be carried on heartbeat 
response)
+ */
+public class FrontendHbResponse extends HeartbeatResponse implements Writable {
+
+    private String name;
+    private int queryPort;
+    private int rpcPort;
+    private long replayedJournalId;
+
+    public FrontendHbResponse() {
+        super(HeartbeatResponse.Type.FRONTEND);
+    }
+
+    public FrontendHbResponse(String name, int queryPort, int rpcPort, long 
replayedJournalId, long hbTime) {
+        super(HeartbeatResponse.Type.FRONTEND);
+        this.status = HbStatus.OK;
+        this.name = name;
+        this.queryPort = queryPort;
+        this.rpcPort = rpcPort;
+        this.replayedJournalId = replayedJournalId;
+        this.hbTime = hbTime;
+    }
+
+    public FrontendHbResponse(String name, String errMsg) {
+        super(HeartbeatResponse.Type.FRONTEND);
+        this.status = HbStatus.BAD;
+        this.name = name;
+        this.msg = errMsg;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public int getQueryPort() {
+        return queryPort;
+    }
+
+    public int getRpcPort() {
+        return rpcPort;
+    }
+
+    public long getReplayedJournalId() {
+        return replayedJournalId;
+    }
+
+    public static FrontendHbResponse read(DataInput in) throws IOException {
+        FrontendHbResponse result = new FrontendHbResponse();
+        result.readFields(in);
+        return result;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+        Text.writeString(out, name);
+        out.writeInt(queryPort);
+        out.writeInt(rpcPort);
+        out.writeLong(replayedJournalId);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        name = Text.readString(in);
+        queryPort = in.readInt();
+        rpcPort = in.readInt();
+        replayedJournalId = in.readLong();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(super.toString());
+        sb.append(", name: ").append(name);
+        sb.append(", queryPort: ").append(queryPort);
+        sb.append(", rpcPort: ").append(rpcPort);
+        sb.append(", replayedJournalId: ").append(replayedJournalId);
+        return sb.toString();
+    }
+
+}
diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
new file mode 100644
index 00000000..7c8d4226
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.Daemon;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.http.rest.BootstrapFinishAction;
+import org.apache.doris.persist.HbPackage;
+import org.apache.doris.system.BackendEvent.BackendEventType;
+import org.apache.doris.system.HeartbeatResponse.HbStatus;
+import org.apache.doris.thrift.HeartbeatService;
+import org.apache.doris.thrift.TBackendInfo;
+import org.apache.doris.thrift.TBrokerOperationStatus;
+import org.apache.doris.thrift.TBrokerOperationStatusCode;
+import org.apache.doris.thrift.TBrokerPingBrokerRequest;
+import org.apache.doris.thrift.TBrokerVersion;
+import org.apache.doris.thrift.THeartbeatResult;
+import org.apache.doris.thrift.TMasterInfo;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPaloBrokerService;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.json.JSONObject;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+/*
+ * Heartbeat manager run as a daemon at a fix interval.
+ * For now, it will send heartbeat to all Frontends, Backends and Brokers
+ */
+public class HeartbeatMgr extends Daemon {
+    private static final Logger LOG = LogManager.getLogger(HeartbeatMgr.class);
+
+    private final ExecutorService executor;
+    private SystemInfoService nodeMgr;
+
+    private static volatile AtomicReference<TMasterInfo> masterInfo = new 
AtomicReference<TMasterInfo>();
+
+    public HeartbeatMgr(SystemInfoService nodeMgr) {
+        super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000);
+        this.nodeMgr = nodeMgr;
+        this.executor = Executors.newCachedThreadPool();
+    }
+
+    public void setMaster(String masterHost, int masterPort, int clusterId, 
String token, long epoch) {
+        TMasterInfo tMasterInfo = new TMasterInfo(new 
TNetworkAddress(masterHost, masterPort), clusterId, epoch);
+        tMasterInfo.setToken(token);
+        masterInfo.set(tMasterInfo);
+    }
+
+    /*
+     * At each round:
+     * 1. send heartbeat to all nodes
+     * 2. collect the heartbeat response from all nodes, and handle them
+     */
+    @Override
+    protected void runOneCycle() {
+        List<Future<HeartbeatResponse>> hbResponses = Lists.newArrayList();
+        
+        // send backend heartbeat
+        for (Backend backend : nodeMgr.getIdToBackend().values()) {
+            BackendHeartbeatHandler handler = new 
BackendHeartbeatHandler(backend);
+            hbResponses.add(executor.submit(handler));
+        }
+
+        // send frontend heartbeat
+        List<Frontend> frontends = 
Catalog.getCurrentCatalog().getFrontends(null);
+        String masterFeNodeName = "";
+        for (Frontend frontend : frontends) {
+            if 
(frontend.getHost().equals(masterInfo.get().getNetwork_address().getHostname()))
 {
+                masterFeNodeName = frontend.getNodeName();
+            }
+            FrontendHeartbeatHandler handler = new 
FrontendHeartbeatHandler(frontend,
+                    Catalog.getCurrentCatalog().getClusterId(),
+                    Catalog.getCurrentCatalog().getToken());
+            hbResponses.add(executor.submit(handler));
+        }
+
+        // send broker heartbeat;
+        Map<String, List<FsBroker>> brokerMap = Maps.newHashMap(
+                Catalog.getCurrentCatalog().getBrokerMgr().getBrokerListMap());
+        for (Map.Entry<String, List<FsBroker>> entry : brokerMap.entrySet()) {
+            for (FsBroker brokerAddress : entry.getValue()) {
+                BrokerHeartbeatHandler handler = new 
BrokerHeartbeatHandler(entry.getKey(), brokerAddress,
+                        masterInfo.get().getNetwork_address().getHostname());
+                hbResponses.add(executor.submit(handler));
+            }
+        }
+
+        // collect all heartbeat responses and handle them.
+        // and also we find which node's info is changed, if is changed, we 
need collect them and write
+        // an edit log to synchronize the info to other Frontends
+        HbPackage hbPackage = new HbPackage();
+        for (Future<HeartbeatResponse> future : hbResponses) {
+            boolean isChanged = false;
+            try {
+                // the heartbeat rpc's timeout is 5 seconds, so we will not be 
blocked here very long.
+                HeartbeatResponse response = future.get();
+                LOG.info("get heartbeat response: {}", response);
+                isChanged = handleHbResponse(response, false);
+
+                if (isChanged) {
+                    hbPackage.addHbResponse(response);
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.warn("got exception when doing heartbeat", e);
+                continue;
+            }
+        } // end for all results
+
+        // we also add a 'mocked' master Frontends heartbeat response to 
synchronize master info to other Frontends.
+        hbPackage.addHbResponse(new FrontendHbResponse(masterFeNodeName,
+                Config.query_port, Config.rpc_port, 
Catalog.getCurrentCatalog().getEditLog().getMaxJournalId(),
+                System.currentTimeMillis()));
+
+        // write edit log
+        Catalog.getCurrentCatalog().getEditLog().logHeartbeat(hbPackage);
+    }
+
+    private boolean handleHbResponse(HeartbeatResponse response, boolean 
isReplay) {
+        switch (response.getType()) {
+            case FRONTEND: {
+                FrontendHbResponse hbResponse = (FrontendHbResponse) response;
+                Frontend fe = 
Catalog.getCurrentCatalog().getFeByName(hbResponse.getName());
+                if (fe != null) {
+                    return fe.handleHbResponse(hbResponse);
+                }
+                break;
+            }
+            case BACKEND: {
+                BackendHbResponse hbResponse = (BackendHbResponse) response;
+                Backend be = nodeMgr.getBackend(hbResponse.getBeId());
+                if (be != null) {
+                    boolean isChanged = be.handleHbResponse(hbResponse);
+                    if (hbResponse.getStatus() != HbStatus.OK && !isReplay) {
+                        nodeMgr.getEventBus().post(new 
BackendEvent(BackendEventType.BACKEND_DOWN,
+                                "missing heartbeat", 
Long.valueOf(hbResponse.getBeId())));
+                    }
+                    return isChanged;
+                }
+                break;
+            }
+            case BROKER: {
+                BrokerHbResponse hbResponse = (BrokerHbResponse) response;
+                FsBroker broker = 
Catalog.getCurrentCatalog().getBrokerMgr().getBroker(
+                        hbResponse.getName(), hbResponse.getHost(), 
hbResponse.getPort());
+                if (broker != null) {
+                    return broker.handleHbResponse(hbResponse);
+                }
+                break;
+            }
+            default:
+                break;
+        }
+        return false;
+    }
+
+    // backend heartbeat
+    private class BackendHeartbeatHandler implements 
Callable<HeartbeatResponse> {
+        private Backend backend;
+
+        public BackendHeartbeatHandler(Backend backend) {
+            this.backend = backend;
+        }
+
+        @Override
+        public HeartbeatResponse call() {
+            long backendId = backend.getId();
+            HeartbeatService.Client client = null;
+            TNetworkAddress beAddr = new TNetworkAddress(backend.getHost(), 
backend.getHeartbeatPort());
+            boolean ok = false;
+            try {
+                client = ClientPool.heartbeatPool.borrowObject(beAddr);
+                TMasterInfo copiedMasterInfo = new 
TMasterInfo(masterInfo.get());
+                copiedMasterInfo.setBackend_ip(backend.getHost());
+                THeartbeatResult result = client.heartbeat(copiedMasterInfo);
+
+                ok = true;
+                if (result.getStatus().getStatus_code() == TStatusCode.OK) {
+                    TBackendInfo tBackendInfo = result.getBackend_info();
+                    int bePort = tBackendInfo.getBe_port();
+                    int httpPort = tBackendInfo.getHttp_port();
+                    int beRpcPort = tBackendInfo.getBe_rpc_port();
+                    int brpcPort = -1;
+                    if (tBackendInfo.isSetBrpc_port()) {
+                        brpcPort = tBackendInfo.getBrpc_port();
+                    }
+                    // backend.updateOnce(bePort, httpPort, beRpcPort, 
brpcPort);
+                    return new BackendHbResponse(backendId, bePort, httpPort, 
brpcPort, System.currentTimeMillis());
+                } else {
+                    return new BackendHbResponse(backendId, 
result.getStatus().getError_msgs().isEmpty() ? "Unknown error"
+                            : result.getStatus().getError_msgs().get(0));
+                }
+            } catch (Exception e) {
+                return new BackendHbResponse(backendId,
+                        Strings.isNullOrEmpty(e.getMessage()) ? "got 
exception" : e.getMessage());
+            } finally {
+                if (ok) {
+                    ClientPool.heartbeatPool.returnObject(beAddr, client);
+                } else {
+                    ClientPool.heartbeatPool.invalidateObject(beAddr, client);
+                }
+            }
+        }
+    }
+
+    // frontend heartbeat
+    public static class FrontendHeartbeatHandler implements 
Callable<HeartbeatResponse> {
+        private Frontend fe;
+        private int clusterId;
+        private String token;
+
+        public FrontendHeartbeatHandler(Frontend fe, int clusterId, String 
token) {
+            this.fe = fe;
+            this.clusterId = clusterId;
+            this.token = token;
+        }
+
+        @Override
+        public HeartbeatResponse call() {
+            String url = "http://"; + fe.getHost() + ":" + Config.http_port
+                    + "/api/bootstrap?cluster_id=" + clusterId + "&token=" + 
token;
+            try {
+                String result = Util.getResultForUrl(url, null, 2000, 2000);
+                /*
+                 * return:
+                 * 
{"replayedJournalId":191224,"queryPort":9131,"rpcPort":9121,"status":"OK","msg":"Success"}
+                 * 
{"replayedJournalId":0,"queryPort":0,"rpcPort":0,"status":"FAILED","msg":"unfinished"}
+                 */
+                JSONObject root = new JSONObject(result);
+                String status = root.getString("status");
+                if (!status.equals("OK")) {
+                    return new FrontendHbResponse(fe.getNodeName(), 
root.getString("msg"));
+                } else {
+                    long replayedJournalId = 
root.getLong(BootstrapFinishAction.REPLAYED_JOURNAL_ID);
+                    int queryPort = 
root.getInt(BootstrapFinishAction.QUERY_PORT);
+                    int rpcPort = root.getInt(BootstrapFinishAction.RPC_PORT);
+                    return new FrontendHbResponse(fe.getNodeName(), queryPort, 
rpcPort, replayedJournalId,
+                            System.currentTimeMillis());
+                }
+            } catch (Exception e) {
+                return new FrontendHbResponse(fe.getNodeName(),
+                        Strings.isNullOrEmpty(e.getMessage()) ? "got 
exception" : e.getMessage());
+            }
+        }
+    }
+
+    // broker heartbeat handler
+    public static class BrokerHeartbeatHandler implements 
Callable<HeartbeatResponse> {
+        private String brokerName;
+        private FsBroker broker;
+        private String clientId;
+
+        public BrokerHeartbeatHandler(String brokerName, FsBroker broker, 
String clientId) {
+            this.brokerName = brokerName;
+            this.broker = broker;
+            this.clientId = clientId;
+        }
+
+        @Override
+        public HeartbeatResponse call() {
+            TPaloBrokerService.Client client = null;
+            TNetworkAddress addr = new TNetworkAddress(broker.ip, broker.port);
+            boolean ok = false;
+            try {
+                client = ClientPool.brokerPool.borrowObject(addr);
+                TBrokerPingBrokerRequest request = new 
TBrokerPingBrokerRequest(TBrokerVersion.VERSION_ONE,
+                        clientId);
+                TBrokerOperationStatus status = client.ping(request);
+                ok = true;
+
+                if (status.getStatusCode() != TBrokerOperationStatusCode.OK) {
+                    return new BrokerHbResponse(brokerName, broker.ip, 
broker.port, status.getMessage());
+                } else {
+                    return new BrokerHbResponse(brokerName, broker.ip, 
broker.port, System.currentTimeMillis());
+                }
+
+            } catch (Exception e) {
+                return new BrokerHbResponse(brokerName, broker.ip, broker.port,
+                        Strings.isNullOrEmpty(e.getMessage()) ? "got 
exception" : e.getMessage());
+            } finally {
+                if (ok) {
+                    ClientPool.brokerPool.returnObject(addr, client);
+                } else {
+                    ClientPool.brokerPool.invalidateObject(addr, client);
+                }
+            }
+        }
+    }
+
+    public void replayHearbeat(HbPackage hbPackage) {
+        for (HeartbeatResponse hbResult : hbPackage.getHbResults()) {
+            handleHbResponse(hbResult, true);
+        }
+    }
+
+}
diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatResponse.java 
b/fe/src/main/java/org/apache/doris/system/HeartbeatResponse.java
new file mode 100644
index 00000000..5d20ec25
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/system/HeartbeatResponse.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/*
+ * This the superclass of all kinds of heartbeat response
+ */
+public class HeartbeatResponse implements Writable {
+    public enum Type {
+        FRONTEND,
+        BACKEND,
+        BROKER
+    }
+
+    public enum HbStatus {
+        OK, BAD
+    }
+
+    protected Type type;
+    protected boolean isTypeRead = false;
+
+    protected HbStatus status;
+
+    // msg and hbTime are no need to be synchronized to other Frontends,
+    // and only Master Frontend has these info
+    protected String msg;
+    protected long hbTime;
+
+    public HeartbeatResponse(Type type) {
+        this.type = type;
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public HbStatus getStatus() {
+        return status;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public long getHbTime() {
+        return hbTime;
+    }
+
+    public void setTypeRead(boolean isTypeRead) {
+        this.isTypeRead = isTypeRead;
+    }
+
+    public static HeartbeatResponse read(DataInput in) throws IOException {
+        HeartbeatResponse result = null;
+        Type type = Type.valueOf(Text.readString(in));
+        if (type == Type.FRONTEND) {
+            result = new FrontendHbResponse();
+        } else if (type == Type.BACKEND) {
+            result = new BackendHbResponse();
+        } else if (type == Type.BROKER) {
+            result = new BrokerHbResponse();
+        } else {
+            throw new IOException("Unknown job type: " + type.name());
+        }
+
+        result.setTypeRead(true);
+        result.readFields(in);
+        return result;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, type.name());
+        Text.writeString(out, status.name());
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        if (!isTypeRead) {
+            type = Type.valueOf(Text.readString(in));
+            isTypeRead = true;
+        }
+
+        status = HbStatus.valueOf(Text.readString(in));
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("type: ").append(type.name());
+        sb.append(", status: ").append(status.name());
+        sb.append(", msg: ").append(msg);
+        return sb.toString();
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoObserver.java 
b/fe/src/main/java/org/apache/doris/system/SystemInfoObserver.java
deleted file mode 100644
index fcc97053..00000000
--- a/fe/src/main/java/org/apache/doris/system/SystemInfoObserver.java
+++ /dev/null
@@ -1,49 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.system;
-
-import com.google.common.eventbus.DeadEvent;
-import com.google.common.eventbus.Subscribe;
-
-public abstract class SystemInfoObserver {
-
-    private String name;
-
-    public SystemInfoObserver() {
-    }
-    
-    public SystemInfoObserver(String name) {
-        this.name = name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-    
-    public String getName() {
-        return name;
-    }
-
-    @Subscribe
-    public void listen(DeadEvent deadEvent) {
-        // do nothing
-    }
-
-    @Subscribe
-    public abstract void listen(BackendEvent backendEvent);
-}
diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java
index 5979bfaa..f8e9e36d 100644
--- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -21,21 +21,12 @@
 import org.apache.doris.catalog.Database;
 import org.apache.doris.cluster.Cluster;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.Pair;
-import org.apache.doris.common.util.Daemon;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.system.Backend.BackendState;
 import org.apache.doris.system.BackendEvent.BackendEventType;
-import org.apache.doris.thrift.HeartbeatService;
-import org.apache.doris.thrift.TBackendInfo;
-import org.apache.doris.thrift.THeartbeatResult;
-import org.apache.doris.thrift.TMasterInfo;
-import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TStatusCode;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -63,29 +54,19 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class SystemInfoService extends Daemon {
+public class SystemInfoService {
     private static final Logger LOG = 
LogManager.getLogger(SystemInfoService.class);
 
     public static final String DEFAULT_CLUSTER = "default_cluster";
 
     private volatile AtomicReference<ImmutableMap<Long, Backend>> 
idToBackendRef;
-    private volatile AtomicReference<ImmutableMap<Long, HeartbeatHandler>> 
idToHeartbeatHandlerRef;
-    private volatile AtomicReference<ImmutableMap<Long, AtomicLong>> 
idToReportVersionRef; // no
-                                                                               
            // need
-                                                                               
            // to
-                                                                               
            // persist
-
-    private final ExecutorService executor;
+    private volatile AtomicReference<ImmutableMap<Long, AtomicLong>> 
idToReportVersionRef;
 
     private final EventBus eventBus;
 
-    private static volatile AtomicReference<TMasterInfo> masterInfo = new 
AtomicReference<TMasterInfo>();
-
     // last backend id used by round robin for sequential choosing backends for
     // tablet creation
     private ConcurrentHashMap<String, Long> lastBackendIdForCreationMap;
@@ -106,19 +87,13 @@ public int compare(List<Backend> list1, List<Backend> 
list2) {
                 return 1;
             }
         }
-
     };
 
     public SystemInfoService() {
-        super("cluster info service", FeConstants.heartbeat_interval_second * 
1000);
         idToBackendRef = new AtomicReference<ImmutableMap<Long, 
Backend>>(ImmutableMap.<Long, Backend> of());
-        idToHeartbeatHandlerRef = new AtomicReference<ImmutableMap<Long, 
HeartbeatHandler>>(
-                ImmutableMap.<Long, HeartbeatHandler> of());
         idToReportVersionRef = new AtomicReference<ImmutableMap<Long, 
AtomicLong>>(
                 ImmutableMap.<Long, AtomicLong> of());
 
-        executor = Executors.newCachedThreadPool();
-
         eventBus = new EventBus("backendEvent");
 
         lastBackendIdForCreationMap = new ConcurrentHashMap<String, Long>();
@@ -129,12 +104,6 @@ public EventBus getEventBus() {
         return this.eventBus;
     }
 
-    public void setMaster(String masterHost, int masterPort, int clusterId, 
String token, long epoch) {
-        TMasterInfo tMasterInfo = new TMasterInfo(new 
TNetworkAddress(masterHost, masterPort), clusterId, epoch);
-        tMasterInfo.setToken(token);
-        masterInfo.set(tMasterInfo);
-    }
-
     // for deploy manager
     public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean 
isFree) throws DdlException {
         addBackends(hostPortPairs, isFree, "");
@@ -191,14 +160,6 @@ private void addBackend(String host, int heartbeatPort, 
boolean isFree, String d
         ImmutableMap<Long, AtomicLong> newIdToReportVersion = 
ImmutableMap.copyOf(copiedReportVerions);
         idToReportVersionRef.set(newIdToReportVersion);
 
-        // update idToHeartbeatHandler
-        Map<Long, HeartbeatHandler> copiedHeartbeatHandlersMap = 
Maps.newHashMap(idToHeartbeatHandlerRef.get());
-        TNetworkAddress tNetworkAddress = new 
TNetworkAddress(newBackend.getHost(), newBackend.getHeartbeatPort());
-        HeartbeatHandler heartbeatHandler = new HeartbeatHandler(newBackend, 
tNetworkAddress);
-        copiedHeartbeatHandlersMap.put(newBackend.getId(), heartbeatHandler);
-        ImmutableMap<Long, HeartbeatHandler> newIdToHeartbeatHandler = 
ImmutableMap.copyOf(copiedHeartbeatHandlersMap);
-        idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler);
-
         if (!Strings.isNullOrEmpty(destCluster)) {
          // add backend to destCluster
             setBackendOwner(newBackend, destCluster);
@@ -272,12 +233,6 @@ public void dropBackend(String host, int heartbeatPort) 
throws DdlException {
         ImmutableMap<Long, AtomicLong> newIdToReportVersion = 
ImmutableMap.copyOf(copiedReportVerions);
         idToReportVersionRef.set(newIdToReportVersion);
 
-        // update idToHeartbeatHandler
-        Map<Long, HeartbeatHandler> copiedHeartbeatHandlersMap = 
Maps.newHashMap(idToHeartbeatHandlerRef.get());
-        copiedHeartbeatHandlersMap.remove(droppedBackend.getId());
-        ImmutableMap<Long, HeartbeatHandler> newIdToHeartbeatHandler = 
ImmutableMap.copyOf(copiedHeartbeatHandlersMap);
-        idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler);
-
         // update cluster
         final Cluster cluster = 
Catalog.getInstance().getCluster(droppedBackend.getOwnerClusterName());
         if (null != cluster) {
@@ -297,12 +252,8 @@ public void dropBackend(String host, int heartbeatPort) 
throws DdlException {
     public void dropAllBackend() {
         // update idToBackend
         idToBackendRef.set(ImmutableMap.<Long, Backend> of());
-
         // update idToReportVersion
         idToReportVersionRef.set(ImmutableMap.<Long, AtomicLong> of());
-
-        // update idToHeartbeatHandler
-        idToHeartbeatHandlerRef.set(ImmutableMap.<Long, HeartbeatHandler> 
of());
     }
 
     public Backend getBackend(long backendId) {
@@ -974,19 +925,9 @@ public long loadBackends(DataInputStream dis, long 
checksum) throws IOException
 
     public void clear() {
         this.idToBackendRef = null;
-        this.idToHeartbeatHandlerRef = null;
         this.idToReportVersionRef = null;
     }
 
-    public void registerObserver(SystemInfoObserver observer) {
-        LOG.info("register observer {} {}: ", observer.getName(), 
observer.getClass());
-        this.eventBus.register(observer);
-    }
-
-    public void unregisterObserver(SystemInfoObserver observer) {
-        this.eventBus.unregister(observer);
-    }
-
     public static Pair<String, Integer> validateHostAndPort(String hostPort) 
throws AnalysisException {
         hostPort = hostPort.replaceAll("\\s+", "");
         if (hostPort.isEmpty()) {
@@ -1047,14 +988,6 @@ public void replayAddBackend(Backend newBackend) {
         ImmutableMap<Long, AtomicLong> newIdToReportVersion = 
ImmutableMap.copyOf(copiedReportVerions);
         idToReportVersionRef.set(newIdToReportVersion);
 
-        // update idToHeartbeatHandler
-        Map<Long, HeartbeatHandler> copiedHeartbeatHandlersMap = 
Maps.newHashMap(idToHeartbeatHandlerRef.get());
-        TNetworkAddress tNetworkAddress = new 
TNetworkAddress(newBackend.getHost(), newBackend.getHeartbeatPort());
-        HeartbeatHandler heartbeatHandler = new HeartbeatHandler(newBackend, 
tNetworkAddress);
-        copiedHeartbeatHandlersMap.put(newBackend.getId(), heartbeatHandler);
-        ImmutableMap<Long, HeartbeatHandler> newIdToHeartbeatHandler = 
ImmutableMap.copyOf(copiedHeartbeatHandlersMap);
-        idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler);
-
         // to add be to DEFAULT_CLUSTER
         if (newBackend.getBackendState() == BackendState.using) {
             final Cluster cluster = 
Catalog.getInstance().getCluster(DEFAULT_CLUSTER);
@@ -1082,12 +1015,6 @@ public void replayDropBackend(Backend backend) {
         ImmutableMap<Long, AtomicLong> newIdToReportVersion = 
ImmutableMap.copyOf(copiedReportVerions);
         idToReportVersionRef.set(newIdToReportVersion);
 
-        // update idToHeartbeatHandler
-        Map<Long, HeartbeatHandler> copiedHeartbeatHandlersMap = 
Maps.newHashMap(idToHeartbeatHandlerRef.get());
-        copiedHeartbeatHandlersMap.remove(backend.getId());
-        ImmutableMap<Long, HeartbeatHandler> newIdToHeartbeatHandler = 
ImmutableMap.copyOf(copiedHeartbeatHandlersMap);
-        idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler);
-
         // update cluster
         final Cluster cluster = 
Catalog.getInstance().getCluster(backend.getOwnerClusterName());
         if (null != cluster) {
@@ -1137,66 +1064,6 @@ public void checkClusterCapacity(String clusterName) 
throws DdlException {
         }
     }
 
-    @Override
-    protected void runOneCycle() {
-        ImmutableMap<Long, HeartbeatHandler> idToHeartbeatHandler = 
idToHeartbeatHandlerRef.get();
-        Iterator<HeartbeatHandler> iterator = 
idToHeartbeatHandler.values().iterator();
-        while (iterator.hasNext()) {
-            HeartbeatHandler heartbeatHandler = iterator.next();
-            executor.submit(heartbeatHandler);
-        }
-    }
-
-    private class HeartbeatHandler implements Runnable {
-        private Backend backend;
-        private TNetworkAddress address;
-
-        public HeartbeatHandler(Backend backend, TNetworkAddress 
networkAddress) {
-            this.backend = backend;
-            this.address = networkAddress;
-        }
-
-        @Override
-        public void run() {
-            long backendId = backend.getId();
-            HeartbeatService.Client client = null;
-            boolean ok = false;
-            try {
-                client = ClientPool.heartbeatPool.borrowObject(address);
-                TMasterInfo copiedMasterInfo = new 
TMasterInfo(masterInfo.get());
-                copiedMasterInfo.setBackend_ip(backend.getHost());
-                THeartbeatResult result = client.heartbeat(copiedMasterInfo);
-
-                if (result.getStatus().getStatus_code() == TStatusCode.OK) {
-                    TBackendInfo tBackendInfo = result.getBackend_info();
-                    int bePort = tBackendInfo.getBe_port();
-                    int httpPort = tBackendInfo.getHttp_port();
-                    int beRpcPort = tBackendInfo.getBe_rpc_port();
-                    int brpcPort = -1;
-                    if (tBackendInfo.isSetBrpc_port()) {
-                        brpcPort = tBackendInfo.getBrpc_port();
-                    }
-                    backend.updateOnce(bePort, httpPort, beRpcPort, brpcPort);
-                } else {
-                    LOG.warn("failed to heartbeat backend[" + backendId + "]: 
" + result.getStatus().toString());
-                    backend.setBad(eventBus, 
result.getStatus().getError_msgs().isEmpty() ? "Unknown error"
-                            : result.getStatus().getError_msgs().get(0));
-                }
-                ok = true;
-                LOG.debug("backend[{}] host: {}, port: {}", backendId, 
backend.getHost(), backend.getHeartbeatPort());
-            } catch (Exception e) {
-                LOG.warn("backend[" + backendId + "] got Exception: ", e);
-                backend.setBad(eventBus, e.getMessage());
-            } finally {
-                if (ok) {
-                    ClientPool.heartbeatPool.returnObject(address, client);
-                } else {
-                    ClientPool.heartbeatPool.invalidateObject(address, client);
-                }
-            }
-        }
-    }
-
     /*
      * Try to randomly get a backend id by given host.
      * If not found, return -1
diff --git a/fe/src/main/java/org/apache/doris/task/DownloadTask.java 
b/fe/src/main/java/org/apache/doris/task/DownloadTask.java
index 3bb5ddec..314c7d29 100644
--- a/fe/src/main/java/org/apache/doris/task/DownloadTask.java
+++ b/fe/src/main/java/org/apache/doris/task/DownloadTask.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.task;
 
-import org.apache.doris.catalog.BrokerMgr.BrokerAddress;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.thrift.TDownloadReq;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TResourceInfo;
@@ -29,11 +29,11 @@
 
     private long jobId;
     private Map<String, String> srcToDestPath;
-    private BrokerAddress brokerAddr;
+    private FsBroker brokerAddr;
     private Map<String, String> brokerProperties;
 
     public DownloadTask(TResourceInfo resourceInfo, long backendId, long 
signature, long jobId, long dbId,
-            Map<String, String> srcToDestPath, BrokerAddress brokerAddr, 
Map<String, String> brokerProperties) {
+            Map<String, String> srcToDestPath, FsBroker brokerAddr, 
Map<String, String> brokerProperties) {
         super(resourceInfo, backendId, TTaskType.DOWNLOAD, dbId, -1, -1, -1, 
-1, signature);
         this.jobId = jobId;
         this.srcToDestPath = srcToDestPath;
@@ -49,7 +49,7 @@ public long getJobId() {
         return srcToDestPath;
     }
 
-    public BrokerAddress getBrokerAddr() {
+    public FsBroker getBrokerAddr() {
         return brokerAddr;
     }
 
diff --git a/fe/src/main/java/org/apache/doris/task/ExportExportingTask.java 
b/fe/src/main/java/org/apache/doris/task/ExportExportingTask.java
index 599a7fa2..99b7a751 100644
--- a/fe/src/main/java/org/apache/doris/task/ExportExportingTask.java
+++ b/fe/src/main/java/org/apache/doris/task/ExportExportingTask.java
@@ -17,13 +17,13 @@
 
 package org.apache.doris.task;
 
-import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.UserException;
 import org.apache.doris.common.Status;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.common.util.RuntimeProfile;
@@ -267,16 +267,16 @@ private void registerProfile() {
     }
 
     private Status moveTmpFiles() {
-        BrokerMgr.BrokerAddress brokerAddress = null;
+        FsBroker broker = null;
         try {
             String localIP = FrontendOptions.getLocalHostAddress();
-            brokerAddress = 
Catalog.getInstance().getBrokerMgr().getBroker(job.getBrokerDesc().getName(), 
localIP);
+            broker = 
Catalog.getInstance().getBrokerMgr().getBroker(job.getBrokerDesc().getName(), 
localIP);
         } catch (AnalysisException e) {
             String failMsg = "get broker failed. msg=" + e.getMessage();
             LOG.warn(failMsg);
             return new Status(TStatusCode.CANCELLED, failMsg);
         }
-        TNetworkAddress address = new TNetworkAddress(brokerAddress.ip, 
brokerAddress.port);
+        TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port);
         TPaloBrokerService.Client client = null;
         try {
             client = ClientPool.brokerPool.borrowObject(address);
diff --git a/fe/src/main/java/org/apache/doris/task/UploadTask.java 
b/fe/src/main/java/org/apache/doris/task/UploadTask.java
index 82646cb0..4dbd310f 100644
--- a/fe/src/main/java/org/apache/doris/task/UploadTask.java
+++ b/fe/src/main/java/org/apache/doris/task/UploadTask.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.task;
 
-import org.apache.doris.catalog.BrokerMgr.BrokerAddress;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TResourceInfo;
 import org.apache.doris.thrift.TTaskType;
@@ -30,15 +30,15 @@
     private long jobId;
 
     private Map<String, String> srcToDestPath;
-    private BrokerAddress brokerAddress;
+    private FsBroker broker;
     private Map<String, String> brokerProperties;
 
     public UploadTask(TResourceInfo resourceInfo, long backendId, long 
signature, long jobId, Long dbId,
-            Map<String, String> srcToDestPath, BrokerAddress brokerAddr, 
Map<String, String> brokerProperties) {
+            Map<String, String> srcToDestPath, FsBroker broker, Map<String, 
String> brokerProperties) {
         super(resourceInfo, backendId, TTaskType.UPLOAD, dbId, -1, -1, -1, -1, 
signature);
         this.jobId = jobId;
         this.srcToDestPath = srcToDestPath;
-        this.brokerAddress = brokerAddr;
+        this.broker = broker;
         this.brokerProperties = brokerProperties;
     }
 
@@ -50,8 +50,8 @@ public long getJobId() {
         return srcToDestPath;
     }
 
-    public BrokerAddress getBrokerAddress() {
-        return brokerAddress;
+    public FsBroker getBrokerAddress() {
+        return broker;
     }
 
     public Map<String, String> getBrokerProperties() {
@@ -59,7 +59,7 @@ public BrokerAddress getBrokerAddress() {
     }
 
     public TUploadReq toThrift() {
-        TNetworkAddress address = new TNetworkAddress(brokerAddress.ip, 
brokerAddress.port);
+        TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port);
         TUploadReq request = new TUploadReq(jobId, srcToDestPath, address);
         request.setBroker_prop(brokerProperties);
         return request;
diff --git a/fe/src/test/java/org/apache/doris/backup/RepositoryTest.java 
b/fe/src/test/java/org/apache/doris/backup/RepositoryTest.java
index ed95cbbe..19b6b4b3 100644
--- a/fe/src/test/java/org/apache/doris/backup/RepositoryTest.java
+++ b/fe/src/test/java/org/apache/doris/backup/RepositoryTest.java
@@ -19,7 +19,7 @@
 
 import org.apache.doris.analysis.ShowRepositoriesStmt;
 import org.apache.doris.catalog.BrokerMgr;
-import org.apache.doris.catalog.BrokerMgr.BrokerAddress;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.service.FrontendOptions;
 
@@ -84,8 +84,8 @@ public void setUp() {
 
         new MockUp<BrokerMgr>() {
             @Mock
-            public BrokerAddress getBroker(String name, String host) throws 
AnalysisException {
-                return new BrokerAddress("10.74.167.16", 8111);
+            public FsBroker getBroker(String name, String host) throws 
AnalysisException {
+                return new FsBroker("10.74.167.16", 8111);
             }
         };
 
diff --git 
a/fe/src/test/java/org/apache/doris/deploy/AmbariDeployManagerTest.java 
b/fe/src/test/java/org/apache/doris/deploy/AmbariDeployManagerTest.java
index f2d0a197..8507c640 100644
--- a/fe/src/test/java/org/apache/doris/deploy/AmbariDeployManagerTest.java
+++ b/fe/src/test/java/org/apache/doris/deploy/AmbariDeployManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.doris.deploy;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.util.Util;
 import org.apache.doris.deploy.impl.AmbariDeployManager;
 
 import org.junit.Before;
@@ -84,20 +85,15 @@ public void getHostTest() throws NoSuchMethodException, 
SecurityException, Illeg
     }
 
     private String getBlueprint() throws NoSuchMethodException, 
IllegalAccessException, InvocationTargetException {
-        Method getResultForUrlM = 
manager.getClass().getDeclaredMethod("getResultForUrl", String.class);
-        getResultForUrlM.setAccessible(true);
-        String res = (String) getResultForUrlM.invoke(manager,
-                                                      
"http://180.76.168.210:8080/api/v1/clusters/BDP?format=blueprint";);
+        String res = 
Util.getResultForUrl("http://180.76.168.210:8080/api/v1/clusters/BDP?format=blueprint";,
+                null, 2000, 2000);
         return res;
     }
 
     private String getComponent(String comp)
             throws NoSuchMethodException, IllegalAccessException, 
InvocationTargetException {
-        Method getResultForUrlM = 
manager.getClass().getDeclaredMethod("getResultForUrl", String.class);
-        getResultForUrlM.setAccessible(true);
-        String res = (String) getResultForUrlM.invoke(manager,
-                                                      
"http://180.76.168.210:8080/api/v1/clusters/BDP/services/PALO/components/";
-                                                              + comp);
+        String res = 
Util.getResultForUrl("http://180.76.168.210:8080/api/v1/clusters/BDP/services/PALO/components/";
+                + comp, null, 2000, 2000);
 
         return res;
     }
diff --git a/fe/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java 
b/fe/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java
new file mode 100644
index 00000000..510fcef1
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.common.GenericPool;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.system.HeartbeatMgr.BrokerHeartbeatHandler;
+import org.apache.doris.system.HeartbeatMgr.FrontendHeartbeatHandler;
+import org.apache.doris.system.HeartbeatResponse.HbStatus;
+import org.apache.doris.thrift.TBrokerOperationStatus;
+import org.apache.doris.thrift.TBrokerPingBrokerRequest;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPaloBrokerService;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+
+public class HeartbeatMgrTest {
+
+    @Before
+    public void setUp() {
+
+    }
+
+    @Test
+    public void testFrontendHbHandler() {
+        new MockUp<Util>() {
+            @Mock
+            public String getResultForUrl(String urlStr, String 
encodedAuthInfo, int connectTimeoutMs,
+                    int readTimeoutMs) {
+                if (urlStr.contains("192.168.1.1")) {
+                    return 
"{\"replayedJournalId\":191224,\"queryPort\":9131,\"rpcPort\":9121,\"status\":\"OK\",\"msg\":\"Success\"}";
+                } else {
+                    return 
"{\"replayedJournalId\":0,\"queryPort\":0,\"rpcPort\":0,\"status\":\"FAILED\",\"msg\":\"unfinished\"}";
+                }
+            }
+        };
+
+        Frontend fe = new Frontend(FrontendNodeType.FOLLOWER, "test", 
"192.168.1.1", 9010);
+        FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(fe, 
12345, "abcd");
+        HeartbeatResponse response = handler.call();
+        
+        Assert.assertTrue(response instanceof FrontendHbResponse);
+        FrontendHbResponse hbResponse = (FrontendHbResponse) response;
+        Assert.assertEquals(191224, hbResponse.getReplayedJournalId());
+        Assert.assertEquals(9121, hbResponse.getRpcPort());
+        Assert.assertEquals(9131, hbResponse.getQueryPort());
+        Assert.assertEquals(HbStatus.OK, hbResponse.getStatus());
+
+        Frontend fe2 = new Frontend(FrontendNodeType.FOLLOWER, "test2", 
"192.168.1.2", 9010);
+        handler = new FrontendHeartbeatHandler(fe2, 12345, "abcd");
+        response = handler.call();
+
+        Assert.assertTrue(response instanceof FrontendHbResponse);
+        hbResponse = (FrontendHbResponse) response;
+        Assert.assertEquals(0, hbResponse.getReplayedJournalId());
+        Assert.assertEquals(0, hbResponse.getRpcPort());
+        Assert.assertEquals(0, hbResponse.getQueryPort());
+        Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus());
+
+    }
+
+    @Test
+    public void testBrokerHbHandler(@Mocked TPaloBrokerService.Client client) 
throws Exception {
+        TBrokerOperationStatus status = new TBrokerOperationStatus();
+
+        new MockUp<GenericPool<TPaloBrokerService.Client>>() {
+            @Mock
+            public TPaloBrokerService.Client borrowObject(TNetworkAddress 
address) throws Exception {
+                return client;
+            }
+
+            @Mock
+            public void returnObject(TNetworkAddress address, 
TPaloBrokerService.Client object) {
+                return;
+            }
+
+            @Mock
+            public void invalidateObject(TNetworkAddress address, 
TPaloBrokerService.Client object) {
+                return;
+            }
+        };
+
+        new NonStrictExpectations() {
+            {
+                client.ping((TBrokerPingBrokerRequest) any);
+                result = status;
+            }
+        };
+
+        FsBroker broker = new FsBroker("192.168.1.1", 8111);
+        BrokerHeartbeatHandler handler = new BrokerHeartbeatHandler("hdfs", 
broker, "abc");
+        HeartbeatResponse response = handler.call();
+
+        Assert.assertTrue(response instanceof BrokerHbResponse);
+        BrokerHbResponse hbResponse = (BrokerHbResponse) response;
+        System.out.println(hbResponse.toString());
+        Assert.assertEquals(HbStatus.OK, hbResponse.getStatus());
+    }
+
+}
diff --git 
a/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java
 
b/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java
index 9cda0297..517671dd 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java
@@ -141,7 +141,8 @@ public void testListPaths() {
         properties.put("username", "user");
         properties.put("password", "passwd");
         
-        List<TBrokerFileStatus> files2 = 
fileSystemManager.listPath(testHdfsHost + "/data/abc/logs/*.out", properties);
+        List<TBrokerFileStatus> files2 = 
fileSystemManager.listPath(testHdfsHost + "/data/abc/logs/*.out",
+                false, properties);
         assertEquals(files2.size(), 2);
     }
     
@@ -168,7 +169,7 @@ public void testOpenFileStream() {
         assertTrue(isPathExist);
         
         // check file size
-        List<TBrokerFileStatus> files = fileSystemManager.listPath(tempFile, 
properties);
+        List<TBrokerFileStatus> files = fileSystemManager.listPath(tempFile, 
false, properties);
         assertEquals(files.size(), 1);
         assertFalse(files.get(0).isDir);
         assertEquals(1256, files.get(0).size);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to