This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5cd3734089c branch-4.0: [Feature] doris cross-cluster query #57898
(#58571)
5cd3734089c is described below
commit 5cd3734089c9a693db9289a651b4c20a1ea84ad7
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 9 17:18:59 2025 +0800
branch-4.0: [Feature] doris cross-cluster query #57898 (#58571)
Cherry-picked from #57898
---------
Co-authored-by: HonestManXin <[email protected]>
Co-authored-by: morningman <[email protected]>
---
.../java/org/apache/doris/catalog/OlapTable.java | 49 ++-
.../java/org/apache/doris/catalog/TableIf.java | 2 +
.../doris/datasource/ExternalMetaCacheMgr.java | 9 +
.../doris/DorisExternalMetaCacheMgr.java | 83 +++++
.../doris/datasource/doris/FeServiceClient.java | 278 ++++++++++++++++
.../doris/RemoteDorisExternalCatalog.java | 37 ++-
.../datasource/doris/RemoteDorisExternalTable.java | 79 ++++-
.../doris/datasource/doris/RemoteOlapTable.java | 130 ++++++++
.../property/constants/RemoteDorisProperties.java | 4 +
.../doris/nereids/rules/analysis/BindRelation.java | 18 ++
.../LogicalOlapScanToPhysicalOlapScan.java | 3 +-
.../trees/plans/distribute/DistributePlanner.java | 14 +
.../trees/plans/distribute/SelectedWorkers.java | 20 +-
.../BackendDistributedPlanWorkerManager.java | 57 ++--
.../plans/distribute/worker/BackendWorker.java | 14 +-
.../distribute/worker/DistributedPlanWorker.java | 2 +
.../worker/DistributedPlanWorkerManager.java | 12 +-
.../trees/plans/distribute/worker/DummyWorker.java | 7 +
.../worker/LoadBalanceScanWorkerSelector.java | 11 +-
.../worker/job/AbstractUnassignedScanJob.java | 7 +-
.../distribute/worker/job/UnassignedAllBEJob.java | 22 +-
.../worker/job/UnassignedGroupCommitJob.java | 4 +-
.../job/UnassignedScanBucketOlapTableJob.java | 6 +-
.../org/apache/doris/planner/OlapScanNode.java | 13 +-
.../java/org/apache/doris/planner/ScanNode.java | 5 +
.../org/apache/doris/qe/AbstractJobProcessor.java | 6 +-
.../doris/qe/runtime/PipelineExecutionTask.java | 11 +-
.../qe/runtime/PipelineExecutionTaskBuilder.java | 6 +-
.../apache/doris/service/FrontendServiceImpl.java | 105 +++++++
.../main/java/org/apache/doris/system/Backend.java | 12 +
gensrc/thrift/FrontendService.thrift | 25 ++
...est_query_remote_doris_as_olap_table_select.out | 48 +++
.../pipeline/external/conf/regression-conf.groovy | 1 +
..._query_remote_doris_as_olap_table_select.groovy | 350 +++++++++++++++++++++
.../test_remote_doris_all_types_select.groovy | 8 +-
.../test_remote_doris_all_types_show.groovy | 8 +-
.../remote_doris/test_remote_doris_catalog.groovy | 8 +-
.../remote_doris/test_remote_doris_predict.groovy | 8 +-
.../remote_doris/test_remote_doris_refresh.groovy | 8 +-
.../test_remote_doris_statistics.groovy | 8 +-
.../test_remote_doris_table_stats.groovy | 8 +-
41 files changed, 1424 insertions(+), 82 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index f3fb4914ff5..33bbff84f01 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -95,6 +95,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
@@ -182,10 +183,10 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
private PartitionInfo partitionInfo;
@SerializedName(value = "itp", alternate = {"idToPartition"})
@Getter
- private ConcurrentHashMap<Long, Partition> idToPartition = new
ConcurrentHashMap<>();
+ protected ConcurrentHashMap<Long, Partition> idToPartition = new
ConcurrentHashMap<>();
// handled in postgsonprocess
@Getter
- private Map<String, Partition> nameToPartition = Maps.newTreeMap();
+ protected Map<String, Partition> nameToPartition = Maps.newTreeMap();
@SerializedName(value = "di", alternate = {"distributionInfo"})
private DistributionInfo defaultDistributionInfo;
@@ -3698,4 +3699,48 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
.filter(Index::isAnalyzedInvertedIndex).findFirst().orElse(null);
}
}
+
+ /**
+ * caller should acquire the read lock and should not modify any field of
the return obj
+ */
+ public OlapTable copyTableMeta() {
+ OlapTable table = new OlapTable();
+ // metaobj
+ table.signature = signature;
+ table.lastCheckTime = lastCheckTime;
+ // abstract table
+ table.id = id;
+ table.name = name;
+ table.qualifiedDbName = qualifiedDbName;
+ table.type = type;
+ table.createTime = createTime;
+ table.fullSchema = fullSchema;
+ table.comment = comment;
+ table.tableAttributes = tableAttributes;
+ // olap table
+ // NOTE: currently do not need temp partitions, colocateGroup,
autoIncrementGenerator
+ table.idToPartition = new ConcurrentHashMap<>();
+ table.tempPartitions = new TempPartitions();
+
+ table.state = state;
+ table.indexIdToMeta = ImmutableMap.copyOf(indexIdToMeta);
+ table.indexNameToId = ImmutableMap.copyOf(indexNameToId);
+ table.keysType = keysType;
+ table.partitionInfo = partitionInfo;
+ table.defaultDistributionInfo = defaultDistributionInfo;
+ table.bfColumns = bfColumns;
+ table.bfFpp = bfFpp;
+ table.indexes = indexes;
+ table.baseIndexId = baseIndexId;
+ table.tableProperty = tableProperty;
+ return table;
+ }
+
+ public long getCatalogId() {
+ return Env.getCurrentInternalCatalog().getId();
+ }
+
+ public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() throws
AnalysisException {
+ return Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 1dd34b43cc6..778137d1f73 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -437,6 +437,8 @@ public interface TableIf {
return "iceberg";
case DICTIONARY:
return "dictionary";
+ case DORIS_EXTERNAL_TABLE:
+ return "External_Doris";
default:
return null;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index e777285a07f..4932f3aa8f9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -22,6 +22,7 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.datasource.doris.DorisExternalMetaCacheMgr;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@@ -100,6 +101,7 @@ public class ExternalMetaCacheMgr {
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
+ private final DorisExternalMetaCacheMgr dorisExternalMetaCacheMgr;
public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
rowCountRefreshExecutor = newThreadPool(isCheckpointCatalog,
@@ -131,6 +133,7 @@ public class ExternalMetaCacheMgr {
icebergMetadataCacheMgr = new
IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
paimonMetadataCacheMgr = new
PaimonMetadataCacheMgr(commonRefreshExecutor);
+ dorisExternalMetaCacheMgr = new
DorisExternalMetaCacheMgr(commonRefreshExecutor);
}
private ExecutorService newThreadPool(boolean isCheckpointCatalog, int
numThread, int queueSize,
@@ -219,6 +222,10 @@ public class ExternalMetaCacheMgr {
return rowCountCache;
}
+ public DorisExternalMetaCacheMgr getDorisExternalMetaCacheMgr() {
+ return dorisExternalMetaCacheMgr;
+ }
+
public void removeCache(long catalogId) {
if (cacheMap.remove(catalogId) != null) {
LOG.info("remove hive metastore cache for catalog {}", catalogId);
@@ -232,6 +239,7 @@ public class ExternalMetaCacheMgr {
icebergMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
paimonMetadataCacheMgr.removeCache(catalogId);
+ dorisExternalMetaCacheMgr.removeCache(catalogId);
}
public void invalidateTableCache(ExternalTable dorisTable) {
@@ -288,6 +296,7 @@ public class ExternalMetaCacheMgr {
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
+ dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid catalog cache for {}", catalogId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/DorisExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/DorisExternalMetaCacheMgr.java
new file mode 100644
index 00000000000..70f92853ccc
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/DorisExternalMetaCacheMgr.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.CacheFactory;
+import org.apache.doris.common.Config;
+import org.apache.doris.system.Backend;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class DorisExternalMetaCacheMgr {
+ private static final Logger LOG =
LogManager.getLogger(DorisExternalMetaCacheMgr.class);
+ private final LoadingCache<Long, ImmutableMap<Long, Backend>>
backendsCache;
+
+ public DorisExternalMetaCacheMgr(ExecutorService executor) {
+ CacheFactory cacheFactory = new CacheFactory(
+
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+ OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
+ 20,
+ true,
+ null);
+ backendsCache = cacheFactory.buildCache(key -> loadBackends(key),
executor);
+ }
+
+ private ImmutableMap<Long, Backend> loadBackends(Long catalogId) {
+ RemoteDorisExternalCatalog catalog = (RemoteDorisExternalCatalog)
Env.getCurrentEnv().getCatalogMgr()
+ .getCatalog(catalogId);
+ List<Backend> backends = catalog.getFeServiceClient().listBackends();
+ if (LOG.isDebugEnabled()) {
+ List<String> names = backends.stream().map(b ->
b.getAddress()).collect(Collectors.toList());
+ LOG.debug("load backends:{} from:{}", String.join(",", names),
catalog.getName());
+ }
+ Map<Long, Backend> backendMap = Maps.newHashMap();
+ backends.forEach(backend -> backendMap.put(backend.getId(), backend));
+ return ImmutableMap.copyOf(backendMap);
+ }
+
+ public void removeCache(long catalogId) {
+ backendsCache.invalidate(catalogId);
+ }
+
+ public void invalidateBackendCache(long catalogId) {
+ backendsCache.invalidate(catalogId);
+ }
+
+ public void invalidateCatalogCache(long catalogId) {
+ invalidateBackendCache(catalogId);
+ }
+
+ public ImmutableMap<Long, Backend> getBackends(long catalogId) {
+ ImmutableMap<Long, Backend> backends = backendsCache.get(catalogId);
+ if (backends == null) {
+ return ImmutableMap.of();
+ }
+ return backends;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java
new file mode 100644
index 00000000000..56e28f605c6
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TGetBackendMetaRequest;
+import org.apache.doris.thrift.TGetBackendMetaResult;
+import org.apache.doris.thrift.TGetOlapTableMetaRequest;
+import org.apache.doris.thrift.TGetOlapTableMetaResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPartitionMeta;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class FeServiceClient {
+ private static final Logger LOG =
LogManager.getLogger(FeServiceClient.class);
+
+ private final Random random = new Random(System.currentTimeMillis());
+ private final String name;
+ private final List<TNetworkAddress> addresses;
+ private volatile TNetworkAddress master;
+ private final String user;
+ private final String password;
+ private final int retryCount;
+ private final int timeout;
+
+ public FeServiceClient(String name, List<TNetworkAddress> addresses,
String user, String password,
+ int retryCount, int timeout) {
+ this.name = name;
+ this.addresses = addresses;
+ this.user = user;
+ this.password = password;
+ this.retryCount = retryCount;
+ this.timeout = timeout;
+ }
+
+ private List<TNetworkAddress> getAddresses() {
+ return addresses;
+ }
+
+ private FrontendService.Client getRemoteFeClient(TNetworkAddress address,
int timeout) {
+ try {
+ return ClientPool.frontendPool.borrowObject(address, timeout);
+ } catch (Exception e) {
+ String msg = String.format("failed to get remote doris:%s fe
connection", name);
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ private void returnClient(TNetworkAddress address, FrontendService.Client
client, boolean returnObj) {
+ if (returnObj) {
+ ClientPool.frontendPool.returnObject(address, client);
+ } else {
+ ClientPool.frontendPool.invalidateObject(address, client);
+ }
+ }
+
+ private <T> T randomCallWithRetry(ThriftCall<T> call, String errorMsg, int
timeout) {
+ List<TNetworkAddress> addresses = getAddresses();
+ int retries = 0;
+ Exception lastException = null;
+ while (retries < retryCount) {
+ int index = random.nextInt(addresses.size());
+ FrontendService.Client client = null;
+ for (int i = 0; i < addresses.size() && retries < retryCount; i++)
{
+ TNetworkAddress address = addresses.get((index + i) %
addresses.size());
+ client = getRemoteFeClient(address, timeout);
+ boolean returnObj = false;
+ try {
+ T result = call.call(client);
+ returnObj = true;
+ return result;
+ } catch (TException | IOException e) {
+ lastException = e;
+ retries++;
+ } catch (Exception e) {
+ throw new RuntimeException(errorMsg + ":" +
e.getMessage(), e);
+ } finally {
+ returnClient(address, client, returnObj);
+ }
+ }
+ }
+ throw new RuntimeException(errorMsg + ":" +
lastException.getMessage(), lastException);
+ }
+
+ private <T> T callFromMaster(ThriftCall<MasterResult<T>> call, String
errorMsg, int timeout) {
+ TNetworkAddress address = master;
+ FrontendService.Client client = null;
+ Exception lastException = null;
+ if (address != null) {
+ client = getRemoteFeClient(address, timeout);
+ boolean returnObj = false;
+ try {
+ MasterResult<T> ret = call.call(client);
+ returnObj = true;
+ if (ret.isMaster) {
+ if (ret.hasError) {
+ throw new RuntimeException(ret.errorMsg);
+ }
+ return ret.result;
+ }
+ } catch (TException | IOException e) {
+ lastException = e;
+ } catch (Exception e) {
+ throw new RuntimeException(errorMsg + ":" + e.getMessage(), e);
+ } finally {
+ returnClient(address, client, returnObj);
+ }
+ }
+ master = null;
+ List<TNetworkAddress> addresses = getAddresses();
+ int retries = 0;
+ while (retries < retryCount) {
+ int index = random.nextInt(addresses.size());
+ for (int i = 0; i < addresses.size() && retries < retryCount; i++)
{
+ address = addresses.get((index + i) % addresses.size());
+ client = getRemoteFeClient(address, timeout);
+ boolean returnObj = false;
+ try {
+ MasterResult<T> ret = call.call(client);
+ returnObj = true;
+ if (ret.isMaster) {
+ master = address;
+ if (ret.hasError) {
+ throw new RuntimeException(ret.errorMsg);
+ }
+ return ret.result;
+ }
+ } catch (TException | IOException e) {
+ lastException = e;
+ retries++;
+ } catch (Exception e) {
+ throw new RuntimeException(errorMsg + ":" +
e.getMessage(), e);
+ } finally {
+ returnClient(address, client, returnObj);
+ }
+ }
+ }
+ throw new RuntimeException(errorMsg + ":" +
lastException.getMessage(), lastException);
+ }
+
+ public List<Backend> listBackends() {
+ TGetBackendMetaRequest request = new TGetBackendMetaRequest();
+ request.setUser(user);
+ request.setPasswd(password);
+ String msg = String.format("failed to get backends from remote
doris:%s", name);
+ return callFromMaster(client -> {
+ TGetBackendMetaResult result = client.getBackendMeta(request);
+ if (result.getStatus().getStatusCode() == TStatusCode.NOT_MASTER) {
+ return MasterResult.notMaster();
+ }
+ if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+ return
MasterResult.masterWithError(result.getStatus().toString());
+ }
+ List<Backend> backends = result.getBackends().stream()
+ .map(b -> Backend.fromThrift(b))
+ .collect(Collectors.toList());
+ return MasterResult.withResult(backends);
+ }, msg, timeout);
+ }
+
+ public RemoteOlapTable getOlapTable(String dbName, String table, long
tableId, List<Partition> partitions) {
+ TGetOlapTableMetaRequest request = new TGetOlapTableMetaRequest();
+ request.setDb(dbName);
+ request.setTable(table);
+ request.setTableId(tableId);
+ request.setUser(user);
+ request.setPasswd(password);
+ request.setVersion(FeConstants.meta_version);
+ for (Partition partition : partitions) {
+ TPartitionMeta meta = new TPartitionMeta();
+ meta.setId(partition.getId());
+ meta.setVisibleVersion(partition.getVisibleVersion());
+ meta.setVisibleVersionTime(partition.getVisibleVersionTime());
+ request.addToPartitions(meta);
+ }
+ String msg = String.format("failed to get table meta from remote
doris:%s", name);
+ return randomCallWithRetry(client -> {
+ TGetOlapTableMetaResult result = client.getOlapTableMeta(request);
+ if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+ throw new UserException(result.getStatus().toString());
+ }
+ RemoteOlapTable remoteOlapTable = null;
+ try (DataInputStream in = new DataInputStream(new
ByteArrayInputStream(result.getTableMeta()))) {
+ OlapTable olapTable = OlapTable.read(in);
+ remoteOlapTable = RemoteOlapTable.fromOlapTable(olapTable);
+ }
+ List<Partition> updatedPartitions = new
ArrayList<>(result.getUpdatedPartitionsSize());
+ if (result.getUpdatedPartitionsSize() > 0) {
+ for (ByteBuffer buffer : result.getUpdatedPartitions()) {
+ try (ByteArrayInputStream in =
+ new ByteArrayInputStream(buffer.array(),
buffer.position(), buffer.remaining());
+ DataInputStream dataInputStream = new
DataInputStream(in)) {
+ String partitionStr = Text.readString(dataInputStream);
+ Partition partition =
GsonUtils.GSON.fromJson(partitionStr, Partition.class);
+ updatedPartitions.add(partition);
+ }
+ }
+ }
+ List<Long> removedPartitions = result.getRemovedPartitions();
+ if (removedPartitions == null) {
+ removedPartitions = new ArrayList<>();
+ }
+ remoteOlapTable.rebuildPartitions(partitions, updatedPartitions,
removedPartitions);
+ return remoteOlapTable;
+ }, msg, timeout);
+ }
+
+ private interface ThriftCall<T> {
+ public T call(FrontendService.Client client) throws Exception;
+ }
+
+ private static class MasterResult<T> {
+ boolean isMaster = true;
+ T result;
+ boolean hasError = false;
+ String errorMsg;
+
+ static <T> MasterResult<T> notMaster() {
+ MasterResult<T> ret = new MasterResult();
+ ret.isMaster = false;
+ return ret;
+ }
+
+ static <T> MasterResult<T> withResult(T result) {
+ MasterResult<T> ret = new MasterResult();
+ ret.isMaster = true;
+ ret.hasError = false;
+ ret.result = result;
+ return ret;
+ }
+
+ // is master but has error code
+ static <T> MasterResult<T> masterWithError(String errorMsg) {
+ MasterResult<T> ret = new MasterResult();
+ ret.isMaster = true;
+ ret.hasError = true;
+ ret.errorMsg = errorMsg;
+ return ret;
+ }
+
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java
index b63a2a03b1f..76d42b70b42 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java
@@ -17,17 +17,20 @@
package org.apache.doris.datasource.doris;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.property.constants.RemoteDorisProperties;
+import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -36,11 +39,14 @@ public class RemoteDorisExternalCatalog extends
ExternalCatalog {
private static final Logger LOG =
LogManager.getLogger(RemoteDorisExternalCatalog.class);
private RemoteDorisRestClient dorisRestClient;
+ private FeServiceClient client;
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
+ RemoteDorisProperties.FE_THRIFT_HOSTS,
RemoteDorisProperties.FE_HTTP_HOSTS,
RemoteDorisProperties.FE_ARROW_HOSTS,
RemoteDorisProperties.USER,
- RemoteDorisProperties.PASSWORD
+ RemoteDorisProperties.PASSWORD,
+ RemoteDorisProperties.USE_ARROW_FLIGHT
);
/**
@@ -61,6 +67,11 @@ public class RemoteDorisExternalCatalog extends
ExternalCatalog {
throw new DdlException("Required property '" +
requiredProperty + "' is missing");
}
}
+ if (!useArrowFlight() && Config.isCloudMode()) {
+ // TODO we not validate it in cloud mode, so currently not support
it
+ throw new DdlException("Cloud mode is not supported when "
+ + RemoteDorisProperties.USE_ARROW_FLIGHT + " is false");
+ }
}
public List<String> getFeNodes() {
@@ -71,6 +82,19 @@ public class RemoteDorisExternalCatalog extends
ExternalCatalog {
return
parseArrowHosts(catalogProperty.getOrDefault(RemoteDorisProperties.FE_ARROW_HOSTS,
""));
}
+ public List<TNetworkAddress> getFeThriftNodes() {
+ String addresses =
catalogProperty.getOrDefault(RemoteDorisProperties.FE_THRIFT_HOSTS, "");
+ List<TNetworkAddress> tAddresses = new ArrayList<>();
+ for (String address : addresses.split(",")) {
+ int index = address.lastIndexOf(":");
+ String host = address.substring(0, index);
+ int port = Integer.parseInt(address.substring(index + 1));
+ TNetworkAddress thriftAddress = new TNetworkAddress(host, port);
+ tAddresses.add(thriftAddress);
+ }
+ return tAddresses;
+ }
+
public String getUsername() {
return catalogProperty.getOrDefault(RemoteDorisProperties.USER, "");
}
@@ -139,6 +163,11 @@ public class RemoteDorisExternalCatalog extends
ExternalCatalog {
"0"));
}
+ public boolean useArrowFlight() {
+ return
Boolean.parseBoolean(catalogProperty.getOrDefault(RemoteDorisProperties.USE_ARROW_FLIGHT,
+ "true"));
+ }
+
@Override
protected void initLocalObjectsImpl() {
if (isCompatible()) {
@@ -158,6 +187,8 @@ public class RemoteDorisExternalCatalog extends
ExternalCatalog {
throw new RuntimeException("Failed to connect to Doris cluster,"
+ " please check your Doris cluster or your Doris catalog
configuration.");
}
+ client = new FeServiceClient(name, getFeThriftNodes(), getUsername(),
getPassword(),
+ getMetadataSyncRetryCount(), getMetadataReadTimeoutSec());
}
protected List<String> listDatabaseNames() {
@@ -181,6 +212,10 @@ public class RemoteDorisExternalCatalog extends
ExternalCatalog {
return dorisRestClient;
}
+ public FeServiceClient getFeServiceClient() {
+ return client;
+ }
+
private List<String> parseHttpHosts(String hosts) {
String[] hostUrls = hosts.trim().split(",");
fillUrlsWithSchema(hostUrls, enableSsl());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java
index 4b6117aa313..c86d91af92b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java
@@ -18,9 +18,13 @@
package org.apache.doris.datasource.doris;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
@@ -28,6 +32,7 @@ import org.apache.doris.thrift.TRemoteDorisTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
+import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -36,9 +41,14 @@ import java.util.Optional;
public class RemoteDorisExternalTable extends ExternalTable {
private static final Logger LOG =
LogManager.getLogger(RemoteDorisExternalTable.class);
+ private volatile List<Partition> partitions = Lists.newArrayList();
+ private volatile long tableId = -1;
+ private volatile boolean isSyncOlapTable = false;
+ private volatile RemoteOlapTable remoteOlapTable = null;
+ private volatile Exception lastException = null;
public RemoteDorisExternalTable(long id, String name, String remoteName,
- RemoteDorisExternalCatalog catalog,
ExternalDatabase db) {
+ RemoteDorisExternalCatalog catalog, ExternalDatabase db) {
super(id, name, remoteName, catalog, db,
TableType.DORIS_EXTERNAL_TABLE);
}
@@ -50,6 +60,73 @@ public class RemoteDorisExternalTable extends ExternalTable {
}
}
+ private RemoteOlapTable getDorisOlapTable() {
+ if (!isSyncOlapTable) {
+ synchronized (this) {
+ if (!isSyncOlapTable) {
+ try {
+ isSyncOlapTable = true;
+ remoteOlapTable = null;
+ lastException = null; // clear previous exception
+
+ List<Partition> cachedPartitions =
Lists.newArrayList(partitions);
+ RemoteOlapTable olapTable =
((RemoteDorisExternalCatalog) catalog).getFeServiceClient()
+ .getOlapTable(dbName, remoteName, tableId,
cachedPartitions);
+ olapTable.setCatalog((RemoteDorisExternalCatalog)
catalog);
+ olapTable.setDatabase((RemoteDorisExternalDatabase)
db);
+
+ // Remove redundant nested synchronized block
+ tableId = olapTable.getId();
+ partitions =
Lists.newArrayList(olapTable.getPartitions());
+
+ olapTable.setId(id); // change id in case of possible
conflicts
+ olapTable.invalidateBackendsIfNeed();
+ remoteOlapTable = olapTable;
+ } catch (Exception e) {
+ // Save exception for waiting threads
+ lastException = e;
+ LOG.warn("Failed to get remote doris olap table:
{}.{}", dbName, remoteName, e);
+ throw e; // Re-throw the exception
+ } finally {
+ isSyncOlapTable = false;
+ this.notifyAll();
+ }
+ return remoteOlapTable;
+ }
+ }
+ }
+
+ synchronized (this) {
+ while (isSyncOlapTable) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ throw new AnalysisException("interrupted while getting
doris olap table", e);
+ }
+ }
+
+ // If there is a saved exception, throw it with more details
+ if (remoteOlapTable == null) {
+ if (lastException != null) {
+ throw new AnalysisException(
+ "failed to get remote doris olap table: " +
Util.getRootCauseMessage(lastException),
+ lastException);
+ }
+ throw new AnalysisException("failed to get remote doris olap
table");
+ }
+ return remoteOlapTable;
+ }
+ }
+
+ public OlapTable getOlapTable() {
+ makeSureInitialized();
+ return getDorisOlapTable();
+ }
+
+ public boolean useArrowFlight() {
+ return ((RemoteDorisExternalCatalog) catalog).useArrowFlight();
+ }
+
@Override
public TTableDescriptor toThrift() {
List<Column> schema = getFullSchema();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java
new file mode 100644
index 00000000000..80089b3a14c
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RemoteOlapTable extends OlapTable {
+ private static final Logger LOG =
LogManager.getLogger(RemoteOlapTable.class);
+
+ private RemoteDorisExternalCatalog catalog;
+ private RemoteDorisExternalDatabase database;
+
+ public RemoteDorisExternalCatalog getCatalog() {
+ return catalog;
+ }
+
+ public void setCatalog(RemoteDorisExternalCatalog catalog) {
+ this.catalog = catalog;
+ }
+
+ @Override
+ public RemoteDorisExternalDatabase getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(RemoteDorisExternalDatabase database) {
+ this.database = database;
+ }
+
+ public static RemoteOlapTable fromOlapTable(OlapTable olapTable) {
+ try {
+ RemoteOlapTable externalOlapTable = new RemoteOlapTable();
+ Class<?> currentClass = olapTable.getClass();
+ while (currentClass != null) {
+ for (Field field : currentClass.getDeclaredFields()) {
+ if (Modifier.isStatic(field.getModifiers())) {
+ continue;
+ }
+ field.setAccessible(true);
+ field.set(externalOlapTable, field.get(olapTable));
+ }
+ currentClass = currentClass.getSuperclass();
+ }
+ return externalOlapTable;
+ } catch (Exception e) {
+ throw new RuntimeException("failed to initial external olap
table", e);
+ }
+ }
+
+ public void rebuildPartitions(List<Partition> oldPartitions,
List<Partition> updatedPartitions,
+ List<Long> removedPartitions)
+ throws AnalysisException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("rebuildPartitions oldPartitions: " +
oldPartitions.size() + ", updatedPartitions: "
+ + updatedPartitions.size() + ", removedPartitions: " +
removedPartitions.size());
+ }
+ ConcurrentHashMap<Long, Partition> newIdToPartition = new
ConcurrentHashMap<>();
+ for (Partition oldPartition : oldPartitions) {
+ newIdToPartition.put(oldPartition.getId(), oldPartition);
+ }
+ for (Long removedPartition : removedPartitions) {
+ newIdToPartition.remove(removedPartition);
+ }
+ for (Partition updatedPartition : updatedPartitions) {
+ newIdToPartition.put(updatedPartition.getId(), updatedPartition);
+ }
+ Map<String, Partition> newNameToPartition = Maps.newTreeMap();
+ for (Partition partition : newIdToPartition.values()) {
+ newNameToPartition.put(partition.getName(), partition);
+ }
+ this.idToPartition = newIdToPartition;
+ this.nameToPartition = newNameToPartition;
+ }
+
+ public void invalidateBackendsIfNeed() {
+ ImmutableMap<Long, Backend> backends =
+
Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr().getBackends(catalog.getId());
+ for (Partition partition : getPartitions()) {
+ for (Tablet tablet : partition.getBaseIndex().getTablets()) {
+ for (long backendId : tablet.getBackendIds()) {
+ if (!backends.containsKey(backendId)) {
+
Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr()
+ .invalidateBackendCache(catalog.getId());
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public long getCatalogId() {
+ return catalog.getId();
+ }
+
+ public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() {
+ return
Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr().getBackends(catalog.getId());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java
index c6bceed94cb..54c5051b244 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java
@@ -18,6 +18,7 @@
package org.apache.doris.datasource.property.constants;
public class RemoteDorisProperties {
+ public static final String FE_THRIFT_HOSTS = "fe_thrift_hosts";
public static final String FE_HTTP_HOSTS = "fe_http_hosts";
public static final String FE_ARROW_HOSTS = "fe_arrow_hosts";
@@ -26,6 +27,9 @@ public class RemoteDorisProperties {
public static final String ENABLE_PARALLEL_RESULT_SINK =
"enable_parallel_result_sink";
+ // query remote doris use arrow flight or treat it as olap table
+ public static final String USE_ARROW_FLIGHT = "use_arrow_flight";
+
// Supports older versions of remote Doris; enabling this may introduce
some inaccuracies in schema parsing.
public static final String COMPATIBLE = "compatible";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 2dbfb0f3e42..73d32f3326b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.ExternalView;
+import org.apache.doris.datasource.doris.RemoteDorisExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
@@ -469,7 +470,24 @@ public class BindRelation extends OneAnalysisRuleFactory {
case MAX_COMPUTE_EXTERNAL_TABLE:
case TRINO_CONNECTOR_EXTERNAL_TABLE:
case LAKESOUl_EXTERNAL_TABLE:
+ return new
LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table,
+ qualifierWithoutTableName, ImmutableList.of(),
+ unboundRelation.getTableSample(),
+ unboundRelation.getTableSnapshot(),
+
Optional.ofNullable(unboundRelation.getScanParams()));
case DORIS_EXTERNAL_TABLE:
+ ConnectContext ctx = cascadesContext.getConnectContext();
+ RemoteDorisExternalTable externalTable =
(RemoteDorisExternalTable) table;
+ if (!externalTable.useArrowFlight()) {
+ if
(!ctx.getSessionVariable().isEnableNereidsDistributePlanner()) {
+ // use isEnableNereidsDistributePlanner instead of
canUseNereidsDistributePlanner
+ // because it cannot work in explain command
+ throw new AnalysisException("query remote doris
only support NereidsDistributePlanner"
+ + " when catalog use_arrow_flight is
false");
+ }
+ OlapTable olapTable = externalTable.getOlapTable();
+ return makeOlapScan(olapTable, unboundRelation,
qualifierWithoutTableName, cascadesContext);
+ }
return new
LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table,
qualifierWithoutTableName, ImmutableList.of(),
unboundRelation.getTableSample(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
index dd7f83ecd07..002956ee17f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
@@ -80,7 +80,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends
OneImplementationRuleFact
// rounded robin algorithm. Therefore, the hashDistributedSpec can be
broken except they are in
// the same stable colocateGroup(CG)
boolean isBelongStableCG =
colocateTableIndex.isColocateTable(olapTable.getId())
- &&
!colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId()));
+ &&
!colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId()))
+ && olapTable.getCatalogId() ==
Env.getCurrentInternalCatalog().getId();
boolean isSelectUnpartition = olapTable.getPartitionInfo().getType()
== PartitionType.UNPARTITIONED
|| olapScan.getSelectedPartitionIds().size() == 1;
// TODO: find a better way to handle both tablet num == 1 and colocate
table together in future
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
index f103c627143..20aa06b9fa0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.distribute;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.nereids.StatementContext;
@@ -39,6 +40,7 @@ import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.MultiCastPlanFragment;
+import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.qe.ConnectContext;
@@ -85,6 +87,7 @@ public class DistributePlanner {
try {
BackendDistributedPlanWorkerManager workerManager = new
BackendDistributedPlanWorkerManager(
statementContext.getConnectContext(),
notNeedBackend, isLoadJob);
+ addExternalBackends(workerManager);
LoadBalanceScanWorkerSelector workerSelector = new
LoadBalanceScanWorkerSelector(workerManager);
FragmentIdMapping<UnassignedJob> fragmentJobs
= UnassignedJobBuilder.buildJobs(workerSelector,
statementContext, idToFragments);
@@ -119,6 +122,17 @@ public class DistributePlanner {
}
}
+ private void addExternalBackends(BackendDistributedPlanWorkerManager
workerManager) throws AnalysisException {
+ for (PlanFragment planFragment : idToFragments.values()) {
+ List<OlapScanNode> scanNodes = planFragment.getPlanRoot()
+ .collectInCurrentFragment(OlapScanNode.class::isInstance);
+ for (OlapScanNode scanNode : scanNodes) {
+ workerManager.addBackends(scanNode.getCatalogId(),
+ scanNode.getOlapTable().getAllBackendsByAllCluster());
+ }
+ }
+ }
+
private FragmentIdMapping<DistributedPlan> buildDistributePlans(
Map<PlanFragmentId, UnassignedJob> idToUnassignedJobs,
ListMultimap<PlanFragmentId, AssignedJob> idToAssignedJobs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java
index f67cd86891d..52a31351c8c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.distribute;
+import org.apache.doris.catalog.Env;
import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
@@ -34,7 +35,7 @@ import java.util.Set;
/** SelectedWorkers */
public class SelectedWorkers {
private final DistributedPlanWorkerManager workerManager;
- private final Map<TNetworkAddress, Long> usedWorkersAddressToBackendID;
+ private final Map<Long, Map<TNetworkAddress, Long>>
usedWorkersAddressToBackendID;
private final Set<DistributedPlanWorker> usedWorkers;
public SelectedWorkers(DistributedPlanWorkerManager workerManager) {
@@ -48,7 +49,8 @@ public class SelectedWorkers {
BackendWorker worker = (BackendWorker) assignedJob.getAssignedWorker();
if (usedWorkers.add(worker)) {
Backend backend = worker.getBackend();
- usedWorkersAddressToBackendID.put(
+
usedWorkersAddressToBackendID.computeIfAbsent(worker.getCatalogId(), k ->
Maps.newLinkedHashMap());
+ usedWorkersAddressToBackendID.get(worker.getCatalogId()).put(
new TNetworkAddress(backend.getHost(),
backend.getBePort()), backend.getId()
);
}
@@ -56,11 +58,19 @@ public class SelectedWorkers {
/** tryToSelectRandomUsedWorker */
public DistributedPlanWorker tryToSelectRandomUsedWorker() {
+ long catalogId = Env.getCurrentInternalCatalog().getId();
if (usedWorkers.isEmpty()) {
- return workerManager.randomAvailableWorker();
+ return workerManager.randomAvailableWorker(catalogId);
} else {
- long id =
workerManager.randomAvailableWorker(usedWorkersAddressToBackendID);
- return workerManager.getWorker(id);
+ Map<TNetworkAddress, Long> backendIDs;
+ if (usedWorkersAddressToBackendID.containsKey(catalogId)) {
+ backendIDs = usedWorkersAddressToBackendID.get(catalogId);
+ } else {
+ catalogId = usedWorkers.iterator().next().getCatalogId();
+ backendIDs = usedWorkersAddressToBackendID.get(catalogId);
+ }
+ long id = workerManager.randomAvailableWorker(backendIDs);
+ return workerManager.getWorker(catalogId, id);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
index 5ca2c4bc6bf..e36165f10b4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java
@@ -53,19 +53,26 @@ public class BackendDistributedPlanWorkerManager implements
DistributedPlanWorke
DUMMY_BACKEND.setAlive(true);
}
- private final Supplier<ImmutableMap<Long, Backend>> allClusterBackends =
Suppliers.memoize(() -> {
- try {
- return Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
- } catch (Exception t) {
- throw new NereidsException("Can not get backends: " + t, t);
- }
- });
+ private final Map<Long, Supplier<ImmutableMap<Long, Backend>>>
allClusterBackends = Maps.newHashMap();
- private final ImmutableMap<Long, Backend> currentClusterBackends;
+ private final Map<Long, ImmutableMap<Long, Backend>>
currentClusterBackends;
+ /**
+ * Constructor
+ */
public BackendDistributedPlanWorkerManager(
ConnectContext context, boolean notNeedBackend, boolean isLoadJob)
throws UserException {
- this.currentClusterBackends = checkAndInitClusterBackends(context,
notNeedBackend, isLoadJob);
+ this.currentClusterBackends = Maps.newHashMap();
+ ImmutableMap<Long, Backend> internalBackends =
checkAndInitClusterBackends(context, notNeedBackend, isLoadJob);
+
this.currentClusterBackends.put(Env.getCurrentInternalCatalog().getId(),
internalBackends);
+ allClusterBackends.put(Env.getCurrentInternalCatalog().getId(),
+ Suppliers.memoize(() -> {
+ try {
+ return
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
+ } catch (Exception t) {
+ throw new NereidsException("Can not get backends: " +
t, t);
+ }
+ }));
}
private ImmutableMap<Long, Backend> checkAndInitClusterBackends(
@@ -115,28 +122,38 @@ public class BackendDistributedPlanWorkerManager
implements DistributedPlanWorke
}
@Override
- public DistributedPlanWorker getWorker(long backendId) {
- ImmutableMap<Long, Backend> backends = this.allClusterBackends.get();
+ public void addBackends(long catalogId, ImmutableMap<Long, Backend>
backends) {
+ if (!currentClusterBackends.containsKey(catalogId)) {
+ currentClusterBackends.put(catalogId, backends);
+ }
+ if (!allClusterBackends.containsKey(catalogId)) {
+ allClusterBackends.put(catalogId, Suppliers.ofInstance(backends));
+ }
+ }
+
+ @Override
+ public DistributedPlanWorker getWorker(long catalogId, long backendId) {
+ ImmutableMap<Long, Backend> backends =
this.allClusterBackends.get(catalogId).get();
Backend backend = backends.get(backendId);
if (backend == null) {
throw new IllegalStateException("Backend " + backendId + " is not
exist");
}
- return new BackendWorker(backend);
+ return new BackendWorker(catalogId, backend);
}
@Override
- public DistributedPlanWorker getWorker(Backend backend) {
- return new BackendWorker(backend);
+ public DistributedPlanWorker getWorker(long catalogId, Backend backend) {
+ return new BackendWorker(catalogId, backend);
}
@Override
- public DistributedPlanWorker randomAvailableWorker() {
+ public DistributedPlanWorker randomAvailableWorker(long catalogId) {
try {
Reference<Long> selectedBackendId = new Reference<>();
- ImmutableMap<Long, Backend> backends = this.currentClusterBackends;
+ ImmutableMap<Long, Backend> backends =
this.currentClusterBackends.get(catalogId);
SimpleScheduler.getHost(backends, selectedBackendId);
Backend selctedBackend = backends.get(selectedBackendId.getRef());
- return new BackendWorker(selctedBackend);
+ return new BackendWorker(catalogId, selctedBackend);
} catch (Exception t) {
throw new NereidsException("Can not get backends: " + t, t);
}
@@ -149,17 +166,17 @@ public class BackendDistributedPlanWorkerManager
implements DistributedPlanWorke
}
@Override
- public List<Backend> getAllBackends(boolean needAlive) {
+ public List<Backend> getAllBackends(long catalogId, boolean needAlive) {
List<Backend> backends = null;
if (needAlive) {
backends = Lists.newArrayList();
- for (Map.Entry<Long, Backend> entry :
this.allClusterBackends.get().entrySet()) {
+ for (Map.Entry<Long, Backend> entry :
this.allClusterBackends.get(catalogId).get().entrySet()) {
if (entry.getValue().isQueryAvailable()) {
backends.add(entry.getValue());
}
}
} else {
- backends =
Lists.newArrayList(this.allClusterBackends.get().values());
+ backends =
Lists.newArrayList(this.allClusterBackends.get(catalogId).get().values());
}
return backends;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
index e76934cf847..99f03be0e6a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
@@ -24,15 +24,22 @@ import java.util.Objects;
/** BackendWorker */
public class BackendWorker implements DistributedPlanWorker {
private final Backend backend;
+ private final long catalogId;
- public BackendWorker(Backend backend) {
+ public BackendWorker(long catalogId, Backend backend) {
this.backend = Objects.requireNonNull(backend, "backend can not be
null");
+ this.catalogId = catalogId;
}
public Backend getBackend() {
return backend;
}
+ @Override
+ public long getCatalogId() {
+ return catalogId;
+ }
+
@Override
public long id() {
return backend.getId();
@@ -70,7 +77,7 @@ public class BackendWorker implements DistributedPlanWorker {
@Override
public int hashCode() {
- return Objects.hash(backend.getId());
+ return Objects.hash(backend.getId(), catalogId);
}
@Override
@@ -78,7 +85,8 @@ public class BackendWorker implements DistributedPlanWorker {
if (!(obj instanceof BackendWorker)) {
return false;
}
- return backend.getId() == ((BackendWorker) obj).backend.getId();
+ return backend.getId() == ((BackendWorker) obj).backend.getId()
+ && getCatalogId() == ((BackendWorker) obj).getCatalogId();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java
index 79f8b482d88..8def27bd68e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java
@@ -21,6 +21,8 @@ package
org.apache.doris.nereids.trees.plans.distribute.worker;
* DistributedPlanWorker: a worker who can execute the assigned job(instance)
of the DistributedPlan
*/
public interface DistributedPlanWorker extends
Comparable<DistributedPlanWorker> {
+ long getCatalogId();
+
long id();
// ipv4/ipv6 address
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java
index 35aa1701a1c..1efc024d574 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java
@@ -20,18 +20,22 @@ package
org.apache.doris.nereids.trees.plans.distribute.worker;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
+import com.google.common.collect.ImmutableMap;
+
import java.util.List;
import java.util.Map;
/** DistributedPlanWorkerManager */
public interface DistributedPlanWorkerManager {
- DistributedPlanWorker getWorker(long backendId);
+ void addBackends(long catalogId, ImmutableMap<Long, Backend> backends);
+
+ DistributedPlanWorker getWorker(long catalogId, long backendId);
- DistributedPlanWorker getWorker(Backend backend);
+ DistributedPlanWorker getWorker(long catalogId, Backend backend);
- DistributedPlanWorker randomAvailableWorker();
+ DistributedPlanWorker randomAvailableWorker(long catalogId);
long randomAvailableWorker(Map<TNetworkAddress, Long> addressToBackendID);
- List<Backend> getAllBackends(boolean needAlive);
+ List<Backend> getAllBackends(long catalogId, boolean needAlive);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java
index 9a7d2f42476..09fbd40d8d7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java
@@ -17,12 +17,19 @@
package org.apache.doris.nereids.trees.plans.distribute.worker;
+import org.apache.doris.catalog.Env;
+
/** DummyWorker */
public class DummyWorker implements DistributedPlanWorker {
public static final DummyWorker INSTANCE = new DummyWorker();
private DummyWorker() {}
+ @Override
+ public long getCatalogId() {
+ return Env.getCurrentInternalCatalog().getId();
+ }
+
@Override
public long id() {
return 0;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java
index b479d24a0c9..4293408fa88 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java
@@ -100,7 +100,7 @@ public class LoadBalanceScanWorkerSelector implements
ScanWorkerSelector {
long bytes = getScanRangeSize(scanNode,
onePartitionOneScanRangeLocation);
WorkerScanRanges assigned = selectScanReplicaAndMinWorkloadWorker(
- onePartitionOneScanRangeLocation, bytes,
orderedScanRangeLocations);
+ onePartitionOneScanRangeLocation, bytes,
orderedScanRangeLocations, scanNode.getCatalogId());
UninstancedScanSource scanRanges =
workerScanRanges.computeIfAbsent(
assigned.worker,
w -> new UninstancedScanSource(
@@ -202,7 +202,8 @@ public class LoadBalanceScanWorkerSelector implements
ScanWorkerSelector {
WorkerScanRanges replicaAndWorker =
selectScanReplicaAndMinWorkloadWorker(
allPartitionTabletsInOneBucketInOneTable.get(0),
allScanNodeScanBytesInOneBucket,
- orderedScanRangeLocations
+ orderedScanRangeLocations,
+ scanNode.getCatalogId()
);
selectedWorker = replicaAndWorker.worker;
break;
@@ -240,11 +241,11 @@ public class LoadBalanceScanWorkerSelector implements
ScanWorkerSelector {
}
private WorkerScanRanges selectScanReplicaAndMinWorkloadWorker(
- TScanRangeLocations tabletLocation, long tabletBytes, boolean
orderedScanRangeLocations) {
+ TScanRangeLocations tabletLocation, long tabletBytes, boolean
orderedScanRangeLocations, long catalogId) {
List<TScanRangeLocation> replicaLocations =
tabletLocation.getLocations();
if (replicaLocations.size() == 1) {
TScanRangeLocation replicaLocation = replicaLocations.get(0);
- DistributedPlanWorker worker =
workerManager.getWorker(replicaLocation.getBackendId());
+ DistributedPlanWorker worker = workerManager.getWorker(catalogId,
replicaLocation.getBackendId());
ScanRanges scanRanges = new ScanRanges();
TScanRangeParams scanReplicaParams =
ScanWorkerSelector.buildScanReplicaParams(tabletLocation,
replicaLocation);
@@ -265,7 +266,7 @@ public class LoadBalanceScanWorkerSelector implements
ScanWorkerSelector {
for (int i = 0; i < replicaNum; i++) {
TScanRangeLocation replicaLocation = replicaLocations.get(i);
- DistributedPlanWorker worker =
workerManager.getWorker(replicaLocation.getBackendId());
+ DistributedPlanWorker worker = workerManager.getWorker(catalogId,
replicaLocation.getBackendId());
if (!worker.available()) {
continue;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
index 2657be975b2..167ea3dc334 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+import org.apache.doris.catalog.Env;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
@@ -188,10 +189,14 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
}
protected List<AssignedJob>
fillUpSingleEmptyInstance(DistributedPlanWorkerManager workerManager) {
+ long catalogId = Env.getCurrentInternalCatalog().getId();
+ if (scanNodes != null && scanNodes.size() > 0) {
+ catalogId = scanNodes.get(0).getCatalogId();
+ }
return ImmutableList.of(
assignWorkerAndDataSources(0,
statementContext.getConnectContext().nextInstanceId(),
- workerManager.randomAvailableWorker(),
+ workerManager.randomAvailableWorker(catalogId),
DefaultScanSource.empty())
);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
index 045b1af8a03..e8b30730103 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.mvcc.MvccUtil;
@@ -64,6 +65,10 @@ public class UnassignedAllBEJob extends
AbstractUnassignedJob {
DictionarySink sink = (DictionarySink) fragment.getSink();
// it may be ScanNode or optimized to EmptySetNode. use universay
function to get the deepest source.
PlanNode rootNode = fragment.getDeepestLinearSource();
+ long catalogId = Env.getCurrentInternalCatalog().getId();
+ if (rootNode instanceof OlapScanNode) {
+ catalogId = ((OlapScanNode) rootNode).getCatalogId();
+ }
List<Backend> bes;
if (sink.allowAdaptiveLoad() && rootNode instanceof OlapScanNode) {
Dictionary dictionary = sink.getDictionary();
@@ -75,21 +80,21 @@ public class UnassignedAllBEJob extends
AbstractUnassignedJob {
}
if (usingVersion > lastVersion) {
// load new data
- bes = computeFullLoad(workerManager, inputJobs);
+ bes = computeFullLoad(workerManager, inputJobs, catalogId);
} else {
// try to load only for the BEs which is outdated
- bes = computePartiallLoad(workerManager, inputJobs,
dictionary, sink);
+ bes = computePartiallLoad(workerManager, inputJobs,
dictionary, sink, catalogId);
statementContext.setPartialLoadDictionary(true);
}
} else {
// we explicitly request all BEs to load data. or ExternalTable.
(or EmptySetNode - should not happen)
- bes = computeFullLoad(workerManager, inputJobs);
+ bes = computeFullLoad(workerManager, inputJobs, catalogId);
}
List<AssignedJob> assignedJobs = Lists.newArrayList();
for (int i = 0; i < bes.size(); ++i) {
// every time one BE is selected
- DistributedPlanWorker worker = workerManager.getWorker(bes.get(i));
+ DistributedPlanWorker worker = workerManager.getWorker(catalogId,
bes.get(i));
if (worker != null) {
assignedJobs.add(assignWorkerAndDataSources(i,
connectContext.nextInstanceId(), worker,
new DefaultScanSource(ImmutableMap.of())));
@@ -122,7 +127,7 @@ public class UnassignedAllBEJob extends
AbstractUnassignedJob {
}
private List<Backend> computeFullLoad(DistributedPlanWorkerManager
workerManager,
- ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs, long catalogId)
{
// input jobs from upstream fragment - may have many instances.
ExchangeNode exchange = inputJobs.keySet().iterator().next(); //
random one - should be same for any exchange.
int expectInstanceNum = exchange.getNumInstances();
@@ -130,7 +135,7 @@ public class UnassignedAllBEJob extends
AbstractUnassignedJob {
// for Coordinator to know the right parallelism of DictionarySink
exchange.getFragment().setParallelExecNum(expectInstanceNum);
- List<Backend> bes = workerManager.getAllBackends(true);
+ List<Backend> bes = workerManager.getAllBackends(catalogId, true);
if (bes.size() != expectInstanceNum) {
// BE number changed when planning
throw new IllegalArgumentException("BE number should be " +
expectInstanceNum + ", but is " + bes.size());
@@ -139,10 +144,11 @@ public class UnassignedAllBEJob extends
AbstractUnassignedJob {
}
private List<Backend> computePartiallLoad(DistributedPlanWorkerManager
workerManager,
- ListMultimap<ExchangeNode, AssignedJob> inputJobs, Dictionary
dictionary, DictionarySink sink) {
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs, Dictionary
dictionary, DictionarySink sink,
+ long catalogId) {
// dictionary's src version(bundled with dictionary's version) is same
with usingVersion(otherwise FullLoad)
// so we can just use the src version to find the outdated backends
- List<Backend> outdateBEs =
dictionary.filterOutdatedBEs(workerManager.getAllBackends(true));
+ List<Backend> outdateBEs =
dictionary.filterOutdatedBEs(workerManager.getAllBackends(catalogId, true));
// reset all exchange node's instance number to the number of outdated
backends
PlanFragment fragment =
inputJobs.keySet().iterator().next().getFragment(); // random one exchange
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
index bcd7218f9f4..d4f32cce896 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+import org.apache.doris.catalog.Env;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
@@ -42,7 +43,8 @@ public class UnassignedGroupCommitJob extends
AbstractUnassignedJob {
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
TUniqueId instanceId =
statementContext.getConnectContext().nextInstanceId();
- BackendWorker selectBackend = new
BackendWorker(statementContext.getGroupCommitMergeBackend());
+ BackendWorker selectBackend = new
BackendWorker(Env.getCurrentEnv().getInternalCatalog().getId(),
+ statementContext.getGroupCommitMergeBackend());
return ImmutableList.of(
new StaticAssignedJob(
0, instanceId, this, selectBackend,
DefaultScanSource.empty()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index 472fafe4513..44731efd3c7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -455,7 +455,7 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
for (Integer bucketIndex : selectBucketIndexes) {
Long tabletIdInBucket = tabletIdsInOrder.get(bucketIndex);
Tablet tabletInBucket = partition.getTablet(tabletIdInBucket);
- List<DistributedPlanWorker> workers =
getWorkersByReplicas(tabletInBucket);
+ List<DistributedPlanWorker> workers =
getWorkersByReplicas(tabletInBucket, olapScanNode.getCatalogId());
if (workers.isEmpty()) {
throw new IllegalStateException("Can not found available
replica for bucket " + bucketIndex
+ ", table: " + olapScanNode);
@@ -466,12 +466,12 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
return fillUpWorkerToBuckets;
}
- private List<DistributedPlanWorker> getWorkersByReplicas(Tablet tablet) {
+ private List<DistributedPlanWorker> getWorkersByReplicas(Tablet tablet,
long catalogId) {
DistributedPlanWorkerManager workerManager =
scanWorkerSelector.getWorkerManager();
List<Replica> replicas = tablet.getReplicas();
List<DistributedPlanWorker> workers =
Lists.newArrayListWithCapacity(replicas.size());
for (Replica replica : replicas) {
- DistributedPlanWorker worker =
workerManager.getWorker(replica.getBackendIdWithoutException());
+ DistributedPlanWorker worker = workerManager.getWorker(catalogId,
replica.getBackendIdWithoutException());
if (worker.available()) {
workers.add(worker);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 2bc03096bbb..cbc7acf4fac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -33,7 +33,6 @@ import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DistributionInfo;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.MaterializedIndex;
@@ -497,7 +496,7 @@ public class OlapScanNode extends ScanNode {
boolean isInvalidComputeGroup =
ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup);
boolean isNotCloudComputeGroup = computeGroup != null &&
!Config.isCloudMode();
- ImmutableMap<Long, Backend> allBackends =
Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
+ ImmutableMap<Long, Backend> allBackends =
olapTable.getAllBackendsByAllCluster();
long partitionVisibleVersion = visibleVersion;
String partitionVisibleVersionStr = fastToString(visibleVersion);
for (Tablet tablet : tablets) {
@@ -887,7 +886,7 @@ public class OlapScanNode extends ScanNode {
Preconditions.checkState(scanBackendIds.isEmpty());
Preconditions.checkState(scanTabletIds.isEmpty());
Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
- for (Backend backend :
Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) {
+ for (Backend backend :
olapTable.getAllBackendsByAllCluster().values()) {
Set<Long> hashSet = Sets.newLinkedHashSet();
for (DiskInfo diskInfo : backend.getDisks().values()) {
if (diskInfo.isAlive()) {
@@ -1415,4 +1414,12 @@ public class OlapScanNode extends ScanNode {
public void setGlobalRowIdColumn(Column globalRowIdColumn) {
this.globalRowIdColumn = globalRowIdColumn;
}
+
+ @Override
+ public long getCatalogId() {
+ if (olapTable != null) {
+ return olapTable.getCatalogId();
+ }
+ return super.getCatalogId();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index be7f85875ee..bb7eca6accc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -36,6 +36,7 @@ import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
@@ -716,4 +717,8 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
public void setDesc(TupleDescriptor desc) {
this.desc = desc;
}
+
+ public long getCatalogId() {
+ return Env.getCurrentInternalCatalog().getId();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
index 3970acfd5fe..54e607dd27a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
import org.apache.doris.qe.runtime.BackendFragmentId;
import org.apache.doris.qe.runtime.MultiFragmentsPipelineTask;
import org.apache.doris.qe.runtime.PipelineExecutionTask;
@@ -122,8 +123,9 @@ public abstract class AbstractJobProcessor implements
JobProcessor {
PipelineExecutionTask executionTask) {
ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask>
backendFragmentTasks
= ImmutableMap.builder();
- for (Entry<Long, MultiFragmentsPipelineTask> backendTask :
executionTask.getChildrenTasks().entrySet()) {
- Long backendId = backendTask.getKey();
+ for (Entry<BackendWorker, MultiFragmentsPipelineTask> backendTask :
+ executionTask.getChildrenTasks().entrySet()) {
+ Long backendId = backendTask.getKey().id();
for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask :
backendTask.getValue()
.getChildrenTasks().entrySet()) {
Integer fragmentId = fragmentIdToTask.getKey();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
index 4d2da16ff26..6a52e3a6d9f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
import org.apache.doris.qe.CoordinatorContext;
import org.apache.doris.qe.SimpleScheduler;
@@ -56,7 +57,7 @@ import java.util.stream.Collectors;
*
* This class is used to describe which backend process which fragments
*/
-public class PipelineExecutionTask extends AbstractRuntimeTask<Long,
MultiFragmentsPipelineTask> {
+public class PipelineExecutionTask extends AbstractRuntimeTask<BackendWorker,
MultiFragmentsPipelineTask> {
private static final Logger LOG =
LogManager.getLogger(PipelineExecutionTask.class);
// immutable parameters
@@ -68,7 +69,7 @@ public class PipelineExecutionTask extends
AbstractRuntimeTask<Long, MultiFragme
public PipelineExecutionTask(
CoordinatorContext coordinatorContext,
BackendServiceProxy backendServiceProxy,
- Map<Long, MultiFragmentsPipelineTask> fragmentTasks) {
+ Map<BackendWorker, MultiFragmentsPipelineTask> fragmentTasks) {
// insert into stmt need latch to wait finish, but query stmt not need
because result receiver can wait finish
super(new ChildrenRuntimeTasks<>(fragmentTasks));
this.coordinatorContext = Objects.requireNonNull(coordinatorContext,
"coordinatorContext can not be null");
@@ -78,13 +79,13 @@ public class PipelineExecutionTask extends
AbstractRuntimeTask<Long, MultiFragme
// flatten to fragment tasks to quickly index by BackendFragmentId,
when receive the report message
ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask>
backendFragmentTasks
= ImmutableMap.builder();
- for (Entry<Long, MultiFragmentsPipelineTask> backendTask :
fragmentTasks.entrySet()) {
- Long backendId = backendTask.getKey();
+ for (Entry<BackendWorker, MultiFragmentsPipelineTask> backendTask :
fragmentTasks.entrySet()) {
+ BackendWorker worker = backendTask.getKey();
for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask :
backendTask.getValue()
.getChildrenTasks().entrySet()) {
Integer fragmentId = fragmentIdToTask.getKey();
SingleFragmentPipelineTask fragmentTask =
fragmentIdToTask.getValue();
- backendFragmentTasks.put(new BackendFragmentId(backendId,
fragmentId), fragmentTask);
+ backendFragmentTasks.put(new BackendFragmentId(worker.id(),
fragmentId), fragmentTask);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
index 0da6f4a5fe2..d9503e3145a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
@@ -65,13 +65,13 @@ public class PipelineExecutionTaskBuilder {
return pipelineExecutionTask;
}
- private Map<Long, MultiFragmentsPipelineTask> buildMultiFragmentTasks(
+ private Map<BackendWorker, MultiFragmentsPipelineTask>
buildMultiFragmentTasks(
CoordinatorContext coordinatorContext, BackendServiceProxy
backendServiceProxy,
Map<DistributedPlanWorker, TPipelineFragmentParamsList>
workerToFragmentsParam) {
Map<DistributedPlanWorker, ByteString> workerToSerializeFragments =
serializeFragments(workerToFragmentsParam);
- Map<Long, MultiFragmentsPipelineTask> fragmentTasks =
Maps.newLinkedHashMap();
+ Map<BackendWorker, MultiFragmentsPipelineTask> fragmentTasks =
Maps.newLinkedHashMap();
for (Entry<DistributedPlanWorker, TPipelineFragmentParamsList> kv :
workerToFragmentsParam.entrySet()) {
BackendWorker worker = (BackendWorker) kv.getKey();
@@ -80,7 +80,7 @@ public class PipelineExecutionTaskBuilder {
Backend backend = worker.getBackend();
fragmentTasks.put(
- worker.id(),
+ worker,
new MultiFragmentsPipelineTask(
coordinatorContext,
backend,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 17a47cabf09..7061942b384 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -64,6 +64,7 @@ import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
@@ -76,6 +77,7 @@ import org.apache.doris.common.ThriftServerContext;
import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
+import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.common.util.Util;
@@ -94,6 +96,7 @@ import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.master.MasterImpl;
+import org.apache.doris.meta.MetaContext;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanNodeAndHash;
@@ -193,6 +196,8 @@ import org.apache.doris.thrift.TGetMetaDB;
import org.apache.doris.thrift.TGetMetaRequest;
import org.apache.doris.thrift.TGetMetaResult;
import org.apache.doris.thrift.TGetMetaTable;
+import org.apache.doris.thrift.TGetOlapTableMetaRequest;
+import org.apache.doris.thrift.TGetOlapTableMetaResult;
import org.apache.doris.thrift.TGetQueryStatsRequest;
import org.apache.doris.thrift.TGetSnapshotRequest;
import org.apache.doris.thrift.TGetSnapshotResult;
@@ -228,6 +233,7 @@ import org.apache.doris.thrift.TNodeInfo;
import org.apache.doris.thrift.TNullableStringLiteral;
import org.apache.doris.thrift.TOlapTableIndexTablets;
import org.apache.doris.thrift.TOlapTablePartition;
+import org.apache.doris.thrift.TPartitionMeta;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TPlsqlPackageResult;
@@ -302,7 +308,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
@@ -4561,6 +4570,102 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
+ /**
+ * only copy basic meta about table
+ * do not copy the partitions
+ * @param request
+ * @return
+ * @throws TException
+ */
+ @Override
+ public TGetOlapTableMetaResult getOlapTableMeta(TGetOlapTableMetaRequest
request) throws TException {
+ String clientAddr = getClientAddrAsString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("receive getOlapTableMeta request: {}, client: {}",
request, clientAddr);
+ }
+ TGetOlapTableMetaResult result = new TGetOlapTableMetaResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+ try {
+ checkSingleTablePasswordAndPrivs(request.getUser(),
request.getPasswd(), request.getDb(),
+ request.getTable(), clientAddr, PrivPredicate.SELECT);
+ String dbName = request.getDb();
+ Database db =
Env.getCurrentInternalCatalog().getDbNullable(dbName);
+ if (db == null) {
+ throw new UserException("unknown database, database=" +
dbName);
+ }
+ OlapTable table = (OlapTable)
db.getTableNullable(request.getTable());
+ if (table == null) {
+ throw new UserException("unknown table, table=" +
request.getTable());
+ }
+ MetaContext metaContext = new MetaContext();
+ metaContext.setMetaVersion(FeConstants.meta_version);
+ metaContext.setThreadLocalInfo();
+ table.readLock();
+ try (ByteArrayOutputStream bOutputStream = new
ByteArrayOutputStream(8192)) {
+ OlapTable copyTable = table.copyTableMeta();
+ try (DataOutputStream out = new
DataOutputStream(bOutputStream)) {
+ copyTable.write(out);
+ out.flush();
+ result.setTableMeta(bOutputStream.toByteArray());
+ }
+ Set<Long> updatedPartitionIds =
Sets.newHashSet(table.getPartitionIds());
+ List<TPartitionMeta> partitionMetas =
request.getPartitionsSize() == 0 ? Lists.newArrayList()
+ : request.getPartitions();
+ for (TPartitionMeta partitionMeta : partitionMetas) {
+ if (request.getTableId() != table.getId()) {
+ result.addToRemovedPartitions(partitionMeta.getId());
+ continue;
+ }
+ Partition partition =
table.getPartition(partitionMeta.getId());
+ if (partition == null) {
+ result.addToRemovedPartitions(partitionMeta.getId());
+ continue;
+ }
+ if (partition.getVisibleVersion() ==
partitionMeta.getVisibleVersion()
+ && partition.getVisibleVersionTime() ==
partitionMeta.getVisibleVersionTime()) {
+ updatedPartitionIds.remove(partitionMeta.getId());
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("receive getOlapTableMeta db: {} table:{}
update partitions: {} removed partition:{}",
+ request.getDb(), request.getTable(),
updatedPartitionIds.size(),
+ result.getRemovedPartitionsSize());
+ }
+ for (Long partitionId : updatedPartitionIds) {
+ bOutputStream.reset();
+ Partition partition = table.getPartition(partitionId);
+ try (DataOutputStream out = new
DataOutputStream(bOutputStream)) {
+ Text.writeString(out,
GsonUtils.GSON.toJson(partition));
+ out.flush();
+
result.addToUpdatedPartitions(ByteBuffer.wrap(bOutputStream.toByteArray()));
+ }
+ }
+ return result;
+ } finally {
+ table.readUnlock();
+ MetaContext.remove();
+ }
+ } catch (AuthenticationException e) {
+ LOG.warn("failed to check user auth: {}", e);
+ status.setStatusCode(TStatusCode.NOT_AUTHORIZED);
+ status.addToErrorMsgs(e.getMessage());
+ return result;
+ } catch (UserException e) {
+ LOG.warn("failed to get table meta db:{} table:{} : {}",
+ request.getDb(), request.getTable(), e);
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+ return result;
+ } catch (Exception e) {
+ LOG.warn("unknown exception when get table meta db:{} table:{} :
{}",
+ request.getDb(), request.getTable(), e);
+ status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+ status.addToErrorMsgs(e.getMessage());
+ return result;
+ }
+ }
+
private TStatus checkMaster() {
TStatus status = new TStatus(TStatusCode.OK);
if (!Env.getCurrentEnv().isMaster()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 3f54db60a86..3eb001ba624 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -33,6 +33,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
+import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TDisk;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStorageMedium;
@@ -1125,5 +1126,16 @@ public class Backend implements Writable {
}
}
+ public static Backend fromThrift(TBackend backend) {
+ Backend result = new Backend();
+ result.id = backend.getId();
+ result.host = backend.getHost();
+ result.httpPort = backend.getHttpPort();
+ result.brpcPort = backend.getBrpcPort();
+ result.bePort = backend.getBePort();
+ result.setAlive(backend.isIsAlive());
+ return result;
+ }
+
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index f27dcd44b4e..9afc7570225 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1650,6 +1650,29 @@ struct TGetTableTDEInfoResult {
2: optional AgentService.TEncryptionAlgorithm algorithm
}
+struct TPartitionMeta {
+ 1: optional i64 id
+ 2: optional i64 visible_version
+ 3: optional i64 visible_version_time
+}
+
+struct TGetOlapTableMetaRequest {
+ 1: required string user
+ 2: required string passwd
+ 3: required string db
+ 4: required string table
+ 5: required i64 table_id
+ 6: optional i32 version // todo serialize according to the version
+ 7: optional list<TPartitionMeta> partitions // client owned partition meta
+}
+
+struct TGetOlapTableMetaResult {
+ 1: required Status.TStatus status
+ 2: required binary table_meta
+ 3: optional list<binary> updated_partitions
+ 4: optional list<i64> removed_partitions
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1754,4 +1777,6 @@ service FrontendService {
TGetEncryptionKeysResult getEncryptionKeys(1: TGetEncryptionKeysRequest
request)
TGetTableTDEInfoResult getTableTDEInfo(1: TGetTableTDEInfoRequest request)
+
+ TGetOlapTableMetaResult getOlapTableMeta(1: TGetOlapTableMetaRequest
request)
}
diff --git
a/regression-test/data/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.out
b/regression-test/data/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.out
new file mode 100644
index 00000000000..1bb50b2cc13
--- /dev/null
+++
b/regression-test/data/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.out
@@ -0,0 +1,48 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+2025-05-18T01:00 true -128 -32768 -2147483648
-9223372036854775808 -1234567890123456790 -123.456 -123456.789
-123457 -123456789012346 -1234567890123456789012345678 1970-01-01
0000-01-01T00:00 A Hello Hello, Doris! ["apple", "banana",
"orange"] {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"}
+2025-05-18T02:00 \N \N \N \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N
+2025-05-18T03:00 false 127 32767 2147483647
9223372036854775807 1234567890123456789 123.456 123456.789 123457
123456789012346 1234567890123456789012345678 9999-12-31
9999-12-31T23:59:59 [] {} {"f1":11,
"f2":3.14, "f3":"Emily"}
+2025-05-18T04:00 true 0 0 0 0 0 0.0
0 0 0 0 2023-10-01 2023-10-01T12:34:56 A
Hello Hello, Doris! ["apple", "banana", "orange"] {"Emily":101, "age":25}
{"f1":11, "f2":3.14, "f3":"Emily"}
+
+-- !sql --
+2025-05-18T01:00 [1] [-128] [-32768] [-2147483648]
[-9223372036854775808] [-1234567890123456790] [-123.456] [-123456.789]
[-123457] [-123456789012346] [-1234567890123456789012345678]
["0000-01-01"] ["0000-01-01 00:00:00"] ["A"] ["Hello"] ["Hello,
Doris!"]
+2025-05-18T02:00 [null] [null] [null] [null] [null] [null] [null]
[null] [null] [null] [null] [null] [null] [null] [null] [null]
+2025-05-18T03:00 [0] [127] [32767] [2147483647]
[9223372036854775807] [1234567890123456789] [123.456] [123456.789]
[123457] [123456789012346] [1234567890123456789012345678]
["9999-12-31"] ["9999-12-31 23:59:59"] [""] [""] [""]
+2025-05-18T04:00 [1] [0] [0] [0] [0] [0] [0]
[0] [0] [0] [0] ["2023-10-01"] ["2023-10-01 12:34:56"] ["A"]
["Hello"] ["Hello, Doris!"]
+
+-- !sql --
+2025-05-18T01:00 2025-05-18T01:00 2025-05-18T01:00:00.100
2025-05-18T01:00:00.110 2025-05-18T01:00:00.111 2025-05-18T01:00:00.111100
2025-05-18T01:00:00.111110 2025-05-18T01:00:00.111111
+
+-- !query_after_insert --
+2025-05-18T01:00 2025-05-18T01:00 2025-05-18T01:00:00.100
2025-05-18T01:00:00.110 2025-05-18T01:00:00.111 2025-05-18T01:00:00.111100
2025-05-18T01:00:00.111110 2025-05-18T01:00:00.111111
+2025-05-19T01:00 2025-05-19T01:00 2025-05-19T01:00:00.100
2025-05-19T01:00:00.110 2025-05-19T01:00:00.111 2025-05-19T01:00:00.111100
2025-05-19T01:00:00.111110 2025-05-19T01:00:00.111111
+
+-- !after_insert_cmd --
+2025-05-18T01:00 true -128 -32768 -2147483648
-9223372036854775808 -1234567890123456790 -123.456 -123456.789
-123457 -123456789012346 -1234567890123456789012345678 1970-01-01
0000-01-01T00:00 A Hello Hello, Doris! ["apple", "banana",
"orange"] {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"}
+2025-05-18T02:00 \N \N \N \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N
+2025-05-18T03:00 false 127 32767 2147483647
9223372036854775807 1234567890123456789 123.456 123456.789 123457
123456789012346 1234567890123456789012345678 9999-12-31
9999-12-31T23:59:59 [] {} {"f1":11,
"f2":3.14, "f3":"Emily"}
+2025-05-18T04:00 true 0 0 0 0 0 0.0
0 0 0 0 2023-10-01 2023-10-01T12:34:56 A
Hello Hello, Doris! ["apple", "banana", "orange"] {"Emily":101, "age":25}
{"f1":11, "f2":3.14, "f3":"Emily"}
+
+-- !after_insert_overwrite_cmd --
+2025-05-18T01:00 true -128 -32768 -2147483648
-9223372036854775808 -1234567890123456790 -123.456 -123456.789
-123457 -123456789012346 -1234567890123456789012345678 1970-01-01
0000-01-01T00:00 A Hello Hello, Doris! ["apple", "banana",
"orange"] {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"}
+2025-05-18T02:00 \N \N \N \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N
+2025-05-18T03:00 false 127 32767 2147483647
9223372036854775807 1234567890123456789 123.456 123456.789 123457
123456789012346 1234567890123456789012345678 9999-12-31
9999-12-31T23:59:59 [] {} {"f1":11,
"f2":3.14, "f3":"Emily"}
+2025-05-18T04:00 true 0 0 0 0 0 0.0
0 0 0 0 2023-10-01 2023-10-01T12:34:56 A
Hello Hello, Doris! ["apple", "banana", "orange"] {"Emily":101, "age":25}
{"f1":11, "f2":3.14, "f3":"Emily"}
+
+-- !join --
+1 reason1 2023-01-01 1 100 error1 1000 2023-01-01T00:00
+2 reason2 2023-01-01 2 200 error2 2000 2023-01-01T00:00
+3 reason3 2023-01-01 3 300 error3 3000 2023-01-01T00:00
+
+-- !join_predicate --
+2 reason2 2023-01-01 2 200 error2 2000 2023-01-01T00:00
+
+-- !join_partition --
+1 2023-01-01 reason1 2023-01-01 1 100 error1 1000
2023-01-01T00:00
+2 2023-01-02 reason2 2023-01-02 2 200 error2 2000
2023-01-02T00:00
+3 2023-01-03 reason3 2023-01-03 3 300 error3 3000
2023-01-03T00:00
+
+-- !join_partition_predicate --
+2 2023-01-02 reason2 2023-01-02 2 200 error2 2000
2023-01-02T00:00
+
diff --git a/regression-test/pipeline/external/conf/regression-conf.groovy
b/regression-test/pipeline/external/conf/regression-conf.groovy
index 0d3e94ace11..75299d79082 100644
--- a/regression-test/pipeline/external/conf/regression-conf.groovy
+++ b/regression-test/pipeline/external/conf/regression-conf.groovy
@@ -31,6 +31,7 @@ extArrowFlightSqlPort = 8081
extArrowFlightSqlUser = "root"
extArrowFlightSqlPassword= ""
extArrowFlightHttpPort= 8131
+extFeThriftPort = 9020
ccrDownstreamUrl =
"jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true&allowLoadLocalInfile=true"
ccrDownstreamUser = "root"
diff --git
a/regression-test/suites/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.groovy
b/regression-test/suites/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.groovy
new file mode 100644
index 00000000000..50cb13deca8
--- /dev/null
+++
b/regression-test/suites/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.groovy
@@ -0,0 +1,350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_query_remote_doris_as_olap_table_select",
"p0,external,doris,external_docker,external_docker_doris") {
+ String remote_doris_host =
context.config.otherConfigs.get("extArrowFlightSqlHost")
+ String remote_doris_arrow_port =
context.config.otherConfigs.get("extArrowFlightSqlPort")
+ String remote_doris_http_port =
context.config.otherConfigs.get("extArrowFlightHttpPort")
+ String remote_doris_user =
context.config.otherConfigs.get("extArrowFlightSqlUser")
+ String remote_doris_psw =
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+ String remote_doris_thrift_port =
context.config.otherConfigs.get("extFeThriftPort")
+
+ def showres = sql "show frontends";
+ remote_doris_arrow_port = showres[0][6]
+ remote_doris_http_port = showres[0][3]
+ remote_doris_thrift_port = showres[0][5]
+ log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift:
${remote_doris_thrift_port}")
+
+ def showres2 = sql "show backends";
+ log.info("show backends log = ${showres2}")
+
+ def db_name = "test_query_remote_doris_as_olap_table_select_db"
+ def catalog_name = "test_query_remote_doris_as_olap_table_select_catalog"
+
+ sql """DROP DATABASE IF EXISTS ${db_name}"""
+
+ sql """CREATE DATABASE IF NOT EXISTS ${db_name}"""
+
+ sql """
+ CREATE TABLE `${db_name}`.`test_remote_doris_all_types_select_t` (
+ `id` datetime(3) NOT NULL,
+ `c_boolean` boolean NULL,
+ `c_tinyint` tinyint NULL,
+ `c_smallint` smallint NULL,
+ `c_int` int NULL,
+ `c_bigint` bigint NULL,
+ `c_largeint` largeint NULL,
+ `c_float` float NULL,
+ `c_double` double NULL,
+ `c_decimal9` decimal(9,0) NULL,
+ `c_decimal18` decimal(18,0) NULL,
+ `c_decimal32` decimal(32,0) NULL,
+ `c_date` date NULL,
+ `c_datetime` datetime NULL,
+ `c_char` char(1) NULL,
+ `c_varchar` varchar(65533) NULL,
+ `c_string` text NULL,
+ `c_array_s` array<text> NULL,
+ `c_map` MAP<STRING, INT> NULL,
+ `c_struct` STRUCT<f1:INT,f2:FLOAT,f3:STRING> NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t`
values('2025-05-18 01:00:00.000', true, -128, -32768, -2147483648,
-9223372036854775808, -1234567890123456790, -123.456, -123456.789, -123457,
-123456789012346, -1234567890123456789012345678, '1970-01-01', '0000-01-01
00:00:00', 'A', 'Hello', 'Hello, Doris!', '["apple", "banana", "orange"]',
{"Emily":101,"age":25} , {11, 3.14, "Emily"})
+ """
+ sql """
+ INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t`
values('2025-05-18 02:00:00.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
+ """
+ sql """
+ INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t`
values('2025-05-18 03:00:00.000', false, 127, 32767, 2147483647,
9223372036854775807, 1234567890123456789, 123.456, 123456.789, 123457,
123456789012346, 1234567890123456789012345678, '9999-12-31', '9999-12-31
23:59:59', '', '', '', [], {}, {11, 3.14, "Emily"})
+ """
+ sql """
+ INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t`
values('2025-05-18 04:00:00.000', true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
'2023-10-01', '2023-10-01 12:34:56', 'A', 'Hello', 'Hello, Doris!', '["apple",
"banana", "orange"]', {"Emily":101,"age":25} , {11, 3.14, "Emily"});
+ """
+
+ sql """
+ CREATE TABLE `${db_name}`.`test_remote_doris_all_types_select_t2` (
+ `id` datetime(3) NOT NULL,
+ `a_boolean` array<boolean> NULL,
+ `a_tinyint` array<tinyint> NULL,
+ `a_smallint` array<smallint> NULL,
+ `a_int` array<int> NULL,
+ `a_bigint` array<bigint> NULL,
+ `a_largeint` array<largeint> NULL,
+ `a_float` array<float> NULL,
+ `a_double` array<double> NULL,
+ `a_decimal9` array<decimal(9,0)> NULL,
+ `a_decimal18` array<decimal(18,0)> NULL,
+ `a_decimal32` array<decimal(32,0)> NULL,
+ `a_date` array<date> NULL,
+ `a_datetime` array<datetime> NULL,
+ `a_char` array<char(1)> NULL,
+ `a_varchar` array<varchar(65533)> NULL,
+ `a_string` array<text> NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2`
values('2025-05-18 01:00:00.000', [true], [-128], [-32768], [-2147483648],
[-9223372036854775808], [-1234567890123456790], [-123.456], [-123456.789],
[-123457], [-123456789012346], [-1234567890123456789012345678], ['0000-01-01'],
['0000-01-01 00:00:00'], ['A'], ['Hello'], ['Hello, Doris!'])
+ """
+ sql """
+ INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2`
values('2025-05-18 02:00:00.000', [NULL], [NULL], [NULL], [NULL], [NULL],
[NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL],
[NULL])
+ """
+ sql """
+ INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2`
values('2025-05-18 03:00:00.000', [false], [127], [32767], [2147483647],
[9223372036854775807], [1234567890123456789], [123.456], [123456.789],
[123457], [123456789012346], [1234567890123456789012345678], ['9999-12-31'],
['9999-12-31 23:59:59'], [''], [''], [''])
+ """
+ sql """
+ INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2`
values('2025-05-18 04:00:00.000', [true], [0], [0], [0], [0], [0], [0], [0],
[0], [0], [0], ['2023-10-01'], ['2023-10-01 12:34:56'], ['A'], ['Hello'],
['Hello, Doris!']);
+ """
+
+ sql """
+ CREATE TABLE `${db_name}`.`test_remote_doris_all_types_select_t3` (
+ `id` datetime NOT NULL,
+ `datetime_0` datetime(0) NULL,
+ `datetime_1` datetime(1) NULL,
+ `datetime_3` datetime(2) NULL,
+ `datetime_4` datetime(3) NULL,
+ `datetime_5` datetime(4) NULL,
+ `datetime_6` datetime(5) NULL,
+ `datetime_7` datetime(6) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t3`
values('2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18
01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111',
'2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18
01:00:00.111111');
+ """
+
+
+ sql """
+ DROP CATALOG IF EXISTS `${catalog_name}`
+ """
+
+
+ sql """
+ CREATE CATALOG `${catalog_name}` PROPERTIES (
+ 'type' = 'doris',
+ 'fe_http_hosts' =
'http://${remote_doris_host}:${remote_doris_http_port}',
+ 'fe_arrow_hosts' =
'${remote_doris_host}:${remote_doris_arrow_port}',
+ 'fe_thrift_hosts' =
'${remote_doris_host}:${remote_doris_thrift_port}',
+ 'user' = '${remote_doris_user}',
+ 'password' = '${remote_doris_psw}',
+ 'use_arrow_flight' = 'false'
+ );
+ """
+
+ qt_sql """
+ select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from
`${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t` order by
id
+ """
+
+ qt_sql """
+ select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from
`${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t2` order by
id
+ """
+
+ qt_sql """
+ select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from
`${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t3` order by
id
+ """
+
+ // test select after insert also get correct data, it seems we can newly
partition version
+ sql """
+ INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t3`
values('2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19
01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111',
'2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19
01:00:00.111111');
+ """
+ qt_query_after_insert """
+ select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from
`${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t3` order by
id
+ """
+
+ // test insert command
+ sql """
+ CREATE TABLE `${db_name}`.`test_remote_doris_all_types_insert` (
+ `id` datetime(3) NOT NULL,
+ `c_boolean` boolean NULL,
+ `c_tinyint` tinyint NULL,
+ `c_smallint` smallint NULL,
+ `c_int` int NULL,
+ `c_bigint` bigint NULL,
+ `c_largeint` largeint NULL,
+ `c_float` float NULL,
+ `c_double` double NULL,
+ `c_decimal9` decimal(9,0) NULL,
+ `c_decimal18` decimal(18,0) NULL,
+ `c_decimal32` decimal(32,0) NULL,
+ `c_date` date NULL,
+ `c_datetime` datetime NULL,
+ `c_char` char(1) NULL,
+ `c_varchar` varchar(65533) NULL,
+ `c_string` text NULL,
+ `c_array_s` array<text> NULL,
+ `c_map` MAP<STRING, INT> NULL,
+ `c_struct` STRUCT<f1:INT,f2:FLOAT,f3:STRING> NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ // test insert into
+ explain {
+ sql("insert into `${db_name}`.`test_remote_doris_all_types_insert`
select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from
`${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t`")
+ contains("VOlapScanNode")
+ }
+ sql """
+ insert into `${db_name}`.`test_remote_doris_all_types_insert` select
/*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ *
from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t`
+ """
+ qt_after_insert_cmd """
+ select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from
`${db_name}`.`test_remote_doris_all_types_insert` order by id
+ """
+ // test insert overwrite
+ explain {
+ sql("insert OVERWRITE table
`${db_name}`.`test_remote_doris_all_types_insert` select /*+
SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ *
from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t`")
+ contains("VOlapScanNode")
+ }
+ sql """
+ insert OVERWRITE table
`${db_name}`.`test_remote_doris_all_types_insert` select /*+
SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ *
from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t`
+ """
+ qt_after_insert_overwrite_cmd """
+ select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from
`${db_name}`.`test_remote_doris_all_types_insert` order by id
+ """
+
+ // test join operation
+ sql """
+ CREATE TABLE `${db_name}`.`left_inner_table` (
+ log_type INT NOT NULL,
+ reason VARCHAR(1024) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`log_type`)
+ DISTRIBUTED BY HASH(`log_type`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ sql """
+ INSERT INTO `${db_name}`.`left_inner_table` VALUES
+ (1,'reason1'),
+ (2,'reason2'),
+ (3,'reason3');
+ """
+ sql """
+ CREATE TABLE `${db_name}`.`right_remote_table` (
+ log_time DATE NOT NULL,
+ log_type INT NOT NULL,
+ error_code INT,
+ error_msg VARCHAR(1024),
+ op_id BIGINT,
+ op_time DATETIME
+ ) ENGINE=OLAP
+ DUPLICATE KEY(log_time, log_type, error_code)
+ DISTRIBUTED BY HASH(`log_type`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ sql """
+ INSERT INTO `${db_name}`.`right_remote_table` VALUES
+ ('2023-01-01',1,100,'error1',1000,'2023-01-01 00:00:00'),
+ ('2023-01-01',2,200,'error2',2000,'2023-01-01 00:00:00'),
+ ('2023-01-01',3,300,'error3',3000,'2023-01-01 00:00:00');
+ """
+ qt_join """
+ select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table` a
+ join `${catalog_name}`.`${db_name}`.`right_remote_table` b on
a.`log_type` = b.`log_type` order by a.`log_type`
+ """
+ qt_join_predicate """
+ select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table` a
+ join `${catalog_name}`.`${db_name}`.`right_remote_table` b on
a.`log_type` = b.`log_type` and b.op_id=2000 order by a.`log_type`
+ """
+
+ // test partition table
+ sql """
+ CREATE TABLE `${db_name}`.`left_inner_table_partition` (
+ log_type INT NOT NULL,
+ `log_time` date NOT NULL,
+ reason VARCHAR(1024) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`log_type`)
+ PARTITION BY RANGE(`log_time`)
+ (PARTITION p20230101 VALUES [('2023-01-01'), ('2023-01-02')),
+ PARTITION p20230102 VALUES [('2023-01-02'), ('2023-01-03')),
+ PARTITION p20230103 VALUES [('2023-01-03'), ('2023-01-04')))
+ DISTRIBUTED BY HASH(`log_type`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ INSERT INTO `${db_name}`.`left_inner_table_partition` VALUES
+ (1,'2023-01-01','reason1'),
+ (2,'2023-01-02','reason2'),
+ (3,'2023-01-03','reason3');
+ """
+
+ sql """
+ CREATE TABLE `${db_name}`.`right_remote_table_partition` (
+ log_time DATE NOT NULL,
+ log_type INT NOT NULL,
+ error_code INT,
+ error_msg VARCHAR(1024),
+ op_id BIGINT,
+ op_time DATETIME
+ ) ENGINE=OLAP
+ DUPLICATE KEY(log_time, log_type, error_code)
+ PARTITION BY RANGE(`log_time`)
+ (PARTITION p20230101 VALUES [('2023-01-01'), ('2023-01-02')),
+ PARTITION p20230102 VALUES [('2023-01-02'), ('2023-01-03')),
+ PARTITION p20230103 VALUES [('2023-01-03'), ('2023-01-04')))
+ DISTRIBUTED BY HASH(`log_type`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ INSERT INTO `${db_name}`.`right_remote_table_partition` VALUES
+ ('2023-01-01',1,100,'error1',1000,'2023-01-01 00:00:00'),
+ ('2023-01-02',2,200,'error2',2000,'2023-01-02 00:00:00'),
+ ('2023-01-03',3,300,'error3',3000,'2023-01-03 00:00:00');
+ """
+
+ qt_join_partition """
+ select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table_partition` a
+ join `${catalog_name}`.`${db_name}`.`right_remote_table_partition`
b on a.`log_type` = b.`log_type` and a.`log_time` = b.`log_time` order by
a.`log_type`
+ """
+
+ qt_join_partition_predicate """
+ select /*+ SET_VAR(enable_nereids_distribute_planner=true,
enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table_partition` a
+ join `${catalog_name}`.`${db_name}`.`right_remote_table_partition`
b on a.`log_type` = b.`log_type` and a.`log_time` = b.`log_time` and
b.log_time='2023-01-02' order by a.`log_type`
+ """
+
+ sql """ DROP DATABASE IF EXISTS ${db_name} """
+ sql """ DROP CATALOG IF EXISTS `${catalog_name}` """
+}
\ No newline at end of file
diff --git
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy
index a77d21f59f5..ef5bfcae8f6 100644
---
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy
+++
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_all_types_select",
"p0,external,doris,external_docker,e
String remote_doris_http_port =
context.config.otherConfigs.get("extArrowFlightHttpPort")
String remote_doris_user =
context.config.otherConfigs.get("extArrowFlightSqlUser")
String remote_doris_psw =
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+ String remote_doris_thrift_port =
context.config.otherConfigs.get("extFeThriftPort")
def showres = sql "show frontends";
remote_doris_arrow_port = showres[0][6]
remote_doris_http_port = showres[0][3]
- log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+ remote_doris_thrift_port = showres[0][5]
+ log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift:
${remote_doris_thrift_port}")
def showres2 = sql "show backends";
log.info("show backends log = ${showres2}")
@@ -150,8 +152,10 @@ suite("test_remote_doris_all_types_select",
"p0,external,doris,external_docker,e
'type' = 'doris',
'fe_http_hosts' =
'http://${remote_doris_host}:${remote_doris_http_port}',
'fe_arrow_hosts' =
'${remote_doris_host}:${remote_doris_arrow_port}',
+ 'fe_thrift_hosts' =
'${remote_doris_host}:${remote_doris_thrift_port}',
'user' = '${remote_doris_user}',
- 'password' = '${remote_doris_psw}'
+ 'password' = '${remote_doris_psw}',
+ 'use_arrow_flight' = 'true'
);
"""
diff --git
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy
index ab9f89019e2..5438617e09c 100644
---
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy
+++
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_all_types_show",
"p0,external,doris,external_docker,ext
String remote_doris_http_port =
context.config.otherConfigs.get("extArrowFlightHttpPort")
String remote_doris_user =
context.config.otherConfigs.get("extArrowFlightSqlUser")
String remote_doris_psw =
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+ String remote_doris_thrift_port =
context.config.otherConfigs.get("extFeThriftPort")
def showres = sql "show frontends";
remote_doris_arrow_port = showres[0][6]
remote_doris_http_port = showres[0][3]
- log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+ remote_doris_thrift_port = showres[0][5]
+ log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift:
${remote_doris_thrift_port}")
def showres2 = sql "show backends";
log.info("show backends log = ${showres2}")
@@ -141,10 +143,12 @@ suite("test_remote_doris_all_types_show",
"p0,external,doris,external_docker,ext
sql """
CREATE CATALOG `test_remote_doris_all_types_catalog` PROPERTIES (
'type' = 'doris',
+ 'fe_thrift_hosts' =
'${remote_doris_host}:${remote_doris_thrift_port}',
'fe_http_hosts' =
'http://${remote_doris_host}:${remote_doris_http_port}',
'fe_arrow_hosts' =
'${remote_doris_host}:${remote_doris_arrow_port}',
'user' = '${remote_doris_user}',
- 'password' = '${remote_doris_psw}'
+ 'password' = '${remote_doris_psw}',
+ 'use_arrow_flight' = 'true'
);
"""
diff --git
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy
index 82e2b2e3550..710ee5ecb54 100644
---
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy
+++
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_catalog",
"p0,external,doris,external_docker,external_d
String remote_doris_http_port =
context.config.otherConfigs.get("extArrowFlightHttpPort")
String remote_doris_user =
context.config.otherConfigs.get("extArrowFlightSqlUser")
String remote_doris_psw =
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+ String remote_doris_thrift_port =
context.config.otherConfigs.get("extFeThriftPort")
def showres = sql "show frontends";
remote_doris_arrow_port = showres[0][6]
remote_doris_http_port = showres[0][3]
- log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+ remote_doris_thrift_port = showres[0][5]
+ log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift:
${remote_doris_thrift_port}")
def showres2 = sql "show backends";
log.info("show backends log = ${showres2}")
@@ -39,10 +41,12 @@ suite("test_remote_doris_catalog",
"p0,external,doris,external_docker,external_d
sql """
CREATE CATALOG `test_remote_doris_catalog_catalog` PROPERTIES (
'type' = 'doris',
+ 'fe_thrift_hosts' =
'${remote_doris_host}:${remote_doris_thrift_port}',
'fe_http_hosts' =
'http://${remote_doris_host}:${remote_doris_http_port}',
'fe_arrow_hosts' =
'${remote_doris_host}:${remote_doris_arrow_port}',
'user' = '${remote_doris_user}',
- 'password' = '${remote_doris_psw}'
+ 'password' = '${remote_doris_psw}',
+ 'use_arrow_flight' = 'true'
);
"""
diff --git
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy
index 3782f5c50a7..cf7dfd5ea31 100644
---
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy
+++
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_predict",
"p0,external,doris,external_docker,external_d
String remote_doris_http_port =
context.config.otherConfigs.get("extArrowFlightHttpPort")
String remote_doris_user =
context.config.otherConfigs.get("extArrowFlightSqlUser")
String remote_doris_psw =
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+ String remote_doris_thrift_port =
context.config.otherConfigs.get("extFeThriftPort")
def showres = sql "show frontends";
remote_doris_arrow_port = showres[0][6]
remote_doris_http_port = showres[0][3]
- log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+ remote_doris_thrift_port = showres[0][5]
+ log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift:
${remote_doris_thrift_port}")
def showres2 = sql "show backends";
log.info("show backends log = ${showres2}")
@@ -63,10 +65,12 @@ suite("test_remote_doris_predict",
"p0,external,doris,external_docker,external_d
sql """
CREATE CATALOG `test_remote_doris_predict_catalog` PROPERTIES (
'type' = 'doris',
+ 'fe_thrift_hosts' =
'${remote_doris_host}:${remote_doris_thrift_port}',
'fe_http_hosts' =
'http://${remote_doris_host}:${remote_doris_http_port}',
'fe_arrow_hosts' =
'${remote_doris_host}:${remote_doris_arrow_port}',
'user' = '${remote_doris_user}',
- 'password' = '${remote_doris_psw}'
+ 'password' = '${remote_doris_psw}',
+ 'use_arrow_flight' = 'true'
);
"""
diff --git
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy
index e7596eafffb..02d1028c26b 100644
---
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy
+++
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_refresh",
"p0,external,doris,external_docker,external_d
String remote_doris_http_port =
context.config.otherConfigs.get("extArrowFlightHttpPort")
String remote_doris_user =
context.config.otherConfigs.get("extArrowFlightSqlUser")
String remote_doris_psw =
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+ String remote_doris_thrift_port =
context.config.otherConfigs.get("extFeThriftPort")
def showres = sql "show frontends";
remote_doris_arrow_port = showres[0][6]
remote_doris_http_port = showres[0][3]
- log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+ remote_doris_thrift_port = showres[0][5]
+ log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift:
${remote_doris_thrift_port}")
def showres2 = sql "show backends";
log.info("show backends log = ${showres2}")
@@ -39,10 +41,12 @@ suite("test_remote_doris_refresh",
"p0,external,doris,external_docker,external_d
sql """
CREATE CATALOG `test_remote_doris_refresh_catalog` PROPERTIES (
'type' = 'doris',
+ 'fe_thrift_hosts' =
'${remote_doris_host}:${remote_doris_thrift_port}',
'fe_http_hosts' =
'http://${remote_doris_host}:${remote_doris_http_port}',
'fe_arrow_hosts' =
'${remote_doris_host}:${remote_doris_arrow_port}',
'user' = '${remote_doris_user}',
- 'password' = '${remote_doris_psw}'
+ 'password' = '${remote_doris_psw}',
+ 'use_arrow_flight' = 'true'
);
"""
diff --git
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy
index f2633b8cd28..223c294d811 100644
---
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy
+++
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_statistics",
"p0,external,doris,external_docker,externa
String remote_doris_http_port =
context.config.otherConfigs.get("extArrowFlightHttpPort")
String remote_doris_user =
context.config.otherConfigs.get("extArrowFlightSqlUser")
String remote_doris_psw =
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+ String remote_doris_thrift_port =
context.config.otherConfigs.get("extFeThriftPort")
def showres = sql "show frontends";
remote_doris_arrow_port = showres[0][6]
remote_doris_http_port = showres[0][3]
- log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+ remote_doris_thrift_port = showres[0][5]
+ log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift:
${remote_doris_thrift_port}")
def showres2 = sql "show backends";
log.info("show backends log = ${showres2}")
@@ -71,10 +73,12 @@ suite("test_remote_doris_statistics",
"p0,external,doris,external_docker,externa
sql """
CREATE CATALOG `test_remote_doris_statistics_catalog` PROPERTIES (
'type' = 'doris',
+ 'fe_thrift_hosts' =
'${remote_doris_host}:${remote_doris_thrift_port}',
'fe_http_hosts' =
'http://${remote_doris_host}:${remote_doris_http_port}',
'fe_arrow_hosts' =
'${remote_doris_host}:${remote_doris_arrow_port}',
'user' = '${remote_doris_user}',
- 'password' = '${remote_doris_psw}'
+ 'password' = '${remote_doris_psw}',
+ 'use_arrow_flight' = 'true'
);
"""
diff --git
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy
index a9dd083ffd9..e6de44e005a 100644
---
a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy
+++
b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy
@@ -21,11 +21,13 @@ suite("test_remote_doris_table_stats",
"p0,external,doris,external_docker,extern
String remote_doris_http_port =
context.config.otherConfigs.get("extArrowFlightHttpPort")
String remote_doris_user =
context.config.otherConfigs.get("extArrowFlightSqlUser")
String remote_doris_psw =
context.config.otherConfigs.get("extArrowFlightSqlPassword")
+ String remote_doris_thrift_port =
context.config.otherConfigs.get("extFeThriftPort")
def showres = sql "show frontends";
remote_doris_arrow_port = showres[0][6]
remote_doris_http_port = showres[0][3]
- log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}")
+ remote_doris_thrift_port = showres[0][5]
+ log.info("show frontends log = ${showres}, arrow:
${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift:
${remote_doris_thrift_port}")
def showres2 = sql "show backends";
log.info("show backends log = ${showres2}")
@@ -71,10 +73,12 @@ suite("test_remote_doris_table_stats",
"p0,external,doris,external_docker,extern
sql """
CREATE CATALOG `test_remote_doris_table_stats_catalog` PROPERTIES (
'type' = 'doris',
+ 'fe_thrift_hosts' =
'${remote_doris_host}:${remote_doris_thrift_port}',
'fe_http_hosts' =
'http://${remote_doris_host}:${remote_doris_http_port}',
'fe_arrow_hosts' =
'${remote_doris_host}:${remote_doris_arrow_port}',
'user' = '${remote_doris_user}',
- 'password' = '${remote_doris_psw}'
+ 'password' = '${remote_doris_psw}',
+ 'use_arrow_flight' = 'true'
);
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]