Copilot commented on code in PR #57898:
URL: https://github.com/apache/doris/pull/57898#discussion_r2559243788


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java:
##########
@@ -149,17 +166,17 @@ public long randomAvailableWorker(Map<TNetworkAddress, 
Long> addressToBackendID)
     }
 
     @Override
-    public List<Backend> getAllBackends(boolean needAlive) {
+    public List<Backend> getAllBackends(long catalogId, boolean needAlive) {
         List<Backend> backends = null;
         if (needAlive) {
             backends = Lists.newArrayList();
-            for (Map.Entry<Long, Backend> entry : 
this.allClusterBackends.get().entrySet()) {
+            for (Map.Entry<Long, Backend> entry : 
this.allClusterBackends.get(catalogId).get().entrySet()) {
                 if (entry.getValue().isQueryAvailable()) {
                     backends.add(entry.getValue());
                 }
             }
         } else {
-            backends = 
Lists.newArrayList(this.allClusterBackends.get().values());
+            backends = 
Lists.newArrayList(this.allClusterBackends.get(catalogId).get().values());

Review Comment:
   Potential NullPointerException: Similar issue at line 179 where 
`allClusterBackends.get(catalogId)` could return null.



##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -4393,6 +4402,7 @@ public void setEnablePushDownMinMaxOnUnique(boolean 
enablePushDownMinMaxOnUnique
         this.enablePushDownMinMaxOnUnique = enablePushDownMinMaxOnUnique;
     }
 
+    /** canUseNereidsDistributePlanner */

Review Comment:
   Incorrect JavaDoc comment: The comment `/** canUseNereidsDistributePlanner 
*/` on line 4405 is placed above the wrong method 
(`isEnablePushDownStringMinMax`). This appears to be a copy-paste error. Remove 
or correct this misplaced comment.
   ```suggestion
   
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -4507,6 +4516,102 @@ public TGetTableTDEInfoResult 
getTableTDEInfo(TGetTableTDEInfoRequest request) t
         return result;
     }
 
+    /**
+     * only copy basic meta about table
+     * do not copy the partitions
+     * @param request
+     * @return
+     * @throws TException
+     */
+    @Override
+    public TGetOlapTableMetaResult getOlapTableMeta(TGetOlapTableMetaRequest 
request) throws TException {
+        String clientAddr = getClientAddrAsString();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("receive getOlapTableMeta request: {}, client: {}", 
request, clientAddr);
+        }
+        TGetOlapTableMetaResult result = new TGetOlapTableMetaResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            checkSingleTablePasswordAndPrivs(request.getUser(), 
request.getPasswd(), request.getDb(),
+                    request.getTable(), clientAddr, PrivPredicate.SELECT);
+            String dbName = request.getDb();
+            Database db = 
Env.getCurrentInternalCatalog().getDbNullable(dbName);
+            if (db == null) {
+                throw new UserException("unknown database, database=" + 
dbName);
+            }
+            OlapTable table = (OlapTable) 
db.getTableNullable(request.getTable());
+            if (table == null) {
+                throw new UserException("unknown table, table=" + 
request.getTable());
+            }
+            MetaContext metaContext = new MetaContext();
+            metaContext.setMetaVersion(FeConstants.meta_version);
+            metaContext.setThreadLocalInfo();
+            table.readLock();
+            try (ByteArrayOutputStream bOutputStream = new 
ByteArrayOutputStream(8192)) {
+                OlapTable copyTable = table.copyTableMeta();
+                try (DataOutputStream out = new 
DataOutputStream(bOutputStream)) {
+                    copyTable.write(out);
+                    out.flush();
+                    result.setTableMeta(bOutputStream.toByteArray());
+                }
+                Set<Long> updatedPartitionIds = 
Sets.newHashSet(table.getPartitionIds());
+                List<TPartitionMeta> partitionMetas = 
request.getPartitionsSize() == 0 ? Lists.newArrayList()
+                        : request.getPartitions();
+                for (TPartitionMeta partitionMeta : partitionMetas) {
+                    if (request.getTableId() != table.getId()) {
+                        result.addToRemovedPartitions(partitionMeta.getId());
+                        continue;
+                    }
+                    Partition partition = 
table.getPartition(partitionMeta.getId());
+                    if (partition == null) {
+                        result.addToRemovedPartitions(partitionMeta.getId());
+                        continue;
+                    }
+                    if (partition.getVisibleVersion() == 
partitionMeta.getVisibleVersion()
+                            && partition.getVisibleVersionTime() == 
partitionMeta.getVisibleVersionTime()) {
+                        updatedPartitionIds.remove(partitionMeta.getId());
+                    }

Review Comment:
   Logic error in partition check: The check `if (request.getTableId() != 
table.getId())` on line 4562 is inside the loop over partitions (line 4561), 
which means all partitions will be marked as removed if the table ID doesn't 
match. This check should be moved outside the loop, before iterating over 
partitions, to avoid unnecessary iteration and incorrect behavior.
   ```suggestion
                   // Move table ID check outside the loop for correct logic
                   if (request.getTableId() != table.getId()) {
                       for (TPartitionMeta partitionMeta : partitionMetas) {
                           result.addToRemovedPartitions(partitionMeta.getId());
                       }
                   } else {
                       for (TPartitionMeta partitionMeta : partitionMetas) {
                           Partition partition = 
table.getPartition(partitionMeta.getId());
                           if (partition == null) {
                               
result.addToRemovedPartitions(partitionMeta.getId());
                               continue;
                           }
                           if (partition.getVisibleVersion() == 
partitionMeta.getVisibleVersion()
                                   && partition.getVisibleVersionTime() == 
partitionMeta.getVisibleVersionTime()) {
                               
updatedPartitionIds.remove(partitionMeta.getId());
                           }
                       }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RemoteOlapTable extends OlapTable {
+    private static final Logger LOG = 
LogManager.getLogger(RemoteOlapTable.class);
+
+    private RemoteDorisExternalCatalog catalog;
+    private RemoteDorisExternalDatabase database;
+
+    public RemoteDorisExternalCatalog getCatalog() {
+        return catalog;
+    }
+
+    public void setCatalog(RemoteDorisExternalCatalog catalog) {
+        this.catalog = catalog;
+    }
+
+    @Override
+    public RemoteDorisExternalDatabase getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(RemoteDorisExternalDatabase database) {
+        this.database = database;
+    }
+
+    public static RemoteOlapTable fromOlapTable(OlapTable olapTable) {
+        try {
+            RemoteOlapTable externalOlapTable = new RemoteOlapTable();
+            Class<?> currentClass = olapTable.getClass();
+            while (currentClass != null) {
+                for (Field field : currentClass.getDeclaredFields()) {
+                    if (Modifier.isStatic(field.getModifiers())) {
+                        continue;
+                    }
+                    field.setAccessible(true);
+                    field.set(externalOlapTable, field.get(olapTable));
+                }
+                currentClass = currentClass.getSuperclass();
+            }
+            return externalOlapTable;
+        } catch (Exception e) {
+            throw new RuntimeException("failed to initial external olap 
table", e);
+        }

Review Comment:
   Security risk: Using reflection with `setAccessible(true)` on all fields 
bypasses Java's access control and could expose private fields. This approach 
is fragile and may break with future Java versions. Consider implementing a 
proper copy constructor or builder pattern instead of using reflection.
   ```suggestion
           return new RemoteOlapTable(olapTable);
       }
   
       /**
        * Copy constructor to create a RemoteOlapTable from an OlapTable.
        * Only copies fields that are accessible and relevant.
        */
       public RemoteOlapTable(OlapTable olapTable) {
           super(olapTable.getId(), olapTable.getName(), 
olapTable.getBaseSchema(), olapTable.getKeysType(),
                   olapTable.getPartitionInfo(), 
olapTable.getDistributionInfo());
           // Copy other relevant fields as needed
           this.idToPartition = new 
ConcurrentHashMap<>(olapTable.getIdToPartition());
           this.nameToPartition = Maps.newTreeMap();
           for (Partition partition : this.idToPartition.values()) {
               this.nameToPartition.put(partition.getName(), partition);
           }
           this.indexIdToMeta = Maps.newHashMap(olapTable.getIndexIdToMeta());
           this.visibleIndexId = olapTable.getVisibleIndexId();
           this.state = olapTable.getState();
           this.createTime = olapTable.getCreateTime();
           this.lastSchemaUpdateTime = olapTable.getLastSchemaUpdateTime();
           this.lastUpdateTime = olapTable.getLastUpdateTime();
           this.comment = olapTable.getComment();
           this.baseIndexId = olapTable.getBaseIndexId();
           this.colUniqueIdGenerator = olapTable.getColUniqueIdGenerator();
           this.indexNameToId = Maps.newHashMap(olapTable.getIndexNameToId());
           this.partitionColumnNames = olapTable.getPartitionColumnNames() != 
null
                   ? List.copyOf(olapTable.getPartitionColumnNames()) : null;
           this.defaultDistributionBucketNum = 
olapTable.getDefaultDistributionBucketNum();
           this.storageType = olapTable.getStorageType();
           this.storageFormat = olapTable.getStorageFormat();
           this.enablePersistentIndex = olapTable.enablePersistentIndex();
           this.enableUniqueKeyMergeOnWrite = 
olapTable.enableUniqueKeyMergeOnWrite();
           this.enableSingleReplicaCompaction = 
olapTable.enableSingleReplicaCompaction();
           this.enablePersistentIndex = olapTable.enablePersistentIndex();
           this.enablePersistentIndex = olapTable.enablePersistentIndex();
           // Add more fields as necessary, depending on what needs to be 
copied.
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java:
##########
@@ -115,28 +122,38 @@ private ImmutableMap<Long, Backend> 
checkAndInitClusterBackends(
     }
 
     @Override
-    public DistributedPlanWorker getWorker(long backendId) {
-        ImmutableMap<Long, Backend> backends = this.allClusterBackends.get();
+    public void addBackends(long catalogId, ImmutableMap<Long, Backend> 
backends) {
+        if (!currentClusterBackends.containsKey(catalogId)) {
+            currentClusterBackends.put(catalogId, backends);
+        }
+        if (!allClusterBackends.containsKey(catalogId)) {
+            allClusterBackends.put(catalogId, Suppliers.ofInstance(backends));
+        }
+    }
+
+    @Override
+    public DistributedPlanWorker getWorker(long catalogId, long backendId) {
+        ImmutableMap<Long, Backend> backends = 
this.allClusterBackends.get(catalogId).get();
         Backend backend = backends.get(backendId);
         if (backend == null) {
             throw new IllegalStateException("Backend " + backendId + " is not 
exist");
         }
-        return new BackendWorker(backend);
+        return new BackendWorker(catalogId, backend);
     }
 
     @Override
-    public DistributedPlanWorker getWorker(Backend backend) {
-        return new BackendWorker(backend);
+    public DistributedPlanWorker getWorker(long catalogId, Backend backend) {
+        return new BackendWorker(catalogId, backend);
     }
 
     @Override
-    public DistributedPlanWorker randomAvailableWorker() {
+    public DistributedPlanWorker randomAvailableWorker(long catalogId) {
         try {
             Reference<Long> selectedBackendId = new Reference<>();
-            ImmutableMap<Long, Backend> backends = this.currentClusterBackends;
+            ImmutableMap<Long, Backend> backends = 
this.currentClusterBackends.get(catalogId);

Review Comment:
   Potential NullPointerException: If `catalogId` is not present in 
`currentClusterBackends`, calling `get(catalogId)` will return null, and then 
calling `SimpleScheduler.getHost()` with null will likely cause issues. Add a 
null check or ensure the catalog exists before calling this method.
   ```suggestion
               ImmutableMap<Long, Backend> backends = 
this.currentClusterBackends.get(catalogId);
               if (backends == null) {
                   throw new IllegalStateException("No backends found for 
catalogId: " + catalogId);
               }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -470,7 +471,21 @@ private LogicalPlan getLogicalPlan(TableIf table, 
UnboundRelation unboundRelatio
                 case MAX_COMPUTE_EXTERNAL_TABLE:
                 case TRINO_CONNECTOR_EXTERNAL_TABLE:
                 case LAKESOUl_EXTERNAL_TABLE:
+                    return new 
LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table,
+                            qualifierWithoutTableName, ImmutableList.of(),
+                            unboundRelation.getTableSample(),
+                            unboundRelation.getTableSnapshot(),
+                            
Optional.ofNullable(unboundRelation.getScanParams()), Optional.empty());
                 case DORIS_EXTERNAL_TABLE:
+                    ConnectContext ctx = cascadesContext.getConnectContext();
+                    if (Config.isNotCloudMode() && ctx != null
+                            && 
ctx.getSessionVariable().isQueryDorisExternalTableAsOlapTable()
+                            // use isEnableNereidsDistributePlanner instead of 
canUseNereidsDistributePlanner
+                            // bacause it cannot work in explain command

Review Comment:
   Typo in comment: "bacause" should be "because".
   ```suggestion
                               // because it cannot work in explain command
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java:
##########
@@ -149,17 +166,17 @@ public long randomAvailableWorker(Map<TNetworkAddress, 
Long> addressToBackendID)
     }
 
     @Override
-    public List<Backend> getAllBackends(boolean needAlive) {
+    public List<Backend> getAllBackends(long catalogId, boolean needAlive) {
         List<Backend> backends = null;
         if (needAlive) {
             backends = Lists.newArrayList();
-            for (Map.Entry<Long, Backend> entry : 
this.allClusterBackends.get().entrySet()) {
+            for (Map.Entry<Long, Backend> entry : 
this.allClusterBackends.get(catalogId).get().entrySet()) {

Review Comment:
   Potential NullPointerException: Similar to line 136, if `catalogId` is not 
present in `allClusterBackends`, this will throw a NullPointerException. Add 
proper null checking or validation.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RemoteOlapTable extends OlapTable {
+    private static final Logger LOG = 
LogManager.getLogger(RemoteOlapTable.class);
+
+    private RemoteDorisExternalCatalog catalog;
+    private RemoteDorisExternalDatabase database;
+
+    public RemoteDorisExternalCatalog getCatalog() {
+        return catalog;
+    }
+
+    public void setCatalog(RemoteDorisExternalCatalog catalog) {
+        this.catalog = catalog;
+    }
+
+    @Override
+    public RemoteDorisExternalDatabase getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(RemoteDorisExternalDatabase database) {
+        this.database = database;
+    }
+
+    public static RemoteOlapTable fromOlapTable(OlapTable olapTable) {
+        try {
+            RemoteOlapTable externalOlapTable = new RemoteOlapTable();
+            Class<?> currentClass = olapTable.getClass();
+            while (currentClass != null) {
+                for (Field field : currentClass.getDeclaredFields()) {
+                    if (Modifier.isStatic(field.getModifiers())) {
+                        continue;
+                    }
+                    field.setAccessible(true);
+                    field.set(externalOlapTable, field.get(olapTable));
+                }
+                currentClass = currentClass.getSuperclass();
+            }
+            return externalOlapTable;
+        } catch (Exception e) {
+            throw new RuntimeException("failed to initial external olap 
table", e);
+        }
+    }
+
+    public void rebuildPartitions(List<Partition> oldPartitions, 
List<Partition> updatedPartitions,
+            List<Long> removedPartitions)
+            throws AnalysisException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("rebuildPartitions oldPartitions: " + 
oldPartitions.size() + ", updatedPartitions: "
+                    + updatedPartitions.size() + ", removedPartitions: " + 
removedPartitions.size());
+        }
+        ConcurrentHashMap<Long, Partition> newIdToPartition = new 
ConcurrentHashMap<>();
+        for (Partition oldPartition : oldPartitions) {
+            newIdToPartition.put(oldPartition.getId(), oldPartition);
+        }
+        for (Long removedPartition : removedPartitions) {
+            newIdToPartition.remove(removedPartition);
+        }
+        for (Partition updatedPartition : updatedPartitions) {
+            newIdToPartition.put(updatedPartition.getId(), updatedPartition);
+        }
+        Map<String, Partition> newNameToPartition = Maps.newTreeMap();
+        for (Partition partition : newIdToPartition.values()) {
+            newNameToPartition.put(partition.getName(), partition);
+        }
+        this.idToPartition = newIdToPartition;
+        this.nameToPartition = newNameToPartition;
+    }
+
+    public void invalidateBackendsIfNeed() {
+        ImmutableMap<Long, Backend> backends = getAllBackendsByAllCluster();
+        for (Partition partition : getPartitions()) {
+            for (Tablet tablet : partition.getBaseIndex().getTablets()) {
+                for (long backendId : tablet.getBackendIds()) {
+                    if (!backends.containsKey(backendId)) {
+                        
Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr()
+                                .invalidateBackendCache(catalog.getId());
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public long getCatalogId() {
+        return catalog.getId();
+    }
+
+    public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() {

Review Comment:
   Potential NullPointerException: If `catalog` is null, calling 
`catalog.getId()` at line 123 or 127 will throw a NullPointerException. Ensure 
catalog is set before these methods are called, or add null checks with 
appropriate error handling.
   ```suggestion
       public long getCatalogId() {
           if (catalog == null) {
               throw new IllegalStateException("Catalog is not set in 
RemoteOlapTable");
           }
           return catalog.getId();
       }
   
       public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() {
           if (catalog == null) {
               throw new IllegalStateException("Catalog is not set in 
RemoteOlapTable");
           }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java:
##########
@@ -3698,4 +3699,44 @@ public Index getInvertedIndex(Column column, 
List<String> subPath) {
                                         
.filter(Index::isAnalyzedInvertedIndex).findFirst().orElse(null);
         }
     }
+
+    /**
+     * caller should acquire the read lock and should not modify any field of 
the return obj
+     */
+    public OlapTable copyTableMeta() {
+        OlapTable table = new OlapTable();
+        // metaobj
+        table.signature = signature;
+        table.lastCheckTime =  lastCheckTime;
+        // abstract table
+        table.id = id;
+        table.name = name;
+        table.qualifiedDbName = qualifiedDbName;
+        table.type = type;
+        table.createTime = createTime;
+        // olap table
+        // NOTE: currently do not need temp partitions, colocateGroup, 
autoIncrementGenerator
+        table.idToPartition = new ConcurrentHashMap<>();
+        table.tempPartitions = new TempPartitions();
+
+        table.state = state;
+        table.indexIdToMeta = indexIdToMeta;
+        table.indexNameToId = indexNameToId;
+        table.keysType = keysType;
+        table.partitionInfo = partitionInfo;
+        table.defaultDistributionInfo = defaultDistributionInfo;
+        table.bfColumns = bfColumns;
+        table.bfFpp = bfFpp;
+        table.indexes = indexes;
+        table.baseIndexId = baseIndexId;
+        return table;

Review Comment:
   [nitpick] Incomplete shallow copy: The `copyTableMeta()` method performs a 
shallow copy of reference fields like `indexIdToMeta`, `indexNameToId`, 
`partitionInfo`, etc. (lines 3723-3731). This means modifications to these 
shared objects in the copy will affect the original table. Since the JavaDoc 
states "caller should not modify any field of the return obj", consider making 
this explicit with defensive copies or using immutable wrappers to prevent 
accidental modifications.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java:
##########
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TGetBackendMetaRequest;
+import org.apache.doris.thrift.TGetBackendMetaResult;
+import org.apache.doris.thrift.TGetOlapTableMetaRequest;
+import org.apache.doris.thrift.TGetOlapTableMetaResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPartitionMeta;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class FeServiceClient {
+    private static final Logger LOG = 
LogManager.getLogger(FeServiceClient.class);
+
+    private final Random random = new Random(System.currentTimeMillis());
+    private final String name;
+    private final List<TNetworkAddress> addresses;
+    private volatile TNetworkAddress master;
+    private final String user;
+    private final String password;
+    private final int retryCount;
+    private final int timeout;
+
+    public FeServiceClient(String name, List<TNetworkAddress> addresses, 
String user, String password,
+            int retryCount, int timeout) {
+        this.name = name;
+        this.addresses = addresses;
+        this.user = user;
+        this.password = password;
+        this.retryCount = retryCount;
+        this.timeout = timeout;
+    }
+
+    private List<TNetworkAddress> getAddresses() {
+        return addresses;
+    }
+
+    private FrontendService.Client getRemoteFeClient(TNetworkAddress address, 
int timeout) {
+        try {
+            return ClientPool.frontendPool.borrowObject(address, timeout);
+        } catch (Exception e) {
+            String msg = String.format("failed to get remote doris:%s fe 
connection", name);
+            throw new RuntimeException(msg, e);
+        }
+    }
+
+    private void returnClient(TNetworkAddress address, FrontendService.Client 
client, boolean returnObj) {
+        if (returnObj) {
+            ClientPool.frontendPool.returnObject(address, client);
+        } else {
+            ClientPool.frontendPool.invalidateObject(address, client);
+        }
+    }
+
+    private <T> T randomCallWithRetry(ThriftCall<T> call, String errorMsg, int 
timeout) {
+        List<TNetworkAddress> addresses = getAddresses();
+        int retries = 0;
+        Exception lastException = null;
+        while (retries < retryCount) {
+            int index = random.nextInt(addresses.size());
+            FrontendService.Client client = null;
+            for (int i = 0; i < addresses.size() && retries < retryCount; i++) 
{
+                TNetworkAddress address = addresses.get((index + i) % 
addresses.size());
+                client = getRemoteFeClient(address, timeout);
+                boolean returnObj = false;
+                try {
+                    T result = call.call(client);
+                    returnObj = true;
+                    return result;
+                } catch (TException | IOException e) {
+                    lastException = e;
+                    retries++;
+                } catch (Exception e) {
+                    throw new RuntimeException(errorMsg + ":" + 
e.getMessage(), e);
+                } finally {
+                    returnClient(address, client, returnObj);
+                }
+            }
+        }
+        throw new RuntimeException(errorMsg + ":" + 
lastException.getMessage(), lastException);
+    }
+
+    private <T> T callFromMaster(ThriftCall<MasterResult<T>> call, String 
errorMsg, int timeout) {
+        TNetworkAddress address = master;
+        FrontendService.Client client = null;
+        Exception lastException = null;
+        if (address != null) {
+            client = getRemoteFeClient(address, timeout);
+            boolean returnObj = false;
+            try {
+                MasterResult<T> ret = call.call(client);
+                returnObj = true;
+                if (ret.isMaster) {
+                    if (ret.hasError) {
+                        throw new RuntimeException(ret.errorMsg);
+                    }
+                    return ret.result;
+                }
+            } catch (TException | IOException e) {
+                lastException = e;
+            } catch (Exception e) {
+                throw new RuntimeException(errorMsg + ":" + e.getMessage(), e);
+            } finally {
+                returnClient(address, client, returnObj);
+            }
+        }
+        master = null;
+        List<TNetworkAddress> addresses = getAddresses();
+        int retries = 0;
+        while (retries < retryCount) {
+            int index = random.nextInt(addresses.size());
+            for (int i = 0; i < addresses.size() && retries < retryCount; i++) 
{
+                address = addresses.get((index + i) % addresses.size());
+                client = getRemoteFeClient(address, timeout);
+                boolean returnObj = false;
+                try {
+                    MasterResult<T> ret = call.call(client);
+                    returnObj = true;
+                    if (ret.isMaster) {
+                        master = address;
+                        if (ret.hasError) {
+                            throw new RuntimeException(ret.errorMsg);
+                        }
+                        return ret.result;
+                    }
+                } catch (TException | IOException e) {
+                    lastException = e;
+                    retries++;
+                } catch (Exception e) {
+                    throw new RuntimeException(errorMsg + ":" + 
e.getMessage(), e);
+                } finally {
+                    returnClient(address, client, returnObj);
+                }
+            }
+        }
+        throw new RuntimeException(errorMsg + ":" + 
lastException.getMessage(), lastException);
+    }
+
+    public List<Backend> listBackends() {
+        TGetBackendMetaRequest request = new TGetBackendMetaRequest();
+        request.setUser(user);
+        request.setPasswd(password);
+        String msg = String.format("failed to get backends from remote 
doris:%s", name);
+        return callFromMaster(client -> {
+            TGetBackendMetaResult result = client.getBackendMeta(request);
+            if (result.getStatus().getStatusCode() == TStatusCode.NOT_MASTER) {
+                return MasterResult.notMaster();
+            }
+            if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+                return 
MasterResult.masterWithError(result.getStatus().toString());
+            }
+            List<Backend> backends = result.getBackends().stream()
+                    .map(b -> Backend.fromThrift(b))
+                    .collect(Collectors.toList());
+            return MasterResult.withResult(backends);
+        }, msg, timeout);
+    }
+
+    public RemoteOlapTable getOlapTable(String dbName, String table, long 
tableId, List<Partition> partitions) {
+        TGetOlapTableMetaRequest request = new TGetOlapTableMetaRequest();
+        request.setDb(dbName);
+        request.setTable(table);
+        request.setTableId(tableId);
+        request.setUser(user);
+        request.setPasswd(password);
+        request.setVersion(FeConstants.meta_version);
+        for (Partition partition : partitions) {
+            TPartitionMeta meta = new TPartitionMeta();
+            meta.setId(partition.getId());
+            meta.setVisibleVersion(partition.getVisibleVersion());
+            meta.setVisibleVersionTime(partition.getVisibleVersionTime());
+            request.addToPartitions(meta);
+        }
+        String msg = String.format("failed to get table meta from remote 
doris:%s", name);
+        return randomCallWithRetry(client -> {
+            TGetOlapTableMetaResult result = client.getOlapTableMeta(request);
+            if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+                throw new UserException(result.getStatus().toString());
+            }
+            RemoteOlapTable remoteOlapTable = null;
+            try (DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(result.getTableMeta()))) {
+                OlapTable olapTable = OlapTable.read(in);
+                remoteOlapTable = RemoteOlapTable.fromOlapTable(olapTable);
+            }
+            List<Partition> updatedPartitions = new 
ArrayList<>(result.getUpdatedPartitionsSize());
+            if (result.getUpdatedPartitionsSize() > 0) {
+                for (ByteBuffer buffer : result.getUpdatedPartitions()) {
+                    try (ByteArrayInputStream in =
+                            new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.remaining());
+                            DataInputStream dataInputStream = new 
DataInputStream(in)) {
+                        String partitionStr = Text.readString(dataInputStream);
+                        Partition partition = 
GsonUtils.GSON.fromJson(partitionStr, Partition.class);
+                        updatedPartitions.add(partition);
+                    }
+                }
+            }
+            List<Long> removedPartitions = result.getRemovedPartitions();
+            if (removedPartitions == null) {
+                removedPartitions = new ArrayList<>();
+            }
+            remoteOlapTable.rebuildPartitions(partitions, updatedPartitions, 
removedPartitions);
+            return remoteOlapTable;
+        }, msg, timeout);
+    }
+
+    private interface ThriftCall<T> {
+        public T call(FrontendService.Client client) throws Exception;
+    }
+
+    private static class MasterResult<T> {
+        boolean isMaster = true;
+        T result;
+        boolean hasError = false;
+        String errorMsg;
+
+        static <T> MasterResult<T> notMaster() {
+            MasterResult ret = new MasterResult();
+            ret.isMaster = false;
+            return ret;
+        }
+
+        static <T> MasterResult<T> withResult(T result) {
+            MasterResult ret = new MasterResult();
+            ret.isMaster = true;
+            ret.hasError = false;
+            ret.result = result;
+            return ret;
+        }
+
+        // is master but has error code
+        static <T> MasterResult<T> masterWithError(String errorMsg) {
+            MasterResult ret = new MasterResult();
+            ret.isMaster = true;
+            ret.hasError = true;
+            ret.errorMsg = errorMsg;
+            return ret;
+        }

Review Comment:
   Missing generic type parameter: The `MasterResult` instantiation is missing 
the type parameter. This causes a raw type warning and potential type safety 
issues. Change `MasterResult ret = new MasterResult();` to `MasterResult<T> ret 
= new MasterResult<>();` for all three factory methods.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java:
##########
@@ -119,6 +122,17 @@ public FragmentIdMapping<DistributedPlan> plan() {
         }
     }
 
+    private void addExternalBackends(BackendDistributedPlanWorkerManager 
workerManager) throws AnalysisException {
+        for (PlanFragment planFragment : idToFragments.values()) {
+            List<OlapScanNode> scanNodes = planFragment.getPlanRoot()
+                    .collectInCurrentFragment(OlapScanNode.class::isInstance);
+            for (OlapScanNode scanNode : scanNodes) {
+                workerManager.addBackends(scanNode.getCatalogId(),
+                        scanNode.getOlapTable().getAllBackendsByAllCluster());

Review Comment:
   [nitpick] Missing error handling: If 
`scanNode.getOlapTable().getAllBackendsByAllCluster()` throws an 
`AnalysisException` (as declared in the method signature at line 125), it will 
propagate up. However, there's no specific handling or logging for this 
exception within the method. Consider adding try-catch around line 130-131 to 
provide more specific error messages about which scan node or table caused the 
issue.
   ```suggestion
                   try {
                       workerManager.addBackends(scanNode.getCatalogId(),
                               
scanNode.getOlapTable().getAllBackendsByAllCluster());
                   } catch (AnalysisException e) {
                       LOG.error("Failed to get all backends for scan node. "
                               + "PlanFragmentId: {}, ScanNode: {}, Table: {}",
                               planFragment.getFragmentId(),
                               scanNode.getId(),
                               scanNode.getOlapTable() != null ? 
scanNode.getOlapTable().getName() : "null",
                               e);
                       throw e;
                   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java:
##########
@@ -48,19 +49,28 @@ public void onCreateAssignedJob(AssignedJob assignedJob) {
         BackendWorker worker = (BackendWorker) assignedJob.getAssignedWorker();
         if (usedWorkers.add(worker)) {
             Backend backend = worker.getBackend();
-            usedWorkersAddressToBackendID.put(
+            
usedWorkersAddressToBackendID.computeIfAbsent(worker.getCatalogId(), k -> 
Maps.newLinkedHashMap());
+            usedWorkersAddressToBackendID.get(worker.getCatalogId()).put(
                     new TNetworkAddress(backend.getHost(), 
backend.getBePort()), backend.getId()
             );
         }
     }
 
     /** tryToSelectRandomUsedWorker */
     public DistributedPlanWorker tryToSelectRandomUsedWorker() {
+        long catalogId = Env.getCurrentInternalCatalog().getId();
         if (usedWorkers.isEmpty()) {
-            return workerManager.randomAvailableWorker();
+            return workerManager.randomAvailableWorker(catalogId);
         } else {
-            long id = 
workerManager.randomAvailableWorker(usedWorkersAddressToBackendID);
-            return workerManager.getWorker(id);
+            Map<TNetworkAddress, Long> backendIDs;
+            if (usedWorkersAddressToBackendID.containsKey(catalogId)) {
+                backendIDs = usedWorkersAddressToBackendID.get(catalogId);
+            } else {
+                catalogId = usedWorkers.iterator().next().getCatalogId();
+                backendIDs = usedWorkersAddressToBackendID.get(catalogId);
+            }

Review Comment:
   Potential NullPointerException: On line 70, 
`usedWorkersAddressToBackendID.get(catalogId)` could return null if the catalog 
ID is not present in the map. This would cause a NullPointerException when 
passed to `workerManager.randomAvailableWorker()` on line 72. Add a null check 
or ensure the catalog exists in the map.
   ```suggestion
               }
               if (backendIDs == null || backendIDs.isEmpty()) {
                   // Fallback: select a random available worker for the 
catalogId
                   return workerManager.randomAvailableWorker(catalogId);
               }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java:
##########
@@ -115,28 +122,38 @@ private ImmutableMap<Long, Backend> 
checkAndInitClusterBackends(
     }
 
     @Override
-    public DistributedPlanWorker getWorker(long backendId) {
-        ImmutableMap<Long, Backend> backends = this.allClusterBackends.get();
+    public void addBackends(long catalogId, ImmutableMap<Long, Backend> 
backends) {
+        if (!currentClusterBackends.containsKey(catalogId)) {
+            currentClusterBackends.put(catalogId, backends);
+        }
+        if (!allClusterBackends.containsKey(catalogId)) {
+            allClusterBackends.put(catalogId, Suppliers.ofInstance(backends));
+        }
+    }
+
+    @Override
+    public DistributedPlanWorker getWorker(long catalogId, long backendId) {
+        ImmutableMap<Long, Backend> backends = 
this.allClusterBackends.get(catalogId).get();

Review Comment:
   Potential NullPointerException: If `catalogId` is not present in 
`allClusterBackends`, calling `get(catalogId)` will return null, and then 
calling `get()` on the null Supplier will cause a NullPointerException. Add a 
null check before accessing the backends.
   ```suggestion
           Supplier<ImmutableMap<Long, Backend>> backendSupplier = 
this.allClusterBackends.get(catalogId);
           if (backendSupplier == null) {
               throw new IllegalStateException("Catalog " + catalogId + " is 
not present in allClusterBackends");
           }
           ImmutableMap<Long, Backend> backends = backendSupplier.get();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to