This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 5cd3734089c branch-4.0: [Feature] doris cross-cluster query #57898 
(#58571)
5cd3734089c is described below

commit 5cd3734089c9a693db9289a651b4c20a1ea84ad7
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 9 17:18:59 2025 +0800

    branch-4.0: [Feature] doris cross-cluster query #57898 (#58571)
    
    Cherry-picked from #57898
    
    ---------
    
    Co-authored-by: HonestManXin <[email protected]>
    Co-authored-by: morningman <[email protected]>
---
 .../java/org/apache/doris/catalog/OlapTable.java   |  49 ++-
 .../java/org/apache/doris/catalog/TableIf.java     |   2 +
 .../doris/datasource/ExternalMetaCacheMgr.java     |   9 +
 .../doris/DorisExternalMetaCacheMgr.java           |  83 +++++
 .../doris/datasource/doris/FeServiceClient.java    | 278 ++++++++++++++++
 .../doris/RemoteDorisExternalCatalog.java          |  37 ++-
 .../datasource/doris/RemoteDorisExternalTable.java |  79 ++++-
 .../doris/datasource/doris/RemoteOlapTable.java    | 130 ++++++++
 .../property/constants/RemoteDorisProperties.java  |   4 +
 .../doris/nereids/rules/analysis/BindRelation.java |  18 ++
 .../LogicalOlapScanToPhysicalOlapScan.java         |   3 +-
 .../trees/plans/distribute/DistributePlanner.java  |  14 +
 .../trees/plans/distribute/SelectedWorkers.java    |  20 +-
 .../BackendDistributedPlanWorkerManager.java       |  57 ++--
 .../plans/distribute/worker/BackendWorker.java     |  14 +-
 .../distribute/worker/DistributedPlanWorker.java   |   2 +
 .../worker/DistributedPlanWorkerManager.java       |  12 +-
 .../trees/plans/distribute/worker/DummyWorker.java |   7 +
 .../worker/LoadBalanceScanWorkerSelector.java      |  11 +-
 .../worker/job/AbstractUnassignedScanJob.java      |   7 +-
 .../distribute/worker/job/UnassignedAllBEJob.java  |  22 +-
 .../worker/job/UnassignedGroupCommitJob.java       |   4 +-
 .../job/UnassignedScanBucketOlapTableJob.java      |   6 +-
 .../org/apache/doris/planner/OlapScanNode.java     |  13 +-
 .../java/org/apache/doris/planner/ScanNode.java    |   5 +
 .../org/apache/doris/qe/AbstractJobProcessor.java  |   6 +-
 .../doris/qe/runtime/PipelineExecutionTask.java    |  11 +-
 .../qe/runtime/PipelineExecutionTaskBuilder.java   |   6 +-
 .../apache/doris/service/FrontendServiceImpl.java  | 105 +++++++
 .../main/java/org/apache/doris/system/Backend.java |  12 +
 gensrc/thrift/FrontendService.thrift               |  25 ++
 ...est_query_remote_doris_as_olap_table_select.out |  48 +++
 .../pipeline/external/conf/regression-conf.groovy  |   1 +
 ..._query_remote_doris_as_olap_table_select.groovy | 350 +++++++++++++++++++++
 .../test_remote_doris_all_types_select.groovy      |   8 +-
 .../test_remote_doris_all_types_show.groovy        |   8 +-
 .../remote_doris/test_remote_doris_catalog.groovy  |   8 +-
 .../remote_doris/test_remote_doris_predict.groovy  |   8 +-
 .../remote_doris/test_remote_doris_refresh.groovy  |   8 +-
 .../test_remote_doris_statistics.groovy            |   8 +-
 .../test_remote_doris_table_stats.groovy           |   8 +-
 41 files changed, 1424 insertions(+), 82 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index f3fb4914ff5..33bbff84f01 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -95,6 +95,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
@@ -182,10 +183,10 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
     private PartitionInfo partitionInfo;
     @SerializedName(value = "itp", alternate = {"idToPartition"})
     @Getter
-    private ConcurrentHashMap<Long, Partition> idToPartition = new 
ConcurrentHashMap<>();
+    protected ConcurrentHashMap<Long, Partition> idToPartition = new 
ConcurrentHashMap<>();
     // handled in postgsonprocess
     @Getter
-    private Map<String, Partition> nameToPartition = Maps.newTreeMap();
+    protected Map<String, Partition> nameToPartition = Maps.newTreeMap();
 
     @SerializedName(value = "di", alternate = {"distributionInfo"})
     private DistributionInfo defaultDistributionInfo;
@@ -3698,4 +3699,48 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
                                         
.filter(Index::isAnalyzedInvertedIndex).findFirst().orElse(null);
         }
     }
+
+    /**
+     * caller should acquire the read lock and should not modify any field of 
the return obj
+     */
+    public OlapTable copyTableMeta() {
+        OlapTable table = new OlapTable();
+        // metaobj
+        table.signature = signature;
+        table.lastCheckTime =  lastCheckTime;
+        // abstract table
+        table.id = id;
+        table.name = name;
+        table.qualifiedDbName = qualifiedDbName;
+        table.type = type;
+        table.createTime = createTime;
+        table.fullSchema = fullSchema;
+        table.comment = comment;
+        table.tableAttributes = tableAttributes;
+        // olap table
+        // NOTE: currently do not need temp partitions, colocateGroup, 
autoIncrementGenerator
+        table.idToPartition = new ConcurrentHashMap<>();
+        table.tempPartitions = new TempPartitions();
+
+        table.state = state;
+        table.indexIdToMeta = ImmutableMap.copyOf(indexIdToMeta);
+        table.indexNameToId = ImmutableMap.copyOf(indexNameToId);
+        table.keysType = keysType;
+        table.partitionInfo = partitionInfo;
+        table.defaultDistributionInfo = defaultDistributionInfo;
+        table.bfColumns = bfColumns;
+        table.bfFpp = bfFpp;
+        table.indexes = indexes;
+        table.baseIndexId = baseIndexId;
+        table.tableProperty = tableProperty;
+        return table;
+    }
+
+    public long getCatalogId() {
+        return Env.getCurrentInternalCatalog().getId();
+    }
+
+    public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() throws 
AnalysisException {
+        return Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 1dd34b43cc6..778137d1f73 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -437,6 +437,8 @@ public interface TableIf {
                     return "iceberg";
                 case DICTIONARY:
                     return "dictionary";
+                case DORIS_EXTERNAL_TABLE:
+                    return "External_Doris";
                 default:
                     return null;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index e777285a07f..4932f3aa8f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -22,6 +22,7 @@ import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.datasource.doris.DorisExternalMetaCacheMgr;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@@ -100,6 +101,7 @@ public class ExternalMetaCacheMgr {
     private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
     private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
     private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
+    private final DorisExternalMetaCacheMgr dorisExternalMetaCacheMgr;
 
     public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
         rowCountRefreshExecutor = newThreadPool(isCheckpointCatalog,
@@ -131,6 +133,7 @@ public class ExternalMetaCacheMgr {
         icebergMetadataCacheMgr = new 
IcebergMetadataCacheMgr(commonRefreshExecutor);
         maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
         paimonMetadataCacheMgr = new 
PaimonMetadataCacheMgr(commonRefreshExecutor);
+        dorisExternalMetaCacheMgr = new 
DorisExternalMetaCacheMgr(commonRefreshExecutor);
     }
 
     private ExecutorService newThreadPool(boolean isCheckpointCatalog, int 
numThread, int queueSize,
@@ -219,6 +222,10 @@ public class ExternalMetaCacheMgr {
         return rowCountCache;
     }
 
+    public DorisExternalMetaCacheMgr getDorisExternalMetaCacheMgr() {
+        return dorisExternalMetaCacheMgr;
+    }
+
     public void removeCache(long catalogId) {
         if (cacheMap.remove(catalogId) != null) {
             LOG.info("remove hive metastore cache for catalog {}", catalogId);
@@ -232,6 +239,7 @@ public class ExternalMetaCacheMgr {
         icebergMetadataCacheMgr.removeCache(catalogId);
         maxComputeMetadataCacheMgr.removeCache(catalogId);
         paimonMetadataCacheMgr.removeCache(catalogId);
+        dorisExternalMetaCacheMgr.removeCache(catalogId);
     }
 
     public void invalidateTableCache(ExternalTable dorisTable) {
@@ -288,6 +296,7 @@ public class ExternalMetaCacheMgr {
         icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
         maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
         paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
+        dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId);
         if (LOG.isDebugEnabled()) {
             LOG.debug("invalid catalog cache for {}", catalogId);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/DorisExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/DorisExternalMetaCacheMgr.java
new file mode 100644
index 00000000000..70f92853ccc
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/DorisExternalMetaCacheMgr.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.CacheFactory;
+import org.apache.doris.common.Config;
+import org.apache.doris.system.Backend;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class DorisExternalMetaCacheMgr {
+    private static final Logger LOG = 
LogManager.getLogger(DorisExternalMetaCacheMgr.class);
+    private final LoadingCache<Long, ImmutableMap<Long, Backend>> 
backendsCache;
+
+    public DorisExternalMetaCacheMgr(ExecutorService executor) {
+        CacheFactory cacheFactory = new CacheFactory(
+                
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+                OptionalLong.of(Config.external_cache_refresh_time_minutes * 
60),
+                20,
+                true,
+                null);
+        backendsCache = cacheFactory.buildCache(key -> loadBackends(key), 
executor);
+    }
+
+    private ImmutableMap<Long, Backend> loadBackends(Long catalogId) {
+        RemoteDorisExternalCatalog catalog = (RemoteDorisExternalCatalog) 
Env.getCurrentEnv().getCatalogMgr()
+                .getCatalog(catalogId);
+        List<Backend> backends = catalog.getFeServiceClient().listBackends();
+        if (LOG.isDebugEnabled()) {
+            List<String> names = backends.stream().map(b -> 
b.getAddress()).collect(Collectors.toList());
+            LOG.debug("load backends:{} from:{}", String.join(",", names), 
catalog.getName());
+        }
+        Map<Long, Backend> backendMap = Maps.newHashMap();
+        backends.forEach(backend -> backendMap.put(backend.getId(), backend));
+        return ImmutableMap.copyOf(backendMap);
+    }
+
+    public void removeCache(long catalogId) {
+        backendsCache.invalidate(catalogId);
+    }
+
+    public void invalidateBackendCache(long catalogId) {
+        backendsCache.invalidate(catalogId);
+    }
+
+    public void invalidateCatalogCache(long catalogId) {
+        invalidateBackendCache(catalogId);
+    }
+
+    public ImmutableMap<Long, Backend> getBackends(long catalogId) {
+        ImmutableMap<Long, Backend> backends = backendsCache.get(catalogId);
+        if (backends == null) {
+            return ImmutableMap.of();
+        }
+        return backends;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java
new file mode 100644
index 00000000000..56e28f605c6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TGetBackendMetaRequest;
+import org.apache.doris.thrift.TGetBackendMetaResult;
+import org.apache.doris.thrift.TGetOlapTableMetaRequest;
+import org.apache.doris.thrift.TGetOlapTableMetaResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPartitionMeta;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class FeServiceClient {
+    private static final Logger LOG = 
LogManager.getLogger(FeServiceClient.class);
+
+    private final Random random = new Random(System.currentTimeMillis());
+    private final String name;
+    private final List<TNetworkAddress> addresses;
+    private volatile TNetworkAddress master;
+    private final String user;
+    private final String password;
+    private final int retryCount;
+    private final int timeout;
+
+    public FeServiceClient(String name, List<TNetworkAddress> addresses, 
String user, String password,
+            int retryCount, int timeout) {
+        this.name = name;
+        this.addresses = addresses;
+        this.user = user;
+        this.password = password;
+        this.retryCount = retryCount;
+        this.timeout = timeout;
+    }
+
+    private List<TNetworkAddress> getAddresses() {
+        return addresses;
+    }
+
+    private FrontendService.Client getRemoteFeClient(TNetworkAddress address, 
int timeout) {
+        try {
+            return ClientPool.frontendPool.borrowObject(address, timeout);
+        } catch (Exception e) {
+            String msg = String.format("failed to get remote doris:%s fe 
connection", name);
+            throw new RuntimeException(msg, e);
+        }
+    }
+
+    private void returnClient(TNetworkAddress address, FrontendService.Client 
client, boolean returnObj) {
+        if (returnObj) {
+            ClientPool.frontendPool.returnObject(address, client);
+        } else {
+            ClientPool.frontendPool.invalidateObject(address, client);
+        }
+    }
+
+    private <T> T randomCallWithRetry(ThriftCall<T> call, String errorMsg, int 
timeout) {
+        List<TNetworkAddress> addresses = getAddresses();
+        int retries = 0;
+        Exception lastException = null;
+        while (retries < retryCount) {
+            int index = random.nextInt(addresses.size());
+            FrontendService.Client client = null;
+            for (int i = 0; i < addresses.size() && retries < retryCount; i++) 
{
+                TNetworkAddress address = addresses.get((index + i) % 
addresses.size());
+                client = getRemoteFeClient(address, timeout);
+                boolean returnObj = false;
+                try {
+                    T result = call.call(client);
+                    returnObj = true;
+                    return result;
+                } catch (TException | IOException e) {
+                    lastException = e;
+                    retries++;
+                } catch (Exception e) {
+                    throw new RuntimeException(errorMsg + ":" + 
e.getMessage(), e);
+                } finally {
+                    returnClient(address, client, returnObj);
+                }
+            }
+        }
+        throw new RuntimeException(errorMsg + ":" + 
lastException.getMessage(), lastException);
+    }
+
+    private <T> T callFromMaster(ThriftCall<MasterResult<T>> call, String 
errorMsg, int timeout) {
+        TNetworkAddress address = master;
+        FrontendService.Client client = null;
+        Exception lastException = null;
+        if (address != null) {
+            client = getRemoteFeClient(address, timeout);
+            boolean returnObj = false;
+            try {
+                MasterResult<T> ret = call.call(client);
+                returnObj = true;
+                if (ret.isMaster) {
+                    if (ret.hasError) {
+                        throw new RuntimeException(ret.errorMsg);
+                    }
+                    return ret.result;
+                }
+            } catch (TException | IOException e) {
+                lastException = e;
+            } catch (Exception e) {
+                throw new RuntimeException(errorMsg + ":" + e.getMessage(), e);
+            } finally {
+                returnClient(address, client, returnObj);
+            }
+        }
+        master = null;
+        List<TNetworkAddress> addresses = getAddresses();
+        int retries = 0;
+        while (retries < retryCount) {
+            int index = random.nextInt(addresses.size());
+            for (int i = 0; i < addresses.size() && retries < retryCount; i++) 
{
+                address = addresses.get((index + i) % addresses.size());
+                client = getRemoteFeClient(address, timeout);
+                boolean returnObj = false;
+                try {
+                    MasterResult<T> ret = call.call(client);
+                    returnObj = true;
+                    if (ret.isMaster) {
+                        master = address;
+                        if (ret.hasError) {
+                            throw new RuntimeException(ret.errorMsg);
+                        }
+                        return ret.result;
+                    }
+                } catch (TException | IOException e) {
+                    lastException = e;
+                    retries++;
+                } catch (Exception e) {
+                    throw new RuntimeException(errorMsg + ":" + 
e.getMessage(), e);
+                } finally {
+                    returnClient(address, client, returnObj);
+                }
+            }
+        }
+        throw new RuntimeException(errorMsg + ":" + 
lastException.getMessage(), lastException);
+    }
+
+    public List<Backend> listBackends() {
+        TGetBackendMetaRequest request = new TGetBackendMetaRequest();
+        request.setUser(user);
+        request.setPasswd(password);
+        String msg = String.format("failed to get backends from remote 
doris:%s", name);
+        return callFromMaster(client -> {
+            TGetBackendMetaResult result = client.getBackendMeta(request);
+            if (result.getStatus().getStatusCode() == TStatusCode.NOT_MASTER) {
+                return MasterResult.notMaster();
+            }
+            if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+                return 
MasterResult.masterWithError(result.getStatus().toString());
+            }
+            List<Backend> backends = result.getBackends().stream()
+                    .map(b -> Backend.fromThrift(b))
+                    .collect(Collectors.toList());
+            return MasterResult.withResult(backends);
+        }, msg, timeout);
+    }
+
+    public RemoteOlapTable getOlapTable(String dbName, String table, long 
tableId, List<Partition> partitions) {
+        TGetOlapTableMetaRequest request = new TGetOlapTableMetaRequest();
+        request.setDb(dbName);
+        request.setTable(table);
+        request.setTableId(tableId);
+        request.setUser(user);
+        request.setPasswd(password);
+        request.setVersion(FeConstants.meta_version);
+        for (Partition partition : partitions) {
+            TPartitionMeta meta = new TPartitionMeta();
+            meta.setId(partition.getId());
+            meta.setVisibleVersion(partition.getVisibleVersion());
+            meta.setVisibleVersionTime(partition.getVisibleVersionTime());
+            request.addToPartitions(meta);
+        }
+        String msg = String.format("failed to get table meta from remote 
doris:%s", name);
+        return randomCallWithRetry(client -> {
+            TGetOlapTableMetaResult result = client.getOlapTableMeta(request);
+            if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+                throw new UserException(result.getStatus().toString());
+            }
+            RemoteOlapTable remoteOlapTable = null;
+            try (DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(result.getTableMeta()))) {
+                OlapTable olapTable = OlapTable.read(in);
+                remoteOlapTable = RemoteOlapTable.fromOlapTable(olapTable);
+            }
+            List<Partition> updatedPartitions = new 
ArrayList<>(result.getUpdatedPartitionsSize());
+            if (result.getUpdatedPartitionsSize() > 0) {
+                for (ByteBuffer buffer : result.getUpdatedPartitions()) {
+                    try (ByteArrayInputStream in =
+                            new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.remaining());
+                            DataInputStream dataInputStream = new 
DataInputStream(in)) {
+                        String partitionStr = Text.readString(dataInputStream);
+                        Partition partition = 
GsonUtils.GSON.fromJson(partitionStr, Partition.class);
+                        updatedPartitions.add(partition);
+                    }
+                }
+            }
+            List<Long> removedPartitions = result.getRemovedPartitions();
+            if (removedPartitions == null) {
+                removedPartitions = new ArrayList<>();
+            }
+            remoteOlapTable.rebuildPartitions(partitions, updatedPartitions, 
removedPartitions);
+            return remoteOlapTable;
+        }, msg, timeout);
+    }
+
+    private interface ThriftCall<T> {
+        public T call(FrontendService.Client client) throws Exception;
+    }
+
+    private static class MasterResult<T> {
+        boolean isMaster = true;
+        T result;
+        boolean hasError = false;
+        String errorMsg;
+
+        static <T> MasterResult<T> notMaster() {
+            MasterResult<T> ret = new MasterResult();
+            ret.isMaster = false;
+            return ret;
+        }
+
+        static <T> MasterResult<T> withResult(T result) {
+            MasterResult<T> ret = new MasterResult();
+            ret.isMaster = true;
+            ret.hasError = false;
+            ret.result = result;
+            return ret;
+        }
+
+        // is master but has error code
+        static <T> MasterResult<T> masterWithError(String errorMsg) {
+            MasterResult<T> ret = new MasterResult();
+            ret.isMaster = true;
+            ret.hasError = true;
+            ret.errorMsg = errorMsg;
+            return ret;
+        }
+
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java
index b63a2a03b1f..76d42b70b42 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java
@@ -17,17 +17,20 @@
 
 package org.apache.doris.datasource.doris;
 
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.SessionContext;
 import org.apache.doris.datasource.property.constants.RemoteDorisProperties;
+import org.apache.doris.thrift.TNetworkAddress;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -36,11 +39,14 @@ public class RemoteDorisExternalCatalog extends 
ExternalCatalog {
     private static final Logger LOG = 
LogManager.getLogger(RemoteDorisExternalCatalog.class);
 
     private RemoteDorisRestClient dorisRestClient;
+    private FeServiceClient client;
     private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
+            RemoteDorisProperties.FE_THRIFT_HOSTS,
             RemoteDorisProperties.FE_HTTP_HOSTS,
             RemoteDorisProperties.FE_ARROW_HOSTS,
             RemoteDorisProperties.USER,
-            RemoteDorisProperties.PASSWORD
+            RemoteDorisProperties.PASSWORD,
+            RemoteDorisProperties.USE_ARROW_FLIGHT
     );
 
     /**
@@ -61,6 +67,11 @@ public class RemoteDorisExternalCatalog extends 
ExternalCatalog {
                 throw new DdlException("Required property '" + 
requiredProperty + "' is missing");
             }
         }
+        if (!useArrowFlight() && Config.isCloudMode()) {
+            // TODO we not validate it in cloud mode, so currently not support 
it
+            throw new DdlException("Cloud mode is not supported when "
+                    + RemoteDorisProperties.USE_ARROW_FLIGHT + " is false");
+        }
     }
 
     public List<String> getFeNodes() {
@@ -71,6 +82,19 @@ public class RemoteDorisExternalCatalog extends 
ExternalCatalog {
         return 
parseArrowHosts(catalogProperty.getOrDefault(RemoteDorisProperties.FE_ARROW_HOSTS,
 ""));
     }
 
+    public List<TNetworkAddress> getFeThriftNodes() {
+        String addresses = 
catalogProperty.getOrDefault(RemoteDorisProperties.FE_THRIFT_HOSTS, "");
+        List<TNetworkAddress> tAddresses = new ArrayList<>();
+        for (String address : addresses.split(",")) {
+            int index = address.lastIndexOf(":");
+            String host = address.substring(0, index);
+            int port = Integer.parseInt(address.substring(index + 1));
+            TNetworkAddress thriftAddress = new TNetworkAddress(host, port);
+            tAddresses.add(thriftAddress);
+        }
+        return tAddresses;
+    }
+
     public String getUsername() {
         return catalogProperty.getOrDefault(RemoteDorisProperties.USER, "");
     }
@@ -139,6 +163,11 @@ public class RemoteDorisExternalCatalog extends 
ExternalCatalog {
             "0"));
     }
 
+    public boolean useArrowFlight() {
+        return 
Boolean.parseBoolean(catalogProperty.getOrDefault(RemoteDorisProperties.USE_ARROW_FLIGHT,
+                "true"));
+    }
+
     @Override
     protected void initLocalObjectsImpl() {
         if (isCompatible()) {
@@ -158,6 +187,8 @@ public class RemoteDorisExternalCatalog extends 
ExternalCatalog {
             throw new RuntimeException("Failed to connect to Doris cluster,"
                 + " please check your Doris cluster or your Doris catalog 
configuration.");
         }
+        client = new FeServiceClient(name, getFeThriftNodes(), getUsername(), 
getPassword(),
+                getMetadataSyncRetryCount(), getMetadataReadTimeoutSec());
     }
 
     protected List<String> listDatabaseNames() {
@@ -181,6 +212,10 @@ public class RemoteDorisExternalCatalog extends 
ExternalCatalog {
         return dorisRestClient;
     }
 
+    public FeServiceClient getFeServiceClient() {
+        return client;
+    }
+
     private List<String> parseHttpHosts(String hosts) {
         String[] hostUrls = hosts.trim().split(",");
         fillUrlsWithSchema(hostUrls, enableSsl());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java
index 4b6117aa313..c86d91af92b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java
@@ -18,9 +18,13 @@
 package org.apache.doris.datasource.doris;
 
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.ExternalDatabase;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.statistics.ExternalAnalysisTask;
@@ -28,6 +32,7 @@ import org.apache.doris.thrift.TRemoteDorisTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
+import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -36,9 +41,14 @@ import java.util.Optional;
 
 public class RemoteDorisExternalTable extends ExternalTable {
     private static final Logger LOG = 
LogManager.getLogger(RemoteDorisExternalTable.class);
+    private volatile List<Partition> partitions = Lists.newArrayList();
+    private volatile long tableId = -1;
+    private volatile boolean isSyncOlapTable = false;
+    private volatile RemoteOlapTable remoteOlapTable = null;
+    private volatile Exception lastException = null;
 
     public RemoteDorisExternalTable(long id, String name, String remoteName,
-                                    RemoteDorisExternalCatalog catalog, 
ExternalDatabase db) {
+            RemoteDorisExternalCatalog catalog, ExternalDatabase db) {
         super(id, name, remoteName, catalog, db, 
TableType.DORIS_EXTERNAL_TABLE);
     }
 
@@ -50,6 +60,73 @@ public class RemoteDorisExternalTable extends ExternalTable {
         }
     }
 
+    private RemoteOlapTable getDorisOlapTable() {
+        if (!isSyncOlapTable) {
+            synchronized (this) {
+                if (!isSyncOlapTable) {
+                    try {
+                        isSyncOlapTable = true;
+                        remoteOlapTable = null;
+                        lastException = null; // clear previous exception
+
+                        List<Partition> cachedPartitions = 
Lists.newArrayList(partitions);
+                        RemoteOlapTable olapTable = 
((RemoteDorisExternalCatalog) catalog).getFeServiceClient()
+                                .getOlapTable(dbName, remoteName, tableId, 
cachedPartitions);
+                        olapTable.setCatalog((RemoteDorisExternalCatalog) 
catalog);
+                        olapTable.setDatabase((RemoteDorisExternalDatabase) 
db);
+
+                        // Remove redundant nested synchronized block
+                        tableId = olapTable.getId();
+                        partitions = 
Lists.newArrayList(olapTable.getPartitions());
+
+                        olapTable.setId(id); // change id in case of possible 
conflicts
+                        olapTable.invalidateBackendsIfNeed();
+                        remoteOlapTable = olapTable;
+                    } catch (Exception e) {
+                        // Save exception for waiting threads
+                        lastException = e;
+                        LOG.warn("Failed to get remote doris olap table: 
{}.{}", dbName, remoteName, e);
+                        throw e; // Re-throw the exception
+                    } finally {
+                        isSyncOlapTable = false;
+                        this.notifyAll();
+                    }
+                    return remoteOlapTable;
+                }
+            }
+        }
+
+        synchronized (this) {
+            while (isSyncOlapTable) {
+                try {
+                    this.wait();
+                } catch (InterruptedException e) {
+                    throw new AnalysisException("interrupted while getting 
doris olap table", e);
+                }
+            }
+
+            // If there is a saved exception, throw it with more details
+            if (remoteOlapTable == null) {
+                if (lastException != null) {
+                    throw new AnalysisException(
+                            "failed to get remote doris olap table: " + 
Util.getRootCauseMessage(lastException),
+                            lastException);
+                }
+                throw new AnalysisException("failed to get remote doris olap 
table");
+            }
+            return remoteOlapTable;
+        }
+    }
+
+    public OlapTable getOlapTable() {
+        makeSureInitialized();
+        return getDorisOlapTable();
+    }
+
+    public boolean useArrowFlight() {
+        return ((RemoteDorisExternalCatalog) catalog).useArrowFlight();
+    }
+
     @Override
     public TTableDescriptor toThrift() {
         List<Column> schema = getFullSchema();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java
new file mode 100644
index 00000000000..80089b3a14c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RemoteOlapTable extends OlapTable {
+    private static final Logger LOG = 
LogManager.getLogger(RemoteOlapTable.class);
+
+    private RemoteDorisExternalCatalog catalog;
+    private RemoteDorisExternalDatabase database;
+
+    public RemoteDorisExternalCatalog getCatalog() {
+        return catalog;
+    }
+
+    public void setCatalog(RemoteDorisExternalCatalog catalog) {
+        this.catalog = catalog;
+    }
+
+    @Override
+    public RemoteDorisExternalDatabase getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(RemoteDorisExternalDatabase database) {
+        this.database = database;
+    }
+
+    public static RemoteOlapTable fromOlapTable(OlapTable olapTable) {
+        try {
+            RemoteOlapTable externalOlapTable = new RemoteOlapTable();
+            Class<?> currentClass = olapTable.getClass();
+            while (currentClass != null) {
+                for (Field field : currentClass.getDeclaredFields()) {
+                    if (Modifier.isStatic(field.getModifiers())) {
+                        continue;
+                    }
+                    field.setAccessible(true);
+                    field.set(externalOlapTable, field.get(olapTable));
+                }
+                currentClass = currentClass.getSuperclass();
+            }
+            return externalOlapTable;
+        } catch (Exception e) {
+            throw new RuntimeException("failed to initial external olap 
table", e);
+        }
+    }
+
+    public void rebuildPartitions(List<Partition> oldPartitions, 
List<Partition> updatedPartitions,
+            List<Long> removedPartitions)
+            throws AnalysisException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("rebuildPartitions oldPartitions: " + 
oldPartitions.size() + ", updatedPartitions: "
+                    + updatedPartitions.size() + ", removedPartitions: " + 
removedPartitions.size());
+        }
+        ConcurrentHashMap<Long, Partition> newIdToPartition = new 
ConcurrentHashMap<>();
+        for (Partition oldPartition : oldPartitions) {
+            newIdToPartition.put(oldPartition.getId(), oldPartition);
+        }
+        for (Long removedPartition : removedPartitions) {
+            newIdToPartition.remove(removedPartition);
+        }
+        for (Partition updatedPartition : updatedPartitions) {
+            newIdToPartition.put(updatedPartition.getId(), updatedPartition);
+        }
+        Map<String, Partition> newNameToPartition = Maps.newTreeMap();
+        for (Partition partition : newIdToPartition.values()) {
+            newNameToPartition.put(partition.getName(), partition);
+        }
+        this.idToPartition = newIdToPartition;
+        this.nameToPartition = newNameToPartition;
+    }
+
+    public void invalidateBackendsIfNeed() {
+        ImmutableMap<Long, Backend> backends =
+                
Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr().getBackends(catalog.getId());
+        for (Partition partition : getPartitions()) {
+            for (Tablet tablet : partition.getBaseIndex().getTablets()) {
+                for (long backendId : tablet.getBackendIds()) {
+                    if (!backends.containsKey(backendId)) {
+                        
Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr()
+                                .invalidateBackendCache(catalog.getId());
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public long getCatalogId() {
+        return catalog.getId();
+    }
+
+    public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() {
+        return 
Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr().getBackends(catalog.getId());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java
index c6bceed94cb..54c5051b244 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java
@@ -18,6 +18,7 @@
 package org.apache.doris.datasource.property.constants;
 
 public class RemoteDorisProperties {
+    public static final String FE_THRIFT_HOSTS = "fe_thrift_hosts";
     public static final String FE_HTTP_HOSTS = "fe_http_hosts";
     public static final String FE_ARROW_HOSTS = "fe_arrow_hosts";
 
@@ -26,6 +27,9 @@ public class RemoteDorisProperties {
 
     public static final String ENABLE_PARALLEL_RESULT_SINK = 
"enable_parallel_result_sink";
 
+    // query remote doris use arrow flight or treat it as olap table
+    public static final String USE_ARROW_FLIGHT = "use_arrow_flight";
+
     // Supports older versions of remote Doris; enabling this may introduce 
some inaccuracies in schema parsing.
     public static final String COMPATIBLE = "compatible";
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 2dbfb0f3e42..73d32f3326b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.ExternalView;
+import org.apache.doris.datasource.doris.RemoteDorisExternalTable;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
@@ -469,7 +470,24 @@ public class BindRelation extends OneAnalysisRuleFactory {
                 case MAX_COMPUTE_EXTERNAL_TABLE:
                 case TRINO_CONNECTOR_EXTERNAL_TABLE:
                 case LAKESOUl_EXTERNAL_TABLE:
+                    return new 
LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table,
+                            qualifierWithoutTableName, ImmutableList.of(),
+                            unboundRelation.getTableSample(),
+                            unboundRelation.getTableSnapshot(),
+                            
Optional.ofNullable(unboundRelation.getScanParams()));
                 case DORIS_EXTERNAL_TABLE:
+                    ConnectContext ctx = cascadesContext.getConnectContext();
+                    RemoteDorisExternalTable externalTable = 
(RemoteDorisExternalTable) table;
+                    if (!externalTable.useArrowFlight()) {
+                        if 
(!ctx.getSessionVariable().isEnableNereidsDistributePlanner()) {
+                            // use isEnableNereidsDistributePlanner instead of 
canUseNereidsDistributePlanner
+                            // because it cannot work in explain command
+                            throw new AnalysisException("query remote doris 
only support NereidsDistributePlanner"
+                                    + " when catalog use_arrow_flight is 
false");
+                        }
+                        OlapTable olapTable = externalTable.getOlapTable();
+                        return makeOlapScan(olapTable, unboundRelation, 
qualifierWithoutTableName, cascadesContext);
+                    }
                     return new 
LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table,
                             qualifierWithoutTableName, ImmutableList.of(),
                             unboundRelation.getTableSample(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
index dd7f83ecd07..002956ee17f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
@@ -80,7 +80,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFact
         // rounded robin algorithm. Therefore, the hashDistributedSpec can be 
broken except they are in
         // the same stable colocateGroup(CG)
         boolean isBelongStableCG = 
colocateTableIndex.isColocateTable(olapTable.getId())
-                && 
!colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId()));
+                && 
!colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId()))
+                && olapTable.getCatalogId() == 
Env.getCurrentInternalCatalog().getId();
         boolean isSelectUnpartition = olapTable.getPartitionInfo().getType() 
== PartitionType.UNPARTITIONED
                 || olapScan.getSelectedPartitionIds().size() == 1;
         // TODO: find a better way to handle both tablet num == 1 and colocate 
table together in future
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
index f103c627143..20aa06b9fa0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.distribute;
 
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.nereids.StatementContext;
@@ -39,6 +40,7 @@ import org.apache.doris.planner.DataStreamSink;
 import org.apache.doris.planner.ExchangeNode;
 import org.apache.doris.planner.MultiCastDataSink;
 import org.apache.doris.planner.MultiCastPlanFragment;
+import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.qe.ConnectContext;
@@ -85,6 +87,7 @@ public class DistributePlanner {
         try {
             BackendDistributedPlanWorkerManager workerManager = new 
BackendDistributedPlanWorkerManager(
                             statementContext.getConnectContext(), 
notNeedBackend, isLoadJob);
+            addExternalBackends(workerManager);
             LoadBalanceScanWorkerSelector workerSelector = new 
LoadBalanceScanWorkerSelector(workerManager);
             FragmentIdMapping<UnassignedJob> fragmentJobs
                     = UnassignedJobBuilder.buildJobs(workerSelector, 
statementContext, idToFragments);
@@ -119,6 +122,17 @@ public class DistributePlanner {
         }
     }
 
+    private void addExternalBackends(BackendDistributedPlanWorkerManager 
workerManager) throws AnalysisException {
+        for (PlanFragment planFragment : idToFragments.values()) {
+            List<OlapScanNode> scanNodes = planFragment.getPlanRoot()
+                    .collectInCurrentFragment(OlapScanNode.class::isInstance);
+            for (OlapScanNode scanNode : scanNodes) {
+                workerManager.addBackends(scanNode.getCatalogId(),
+                        scanNode.getOlapTable().getAllBackendsByAllCluster());
+            }
+        }
+    }
+
     private FragmentIdMapping<DistributedPlan> buildDistributePlans(
             Map<PlanFragmentId, UnassignedJob> idToUnassignedJobs,
             ListMultimap<PlanFragmentId, AssignedJob> idToAssignedJobs) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java
index f67cd86891d..52a31351c8c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.distribute;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
@@ -34,7 +35,7 @@ import java.util.Set;
 /** SelectedWorkers */
 public class SelectedWorkers {
     private final DistributedPlanWorkerManager workerManager;
-    private final Map<TNetworkAddress, Long> usedWorkersAddressToBackendID;
+    private final Map<Long, Map<TNetworkAddress, Long>> 
usedWorkersAddressToBackendID;
     private final Set<DistributedPlanWorker> usedWorkers;
 
     public SelectedWorkers(DistributedPlanWorkerManager workerManager) {
@@ -48,7 +49,8 @@ public class SelectedWorkers {
         BackendWorker worker = (BackendWorker) assignedJob.getAssignedWorker();
         if (usedWorkers.add(worker)) {
             Backend backend = worker.getBackend();
-            usedWorkersAddressToBackendID.put(
+            
usedWorkersAddressToBackendID.computeIfAbsent(worker.getCatalogId(), k -> 
Maps.newLinkedHashMap());
+            usedWorkersAddressToBackendID.get(worker.getCatalogId()).put(
                     new TNetworkAddress(backend.getHost(), 
backend.getBePort()), backend.getId()
             );
         }
@@ -56,11 +58,19 @@ public class SelectedWorkers {
 
     /** tryToSelectRandomUsedWorker */
     public DistributedPlanWorker tryToSelectRandomUsedWorker() {
+        long catalogId = Env.getCurrentInternalCatalog().getId();
         if (usedWorkers.isEmpty()) {
-            return workerManager.randomAvailableWorker();
+            return workerManager.randomAvailableWorker(catalogId);
         } else {
-            long id = 
workerManager.randomAvailableWorker(usedWorkersAddressToBackendID);
-            return workerManager.getWorker(id);
+            Map<TNetworkAddress, Long> backendIDs;
+            if (usedWorkersAddressToBackendID.containsKey(catalogId)) {
+                backendIDs = usedWorkersAddressToBackendID.get(catalogId);
+            } else {
+                catalogId = usedWorkers.iterator().next().getCatalogId();
+                backendIDs = usedWorkersAddressToBackendID.get(catalogId);
+            }
+            long id = workerManager.randomAvailableWorker(backendIDs);
+            return workerManager.getWorker(catalogId, id);
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
index 5ca2c4bc6bf..e36165f10b4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
@@ -53,19 +53,26 @@ public class BackendDistributedPlanWorkerManager implements 
DistributedPlanWorke
         DUMMY_BACKEND.setAlive(true);
     }
 
-    private final Supplier<ImmutableMap<Long, Backend>> allClusterBackends = 
Suppliers.memoize(() -> {
-        try {
-            return Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
-        } catch (Exception t) {
-            throw new NereidsException("Can not get backends: " + t, t);
-        }
-    });
+    private final Map<Long, Supplier<ImmutableMap<Long, Backend>>> 
allClusterBackends = Maps.newHashMap();
 
-    private final ImmutableMap<Long, Backend> currentClusterBackends;
+    private final Map<Long, ImmutableMap<Long, Backend>> 
currentClusterBackends;
 
+    /**
+     * Constructor
+     */
     public BackendDistributedPlanWorkerManager(
             ConnectContext context, boolean notNeedBackend, boolean isLoadJob) 
throws UserException {
-        this.currentClusterBackends = checkAndInitClusterBackends(context, 
notNeedBackend, isLoadJob);
+        this.currentClusterBackends = Maps.newHashMap();
+        ImmutableMap<Long, Backend> internalBackends = 
checkAndInitClusterBackends(context, notNeedBackend, isLoadJob);
+        
this.currentClusterBackends.put(Env.getCurrentInternalCatalog().getId(), 
internalBackends);
+        allClusterBackends.put(Env.getCurrentInternalCatalog().getId(),
+                Suppliers.memoize(() -> {
+                    try {
+                        return 
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
+                    } catch (Exception t) {
+                        throw new NereidsException("Can not get backends: " + 
t, t);
+                    }
+                }));
     }
 
     private ImmutableMap<Long, Backend> checkAndInitClusterBackends(
@@ -115,28 +122,38 @@ public class BackendDistributedPlanWorkerManager 
implements DistributedPlanWorke
     }
 
     @Override
-    public DistributedPlanWorker getWorker(long backendId) {
-        ImmutableMap<Long, Backend> backends = this.allClusterBackends.get();
+    public void addBackends(long catalogId, ImmutableMap<Long, Backend> 
backends) {
+        if (!currentClusterBackends.containsKey(catalogId)) {
+            currentClusterBackends.put(catalogId, backends);
+        }
+        if (!allClusterBackends.containsKey(catalogId)) {
+            allClusterBackends.put(catalogId, Suppliers.ofInstance(backends));
+        }
+    }
+
+    @Override
+    public DistributedPlanWorker getWorker(long catalogId, long backendId) {
+        ImmutableMap<Long, Backend> backends = 
this.allClusterBackends.get(catalogId).get();
         Backend backend = backends.get(backendId);
         if (backend == null) {
             throw new IllegalStateException("Backend " + backendId + " is not 
exist");
         }
-        return new BackendWorker(backend);
+        return new BackendWorker(catalogId, backend);
     }
 
     @Override
-    public DistributedPlanWorker getWorker(Backend backend) {
-        return new BackendWorker(backend);
+    public DistributedPlanWorker getWorker(long catalogId, Backend backend) {
+        return new BackendWorker(catalogId, backend);
     }
 
     @Override
-    public DistributedPlanWorker randomAvailableWorker() {
+    public DistributedPlanWorker randomAvailableWorker(long catalogId) {
         try {
             Reference<Long> selectedBackendId = new Reference<>();
-            ImmutableMap<Long, Backend> backends = this.currentClusterBackends;
+            ImmutableMap<Long, Backend> backends = 
this.currentClusterBackends.get(catalogId);
             SimpleScheduler.getHost(backends, selectedBackendId);
             Backend selctedBackend = backends.get(selectedBackendId.getRef());
-            return new BackendWorker(selctedBackend);
+            return new BackendWorker(catalogId, selctedBackend);
         } catch (Exception t) {
             throw new NereidsException("Can not get backends: " + t, t);
         }
@@ -149,17 +166,17 @@ public class BackendDistributedPlanWorkerManager 
implements DistributedPlanWorke
     }
 
     @Override
-    public List<Backend> getAllBackends(boolean needAlive) {
+    public List<Backend> getAllBackends(long catalogId, boolean needAlive) {
         List<Backend> backends = null;
         if (needAlive) {
             backends = Lists.newArrayList();
-            for (Map.Entry<Long, Backend> entry : 
this.allClusterBackends.get().entrySet()) {
+            for (Map.Entry<Long, Backend> entry : 
this.allClusterBackends.get(catalogId).get().entrySet()) {
                 if (entry.getValue().isQueryAvailable()) {
                     backends.add(entry.getValue());
                 }
             }
         } else {
-            backends = 
Lists.newArrayList(this.allClusterBackends.get().values());
+            backends = 
Lists.newArrayList(this.allClusterBackends.get(catalogId).get().values());
         }
         return backends;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
index e76934cf847..99f03be0e6a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
@@ -24,15 +24,22 @@ import java.util.Objects;
 /** BackendWorker */
 public class BackendWorker implements DistributedPlanWorker {
     private final Backend backend;
+    private final long catalogId;
 
-    public BackendWorker(Backend backend) {
+    public BackendWorker(long catalogId, Backend backend) {
         this.backend = Objects.requireNonNull(backend, "backend can not be 
null");
+        this.catalogId = catalogId;
     }
 
     public Backend getBackend() {
         return backend;
     }
 
+    @Override
+    public long getCatalogId() {
+        return catalogId;
+    }
+
     @Override
     public long id() {
         return backend.getId();
@@ -70,7 +77,7 @@ public class BackendWorker implements DistributedPlanWorker {
 
     @Override
     public int hashCode() {
-        return Objects.hash(backend.getId());
+        return Objects.hash(backend.getId(), catalogId);
     }
 
     @Override
@@ -78,7 +85,8 @@ public class BackendWorker implements DistributedPlanWorker {
         if (!(obj instanceof BackendWorker)) {
             return false;
         }
-        return backend.getId() == ((BackendWorker) obj).backend.getId();
+        return backend.getId() == ((BackendWorker) obj).backend.getId()
+                && getCatalogId() == ((BackendWorker) obj).getCatalogId();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java
index 79f8b482d88..8def27bd68e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java
@@ -21,6 +21,8 @@ package 
org.apache.doris.nereids.trees.plans.distribute.worker;
  * DistributedPlanWorker: a worker who can execute the assigned job(instance) 
of the DistributedPlan
  */
 public interface DistributedPlanWorker extends 
Comparable<DistributedPlanWorker> {
+    long getCatalogId();
+
     long id();
 
     // ipv4/ipv6 address
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java
index 35aa1701a1c..1efc024d574 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java
@@ -20,18 +20,22 @@ package 
org.apache.doris.nereids.trees.plans.distribute.worker;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
 
+import com.google.common.collect.ImmutableMap;
+
 import java.util.List;
 import java.util.Map;
 
 /** DistributedPlanWorkerManager */
 public interface DistributedPlanWorkerManager {
-    DistributedPlanWorker getWorker(long backendId);
+    void addBackends(long catalogId, ImmutableMap<Long, Backend> backends);
+
+    DistributedPlanWorker getWorker(long catalogId, long backendId);
 
-    DistributedPlanWorker getWorker(Backend backend);
+    DistributedPlanWorker getWorker(long catalogId, Backend backend);
 
-    DistributedPlanWorker randomAvailableWorker();
+    DistributedPlanWorker randomAvailableWorker(long catalogId);
 
     long randomAvailableWorker(Map<TNetworkAddress, Long> addressToBackendID);
 
-    List<Backend> getAllBackends(boolean needAlive);
+    List<Backend> getAllBackends(long catalogId, boolean needAlive);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java
index 9a7d2f42476..09fbd40d8d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java
@@ -17,12 +17,19 @@
 
 package org.apache.doris.nereids.trees.plans.distribute.worker;
 
+import org.apache.doris.catalog.Env;
+
 /** DummyWorker */
 public class DummyWorker implements DistributedPlanWorker {
     public static final DummyWorker INSTANCE = new DummyWorker();
 
     private DummyWorker() {}
 
+    @Override
+    public long getCatalogId() {
+        return Env.getCurrentInternalCatalog().getId();
+    }
+
     @Override
     public long id() {
         return 0;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java
index b479d24a0c9..4293408fa88 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java
@@ -100,7 +100,7 @@ public class LoadBalanceScanWorkerSelector implements 
ScanWorkerSelector {
             long bytes = getScanRangeSize(scanNode, 
onePartitionOneScanRangeLocation);
 
             WorkerScanRanges assigned = selectScanReplicaAndMinWorkloadWorker(
-                    onePartitionOneScanRangeLocation, bytes, 
orderedScanRangeLocations);
+                    onePartitionOneScanRangeLocation, bytes, 
orderedScanRangeLocations, scanNode.getCatalogId());
             UninstancedScanSource scanRanges = 
workerScanRanges.computeIfAbsent(
                     assigned.worker,
                     w -> new UninstancedScanSource(
@@ -202,7 +202,8 @@ public class LoadBalanceScanWorkerSelector implements 
ScanWorkerSelector {
                     WorkerScanRanges replicaAndWorker = 
selectScanReplicaAndMinWorkloadWorker(
                             allPartitionTabletsInOneBucketInOneTable.get(0),
                             allScanNodeScanBytesInOneBucket,
-                            orderedScanRangeLocations
+                            orderedScanRangeLocations,
+                            scanNode.getCatalogId()
                     );
                     selectedWorker = replicaAndWorker.worker;
                     break;
@@ -240,11 +241,11 @@ public class LoadBalanceScanWorkerSelector implements 
ScanWorkerSelector {
     }
 
     private WorkerScanRanges selectScanReplicaAndMinWorkloadWorker(
-            TScanRangeLocations tabletLocation, long tabletBytes, boolean 
orderedScanRangeLocations) {
+            TScanRangeLocations tabletLocation, long tabletBytes, boolean 
orderedScanRangeLocations, long catalogId) {
         List<TScanRangeLocation> replicaLocations = 
tabletLocation.getLocations();
         if (replicaLocations.size() == 1) {
             TScanRangeLocation replicaLocation = replicaLocations.get(0);
-            DistributedPlanWorker worker = 
workerManager.getWorker(replicaLocation.getBackendId());
+            DistributedPlanWorker worker = workerManager.getWorker(catalogId, 
replicaLocation.getBackendId());
             ScanRanges scanRanges = new ScanRanges();
             TScanRangeParams scanReplicaParams =
                     ScanWorkerSelector.buildScanReplicaParams(tabletLocation, 
replicaLocation);
@@ -265,7 +266,7 @@ public class LoadBalanceScanWorkerSelector implements 
ScanWorkerSelector {
 
         for (int i = 0; i < replicaNum; i++) {
             TScanRangeLocation replicaLocation = replicaLocations.get(i);
-            DistributedPlanWorker worker = 
workerManager.getWorker(replicaLocation.getBackendId());
+            DistributedPlanWorker worker = workerManager.getWorker(catalogId, 
replicaLocation.getBackendId());
             if (!worker.available()) {
                 continue;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
index 2657be975b2..167ea3dc334 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.distribute.worker.job;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
@@ -188,10 +189,14 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
     }
 
     protected List<AssignedJob> 
fillUpSingleEmptyInstance(DistributedPlanWorkerManager workerManager) {
+        long catalogId = Env.getCurrentInternalCatalog().getId();
+        if (scanNodes != null && scanNodes.size() > 0) {
+            catalogId = scanNodes.get(0).getCatalogId();
+        }
         return ImmutableList.of(
                 assignWorkerAndDataSources(0,
                         statementContext.getConnectContext().nextInstanceId(),
-                        workerManager.randomAvailableWorker(),
+                        workerManager.randomAvailableWorker(catalogId),
                         DefaultScanSource.empty())
         );
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
index 045b1af8a03..e8b30730103 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.distribute.worker.job;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.datasource.ExternalScanNode;
 import org.apache.doris.datasource.mvcc.MvccUtil;
@@ -64,6 +65,10 @@ public class UnassignedAllBEJob extends 
AbstractUnassignedJob {
         DictionarySink sink = (DictionarySink) fragment.getSink();
         // it may be ScanNode or optimized to EmptySetNode. use universay 
function to get the deepest source.
         PlanNode rootNode = fragment.getDeepestLinearSource();
+        long catalogId = Env.getCurrentInternalCatalog().getId();
+        if (rootNode instanceof OlapScanNode) {
+            catalogId = ((OlapScanNode) rootNode).getCatalogId();
+        }
         List<Backend> bes;
         if (sink.allowAdaptiveLoad() && rootNode instanceof OlapScanNode) {
             Dictionary dictionary = sink.getDictionary();
@@ -75,21 +80,21 @@ public class UnassignedAllBEJob extends 
AbstractUnassignedJob {
             }
             if (usingVersion > lastVersion) {
                 // load new data
-                bes = computeFullLoad(workerManager, inputJobs);
+                bes = computeFullLoad(workerManager, inputJobs, catalogId);
             } else {
                 // try to load only for the BEs which is outdated
-                bes = computePartiallLoad(workerManager, inputJobs, 
dictionary, sink);
+                bes = computePartiallLoad(workerManager, inputJobs, 
dictionary, sink, catalogId);
                 statementContext.setPartialLoadDictionary(true);
             }
         } else {
             // we explicitly request all BEs to load data. or ExternalTable. 
(or EmptySetNode - should not happen)
-            bes = computeFullLoad(workerManager, inputJobs);
+            bes = computeFullLoad(workerManager, inputJobs, catalogId);
         }
 
         List<AssignedJob> assignedJobs = Lists.newArrayList();
         for (int i = 0; i < bes.size(); ++i) {
             // every time one BE is selected
-            DistributedPlanWorker worker = workerManager.getWorker(bes.get(i));
+            DistributedPlanWorker worker = workerManager.getWorker(catalogId, 
bes.get(i));
             if (worker != null) {
                 assignedJobs.add(assignWorkerAndDataSources(i, 
connectContext.nextInstanceId(), worker,
                         new DefaultScanSource(ImmutableMap.of())));
@@ -122,7 +127,7 @@ public class UnassignedAllBEJob extends 
AbstractUnassignedJob {
     }
 
     private List<Backend> computeFullLoad(DistributedPlanWorkerManager 
workerManager,
-            ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
+            ListMultimap<ExchangeNode, AssignedJob> inputJobs, long catalogId) 
{
         // input jobs from upstream fragment - may have many instances.
         ExchangeNode exchange = inputJobs.keySet().iterator().next(); // 
random one - should be same for any exchange.
         int expectInstanceNum = exchange.getNumInstances();
@@ -130,7 +135,7 @@ public class UnassignedAllBEJob extends 
AbstractUnassignedJob {
         // for Coordinator to know the right parallelism of DictionarySink
         exchange.getFragment().setParallelExecNum(expectInstanceNum);
 
-        List<Backend> bes = workerManager.getAllBackends(true);
+        List<Backend> bes = workerManager.getAllBackends(catalogId, true);
         if (bes.size() != expectInstanceNum) {
             // BE number changed when planning
             throw new IllegalArgumentException("BE number should be " + 
expectInstanceNum + ", but is " + bes.size());
@@ -139,10 +144,11 @@ public class UnassignedAllBEJob extends 
AbstractUnassignedJob {
     }
 
     private List<Backend> computePartiallLoad(DistributedPlanWorkerManager 
workerManager,
-            ListMultimap<ExchangeNode, AssignedJob> inputJobs, Dictionary 
dictionary, DictionarySink sink) {
+            ListMultimap<ExchangeNode, AssignedJob> inputJobs, Dictionary 
dictionary, DictionarySink sink,
+            long catalogId) {
         // dictionary's src version(bundled with dictionary's version) is same 
with usingVersion(otherwise FullLoad)
         // so we can just use the src version to find the outdated backends
-        List<Backend> outdateBEs = 
dictionary.filterOutdatedBEs(workerManager.getAllBackends(true));
+        List<Backend> outdateBEs = 
dictionary.filterOutdatedBEs(workerManager.getAllBackends(catalogId, true));
 
         // reset all exchange node's instance number to the number of outdated 
backends
         PlanFragment fragment = 
inputJobs.keySet().iterator().next().getFragment(); // random one exchange
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
index bcd7218f9f4..d4f32cce896 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.distribute.worker.job;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
 import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
@@ -42,7 +43,8 @@ public class UnassignedGroupCommitJob extends 
AbstractUnassignedJob {
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
         TUniqueId instanceId = 
statementContext.getConnectContext().nextInstanceId();
-        BackendWorker selectBackend = new 
BackendWorker(statementContext.getGroupCommitMergeBackend());
+        BackendWorker selectBackend = new 
BackendWorker(Env.getCurrentEnv().getInternalCatalog().getId(),
+                statementContext.getGroupCommitMergeBackend());
         return ImmutableList.of(
                 new StaticAssignedJob(
                         0, instanceId, this, selectBackend, 
DefaultScanSource.empty()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index 472fafe4513..44731efd3c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -455,7 +455,7 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         for (Integer bucketIndex : selectBucketIndexes) {
             Long tabletIdInBucket = tabletIdsInOrder.get(bucketIndex);
             Tablet tabletInBucket = partition.getTablet(tabletIdInBucket);
-            List<DistributedPlanWorker> workers = 
getWorkersByReplicas(tabletInBucket);
+            List<DistributedPlanWorker> workers = 
getWorkersByReplicas(tabletInBucket, olapScanNode.getCatalogId());
             if (workers.isEmpty()) {
                 throw new IllegalStateException("Can not found available 
replica for bucket " + bucketIndex
                         + ", table: " + olapScanNode);
@@ -466,12 +466,12 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         return fillUpWorkerToBuckets;
     }
 
-    private List<DistributedPlanWorker> getWorkersByReplicas(Tablet tablet) {
+    private List<DistributedPlanWorker> getWorkersByReplicas(Tablet tablet, 
long catalogId) {
         DistributedPlanWorkerManager workerManager = 
scanWorkerSelector.getWorkerManager();
         List<Replica> replicas = tablet.getReplicas();
         List<DistributedPlanWorker> workers = 
Lists.newArrayListWithCapacity(replicas.size());
         for (Replica replica : replicas) {
-            DistributedPlanWorker worker = 
workerManager.getWorker(replica.getBackendIdWithoutException());
+            DistributedPlanWorker worker = workerManager.getWorker(catalogId, 
replica.getBackendIdWithoutException());
             if (worker.available()) {
                 workers.add(worker);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 2bc03096bbb..cbc7acf4fac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -33,7 +33,6 @@ import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.DistributionInfo;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -497,7 +496,7 @@ public class OlapScanNode extends ScanNode {
         boolean isInvalidComputeGroup = 
ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup);
         boolean isNotCloudComputeGroup = computeGroup != null && 
!Config.isCloudMode();
 
-        ImmutableMap<Long, Backend> allBackends = 
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
+        ImmutableMap<Long, Backend> allBackends = 
olapTable.getAllBackendsByAllCluster();
         long partitionVisibleVersion = visibleVersion;
         String partitionVisibleVersionStr = fastToString(visibleVersion);
         for (Tablet tablet : tablets) {
@@ -887,7 +886,7 @@ public class OlapScanNode extends ScanNode {
         Preconditions.checkState(scanBackendIds.isEmpty());
         Preconditions.checkState(scanTabletIds.isEmpty());
         Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
-        for (Backend backend : 
Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) {
+        for (Backend backend : 
olapTable.getAllBackendsByAllCluster().values()) {
             Set<Long> hashSet = Sets.newLinkedHashSet();
             for (DiskInfo diskInfo : backend.getDisks().values()) {
                 if (diskInfo.isAlive()) {
@@ -1415,4 +1414,12 @@ public class OlapScanNode extends ScanNode {
     public void setGlobalRowIdColumn(Column globalRowIdColumn) {
         this.globalRowIdColumn = globalRowIdColumn;
     }
+
+    @Override
+    public long getCatalogId() {
+        if (olapTable != null) {
+            return olapTable.getCatalogId();
+        }
+        return super.getCatalogId();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index be7f85875ee..bb7eca6accc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -36,6 +36,7 @@ import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PartitionInfo;
@@ -716,4 +717,8 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
     public void setDesc(TupleDescriptor desc) {
         this.desc = desc;
     }
+
+    public long getCatalogId() {
+        return Env.getCurrentInternalCatalog().getId();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
index 3970acfd5fe..54e607dd27a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.common.Status;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
 import org.apache.doris.qe.runtime.BackendFragmentId;
 import org.apache.doris.qe.runtime.MultiFragmentsPipelineTask;
 import org.apache.doris.qe.runtime.PipelineExecutionTask;
@@ -122,8 +123,9 @@ public abstract class AbstractJobProcessor implements 
JobProcessor {
             PipelineExecutionTask executionTask) {
         ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask> 
backendFragmentTasks
                 = ImmutableMap.builder();
-        for (Entry<Long, MultiFragmentsPipelineTask> backendTask : 
executionTask.getChildrenTasks().entrySet()) {
-            Long backendId = backendTask.getKey();
+        for (Entry<BackendWorker, MultiFragmentsPipelineTask> backendTask :
+                executionTask.getChildrenTasks().entrySet()) {
+            Long backendId = backendTask.getKey().id();
             for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask : 
backendTask.getValue()
                     .getChildrenTasks().entrySet()) {
                 Integer fragmentId = fragmentIdToTask.getKey();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
index 4d2da16ff26..6a52e3a6d9f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
 import org.apache.doris.qe.CoordinatorContext;
 import org.apache.doris.qe.SimpleScheduler;
@@ -56,7 +57,7 @@ import java.util.stream.Collectors;
  *
  * This class is used to describe which backend process which fragments
  */
-public class PipelineExecutionTask extends AbstractRuntimeTask<Long, 
MultiFragmentsPipelineTask> {
+public class PipelineExecutionTask extends AbstractRuntimeTask<BackendWorker, 
MultiFragmentsPipelineTask> {
     private static final Logger LOG = 
LogManager.getLogger(PipelineExecutionTask.class);
 
     // immutable parameters
@@ -68,7 +69,7 @@ public class PipelineExecutionTask extends 
AbstractRuntimeTask<Long, MultiFragme
     public PipelineExecutionTask(
             CoordinatorContext coordinatorContext,
             BackendServiceProxy backendServiceProxy,
-            Map<Long, MultiFragmentsPipelineTask> fragmentTasks) {
+            Map<BackendWorker, MultiFragmentsPipelineTask> fragmentTasks) {
         // insert into stmt need latch to wait finish, but query stmt not need 
because result receiver can wait finish
         super(new ChildrenRuntimeTasks<>(fragmentTasks));
         this.coordinatorContext = Objects.requireNonNull(coordinatorContext, 
"coordinatorContext can not be null");
@@ -78,13 +79,13 @@ public class PipelineExecutionTask extends 
AbstractRuntimeTask<Long, MultiFragme
         // flatten to fragment tasks to quickly index by BackendFragmentId, 
when receive the report message
         ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask> 
backendFragmentTasks
                 = ImmutableMap.builder();
-        for (Entry<Long, MultiFragmentsPipelineTask> backendTask : 
fragmentTasks.entrySet()) {
-            Long backendId = backendTask.getKey();
+        for (Entry<BackendWorker, MultiFragmentsPipelineTask> backendTask : 
fragmentTasks.entrySet()) {
+            BackendWorker worker = backendTask.getKey();
             for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask : 
backendTask.getValue()
                     .getChildrenTasks().entrySet()) {
                 Integer fragmentId = fragmentIdToTask.getKey();
                 SingleFragmentPipelineTask fragmentTask = 
fragmentIdToTask.getValue();
-                backendFragmentTasks.put(new BackendFragmentId(backendId, 
fragmentId), fragmentTask);
+                backendFragmentTasks.put(new BackendFragmentId(worker.id(), 
fragmentId), fragmentTask);
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
index 0da6f4a5fe2..d9503e3145a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
@@ -65,13 +65,13 @@ public class PipelineExecutionTaskBuilder {
         return pipelineExecutionTask;
     }
 
-    private Map<Long, MultiFragmentsPipelineTask> buildMultiFragmentTasks(
+    private Map<BackendWorker, MultiFragmentsPipelineTask> 
buildMultiFragmentTasks(
             CoordinatorContext coordinatorContext, BackendServiceProxy 
backendServiceProxy,
             Map<DistributedPlanWorker, TPipelineFragmentParamsList> 
workerToFragmentsParam) {
 
         Map<DistributedPlanWorker, ByteString> workerToSerializeFragments = 
serializeFragments(workerToFragmentsParam);
 
-        Map<Long, MultiFragmentsPipelineTask> fragmentTasks = 
Maps.newLinkedHashMap();
+        Map<BackendWorker, MultiFragmentsPipelineTask> fragmentTasks = 
Maps.newLinkedHashMap();
         for (Entry<DistributedPlanWorker, TPipelineFragmentParamsList> kv :
                 workerToFragmentsParam.entrySet()) {
             BackendWorker worker = (BackendWorker) kv.getKey();
@@ -80,7 +80,7 @@ public class PipelineExecutionTaskBuilder {
 
             Backend backend = worker.getBackend();
             fragmentTasks.put(
-                    worker.id(),
+                    worker,
                     new MultiFragmentsPipelineTask(
                             coordinatorContext,
                             backend,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 17a47cabf09..7061942b384 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -64,6 +64,7 @@ import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.LoadException;
@@ -76,6 +77,7 @@ import org.apache.doris.common.ThriftServerContext;
 import org.apache.doris.common.ThriftServerEventProcessor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
+import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
 import org.apache.doris.common.util.Util;
@@ -94,6 +96,7 @@ import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
 import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.master.MasterImpl;
+import org.apache.doris.meta.MetaContext;
 import org.apache.doris.mysql.privilege.AccessControllerManager;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.plans.PlanNodeAndHash;
@@ -193,6 +196,8 @@ import org.apache.doris.thrift.TGetMetaDB;
 import org.apache.doris.thrift.TGetMetaRequest;
 import org.apache.doris.thrift.TGetMetaResult;
 import org.apache.doris.thrift.TGetMetaTable;
+import org.apache.doris.thrift.TGetOlapTableMetaRequest;
+import org.apache.doris.thrift.TGetOlapTableMetaResult;
 import org.apache.doris.thrift.TGetQueryStatsRequest;
 import org.apache.doris.thrift.TGetSnapshotRequest;
 import org.apache.doris.thrift.TGetSnapshotResult;
@@ -228,6 +233,7 @@ import org.apache.doris.thrift.TNodeInfo;
 import org.apache.doris.thrift.TNullableStringLiteral;
 import org.apache.doris.thrift.TOlapTableIndexTablets;
 import org.apache.doris.thrift.TOlapTablePartition;
+import org.apache.doris.thrift.TPartitionMeta;
 import org.apache.doris.thrift.TPipelineFragmentParams;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TPlsqlPackageResult;
@@ -302,7 +308,10 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -4561,6 +4570,102 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
+    /**
+     * only copy basic meta about table
+     * do not copy the partitions
+     * @param request
+     * @return
+     * @throws TException
+     */
+    @Override
+    public TGetOlapTableMetaResult getOlapTableMeta(TGetOlapTableMetaRequest 
request) throws TException {
+        String clientAddr = getClientAddrAsString();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("receive getOlapTableMeta request: {}, client: {}", 
request, clientAddr);
+        }
+        TGetOlapTableMetaResult result = new TGetOlapTableMetaResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            checkSingleTablePasswordAndPrivs(request.getUser(), 
request.getPasswd(), request.getDb(),
+                    request.getTable(), clientAddr, PrivPredicate.SELECT);
+            String dbName = request.getDb();
+            Database db = 
Env.getCurrentInternalCatalog().getDbNullable(dbName);
+            if (db == null) {
+                throw new UserException("unknown database, database=" + 
dbName);
+            }
+            OlapTable table = (OlapTable) 
db.getTableNullable(request.getTable());
+            if (table == null) {
+                throw new UserException("unknown table, table=" + 
request.getTable());
+            }
+            MetaContext metaContext = new MetaContext();
+            metaContext.setMetaVersion(FeConstants.meta_version);
+            metaContext.setThreadLocalInfo();
+            table.readLock();
+            try (ByteArrayOutputStream bOutputStream = new 
ByteArrayOutputStream(8192)) {
+                OlapTable copyTable = table.copyTableMeta();
+                try (DataOutputStream out = new 
DataOutputStream(bOutputStream)) {
+                    copyTable.write(out);
+                    out.flush();
+                    result.setTableMeta(bOutputStream.toByteArray());
+                }
+                Set<Long> updatedPartitionIds = 
Sets.newHashSet(table.getPartitionIds());
+                List<TPartitionMeta> partitionMetas = 
request.getPartitionsSize() == 0 ? Lists.newArrayList()
+                        : request.getPartitions();
+                for (TPartitionMeta partitionMeta : partitionMetas) {
+                    if (request.getTableId() != table.getId()) {
+                        result.addToRemovedPartitions(partitionMeta.getId());
+                        continue;
+                    }
+                    Partition partition = 
table.getPartition(partitionMeta.getId());
+                    if (partition == null) {
+                        result.addToRemovedPartitions(partitionMeta.getId());
+                        continue;
+                    }
+                    if (partition.getVisibleVersion() == 
partitionMeta.getVisibleVersion()
+                            && partition.getVisibleVersionTime() == 
partitionMeta.getVisibleVersionTime()) {
+                        updatedPartitionIds.remove(partitionMeta.getId());
+                    }
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("receive getOlapTableMeta  db: {} table:{} 
update partitions: {} removed partition:{}",
+                            request.getDb(), request.getTable(), 
updatedPartitionIds.size(),
+                            result.getRemovedPartitionsSize());
+                }
+                for (Long partitionId : updatedPartitionIds) {
+                    bOutputStream.reset();
+                    Partition partition = table.getPartition(partitionId);
+                    try (DataOutputStream out = new 
DataOutputStream(bOutputStream)) {
+                        Text.writeString(out, 
GsonUtils.GSON.toJson(partition));
+                        out.flush();
+                        
result.addToUpdatedPartitions(ByteBuffer.wrap(bOutputStream.toByteArray()));
+                    }
+                }
+                return result;
+            } finally {
+                table.readUnlock();
+                MetaContext.remove();
+            }
+        } catch (AuthenticationException e) {
+            LOG.warn("failed to check user auth: {}", e);
+            status.setStatusCode(TStatusCode.NOT_AUTHORIZED);
+            status.addToErrorMsgs(e.getMessage());
+            return result;
+        } catch (UserException e) {
+            LOG.warn("failed to get table meta db:{} table:{} : {}",
+                    request.getDb(), request.getTable(), e);
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;
+        } catch (Exception e) {
+            LOG.warn("unknown exception when get table meta db:{} table:{} : 
{}",
+                    request.getDb(), request.getTable(), e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+            return result;
+        }
+    }
+
     private TStatus checkMaster() {
         TStatus status = new TStatus(TStatusCode.OK);
         if (!Env.getCurrentEnv().isMaster()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 3f54db60a86..3eb001ba624 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -33,6 +33,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.SimpleScheduler;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.HeartbeatResponse.HbStatus;
+import org.apache.doris.thrift.TBackend;
 import org.apache.doris.thrift.TDisk;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStorageMedium;
@@ -1125,5 +1126,16 @@ public class Backend implements Writable {
         }
     }
 
+    public static Backend fromThrift(TBackend backend) {
+        Backend result = new Backend();
+        result.id = backend.getId();
+        result.host = backend.getHost();
+        result.httpPort = backend.getHttpPort();
+        result.brpcPort = backend.getBrpcPort();
+        result.bePort = backend.getBePort();
+        result.setAlive(backend.isIsAlive());
+        return result;
+    }
+
 }
 
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index f27dcd44b4e..9afc7570225 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1650,6 +1650,29 @@ struct TGetTableTDEInfoResult {
     2: optional AgentService.TEncryptionAlgorithm algorithm
 }
 
+struct TPartitionMeta {
+    1: optional i64 id
+    2: optional i64 visible_version
+    3: optional i64 visible_version_time
+}
+
+struct TGetOlapTableMetaRequest {
+    1: required string user
+    2: required string passwd
+    3: required string db
+    4: required string table
+    5: required i64 table_id
+    6: optional i32 version // todo serialize according to the version
+    7: optional list<TPartitionMeta> partitions // client owned partition meta
+}
+
+struct TGetOlapTableMetaResult {
+    1: required Status.TStatus status
+    2: required binary table_meta
+    3: optional list<binary> updated_partitions
+    4: optional list<i64> removed_partitions
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1754,4 +1777,6 @@ service FrontendService {
     TGetEncryptionKeysResult getEncryptionKeys(1: TGetEncryptionKeysRequest 
request)
 
     TGetTableTDEInfoResult getTableTDEInfo(1: TGetTableTDEInfoRequest request)
+
+    TGetOlapTableMetaResult getOlapTableMeta(1: TGetOlapTableMetaRequest 
request)
 }
diff --git 
a/regression-test/data/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.out
 
b/regression-test/data/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.out
new file mode 100644
index 00000000000..1bb50b2cc13
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.out
@@ -0,0 +1,48 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+2025-05-18T01:00       true    -128    -32768  -2147483648     
-9223372036854775808    -1234567890123456790    -123.456        -123456.789     
-123457 -123456789012346        -1234567890123456789012345678   1970-01-01      
0000-01-01T00:00        A       Hello   Hello, Doris!   ["apple", "banana", 
"orange"]   {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"}
+2025-05-18T02:00       \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N
+2025-05-18T03:00       false   127     32767   2147483647      
9223372036854775807     1234567890123456789     123.456 123456.789      123457  
123456789012346 1234567890123456789012345678    9999-12-31      
9999-12-31T23:59:59                             []      {}      {"f1":11, 
"f2":3.14, "f3":"Emily"}
+2025-05-18T04:00       true    0       0       0       0       0       0.0     
0       0       0       0       2023-10-01      2023-10-01T12:34:56     A       
Hello   Hello, Doris!   ["apple", "banana", "orange"]   {"Emily":101, "age":25} 
{"f1":11, "f2":3.14, "f3":"Emily"}
+
+-- !sql --
+2025-05-18T01:00       [1]     [-128]  [-32768]        [-2147483648]   
[-9223372036854775808]  [-1234567890123456790]  [-123.456]      [-123456.789]   
[-123457]       [-123456789012346]      [-1234567890123456789012345678] 
["0000-01-01"]  ["0000-01-01 00:00:00"] ["A"]   ["Hello"]       ["Hello, 
Doris!"]
+2025-05-18T02:00       [null]  [null]  [null]  [null]  [null]  [null]  [null]  
[null]  [null]  [null]  [null]  [null]  [null]  [null]  [null]  [null]
+2025-05-18T03:00       [0]     [127]   [32767] [2147483647]    
[9223372036854775807]   [1234567890123456789]   [123.456]       [123456.789]    
[123457]        [123456789012346]       [1234567890123456789012345678]  
["9999-12-31"]  ["9999-12-31 23:59:59"] [""]    [""]    [""]
+2025-05-18T04:00       [1]     [0]     [0]     [0]     [0]     [0]     [0]     
[0]     [0]     [0]     [0]     ["2023-10-01"]  ["2023-10-01 12:34:56"] ["A"]   
["Hello"]       ["Hello, Doris!"]
+
+-- !sql --
+2025-05-18T01:00       2025-05-18T01:00        2025-05-18T01:00:00.100 
2025-05-18T01:00:00.110 2025-05-18T01:00:00.111 2025-05-18T01:00:00.111100      
2025-05-18T01:00:00.111110      2025-05-18T01:00:00.111111
+
+-- !query_after_insert --
+2025-05-18T01:00       2025-05-18T01:00        2025-05-18T01:00:00.100 
2025-05-18T01:00:00.110 2025-05-18T01:00:00.111 2025-05-18T01:00:00.111100      
2025-05-18T01:00:00.111110      2025-05-18T01:00:00.111111
+2025-05-19T01:00       2025-05-19T01:00        2025-05-19T01:00:00.100 
2025-05-19T01:00:00.110 2025-05-19T01:00:00.111 2025-05-19T01:00:00.111100      
2025-05-19T01:00:00.111110      2025-05-19T01:00:00.111111
+
+-- !after_insert_cmd --
+2025-05-18T01:00       true    -128    -32768  -2147483648     
-9223372036854775808    -1234567890123456790    -123.456        -123456.789     
-123457 -123456789012346        -1234567890123456789012345678   1970-01-01      
0000-01-01T00:00        A       Hello   Hello, Doris!   ["apple", "banana", 
"orange"]   {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"}
+2025-05-18T02:00       \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N
+2025-05-18T03:00       false   127     32767   2147483647      
9223372036854775807     1234567890123456789     123.456 123456.789      123457  
123456789012346 1234567890123456789012345678    9999-12-31      
9999-12-31T23:59:59                             []      {}      {"f1":11, 
"f2":3.14, "f3":"Emily"}
+2025-05-18T04:00       true    0       0       0       0       0       0.0     
0       0       0       0       2023-10-01      2023-10-01T12:34:56     A       
Hello   Hello, Doris!   ["apple", "banana", "orange"]   {"Emily":101, "age":25} 
{"f1":11, "f2":3.14, "f3":"Emily"}
+
+-- !after_insert_overwrite_cmd --
+2025-05-18T01:00       true    -128    -32768  -2147483648     
-9223372036854775808    -1234567890123456790    -123.456        -123456.789     
-123457 -123456789012346        -1234567890123456789012345678   1970-01-01      
0000-01-01T00:00        A       Hello   Hello, Doris!   ["apple", "banana", 
"orange"]   {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"}
+2025-05-18T02:00       \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N
+2025-05-18T03:00       false   127     32767   2147483647      
9223372036854775807     1234567890123456789     123.456 123456.789      123457  
123456789012346 1234567890123456789012345678    9999-12-31      
9999-12-31T23:59:59                             []      {}      {"f1":11, 
"f2":3.14, "f3":"Emily"}
+2025-05-18T04:00       true    0       0       0       0       0       0.0     
0       0       0       0       2023-10-01      2023-10-01T12:34:56     A       
Hello   Hello, Doris!   ["apple", "banana", "orange"]   {"Emily":101, "age":25} 
{"f1":11, "f2":3.14, "f3":"Emily"}
+
+-- !join --
+1      reason1 2023-01-01      1       100     error1  1000    2023-01-01T00:00
+2      reason2 2023-01-01      2       200     error2  2000    2023-01-01T00:00
+3      reason3 2023-01-01      3       300     error3  3000    2023-01-01T00:00
+
+-- !join_predicate --
+2      reason2 2023-01-01      2       200     error2  2000    2023-01-01T00:00
+
+-- !join_partition --
+1      2023-01-01      reason1 2023-01-01      1       100     error1  1000    
2023-01-01T00:00
+2      2023-01-02      reason2 2023-01-02      2       200     error2  2000    
2023-01-02T00:00
+3      2023-01-03      reason3 2023-01-03      3       300     error3  3000    
2023-01-03T00:00
+
+-- !join_partition_predicate --
+2      2023-01-02      reason2 2023-01-02      2       200     error2  2000    
2023-01-02T00:00
+
diff --git a/regression-test/pipeline/external/conf/regression-conf.groovy 
b/regression-test/pipeline/external/conf/regression-conf.groovy
index 0d3e94ace11..75299d79082 100644
--- a/regression-test/pipeline/external/conf/regression-conf.groovy
+++ b/regression-test/pipeline/external/conf/regression-conf.groovy
@@ -31,6 +31,7 @@ extArrowFlightSqlPort = 8081
 extArrowFlightSqlUser = "root"
 extArrowFlightSqlPassword= ""
 extArrowFlightHttpPort= 8131
+extFeThriftPort = 9020
 
 ccrDownstreamUrl = 
"jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true&allowLoadLocalInfile=true"
 ccrDownstreamUser = "root"
diff --git 
a/regression-test/suites/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.groovy
 
b/regression-test/suites/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.groovy
new file mode 100644
index 00000000000..50cb13deca8
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.groovy
@@ -0,0 +1,350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_query_remote_doris_as_olap_table_select", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String remote_doris_host = 
context.config.otherConfigs.get("extArrowFlightSqlHost")
+    String remote_doris_arrow_port = 
context.config.otherConfigs.get("extArrowFlightSqlPort")
+    String remote_doris_http_port = 
context.config.otherConfigs.get("extArrowFlightHttpPort")
+    String remote_doris_user = 
context.config.otherConfigs.get("extArrowFlightSqlUser")
+    String remote_doris_psw = 
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+    String remote_doris_thrift_port = 
context.config.otherConfigs.get("extFeThriftPort")
+
+    def showres = sql "show frontends";
+    remote_doris_arrow_port = showres[0][6]
+    remote_doris_http_port = showres[0][3]
+    remote_doris_thrift_port = showres[0][5]
+    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: 
${remote_doris_thrift_port}")
+
+    def showres2 = sql "show backends";
+    log.info("show backends log = ${showres2}")
+
+    def db_name = "test_query_remote_doris_as_olap_table_select_db"
+    def catalog_name = "test_query_remote_doris_as_olap_table_select_catalog"
+
+    sql """DROP DATABASE IF EXISTS ${db_name}"""
+
+    sql """CREATE DATABASE IF NOT EXISTS ${db_name}"""
+
+    sql """
+        CREATE TABLE `${db_name}`.`test_remote_doris_all_types_select_t` (
+          `id` datetime(3) NOT NULL,
+          `c_boolean` boolean NULL,
+          `c_tinyint` tinyint NULL,
+          `c_smallint` smallint NULL,
+          `c_int` int NULL,
+          `c_bigint` bigint NULL,
+          `c_largeint` largeint NULL,
+          `c_float` float NULL,
+          `c_double` double NULL,
+          `c_decimal9` decimal(9,0) NULL,
+          `c_decimal18` decimal(18,0) NULL,
+          `c_decimal32` decimal(32,0) NULL,
+          `c_date` date NULL,
+          `c_datetime` datetime NULL,
+          `c_char` char(1) NULL,
+          `c_varchar` varchar(65533) NULL,
+          `c_string` text NULL,
+          `c_array_s` array<text> NULL,
+          `c_map` MAP<STRING, INT> NULL,
+          `c_struct` STRUCT<f1:INT,f2:FLOAT,f3:STRING>  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    sql """
+        INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t` 
values('2025-05-18 01:00:00.000', true, -128, -32768, -2147483648, 
-9223372036854775808, -1234567890123456790, -123.456, -123456.789, -123457, 
-123456789012346, -1234567890123456789012345678, '1970-01-01', '0000-01-01 
00:00:00', 'A', 'Hello', 'Hello, Doris!', '["apple", "banana", "orange"]', 
{"Emily":101,"age":25} , {11, 3.14, "Emily"})
+    """
+    sql """
+        INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t` 
values('2025-05-18 02:00:00.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 
NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
+    """
+    sql """
+        INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t` 
values('2025-05-18 03:00:00.000', false, 127, 32767, 2147483647, 
9223372036854775807, 1234567890123456789, 123.456, 123456.789, 123457, 
123456789012346, 1234567890123456789012345678, '9999-12-31', '9999-12-31 
23:59:59', '', '', '', [], {}, {11, 3.14, "Emily"})
+    """
+    sql """
+        INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t` 
values('2025-05-18 04:00:00.000', true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
'2023-10-01', '2023-10-01 12:34:56', 'A', 'Hello', 'Hello, Doris!', '["apple", 
"banana", "orange"]', {"Emily":101,"age":25} , {11, 3.14, "Emily"});
+    """
+
+    sql """
+        CREATE TABLE `${db_name}`.`test_remote_doris_all_types_select_t2` (
+          `id` datetime(3) NOT NULL,
+          `a_boolean` array<boolean> NULL,
+          `a_tinyint` array<tinyint> NULL,
+          `a_smallint` array<smallint> NULL,
+          `a_int` array<int> NULL,
+          `a_bigint` array<bigint> NULL,
+          `a_largeint` array<largeint> NULL,
+          `a_float` array<float> NULL,
+          `a_double` array<double> NULL,
+          `a_decimal9` array<decimal(9,0)> NULL,
+          `a_decimal18` array<decimal(18,0)> NULL,
+          `a_decimal32` array<decimal(32,0)> NULL,
+          `a_date` array<date> NULL,
+          `a_datetime` array<datetime> NULL,
+          `a_char` array<char(1)> NULL,
+          `a_varchar` array<varchar(65533)> NULL,
+          `a_string` array<text> NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    sql """
+        INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2` 
values('2025-05-18 01:00:00.000', [true], [-128], [-32768], [-2147483648], 
[-9223372036854775808], [-1234567890123456790], [-123.456], [-123456.789], 
[-123457], [-123456789012346], [-1234567890123456789012345678], ['0000-01-01'], 
['0000-01-01 00:00:00'], ['A'], ['Hello'], ['Hello, Doris!'])
+    """
+    sql """
+        INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2` 
values('2025-05-18 02:00:00.000', [NULL], [NULL], [NULL], [NULL], [NULL], 
[NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], 
[NULL])
+    """
+    sql """
+        INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2` 
values('2025-05-18 03:00:00.000', [false], [127], [32767], [2147483647], 
[9223372036854775807], [1234567890123456789], [123.456], [123456.789], 
[123457], [123456789012346], [1234567890123456789012345678], ['9999-12-31'], 
['9999-12-31 23:59:59'], [''], [''], [''])
+    """
+    sql """
+        INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2` 
values('2025-05-18 04:00:00.000', [true], [0], [0], [0], [0], [0], [0], [0], 
[0], [0], [0], ['2023-10-01'], ['2023-10-01 12:34:56'], ['A'], ['Hello'], 
['Hello, Doris!']);
+    """
+
+    sql """
+        CREATE TABLE `${db_name}`.`test_remote_doris_all_types_select_t3` (
+          `id` datetime NOT NULL,
+          `datetime_0` datetime(0) NULL,
+          `datetime_1` datetime(1) NULL,
+          `datetime_3` datetime(2) NULL,
+          `datetime_4` datetime(3) NULL,
+          `datetime_5` datetime(4) NULL,
+          `datetime_6` datetime(5) NULL,
+          `datetime_7` datetime(6) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    sql """
+        INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t3` 
values('2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18 
01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', 
'2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18 
01:00:00.111111');
+    """
+
+
+    sql """
+        DROP CATALOG IF EXISTS `${catalog_name}`
+    """
+
+
+    sql """
+        CREATE CATALOG `${catalog_name}` PROPERTIES (
+                'type' = 'doris',
+                'fe_http_hosts' = 
'http://${remote_doris_host}:${remote_doris_http_port}',
+                'fe_arrow_hosts' = 
'${remote_doris_host}:${remote_doris_arrow_port}',
+                'fe_thrift_hosts' = 
'${remote_doris_host}:${remote_doris_thrift_port}',
+                'user' = '${remote_doris_user}',
+                'password' = '${remote_doris_psw}',
+                'use_arrow_flight' = 'false'
+        );
+    """
+
+    qt_sql """
+        select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from 
`${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t` order by 
id
+    """
+
+    qt_sql """
+        select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from 
`${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t2` order by 
id
+    """
+
+    qt_sql """
+        select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from 
`${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t3` order by 
id
+    """
+
+    // test select after insert also get correct data, it seems we can newly 
partition version
+    sql """
+        INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t3` 
values('2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19 
01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', 
'2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19 
01:00:00.111111');
+    """
+    qt_query_after_insert """
+        select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from 
`${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t3` order by 
id
+    """
+
+    // test insert command
+    sql """
+        CREATE TABLE `${db_name}`.`test_remote_doris_all_types_insert` (
+          `id` datetime(3) NOT NULL,
+          `c_boolean` boolean NULL,
+          `c_tinyint` tinyint NULL,
+          `c_smallint` smallint NULL,
+          `c_int` int NULL,
+          `c_bigint` bigint NULL,
+          `c_largeint` largeint NULL,
+          `c_float` float NULL,
+          `c_double` double NULL,
+          `c_decimal9` decimal(9,0) NULL,
+          `c_decimal18` decimal(18,0) NULL,
+          `c_decimal32` decimal(32,0) NULL,
+          `c_date` date NULL,
+          `c_datetime` datetime NULL,
+          `c_char` char(1) NULL,
+          `c_varchar` varchar(65533) NULL,
+          `c_string` text NULL,
+          `c_array_s` array<text> NULL,
+          `c_map` MAP<STRING, INT> NULL,
+          `c_struct` STRUCT<f1:INT,f2:FLOAT,f3:STRING>  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+    // test insert into
+    explain {
+        sql("insert into `${db_name}`.`test_remote_doris_all_types_insert` 
select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from 
`${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t`")
+        contains("VOlapScanNode")
+    }
+    sql """
+        insert into `${db_name}`.`test_remote_doris_all_types_insert` select 
/*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * 
from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t`
+    """
+    qt_after_insert_cmd """
+        select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from 
`${db_name}`.`test_remote_doris_all_types_insert` order by id
+    """
+    // test insert overwrite
+    explain {
+        sql("insert OVERWRITE table 
`${db_name}`.`test_remote_doris_all_types_insert` select /*+ 
SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * 
from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t`")
+        contains("VOlapScanNode")
+    }
+    sql """
+        insert OVERWRITE table 
`${db_name}`.`test_remote_doris_all_types_insert` select /*+ 
SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * 
from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t`
+    """
+    qt_after_insert_overwrite_cmd """
+        select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from 
`${db_name}`.`test_remote_doris_all_types_insert` order by id
+    """
+
+    // test join operation
+    sql """
+        CREATE TABLE `${db_name}`.`left_inner_table` (
+          log_type        INT            NOT NULL,
+          reason       VARCHAR(1024) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`log_type`)
+        DISTRIBUTED BY HASH(`log_type`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+    sql """
+        INSERT INTO `${db_name}`.`left_inner_table` VALUES
+        (1,'reason1'),
+        (2,'reason2'),
+        (3,'reason3');
+    """
+    sql """
+        CREATE TABLE `${db_name}`.`right_remote_table` (
+          log_time        DATE       NOT NULL,
+          log_type        INT            NOT NULL,
+          error_code      INT,
+          error_msg       VARCHAR(1024),
+          op_id           BIGINT,
+          op_time         DATETIME
+        ) ENGINE=OLAP
+        DUPLICATE KEY(log_time, log_type, error_code)
+        DISTRIBUTED BY HASH(`log_type`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+    sql """
+        INSERT INTO `${db_name}`.`right_remote_table` VALUES
+        ('2023-01-01',1,100,'error1',1000,'2023-01-01 00:00:00'),
+        ('2023-01-01',2,200,'error2',2000,'2023-01-01 00:00:00'),
+        ('2023-01-01',3,300,'error3',3000,'2023-01-01 00:00:00');
+    """
+    qt_join """
+        select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table` a
+            join `${catalog_name}`.`${db_name}`.`right_remote_table` b on 
a.`log_type` = b.`log_type` order by a.`log_type`
+    """
+    qt_join_predicate """
+        select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table` a
+        join `${catalog_name}`.`${db_name}`.`right_remote_table` b on 
a.`log_type` = b.`log_type` and b.op_id=2000 order by a.`log_type`
+    """
+
+    // test partition table
+    sql """
+        CREATE TABLE `${db_name}`.`left_inner_table_partition` (
+          log_type        INT            NOT NULL,
+          `log_time`       date           NOT NULL,
+          reason       VARCHAR(1024) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`log_type`)
+        PARTITION BY RANGE(`log_time`)
+        (PARTITION p20230101 VALUES [('2023-01-01'), ('2023-01-02')),
+        PARTITION p20230102 VALUES [('2023-01-02'), ('2023-01-03')),
+        PARTITION p20230103 VALUES [('2023-01-03'), ('2023-01-04')))
+        DISTRIBUTED BY HASH(`log_type`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    sql """
+        INSERT INTO `${db_name}`.`left_inner_table_partition` VALUES
+        (1,'2023-01-01','reason1'),
+        (2,'2023-01-02','reason2'),
+        (3,'2023-01-03','reason3');
+    """
+ 
+    sql """
+        CREATE TABLE `${db_name}`.`right_remote_table_partition` (
+          log_time        DATE       NOT NULL,
+          log_type        INT            NOT NULL,
+          error_code      INT,
+          error_msg       VARCHAR(1024),
+          op_id           BIGINT,
+          op_time         DATETIME
+        ) ENGINE=OLAP
+        DUPLICATE KEY(log_time, log_type, error_code)
+        PARTITION BY RANGE(`log_time`)
+        (PARTITION p20230101 VALUES [('2023-01-01'), ('2023-01-02')),
+        PARTITION p20230102 VALUES [('2023-01-02'), ('2023-01-03')),
+        PARTITION p20230103 VALUES [('2023-01-03'), ('2023-01-04')))
+        DISTRIBUTED BY HASH(`log_type`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    sql """
+        INSERT INTO `${db_name}`.`right_remote_table_partition` VALUES
+        ('2023-01-01',1,100,'error1',1000,'2023-01-01 00:00:00'),
+        ('2023-01-02',2,200,'error2',2000,'2023-01-02 00:00:00'),
+        ('2023-01-03',3,300,'error3',3000,'2023-01-03 00:00:00');
+    """
+
+    qt_join_partition """
+        select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table_partition` a
+            join `${catalog_name}`.`${db_name}`.`right_remote_table_partition` 
b on a.`log_type` = b.`log_type` and a.`log_time` = b.`log_time` order by 
a.`log_type`
+    """
+
+    qt_join_partition_predicate """
+        select /*+ SET_VAR(enable_nereids_distribute_planner=true, 
enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table_partition` a
+            join `${catalog_name}`.`${db_name}`.`right_remote_table_partition` 
b on a.`log_type` = b.`log_type` and a.`log_time` = b.`log_time` and 
b.log_time='2023-01-02' order by a.`log_type`
+    """
+
+    sql """ DROP DATABASE IF EXISTS ${db_name} """
+    sql """ DROP CATALOG IF EXISTS `${catalog_name}` """
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy
 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy
index a77d21f59f5..ef5bfcae8f6 100644
--- 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy
+++ 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_all_types_select", 
"p0,external,doris,external_docker,e
     String remote_doris_http_port = 
context.config.otherConfigs.get("extArrowFlightHttpPort")
     String remote_doris_user = 
context.config.otherConfigs.get("extArrowFlightSqlUser")
     String remote_doris_psw = 
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+    String remote_doris_thrift_port = 
context.config.otherConfigs.get("extFeThriftPort")
 
     def showres = sql "show frontends";
     remote_doris_arrow_port = showres[0][6]
     remote_doris_http_port = showres[0][3]
-    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+    remote_doris_thrift_port = showres[0][5]
+    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: 
${remote_doris_thrift_port}")
 
     def showres2 = sql "show backends";
     log.info("show backends log = ${showres2}")
@@ -150,8 +152,10 @@ suite("test_remote_doris_all_types_select", 
"p0,external,doris,external_docker,e
                 'type' = 'doris',
                 'fe_http_hosts' = 
'http://${remote_doris_host}:${remote_doris_http_port}',
                 'fe_arrow_hosts' = 
'${remote_doris_host}:${remote_doris_arrow_port}',
+                'fe_thrift_hosts' = 
'${remote_doris_host}:${remote_doris_thrift_port}',
                 'user' = '${remote_doris_user}',
-                'password' = '${remote_doris_psw}'
+                'password' = '${remote_doris_psw}',
+                'use_arrow_flight' = 'true'
         );
     """
 
diff --git 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy
 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy
index ab9f89019e2..5438617e09c 100644
--- 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy
+++ 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_all_types_show", 
"p0,external,doris,external_docker,ext
     String remote_doris_http_port = 
context.config.otherConfigs.get("extArrowFlightHttpPort")
     String remote_doris_user = 
context.config.otherConfigs.get("extArrowFlightSqlUser")
     String remote_doris_psw = 
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+    String remote_doris_thrift_port = 
context.config.otherConfigs.get("extFeThriftPort")
 
     def showres = sql "show frontends";
     remote_doris_arrow_port = showres[0][6]
     remote_doris_http_port = showres[0][3]
-    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+    remote_doris_thrift_port = showres[0][5]
+    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: 
${remote_doris_thrift_port}")
 
     def showres2 = sql "show backends";
     log.info("show backends log = ${showres2}")
@@ -141,10 +143,12 @@ suite("test_remote_doris_all_types_show", 
"p0,external,doris,external_docker,ext
     sql """
      CREATE CATALOG `test_remote_doris_all_types_catalog` PROPERTIES (
                 'type' = 'doris',
+                'fe_thrift_hosts' = 
'${remote_doris_host}:${remote_doris_thrift_port}',
                 'fe_http_hosts' = 
'http://${remote_doris_host}:${remote_doris_http_port}',
                 'fe_arrow_hosts' = 
'${remote_doris_host}:${remote_doris_arrow_port}',
                 'user' = '${remote_doris_user}',
-                'password' = '${remote_doris_psw}'
+                'password' = '${remote_doris_psw}',
+                'use_arrow_flight' = 'true'
         );
     """
 
diff --git 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy
 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy
index 82e2b2e3550..710ee5ecb54 100644
--- 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy
+++ 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_catalog", 
"p0,external,doris,external_docker,external_d
     String remote_doris_http_port = 
context.config.otherConfigs.get("extArrowFlightHttpPort")
     String remote_doris_user = 
context.config.otherConfigs.get("extArrowFlightSqlUser")
     String remote_doris_psw = 
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+    String remote_doris_thrift_port = 
context.config.otherConfigs.get("extFeThriftPort")
 
     def showres = sql "show frontends";
     remote_doris_arrow_port = showres[0][6]
     remote_doris_http_port = showres[0][3]
-    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+    remote_doris_thrift_port = showres[0][5]
+    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: 
${remote_doris_thrift_port}")
 
     def showres2 = sql "show backends";
     log.info("show backends log = ${showres2}")
@@ -39,10 +41,12 @@ suite("test_remote_doris_catalog", 
"p0,external,doris,external_docker,external_d
     sql """
        CREATE CATALOG `test_remote_doris_catalog_catalog` PROPERTIES (
                 'type' = 'doris',
+                'fe_thrift_hosts' = 
'${remote_doris_host}:${remote_doris_thrift_port}',
                 'fe_http_hosts' = 
'http://${remote_doris_host}:${remote_doris_http_port}',
                 'fe_arrow_hosts' = 
'${remote_doris_host}:${remote_doris_arrow_port}',
                 'user' = '${remote_doris_user}',
-                'password' = '${remote_doris_psw}'
+                'password' = '${remote_doris_psw}',
+                'use_arrow_flight' = 'true'
         );
     """
 
diff --git 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy
 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy
index 3782f5c50a7..cf7dfd5ea31 100644
--- 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy
+++ 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_predict", 
"p0,external,doris,external_docker,external_d
     String remote_doris_http_port = 
context.config.otherConfigs.get("extArrowFlightHttpPort")
     String remote_doris_user = 
context.config.otherConfigs.get("extArrowFlightSqlUser")
     String remote_doris_psw = 
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+    String remote_doris_thrift_port = 
context.config.otherConfigs.get("extFeThriftPort")
 
     def showres = sql "show frontends";
     remote_doris_arrow_port = showres[0][6]
     remote_doris_http_port = showres[0][3]
-    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+    remote_doris_thrift_port = showres[0][5]
+    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: 
${remote_doris_thrift_port}")
 
     def showres2 = sql "show backends";
     log.info("show backends log = ${showres2}")
@@ -63,10 +65,12 @@ suite("test_remote_doris_predict", 
"p0,external,doris,external_docker,external_d
     sql """
        CREATE CATALOG `test_remote_doris_predict_catalog` PROPERTIES (
                 'type' = 'doris',
+                'fe_thrift_hosts' = 
'${remote_doris_host}:${remote_doris_thrift_port}',
                 'fe_http_hosts' = 
'http://${remote_doris_host}:${remote_doris_http_port}',
                 'fe_arrow_hosts' = 
'${remote_doris_host}:${remote_doris_arrow_port}',
                 'user' = '${remote_doris_user}',
-                'password' = '${remote_doris_psw}'
+                'password' = '${remote_doris_psw}',
+                'use_arrow_flight' = 'true'
         );
     """
 
diff --git 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy
 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy
index e7596eafffb..02d1028c26b 100644
--- 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy
+++ 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_refresh", 
"p0,external,doris,external_docker,external_d
     String remote_doris_http_port = 
context.config.otherConfigs.get("extArrowFlightHttpPort")
     String remote_doris_user = 
context.config.otherConfigs.get("extArrowFlightSqlUser")
     String remote_doris_psw = 
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+    String remote_doris_thrift_port = 
context.config.otherConfigs.get("extFeThriftPort")
 
     def showres = sql "show frontends";
     remote_doris_arrow_port = showres[0][6]
     remote_doris_http_port = showres[0][3]
-    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+    remote_doris_thrift_port = showres[0][5]
+    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: 
${remote_doris_thrift_port}")
 
     def showres2 = sql "show backends";
     log.info("show backends log = ${showres2}")
@@ -39,10 +41,12 @@ suite("test_remote_doris_refresh", 
"p0,external,doris,external_docker,external_d
     sql """
     CREATE CATALOG `test_remote_doris_refresh_catalog` PROPERTIES (
                 'type' = 'doris',
+                'fe_thrift_hosts' = 
'${remote_doris_host}:${remote_doris_thrift_port}',
                 'fe_http_hosts' = 
'http://${remote_doris_host}:${remote_doris_http_port}',
                 'fe_arrow_hosts' = 
'${remote_doris_host}:${remote_doris_arrow_port}',
                 'user' = '${remote_doris_user}',
-                'password' = '${remote_doris_psw}'
+                'password' = '${remote_doris_psw}',
+                'use_arrow_flight' = 'true'
         );
     """
 
diff --git 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy
 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy
index f2633b8cd28..223c294d811 100644
--- 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy
+++ 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_statistics", 
"p0,external,doris,external_docker,externa
     String remote_doris_http_port = 
context.config.otherConfigs.get("extArrowFlightHttpPort")
     String remote_doris_user = 
context.config.otherConfigs.get("extArrowFlightSqlUser")
     String remote_doris_psw = 
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+    String remote_doris_thrift_port = 
context.config.otherConfigs.get("extFeThriftPort")
 
     def showres = sql "show frontends";
     remote_doris_arrow_port = showres[0][6]
     remote_doris_http_port = showres[0][3]
-    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+    remote_doris_thrift_port = showres[0][5]
+    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: 
${remote_doris_thrift_port}")
 
     def showres2 = sql "show backends";
     log.info("show backends log = ${showres2}")
@@ -71,10 +73,12 @@ suite("test_remote_doris_statistics", 
"p0,external,doris,external_docker,externa
     sql """
     CREATE CATALOG `test_remote_doris_statistics_catalog` PROPERTIES (
                 'type' = 'doris',
+                'fe_thrift_hosts' = 
'${remote_doris_host}:${remote_doris_thrift_port}',
                 'fe_http_hosts' = 
'http://${remote_doris_host}:${remote_doris_http_port}',
                 'fe_arrow_hosts' = 
'${remote_doris_host}:${remote_doris_arrow_port}',
                 'user' = '${remote_doris_user}',
-                'password' = '${remote_doris_psw}'
+                'password' = '${remote_doris_psw}',
+                'use_arrow_flight' = 'true'
         );
     """
 
diff --git 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy
 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy
index a9dd083ffd9..e6de44e005a 100644
--- 
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy
+++ 
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_table_stats", 
"p0,external,doris,external_docker,extern
     String remote_doris_http_port = 
context.config.otherConfigs.get("extArrowFlightHttpPort")
     String remote_doris_user = 
context.config.otherConfigs.get("extArrowFlightSqlUser")
     String remote_doris_psw = 
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+    String remote_doris_thrift_port = 
context.config.otherConfigs.get("extFeThriftPort")
 
     def showres = sql "show frontends";
     remote_doris_arrow_port = showres[0][6]
     remote_doris_http_port = showres[0][3]
-    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+    remote_doris_thrift_port = showres[0][5]
+    log.info("show frontends log = ${showres}, arrow: 
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: 
${remote_doris_thrift_port}")
 
     def showres2 = sql "show backends";
     log.info("show backends log = ${showres2}")
@@ -71,10 +73,12 @@ suite("test_remote_doris_table_stats", 
"p0,external,doris,external_docker,extern
     sql """
     CREATE CATALOG `test_remote_doris_table_stats_catalog` PROPERTIES (
                 'type' = 'doris',
+                'fe_thrift_hosts' = 
'${remote_doris_host}:${remote_doris_thrift_port}',
                 'fe_http_hosts' = 
'http://${remote_doris_host}:${remote_doris_http_port}',
                 'fe_arrow_hosts' = 
'${remote_doris_host}:${remote_doris_arrow_port}',
                 'user' = '${remote_doris_user}',
-                'password' = '${remote_doris_psw}'
+                'password' = '${remote_doris_psw}',
+                'use_arrow_flight' = 'true'
         );
     """
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to