Copilot commented on code in PR #57898:
URL: https://github.com/apache/doris/pull/57898#discussion_r2559243788
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java:
##########
@@ -149,17 +166,17 @@ public long randomAvailableWorker(Map<TNetworkAddress,
Long> addressToBackendID)
}
@Override
- public List<Backend> getAllBackends(boolean needAlive) {
+ public List<Backend> getAllBackends(long catalogId, boolean needAlive) {
List<Backend> backends = null;
if (needAlive) {
backends = Lists.newArrayList();
- for (Map.Entry<Long, Backend> entry :
this.allClusterBackends.get().entrySet()) {
+ for (Map.Entry<Long, Backend> entry :
this.allClusterBackends.get(catalogId).get().entrySet()) {
if (entry.getValue().isQueryAvailable()) {
backends.add(entry.getValue());
}
}
} else {
- backends =
Lists.newArrayList(this.allClusterBackends.get().values());
+ backends =
Lists.newArrayList(this.allClusterBackends.get(catalogId).get().values());
Review Comment:
Potential NullPointerException: Similar issue at line 179 where
`allClusterBackends.get(catalogId)` could return null.
##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -4393,6 +4402,7 @@ public void setEnablePushDownMinMaxOnUnique(boolean
enablePushDownMinMaxOnUnique
this.enablePushDownMinMaxOnUnique = enablePushDownMinMaxOnUnique;
}
+ /** canUseNereidsDistributePlanner */
Review Comment:
Incorrect JavaDoc comment: The comment `/** canUseNereidsDistributePlanner
*/` on line 4405 is placed above the wrong method
(`isEnablePushDownStringMinMax`). This appears to be a copy-paste error. Remove
or correct this misplaced comment.
```suggestion
```
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -4507,6 +4516,102 @@ public TGetTableTDEInfoResult
getTableTDEInfo(TGetTableTDEInfoRequest request) t
return result;
}
+ /**
+ * only copy basic meta about table
+ * do not copy the partitions
+ * @param request
+ * @return
+ * @throws TException
+ */
+ @Override
+ public TGetOlapTableMetaResult getOlapTableMeta(TGetOlapTableMetaRequest
request) throws TException {
+ String clientAddr = getClientAddrAsString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("receive getOlapTableMeta request: {}, client: {}",
request, clientAddr);
+ }
+ TGetOlapTableMetaResult result = new TGetOlapTableMetaResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+ try {
+ checkSingleTablePasswordAndPrivs(request.getUser(),
request.getPasswd(), request.getDb(),
+ request.getTable(), clientAddr, PrivPredicate.SELECT);
+ String dbName = request.getDb();
+ Database db =
Env.getCurrentInternalCatalog().getDbNullable(dbName);
+ if (db == null) {
+ throw new UserException("unknown database, database=" +
dbName);
+ }
+ OlapTable table = (OlapTable)
db.getTableNullable(request.getTable());
+ if (table == null) {
+ throw new UserException("unknown table, table=" +
request.getTable());
+ }
+ MetaContext metaContext = new MetaContext();
+ metaContext.setMetaVersion(FeConstants.meta_version);
+ metaContext.setThreadLocalInfo();
+ table.readLock();
+ try (ByteArrayOutputStream bOutputStream = new
ByteArrayOutputStream(8192)) {
+ OlapTable copyTable = table.copyTableMeta();
+ try (DataOutputStream out = new
DataOutputStream(bOutputStream)) {
+ copyTable.write(out);
+ out.flush();
+ result.setTableMeta(bOutputStream.toByteArray());
+ }
+ Set<Long> updatedPartitionIds =
Sets.newHashSet(table.getPartitionIds());
+ List<TPartitionMeta> partitionMetas =
request.getPartitionsSize() == 0 ? Lists.newArrayList()
+ : request.getPartitions();
+ for (TPartitionMeta partitionMeta : partitionMetas) {
+ if (request.getTableId() != table.getId()) {
+ result.addToRemovedPartitions(partitionMeta.getId());
+ continue;
+ }
+ Partition partition =
table.getPartition(partitionMeta.getId());
+ if (partition == null) {
+ result.addToRemovedPartitions(partitionMeta.getId());
+ continue;
+ }
+ if (partition.getVisibleVersion() ==
partitionMeta.getVisibleVersion()
+ && partition.getVisibleVersionTime() ==
partitionMeta.getVisibleVersionTime()) {
+ updatedPartitionIds.remove(partitionMeta.getId());
+ }
Review Comment:
Logic error in partition check: The check `if (request.getTableId() !=
table.getId())` on line 4562 is inside the loop over partitions (line 4561),
which means all partitions will be marked as removed if the table ID doesn't
match. This check should be moved outside the loop, before iterating over
partitions, to avoid unnecessary iteration and incorrect behavior.
```suggestion
// Move table ID check outside the loop for correct logic
if (request.getTableId() != table.getId()) {
for (TPartitionMeta partitionMeta : partitionMetas) {
result.addToRemovedPartitions(partitionMeta.getId());
}
} else {
for (TPartitionMeta partitionMeta : partitionMetas) {
Partition partition =
table.getPartition(partitionMeta.getId());
if (partition == null) {
result.addToRemovedPartitions(partitionMeta.getId());
continue;
}
if (partition.getVisibleVersion() ==
partitionMeta.getVisibleVersion()
&& partition.getVisibleVersionTime() ==
partitionMeta.getVisibleVersionTime()) {
updatedPartitionIds.remove(partitionMeta.getId());
}
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RemoteOlapTable extends OlapTable {
+ private static final Logger LOG =
LogManager.getLogger(RemoteOlapTable.class);
+
+ private RemoteDorisExternalCatalog catalog;
+ private RemoteDorisExternalDatabase database;
+
+ public RemoteDorisExternalCatalog getCatalog() {
+ return catalog;
+ }
+
+ public void setCatalog(RemoteDorisExternalCatalog catalog) {
+ this.catalog = catalog;
+ }
+
+ @Override
+ public RemoteDorisExternalDatabase getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(RemoteDorisExternalDatabase database) {
+ this.database = database;
+ }
+
+ public static RemoteOlapTable fromOlapTable(OlapTable olapTable) {
+ try {
+ RemoteOlapTable externalOlapTable = new RemoteOlapTable();
+ Class<?> currentClass = olapTable.getClass();
+ while (currentClass != null) {
+ for (Field field : currentClass.getDeclaredFields()) {
+ if (Modifier.isStatic(field.getModifiers())) {
+ continue;
+ }
+ field.setAccessible(true);
+ field.set(externalOlapTable, field.get(olapTable));
+ }
+ currentClass = currentClass.getSuperclass();
+ }
+ return externalOlapTable;
+ } catch (Exception e) {
+ throw new RuntimeException("failed to initial external olap
table", e);
+ }
Review Comment:
Security risk: Using reflection with `setAccessible(true)` on all fields
bypasses Java's access control and could expose private fields. This approach
is fragile and may break with future Java versions. Consider implementing a
proper copy constructor or builder pattern instead of using reflection.
```suggestion
return new RemoteOlapTable(olapTable);
}
/**
* Copy constructor to create a RemoteOlapTable from an OlapTable.
* Only copies fields that are accessible and relevant.
*/
public RemoteOlapTable(OlapTable olapTable) {
super(olapTable.getId(), olapTable.getName(),
olapTable.getBaseSchema(), olapTable.getKeysType(),
olapTable.getPartitionInfo(),
olapTable.getDistributionInfo());
// Copy other relevant fields as needed
this.idToPartition = new
ConcurrentHashMap<>(olapTable.getIdToPartition());
this.nameToPartition = Maps.newTreeMap();
for (Partition partition : this.idToPartition.values()) {
this.nameToPartition.put(partition.getName(), partition);
}
this.indexIdToMeta = Maps.newHashMap(olapTable.getIndexIdToMeta());
this.visibleIndexId = olapTable.getVisibleIndexId();
this.state = olapTable.getState();
this.createTime = olapTable.getCreateTime();
this.lastSchemaUpdateTime = olapTable.getLastSchemaUpdateTime();
this.lastUpdateTime = olapTable.getLastUpdateTime();
this.comment = olapTable.getComment();
this.baseIndexId = olapTable.getBaseIndexId();
this.colUniqueIdGenerator = olapTable.getColUniqueIdGenerator();
this.indexNameToId = Maps.newHashMap(olapTable.getIndexNameToId());
this.partitionColumnNames = olapTable.getPartitionColumnNames() !=
null
? List.copyOf(olapTable.getPartitionColumnNames()) : null;
this.defaultDistributionBucketNum =
olapTable.getDefaultDistributionBucketNum();
this.storageType = olapTable.getStorageType();
this.storageFormat = olapTable.getStorageFormat();
this.enablePersistentIndex = olapTable.enablePersistentIndex();
this.enableUniqueKeyMergeOnWrite =
olapTable.enableUniqueKeyMergeOnWrite();
this.enableSingleReplicaCompaction =
olapTable.enableSingleReplicaCompaction();
this.enablePersistentIndex = olapTable.enablePersistentIndex();
this.enablePersistentIndex = olapTable.enablePersistentIndex();
// Add more fields as necessary, depending on what needs to be
copied.
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java:
##########
@@ -115,28 +122,38 @@ private ImmutableMap<Long, Backend>
checkAndInitClusterBackends(
}
@Override
- public DistributedPlanWorker getWorker(long backendId) {
- ImmutableMap<Long, Backend> backends = this.allClusterBackends.get();
+ public void addBackends(long catalogId, ImmutableMap<Long, Backend>
backends) {
+ if (!currentClusterBackends.containsKey(catalogId)) {
+ currentClusterBackends.put(catalogId, backends);
+ }
+ if (!allClusterBackends.containsKey(catalogId)) {
+ allClusterBackends.put(catalogId, Suppliers.ofInstance(backends));
+ }
+ }
+
+ @Override
+ public DistributedPlanWorker getWorker(long catalogId, long backendId) {
+ ImmutableMap<Long, Backend> backends =
this.allClusterBackends.get(catalogId).get();
Backend backend = backends.get(backendId);
if (backend == null) {
throw new IllegalStateException("Backend " + backendId + " is not
exist");
}
- return new BackendWorker(backend);
+ return new BackendWorker(catalogId, backend);
}
@Override
- public DistributedPlanWorker getWorker(Backend backend) {
- return new BackendWorker(backend);
+ public DistributedPlanWorker getWorker(long catalogId, Backend backend) {
+ return new BackendWorker(catalogId, backend);
}
@Override
- public DistributedPlanWorker randomAvailableWorker() {
+ public DistributedPlanWorker randomAvailableWorker(long catalogId) {
try {
Reference<Long> selectedBackendId = new Reference<>();
- ImmutableMap<Long, Backend> backends = this.currentClusterBackends;
+ ImmutableMap<Long, Backend> backends =
this.currentClusterBackends.get(catalogId);
Review Comment:
Potential NullPointerException: If `catalogId` is not present in
`currentClusterBackends`, calling `get(catalogId)` will return null, and then
calling `SimpleScheduler.getHost()` with null will likely cause issues. Add a
null check or ensure the catalog exists before calling this method.
```suggestion
ImmutableMap<Long, Backend> backends =
this.currentClusterBackends.get(catalogId);
if (backends == null) {
throw new IllegalStateException("No backends found for
catalogId: " + catalogId);
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -470,7 +471,21 @@ private LogicalPlan getLogicalPlan(TableIf table,
UnboundRelation unboundRelatio
case MAX_COMPUTE_EXTERNAL_TABLE:
case TRINO_CONNECTOR_EXTERNAL_TABLE:
case LAKESOUl_EXTERNAL_TABLE:
+ return new
LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table,
+ qualifierWithoutTableName, ImmutableList.of(),
+ unboundRelation.getTableSample(),
+ unboundRelation.getTableSnapshot(),
+
Optional.ofNullable(unboundRelation.getScanParams()), Optional.empty());
case DORIS_EXTERNAL_TABLE:
+ ConnectContext ctx = cascadesContext.getConnectContext();
+ if (Config.isNotCloudMode() && ctx != null
+ &&
ctx.getSessionVariable().isQueryDorisExternalTableAsOlapTable()
+ // use isEnableNereidsDistributePlanner instead of
canUseNereidsDistributePlanner
+ // bacause it cannot work in explain command
Review Comment:
Typo in comment: "bacause" should be "because".
```suggestion
// because it cannot work in explain command
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java:
##########
@@ -149,17 +166,17 @@ public long randomAvailableWorker(Map<TNetworkAddress,
Long> addressToBackendID)
}
@Override
- public List<Backend> getAllBackends(boolean needAlive) {
+ public List<Backend> getAllBackends(long catalogId, boolean needAlive) {
List<Backend> backends = null;
if (needAlive) {
backends = Lists.newArrayList();
- for (Map.Entry<Long, Backend> entry :
this.allClusterBackends.get().entrySet()) {
+ for (Map.Entry<Long, Backend> entry :
this.allClusterBackends.get(catalogId).get().entrySet()) {
Review Comment:
Potential NullPointerException: Similar to line 136, if `catalogId` is not
present in `allClusterBackends`, this will throw a NullPointerException. Add
proper null checking or validation.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RemoteOlapTable extends OlapTable {
+ private static final Logger LOG =
LogManager.getLogger(RemoteOlapTable.class);
+
+ private RemoteDorisExternalCatalog catalog;
+ private RemoteDorisExternalDatabase database;
+
+ public RemoteDorisExternalCatalog getCatalog() {
+ return catalog;
+ }
+
+ public void setCatalog(RemoteDorisExternalCatalog catalog) {
+ this.catalog = catalog;
+ }
+
+ @Override
+ public RemoteDorisExternalDatabase getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(RemoteDorisExternalDatabase database) {
+ this.database = database;
+ }
+
+ public static RemoteOlapTable fromOlapTable(OlapTable olapTable) {
+ try {
+ RemoteOlapTable externalOlapTable = new RemoteOlapTable();
+ Class<?> currentClass = olapTable.getClass();
+ while (currentClass != null) {
+ for (Field field : currentClass.getDeclaredFields()) {
+ if (Modifier.isStatic(field.getModifiers())) {
+ continue;
+ }
+ field.setAccessible(true);
+ field.set(externalOlapTable, field.get(olapTable));
+ }
+ currentClass = currentClass.getSuperclass();
+ }
+ return externalOlapTable;
+ } catch (Exception e) {
+ throw new RuntimeException("failed to initial external olap
table", e);
+ }
+ }
+
+ public void rebuildPartitions(List<Partition> oldPartitions,
List<Partition> updatedPartitions,
+ List<Long> removedPartitions)
+ throws AnalysisException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("rebuildPartitions oldPartitions: " +
oldPartitions.size() + ", updatedPartitions: "
+ + updatedPartitions.size() + ", removedPartitions: " +
removedPartitions.size());
+ }
+ ConcurrentHashMap<Long, Partition> newIdToPartition = new
ConcurrentHashMap<>();
+ for (Partition oldPartition : oldPartitions) {
+ newIdToPartition.put(oldPartition.getId(), oldPartition);
+ }
+ for (Long removedPartition : removedPartitions) {
+ newIdToPartition.remove(removedPartition);
+ }
+ for (Partition updatedPartition : updatedPartitions) {
+ newIdToPartition.put(updatedPartition.getId(), updatedPartition);
+ }
+ Map<String, Partition> newNameToPartition = Maps.newTreeMap();
+ for (Partition partition : newIdToPartition.values()) {
+ newNameToPartition.put(partition.getName(), partition);
+ }
+ this.idToPartition = newIdToPartition;
+ this.nameToPartition = newNameToPartition;
+ }
+
+ public void invalidateBackendsIfNeed() {
+ ImmutableMap<Long, Backend> backends = getAllBackendsByAllCluster();
+ for (Partition partition : getPartitions()) {
+ for (Tablet tablet : partition.getBaseIndex().getTablets()) {
+ for (long backendId : tablet.getBackendIds()) {
+ if (!backends.containsKey(backendId)) {
+
Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr()
+ .invalidateBackendCache(catalog.getId());
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public long getCatalogId() {
+ return catalog.getId();
+ }
+
+ public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() {
Review Comment:
Potential NullPointerException: If `catalog` is null, calling
`catalog.getId()` at line 123 or 127 will throw a NullPointerException. Ensure
catalog is set before these methods are called, or add null checks with
appropriate error handling.
```suggestion
public long getCatalogId() {
if (catalog == null) {
throw new IllegalStateException("Catalog is not set in
RemoteOlapTable");
}
return catalog.getId();
}
public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() {
if (catalog == null) {
throw new IllegalStateException("Catalog is not set in
RemoteOlapTable");
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java:
##########
@@ -3698,4 +3699,44 @@ public Index getInvertedIndex(Column column,
List<String> subPath) {
.filter(Index::isAnalyzedInvertedIndex).findFirst().orElse(null);
}
}
+
+ /**
+ * caller should acquire the read lock and should not modify any field of
the return obj
+ */
+ public OlapTable copyTableMeta() {
+ OlapTable table = new OlapTable();
+ // metaobj
+ table.signature = signature;
+ table.lastCheckTime = lastCheckTime;
+ // abstract table
+ table.id = id;
+ table.name = name;
+ table.qualifiedDbName = qualifiedDbName;
+ table.type = type;
+ table.createTime = createTime;
+ // olap table
+ // NOTE: currently do not need temp partitions, colocateGroup,
autoIncrementGenerator
+ table.idToPartition = new ConcurrentHashMap<>();
+ table.tempPartitions = new TempPartitions();
+
+ table.state = state;
+ table.indexIdToMeta = indexIdToMeta;
+ table.indexNameToId = indexNameToId;
+ table.keysType = keysType;
+ table.partitionInfo = partitionInfo;
+ table.defaultDistributionInfo = defaultDistributionInfo;
+ table.bfColumns = bfColumns;
+ table.bfFpp = bfFpp;
+ table.indexes = indexes;
+ table.baseIndexId = baseIndexId;
+ return table;
Review Comment:
[nitpick] Incomplete shallow copy: The `copyTableMeta()` method performs a
shallow copy of reference fields like `indexIdToMeta`, `indexNameToId`,
`partitionInfo`, etc. (lines 3723-3731). This means modifications to these
shared objects in the copy will affect the original table. Since the JavaDoc
states "caller should not modify any field of the return obj", consider making
this explicit with defensive copies or using immutable wrappers to prevent
accidental modifications.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java:
##########
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TGetBackendMetaRequest;
+import org.apache.doris.thrift.TGetBackendMetaResult;
+import org.apache.doris.thrift.TGetOlapTableMetaRequest;
+import org.apache.doris.thrift.TGetOlapTableMetaResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPartitionMeta;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class FeServiceClient {
+ private static final Logger LOG =
LogManager.getLogger(FeServiceClient.class);
+
+ private final Random random = new Random(System.currentTimeMillis());
+ private final String name;
+ private final List<TNetworkAddress> addresses;
+ private volatile TNetworkAddress master;
+ private final String user;
+ private final String password;
+ private final int retryCount;
+ private final int timeout;
+
+ public FeServiceClient(String name, List<TNetworkAddress> addresses,
String user, String password,
+ int retryCount, int timeout) {
+ this.name = name;
+ this.addresses = addresses;
+ this.user = user;
+ this.password = password;
+ this.retryCount = retryCount;
+ this.timeout = timeout;
+ }
+
+ private List<TNetworkAddress> getAddresses() {
+ return addresses;
+ }
+
+ private FrontendService.Client getRemoteFeClient(TNetworkAddress address,
int timeout) {
+ try {
+ return ClientPool.frontendPool.borrowObject(address, timeout);
+ } catch (Exception e) {
+ String msg = String.format("failed to get remote doris:%s fe
connection", name);
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ private void returnClient(TNetworkAddress address, FrontendService.Client
client, boolean returnObj) {
+ if (returnObj) {
+ ClientPool.frontendPool.returnObject(address, client);
+ } else {
+ ClientPool.frontendPool.invalidateObject(address, client);
+ }
+ }
+
+ private <T> T randomCallWithRetry(ThriftCall<T> call, String errorMsg, int
timeout) {
+ List<TNetworkAddress> addresses = getAddresses();
+ int retries = 0;
+ Exception lastException = null;
+ while (retries < retryCount) {
+ int index = random.nextInt(addresses.size());
+ FrontendService.Client client = null;
+ for (int i = 0; i < addresses.size() && retries < retryCount; i++)
{
+ TNetworkAddress address = addresses.get((index + i) %
addresses.size());
+ client = getRemoteFeClient(address, timeout);
+ boolean returnObj = false;
+ try {
+ T result = call.call(client);
+ returnObj = true;
+ return result;
+ } catch (TException | IOException e) {
+ lastException = e;
+ retries++;
+ } catch (Exception e) {
+ throw new RuntimeException(errorMsg + ":" +
e.getMessage(), e);
+ } finally {
+ returnClient(address, client, returnObj);
+ }
+ }
+ }
+ throw new RuntimeException(errorMsg + ":" +
lastException.getMessage(), lastException);
+ }
+
+ private <T> T callFromMaster(ThriftCall<MasterResult<T>> call, String
errorMsg, int timeout) {
+ TNetworkAddress address = master;
+ FrontendService.Client client = null;
+ Exception lastException = null;
+ if (address != null) {
+ client = getRemoteFeClient(address, timeout);
+ boolean returnObj = false;
+ try {
+ MasterResult<T> ret = call.call(client);
+ returnObj = true;
+ if (ret.isMaster) {
+ if (ret.hasError) {
+ throw new RuntimeException(ret.errorMsg);
+ }
+ return ret.result;
+ }
+ } catch (TException | IOException e) {
+ lastException = e;
+ } catch (Exception e) {
+ throw new RuntimeException(errorMsg + ":" + e.getMessage(), e);
+ } finally {
+ returnClient(address, client, returnObj);
+ }
+ }
+ master = null;
+ List<TNetworkAddress> addresses = getAddresses();
+ int retries = 0;
+ while (retries < retryCount) {
+ int index = random.nextInt(addresses.size());
+ for (int i = 0; i < addresses.size() && retries < retryCount; i++)
{
+ address = addresses.get((index + i) % addresses.size());
+ client = getRemoteFeClient(address, timeout);
+ boolean returnObj = false;
+ try {
+ MasterResult<T> ret = call.call(client);
+ returnObj = true;
+ if (ret.isMaster) {
+ master = address;
+ if (ret.hasError) {
+ throw new RuntimeException(ret.errorMsg);
+ }
+ return ret.result;
+ }
+ } catch (TException | IOException e) {
+ lastException = e;
+ retries++;
+ } catch (Exception e) {
+ throw new RuntimeException(errorMsg + ":" +
e.getMessage(), e);
+ } finally {
+ returnClient(address, client, returnObj);
+ }
+ }
+ }
+ throw new RuntimeException(errorMsg + ":" +
lastException.getMessage(), lastException);
+ }
+
+ public List<Backend> listBackends() {
+ TGetBackendMetaRequest request = new TGetBackendMetaRequest();
+ request.setUser(user);
+ request.setPasswd(password);
+ String msg = String.format("failed to get backends from remote
doris:%s", name);
+ return callFromMaster(client -> {
+ TGetBackendMetaResult result = client.getBackendMeta(request);
+ if (result.getStatus().getStatusCode() == TStatusCode.NOT_MASTER) {
+ return MasterResult.notMaster();
+ }
+ if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+ return
MasterResult.masterWithError(result.getStatus().toString());
+ }
+ List<Backend> backends = result.getBackends().stream()
+ .map(b -> Backend.fromThrift(b))
+ .collect(Collectors.toList());
+ return MasterResult.withResult(backends);
+ }, msg, timeout);
+ }
+
+ public RemoteOlapTable getOlapTable(String dbName, String table, long
tableId, List<Partition> partitions) {
+ TGetOlapTableMetaRequest request = new TGetOlapTableMetaRequest();
+ request.setDb(dbName);
+ request.setTable(table);
+ request.setTableId(tableId);
+ request.setUser(user);
+ request.setPasswd(password);
+ request.setVersion(FeConstants.meta_version);
+ for (Partition partition : partitions) {
+ TPartitionMeta meta = new TPartitionMeta();
+ meta.setId(partition.getId());
+ meta.setVisibleVersion(partition.getVisibleVersion());
+ meta.setVisibleVersionTime(partition.getVisibleVersionTime());
+ request.addToPartitions(meta);
+ }
+ String msg = String.format("failed to get table meta from remote
doris:%s", name);
+ return randomCallWithRetry(client -> {
+ TGetOlapTableMetaResult result = client.getOlapTableMeta(request);
+ if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+ throw new UserException(result.getStatus().toString());
+ }
+ RemoteOlapTable remoteOlapTable = null;
+ try (DataInputStream in = new DataInputStream(new
ByteArrayInputStream(result.getTableMeta()))) {
+ OlapTable olapTable = OlapTable.read(in);
+ remoteOlapTable = RemoteOlapTable.fromOlapTable(olapTable);
+ }
+ List<Partition> updatedPartitions = new
ArrayList<>(result.getUpdatedPartitionsSize());
+ if (result.getUpdatedPartitionsSize() > 0) {
+ for (ByteBuffer buffer : result.getUpdatedPartitions()) {
+ try (ByteArrayInputStream in =
+ new ByteArrayInputStream(buffer.array(),
buffer.position(), buffer.remaining());
+ DataInputStream dataInputStream = new
DataInputStream(in)) {
+ String partitionStr = Text.readString(dataInputStream);
+ Partition partition =
GsonUtils.GSON.fromJson(partitionStr, Partition.class);
+ updatedPartitions.add(partition);
+ }
+ }
+ }
+ List<Long> removedPartitions = result.getRemovedPartitions();
+ if (removedPartitions == null) {
+ removedPartitions = new ArrayList<>();
+ }
+ remoteOlapTable.rebuildPartitions(partitions, updatedPartitions,
removedPartitions);
+ return remoteOlapTable;
+ }, msg, timeout);
+ }
+
+ private interface ThriftCall<T> {
+ public T call(FrontendService.Client client) throws Exception;
+ }
+
+ private static class MasterResult<T> {
+ boolean isMaster = true;
+ T result;
+ boolean hasError = false;
+ String errorMsg;
+
+ static <T> MasterResult<T> notMaster() {
+ MasterResult ret = new MasterResult();
+ ret.isMaster = false;
+ return ret;
+ }
+
+ static <T> MasterResult<T> withResult(T result) {
+ MasterResult ret = new MasterResult();
+ ret.isMaster = true;
+ ret.hasError = false;
+ ret.result = result;
+ return ret;
+ }
+
+ // is master but has error code
+ static <T> MasterResult<T> masterWithError(String errorMsg) {
+ MasterResult ret = new MasterResult();
+ ret.isMaster = true;
+ ret.hasError = true;
+ ret.errorMsg = errorMsg;
+ return ret;
+ }
Review Comment:
Missing generic type parameter: The `MasterResult` instantiation is missing
the type parameter. This causes a raw type warning and potential type safety
issues. Change `MasterResult ret = new MasterResult();` to `MasterResult<T> ret
= new MasterResult<>();` for all three factory methods.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java:
##########
@@ -119,6 +122,17 @@ public FragmentIdMapping<DistributedPlan> plan() {
}
}
+ private void addExternalBackends(BackendDistributedPlanWorkerManager
workerManager) throws AnalysisException {
+ for (PlanFragment planFragment : idToFragments.values()) {
+ List<OlapScanNode> scanNodes = planFragment.getPlanRoot()
+ .collectInCurrentFragment(OlapScanNode.class::isInstance);
+ for (OlapScanNode scanNode : scanNodes) {
+ workerManager.addBackends(scanNode.getCatalogId(),
+ scanNode.getOlapTable().getAllBackendsByAllCluster());
Review Comment:
[nitpick] Missing error handling: If
`scanNode.getOlapTable().getAllBackendsByAllCluster()` throws an
`AnalysisException` (as declared in the method signature at line 125), it will
propagate up. However, there's no specific handling or logging for this
exception within the method. Consider adding try-catch around line 130-131 to
provide more specific error messages about which scan node or table caused the
issue.
```suggestion
try {
workerManager.addBackends(scanNode.getCatalogId(),
scanNode.getOlapTable().getAllBackendsByAllCluster());
} catch (AnalysisException e) {
LOG.error("Failed to get all backends for scan node. "
+ "PlanFragmentId: {}, ScanNode: {}, Table: {}",
planFragment.getFragmentId(),
scanNode.getId(),
scanNode.getOlapTable() != null ?
scanNode.getOlapTable().getName() : "null",
e);
throw e;
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java:
##########
@@ -48,19 +49,28 @@ public void onCreateAssignedJob(AssignedJob assignedJob) {
BackendWorker worker = (BackendWorker) assignedJob.getAssignedWorker();
if (usedWorkers.add(worker)) {
Backend backend = worker.getBackend();
- usedWorkersAddressToBackendID.put(
+
usedWorkersAddressToBackendID.computeIfAbsent(worker.getCatalogId(), k ->
Maps.newLinkedHashMap());
+ usedWorkersAddressToBackendID.get(worker.getCatalogId()).put(
new TNetworkAddress(backend.getHost(),
backend.getBePort()), backend.getId()
);
}
}
/** tryToSelectRandomUsedWorker */
public DistributedPlanWorker tryToSelectRandomUsedWorker() {
+ long catalogId = Env.getCurrentInternalCatalog().getId();
if (usedWorkers.isEmpty()) {
- return workerManager.randomAvailableWorker();
+ return workerManager.randomAvailableWorker(catalogId);
} else {
- long id =
workerManager.randomAvailableWorker(usedWorkersAddressToBackendID);
- return workerManager.getWorker(id);
+ Map<TNetworkAddress, Long> backendIDs;
+ if (usedWorkersAddressToBackendID.containsKey(catalogId)) {
+ backendIDs = usedWorkersAddressToBackendID.get(catalogId);
+ } else {
+ catalogId = usedWorkers.iterator().next().getCatalogId();
+ backendIDs = usedWorkersAddressToBackendID.get(catalogId);
+ }
Review Comment:
Potential NullPointerException: On line 70,
`usedWorkersAddressToBackendID.get(catalogId)` could return null if the catalog
ID is not present in the map. This would cause a NullPointerException when
passed to `workerManager.randomAvailableWorker()` on line 72. Add a null check
or ensure the catalog exists in the map.
```suggestion
}
if (backendIDs == null || backendIDs.isEmpty()) {
// Fallback: select a random available worker for the
catalogId
return workerManager.randomAvailableWorker(catalogId);
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java:
##########
@@ -115,28 +122,38 @@ private ImmutableMap<Long, Backend>
checkAndInitClusterBackends(
}
@Override
- public DistributedPlanWorker getWorker(long backendId) {
- ImmutableMap<Long, Backend> backends = this.allClusterBackends.get();
+ public void addBackends(long catalogId, ImmutableMap<Long, Backend>
backends) {
+ if (!currentClusterBackends.containsKey(catalogId)) {
+ currentClusterBackends.put(catalogId, backends);
+ }
+ if (!allClusterBackends.containsKey(catalogId)) {
+ allClusterBackends.put(catalogId, Suppliers.ofInstance(backends));
+ }
+ }
+
+ @Override
+ public DistributedPlanWorker getWorker(long catalogId, long backendId) {
+ ImmutableMap<Long, Backend> backends =
this.allClusterBackends.get(catalogId).get();
Review Comment:
Potential NullPointerException: If `catalogId` is not present in
`allClusterBackends`, calling `get(catalogId)` will return null, and then
calling `get()` on the null Supplier will cause a NullPointerException. Add a
null check before accessing the backends.
```suggestion
Supplier<ImmutableMap<Long, Backend>> backendSupplier =
this.allClusterBackends.get(catalogId);
if (backendSupplier == null) {
throw new IllegalStateException("Catalog " + catalogId + " is
not present in allClusterBackends");
}
ImmutableMap<Long, Backend> backends = backendSupplier.get();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]