This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dc00b991a6 [feature](merge-cloud) Add fe cloud partition and replica 
definition (#29855)
3dc00b991a6 is described below

commit 3dc00b991a6e1a55b5a31429d194cf03b72e5c2f
Author: walter <[email protected]>
AuthorDate: Thu Jan 11 20:19:49 2024 +0800

    [feature](merge-cloud) Add fe cloud partition and replica definition 
(#29855)
    
    Co-authored-by: Luwei <[email protected]>
    Co-authored-by: deardeng <[email protected]>
    Co-authored-by: Gavin Chou <[email protected]>
    Co-authored-by: Lightman <[email protected]>
    Co-authored-by: zhengyu <[email protected]>
    Co-authored-by: Lei Zhang <[email protected]>
    Co-authored-by: AlexYue <[email protected]>
    Co-authored-by: Xiaocc <[email protected]>
    Co-authored-by: panDing19 <[email protected]>
    Co-authored-by: plat1ko <[email protected]>
    Co-authored-by: zhangdong <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |  15 +
 .../apache/doris/analysis/ResourceTypeEnum.java    |  25 ++
 .../main/java/org/apache/doris/catalog/Env.java    |  22 ++
 .../java/org/apache/doris/catalog/Partition.java   |   6 +-
 .../cloud/catalog/CloudInstanceStatusChecker.java  |  52 +++
 .../apache/doris/cloud/catalog/CloudPartition.java | 422 +++++++++++++++++++++
 .../apache/doris/cloud/catalog/CloudReplica.java   | 394 +++++++++++++++++++
 .../java/org/apache/doris/common/ErrorCode.java    |   5 +-
 .../doris/common/profile/SummaryProfile.java       |  66 +++-
 .../org/apache/doris/mysql/privilege/Auth.java     |   8 +
 .../java/org/apache/doris/qe/ConnectContext.java   |  44 ++-
 .../java/org/apache/doris/qe/SessionVariable.java  |  32 ++
 .../main/java/org/apache/doris/resource/Tag.java   |   7 +
 .../main/java/org/apache/doris/system/Backend.java |  51 +++
 .../org/apache/doris/system/SystemInfoService.java | 173 +++++++++
 15 files changed, 1305 insertions(+), 17 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a84185b673b..60ae7ec1463 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2450,6 +2450,21 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static int meta_service_connection_age_base_minutes = 5;
 
+    @ConfField
+    public static int cloud_meta_service_rpc_failed_retry_times = 200;
+
+    @ConfField
+    public static int default_get_version_from_ms_timeout_second = 3;
+
+    @ConfField(mutable = true)
+    public static boolean enable_cloud_multi_replica = false;
+
+    @ConfField(mutable = true)
+    public static int cloud_replica_num = 3;
+
+    @ConfField(mutable = true)
+    public static int cloud_cold_read_percent = 10; // 10%
+
     
//==========================================================================
     //                      end of cloud config
     
//==========================================================================
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResourceTypeEnum.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResourceTypeEnum.java
new file mode 100644
index 00000000000..cb6972b6d0b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResourceTypeEnum.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+/**
+ * Resource type enum
+ **/
+public enum ResourceTypeEnum {
+    GENERAL, CLUSTER, STAGE
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 2f33e12da47..28c74c749a8 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -69,6 +69,7 @@ import org.apache.doris.analysis.RecoverDbStmt;
 import org.apache.doris.analysis.RecoverPartitionStmt;
 import org.apache.doris.analysis.RecoverTableStmt;
 import org.apache.doris.analysis.ReplacePartitionClause;
+import org.apache.doris.analysis.ResourceTypeEnum;
 import org.apache.doris.analysis.RestoreStmt;
 import org.apache.doris.analysis.RollupRenameClause;
 import org.apache.doris.analysis.SetType;
@@ -4970,6 +4971,27 @@ public class Env {
         this.alter.getClusterHandler().cancel(stmt);
     }
 
+    public void checkCloudClusterPriv(String clusterName) throws DdlException {
+        // check resource usage privilege
+        if 
(!Env.getCurrentEnv().getAuth().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(),
+                clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
+            throw new DdlException("USAGE denied to user "
+                    + ConnectContext.get().getQualifiedUser() + "'@'" + 
ConnectContext.get().getRemoteIP()
+                    + "' for cloud cluster '" + clusterName + "'", 
ErrorCode.ERR_CLUSTER_NO_PERMISSIONS);
+        }
+
+        if 
(!Env.getCurrentSystemInfo().getCloudClusterNames().contains(clusterName)) {
+            LOG.debug("current instance does not have a cluster name :{}", 
clusterName);
+            throw new DdlException(String.format("Cluster %s not exist", 
clusterName),
+                    ErrorCode.ERR_CLOUD_CLUSTER_ERROR);
+        }
+    }
+
+    public static void waitForAutoStart(final String clusterName) throws 
DdlException {
+        // TODO: merge from cloud.
+        throw new DdlException("Env.waitForAutoStart unimplemented");
+    }
+
     // Switch catalog of this sesseion.
     public void changeCatalog(ConnectContext ctx, String catalogName) throws 
DdlException {
         CatalogIf catalogIf = catalogMgr.getCatalogNullable(catalogName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 76c3097a033..755e5f14a29 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -95,14 +95,14 @@ public class Partition extends MetaObject implements 
Writable {
     @SerializedName(value = "visibleVersionHash")
     private long visibleVersionHash;
     @SerializedName(value = "nextVersion")
-    private long nextVersion;
+    protected long nextVersion;
     @Deprecated
     @SerializedName(value = "nextVersionHash")
     private long nextVersionHash;
     @SerializedName(value = "distributionInfo")
     private DistributionInfo distributionInfo;
 
-    private Partition() {
+    protected Partition() {
     }
 
     public Partition(long id, String name,
@@ -181,7 +181,7 @@ public class Partition extends MetaObject implements 
Writable {
     }
 
     // The method updateVisibleVersionAndVersionHash is called when fe 
restart, the visibleVersionTime is updated
-    private void setVisibleVersion(long visibleVersion) {
+    protected void setVisibleVersion(long visibleVersion) {
         this.visibleVersion = visibleVersion;
         this.visibleVersionTime = System.currentTimeMillis();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
new file mode 100644
index 00000000000..b8329deffbb
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CloudInstanceStatusChecker extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(CloudInstanceStatusChecker.class);
+
+    public CloudInstanceStatusChecker() {
+        super("cloud instance check");
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        try {
+            Cloud.GetInstanceResponse response =
+                    Env.getCurrentSystemInfo().getCloudInstance();
+            LOG.debug("get from ms response {}", response);
+            if (!response.hasStatus() || !response.getStatus().hasCode()
+                    || response.getStatus().getCode() != 
Cloud.MetaServiceCode.OK) {
+                LOG.warn("failed to get cloud instance due to incomplete 
response, "
+                        + "cloud_unique_id={}, response={}", 
Config.cloud_unique_id, response);
+            } else {
+                
Env.getCurrentSystemInfo().setInstanceStatus(response.getInstance().getStatus());
+            }
+        } catch (Exception e) {
+            LOG.warn("get instance from ms exception", e);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
new file mode 100644
index 00000000000..fba1daa512f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
@@ -0,0 +1,422 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.rpc.RpcException;
+
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * Internal representation of partition-related metadata.
+ */
+public class CloudPartition extends Partition {
+    // Every partition starts from version 1, version 1 has no data
+    public static long EMPTY_VERSION = 1;
+
+    private static final Logger LOG = 
LogManager.getLogger(CloudPartition.class);
+
+    // not Serialized
+    @SerializedName(value = "dbId")
+    private long dbId;
+    @SerializedName(value = "tableId")
+    private long tableId;
+
+    public CloudPartition(long id, String name, MaterializedIndex baseIndex,
+                          DistributionInfo distributionInfo, long dbId, long 
tableId) {
+        super(id, name, baseIndex, distributionInfo);
+        super.nextVersion = -1;
+        this.dbId = dbId;
+        this.tableId = tableId;
+    }
+
+    public CloudPartition() {
+        super();
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public void setDbId(long dbId) {
+        this.dbId = dbId;
+    }
+
+    public long getTableId() {
+        return this.tableId;
+    }
+
+    public void setTableId(long tableId) {
+        this.tableId = tableId;
+    }
+
+    protected void setVisibleVersion(long visibleVersion) {
+        LOG.debug("setVisibleVersion use CloudPartition {}", super.getName());
+        return;
+    }
+
+    protected void setCachedVisibleVersion(long version) {
+        super.setVisibleVersion(version);
+    }
+
+    @Override
+    public long getVisibleVersion() {
+        LOG.debug("getVisibleVersion use CloudPartition {}", super.getName());
+
+        Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
+                .setDbId(this.dbId)
+                .setTableId(this.tableId)
+                .setPartitionId(super.getId())
+                .setBatchMode(false)
+                .build();
+
+        try {
+            Cloud.GetVersionResponse resp = getVersionFromMeta(request);
+            long version = -1;
+            if (resp.getStatus().getCode() == MetaServiceCode.OK) {
+                version = resp.getVersion();
+            } else {
+                assert resp.getStatus().getCode() == 
MetaServiceCode.VERSION_NOT_FOUND;
+                version = 0;
+            }
+            LOG.debug("get version from meta service, version: {}, partition: 
{}", version, super.getId());
+            // Cache visible version, see hasData() for details.
+            super.setVisibleVersion(version);
+            if (version == 0 && isEmptyPartitionPruneDisabled()) {
+                version = 1;
+            }
+            return version;
+        } catch (RpcException e) {
+            throw new RuntimeException("get version from meta service failed");
+        }
+    }
+
+    // Select the non-empty partitions and return the ids.
+    public static List<Long> selectNonEmptyPartitionIds(List<CloudPartition> 
partitions) {
+        List<Long> nonEmptyPartitionIds = partitions.stream()
+                .filter(CloudPartition::hasDataCached)
+                .map(CloudPartition::getId)
+                .collect(Collectors.toList());
+        if (nonEmptyPartitionIds.size() == partitions.size()) {
+            return nonEmptyPartitionIds;
+        }
+
+        List<CloudPartition> unknowns = partitions.stream()
+                .filter(p -> !p.hasDataCached())
+                .collect(Collectors.toList());
+
+        SummaryProfile profile = getSummaryProfile();
+        if (profile != null) {
+            profile.incGetPartitionVersionByHasDataCount();
+        }
+
+        try {
+            List<Long> versions = 
CloudPartition.getSnapshotVisibleVersion(unknowns);
+
+            int size = versions.size();
+            for (int i = 0; i < size; i++) {
+                if (versions.get(i) > CloudPartition.EMPTY_VERSION) {
+                    nonEmptyPartitionIds.add(unknowns.get(i).getId());
+                }
+            }
+
+            return nonEmptyPartitionIds;
+        } catch (RpcException e) {
+            throw new RuntimeException("get version from meta service failed");
+        }
+    }
+
+    // Get visible version from the specified partitions;
+    //
+    // Return the visible version in order of the specified partition ids, -1 
means version NOT FOUND.
+    public static List<Long> getSnapshotVisibleVersion(List<CloudPartition> 
partitions) throws RpcException {
+        if (partitions.isEmpty()) {
+            return new ArrayList<>();
+        }
+
+        List<Long> dbIds = new ArrayList<>();
+        List<Long> tableIds = new ArrayList<>();
+        List<Long> partitionIds = new ArrayList<>();
+        for (CloudPartition partition : partitions) {
+            dbIds.add(partition.getDbId());
+            tableIds.add(partition.getTableId());
+            partitionIds.add(partition.getId());
+        }
+
+        List<Long> versions = getSnapshotVisibleVersion(dbIds, tableIds, 
partitionIds);
+
+        // Cache visible version, see hasData() for details.
+        int size = versions.size();
+        for (int i = 0; i < size; ++i) {
+            Long version = versions.get(i);
+            if (version > EMPTY_VERSION) {
+                partitions.get(i).setCachedVisibleVersion(versions.get(i));
+            }
+        }
+
+        return versions;
+    }
+
+    // Get visible versions for the specified partitions.
+    //
+    // Return the visible version in order of the specified partition ids, -1 
means version NOT FOUND.
+    public static List<Long> getSnapshotVisibleVersion(List<Long> dbIds, 
List<Long> tableIds, List<Long> partitionIds)
+            throws RpcException {
+        assert dbIds.size() == partitionIds.size() :
+                "partition ids size: " + partitionIds.size() + " should equals 
to db ids size: " + dbIds.size();
+        assert tableIds.size() == partitionIds.size() :
+                "partition ids size: " + partitionIds.size() + " should equals 
to tablet ids size: " + tableIds.size();
+
+        Cloud.GetVersionRequest req = Cloud.GetVersionRequest.newBuilder()
+                .setDbId(-1)
+                .setTableId(-1)
+                .setPartitionId(-1)
+                .setBatchMode(true)
+                .addAllDbIds(dbIds)
+                .addAllTableIds(tableIds)
+                .addAllPartitionIds(partitionIds)
+                .build();
+
+        LOG.debug("getVisibleVersion use CloudPartition {}", 
partitionIds.toString());
+        Cloud.GetVersionResponse resp = getVersionFromMeta(req);
+        if (resp.getStatus().getCode() != MetaServiceCode.OK) {
+            throw new RpcException("get visible version", "unexpected status " 
+ resp.getStatus());
+        }
+
+        List<Long> versions = resp.getVersionsList();
+        if (versions.size() != partitionIds.size()) {
+            throw new RpcException("get visible version",
+                    "wrong number of versions, required " + 
partitionIds.size() + ", but got " + versions.size());
+        }
+
+        LOG.debug("get version from meta service, partitions: {}, versions: 
{}", partitionIds, versions);
+
+        if (isEmptyPartitionPruneDisabled()) {
+            ArrayList<Long> news = new ArrayList<>();
+            for (Long v : versions) {
+                news.add(v == -1 ? 1 : v);
+            }
+            return news;
+        }
+
+        return versions;
+    }
+
+    @Override
+    public long getNextVersion() {
+        // use meta service visibleVersion
+        LOG.debug("getNextVersion use CloudPartition {}", super.getName());
+        return -1;
+    }
+
+    @Override
+    public void setNextVersion(long nextVersion) {
+        // use meta service visibleVersion
+        LOG.debug("setNextVersion use CloudPartition {} Version {}", 
super.getName(), nextVersion);
+        return;
+    }
+
+    @Override
+    public void updateVersionForRestore(long visibleVersion) {
+        LOG.debug("updateVersionForRestore use CloudPartition {} version for 
restore: visible: {}",
+                super.getName(), visibleVersion);
+        return;
+    }
+
+    @Override
+    public void updateVisibleVersion(long visibleVersion) {
+        // use meta service visibleVersion
+        LOG.debug("updateVisibleVersion use CloudPartition {} version for 
restore: visible: {}",
+                super.getName(), visibleVersion);
+
+        return;
+    }
+
+    @Override
+    public void updateVisibleVersionAndTime(long visibleVersion, long 
visibleVersionTime) {
+    }
+
+    // Determine whether data this partition has, according to the cached 
visible version.
+    public boolean hasDataCached() {
+        // In order to determine whether a partition is empty, a get_version 
RPC is issued to
+        // the meta service. The pruning process will be very slow when there 
are lots of empty
+        // partitions. This option disables the empty partition prune 
optimization to speed SQL
+        // analysis/plan phase.
+        if (isEmptyPartitionPruneDisabled()) {
+            return true;
+        }
+
+        // Every partition starts from version 1, version 1 has no data.
+        // So as long as version is greater than 1, it can be determined that 
there is data here.
+        return super.getVisibleVersion() > EMPTY_VERSION;
+    }
+
+    /**
+     * CloudPartition always has data
+     */
+    @Override
+    public boolean hasData() {
+        // To avoid sending an RPC request, see the cached visible version 
here first.
+        if (hasDataCached()) {
+            return true;
+        }
+
+        SummaryProfile profile = getSummaryProfile();
+        if (profile != null) {
+            profile.incGetPartitionVersionByHasDataCount();
+        }
+
+        return getVisibleVersion() > EMPTY_VERSION;
+    }
+
+    private static Cloud.GetVersionResponse 
getVersionFromMeta(Cloud.GetVersionRequest req)
+            throws RpcException {
+        long startAt = System.nanoTime();
+        try {
+            return getVersionFromMetaInner(req);
+        } finally {
+            SummaryProfile profile = getSummaryProfile();
+            if (profile != null) {
+                profile.addGetPartitionVersionTime(System.nanoTime() - 
startAt);
+            }
+        }
+    }
+
+    private static Cloud.GetVersionResponse 
getVersionFromMetaInner(Cloud.GetVersionRequest req)
+            throws RpcException {
+        for (int retryTime = 0; retryTime < 
Config.cloud_meta_service_rpc_failed_retry_times; retryTime++) {
+            try {
+                long deadline = System.currentTimeMillis() + 
Config.default_get_version_from_ms_timeout_second * 1000L;
+                Future<Cloud.GetVersionResponse> future =
+                        
MetaServiceProxy.getInstance().getVisibleVersionAsync(req);
+
+                Cloud.GetVersionResponse resp = null;
+                while (resp == null) {
+                    try {
+                        resp = future.get(Math.max(0, deadline - 
System.currentTimeMillis()), TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                        LOG.warn("get version from meta service: future get 
interrupted exception");
+                    }
+                }
+
+                if (resp.hasStatus() && (resp.getStatus().getCode() == 
MetaServiceCode.OK
+                            || resp.getStatus().getCode() == 
MetaServiceCode.VERSION_NOT_FOUND)) {
+                    LOG.debug("get version from meta service, code: {}", 
resp.getStatus().getCode());
+                    return resp;
+                }
+
+                LOG.warn("get version from meta service failed, status: {}, 
retry time: {}",
+                        resp.getStatus(), retryTime);
+            } catch (RpcException | ExecutionException | TimeoutException | 
RuntimeException e) {
+                LOG.warn("get version from meta service failed, retry times: 
{} exception: ", retryTime, e);
+            }
+
+            // sleep random millis [20, 200] ms, retry rpc failed
+            int randomMillis = 20 + (int) (Math.random() * (200 - 20));
+            if (retryTime > Config.cloud_meta_service_rpc_failed_retry_times / 
2) {
+                // sleep random millis [500, 1000] ms, retry rpc failed
+                randomMillis = 500 + (int) (Math.random() * (1000 - 500));
+            }
+            try {
+                Thread.sleep(randomMillis);
+            } catch (InterruptedException e) {
+                LOG.warn("get version from meta service: sleep get interrupted 
exception");
+            }
+        }
+
+        LOG.warn("get version from meta service failed after retry {} times",
+                Config.cloud_meta_service_rpc_failed_retry_times);
+        throw new RpcException("get version from meta service", "failed after 
retry n times");
+    }
+
+    private static boolean isEmptyPartitionPruneDisabled() {
+        ConnectContext ctx = ConnectContext.get();
+        if (ctx != null && 
ctx.getSessionVariable().getDisableEmptyPartitionPrune()) {
+            return true;
+        }
+        return false;
+    }
+
+    private static SummaryProfile getSummaryProfile() {
+        ConnectContext ctx = ConnectContext.get();
+        if (ctx != null) {
+            StmtExecutor executor = ctx.getExecutor();
+            if (executor != null) {
+                return executor.getSummaryProfile();
+            }
+        }
+        return null;
+    }
+
+    public static CloudPartition read(DataInput in) throws IOException {
+        CloudPartition partition = new CloudPartition();
+        partition.readFields(in);
+        partition.setDbId(in.readLong());
+        partition.setTableId(in.readLong());
+        return partition;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+        out.writeLong(this.dbId);
+        out.writeLong(this.tableId);
+    }
+
+    public boolean equals(Object obj) {
+        if (!super.equals(obj)) {
+            return false;
+        }
+
+        if (!(obj instanceof CloudPartition)) {
+            return false;
+        }
+        CloudPartition cloudPartition = (CloudPartition) obj;
+        return (dbId == cloudPartition.dbId) && (tableId == 
cloudPartition.tableId);
+    }
+
+    public String toString() {
+        StringBuilder buffer = new StringBuilder();
+        buffer.append(super.toString());
+        buffer.append("dbId: ").append(this.dbId).append("; ");
+        buffer.append("tableId: ").append(this.tableId).append("; ");
+        return buffer.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
new file mode 100644
index 00000000000..cf503d9bc40
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+
+import com.google.common.base.Strings;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CloudReplica extends Replica {
+    private static final Logger LOG = LogManager.getLogger(CloudReplica.class);
+
+    // In the future, a replica may be mapped to multiple BEs in a cluster,
+    // so this value is be list
+    private Map<String, List<Long>> clusterToBackends = new 
ConcurrentHashMap<String, List<Long>>();
+    @SerializedName(value = "dbId")
+    private long dbId = -1;
+    @SerializedName(value = "tableId")
+    private long tableId = -1;
+    @SerializedName(value = "partitionId")
+    private long partitionId = -1;
+    @SerializedName(value = "indexId")
+    private long indexId = -1;
+    @SerializedName(value = "idx")
+    private long idx = -1;
+
+    private Random rand = new Random();
+
+    private Map<String, List<Long>> memClusterToBackends = new 
ConcurrentHashMap<String, List<Long>>();
+
+    public CloudReplica() {
+    }
+
+    public CloudReplica(long replicaId, List<Long> backendIds, ReplicaState 
state, long version, int schemaHash,
+            long dbId, long tableId, long partitionId, long indexId, long idx) 
{
+        super(replicaId, -1, state, version, schemaHash);
+        this.dbId = dbId;
+        this.tableId = tableId;
+        this.partitionId = partitionId;
+        this.indexId = indexId;
+        this.idx = idx;
+    }
+
+    private boolean isColocated() {
+        return Env.getCurrentColocateIndex().isColocateTable(tableId);
+    }
+
+    private long getColocatedBeId(String cluster) {
+        List<Backend> bes = 
Env.getCurrentSystemInfo().getBackendsByClusterId(cluster);
+        List<Backend> availableBes = new ArrayList<>();
+        for (Backend be : bes) {
+            if (be.isAlive()) {
+                availableBes.add(be);
+            }
+        }
+        if (availableBes == null || availableBes.size() == 0) {
+            LOG.warn("failed to get available be, clusterId: {}", cluster);
+            return -1;
+        }
+
+        // Tablets with the same idx will be hashed to the same BE, which
+        // meets the requirements of colocated table.
+        long index = idx % availableBes.size();
+        long pickedBeId = availableBes.get((int) index).getId();
+
+        return pickedBeId;
+    }
+
+    @Override
+    public long getBackendId() {
+        String cluster = null;
+        // Not in a connect session
+        ConnectContext context = ConnectContext.get();
+        if (context != null) {
+            if 
(!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) {
+                cluster = context.getSessionVariable().getCloudCluster();
+                try {
+                    Env.getCurrentEnv().checkCloudClusterPriv(cluster);
+                } catch (Exception e) {
+                    LOG.warn("get cluster by session context exception");
+                    return -1;
+                }
+                LOG.debug("get cluster by session context cluster: {}", 
cluster);
+            } else {
+                cluster = context.getCloudCluster();
+                LOG.debug("get cluster by context {}", cluster);
+            }
+        } else {
+            LOG.debug("connect context is null in getBackendId");
+            return -1;
+        }
+
+        // check default cluster valid.
+        if (!Strings.isNullOrEmpty(cluster)) {
+            boolean exist = 
Env.getCurrentSystemInfo().getCloudClusterNames().contains(cluster);
+            if (!exist) {
+                //can't use this default cluster, plz change another
+                LOG.warn("cluster: {} is not existed", cluster);
+                return -1;
+            }
+        } else {
+            LOG.warn("failed to get available be, clusterName: {}", cluster);
+            return -1;
+        }
+
+        // if cluster is SUSPENDED, wait
+        try {
+            Env.waitForAutoStart(cluster);
+        } catch (DdlException e) {
+            // this function cant throw exception. so just log it
+            LOG.warn("cant resume cluster {}", cluster);
+        }
+        String clusterId = 
Env.getCurrentSystemInfo().getCloudClusterIdByName(cluster);
+
+        if (isColocated()) {
+            return getColocatedBeId(clusterId);
+        }
+
+        if (Config.enable_cloud_multi_replica) {
+            int indexRand = rand.nextInt(Config.cloud_replica_num);
+            int coldReadRand = rand.nextInt(100);
+            boolean allowColdRead = coldReadRand < 
Config.cloud_cold_read_percent;
+            boolean replicaEnough = memClusterToBackends.get(clusterId) != null
+                    && memClusterToBackends.get(clusterId).size() > indexRand;
+
+            long backendId = -1;
+            if (replicaEnough) {
+                backendId = memClusterToBackends.get(clusterId).get(indexRand);
+            }
+
+            if (!replicaEnough && !allowColdRead && 
clusterToBackends.containsKey(clusterId)) {
+                backendId = clusterToBackends.get(clusterId).get(0);
+            }
+
+            if (backendId > 0) {
+                Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+                if (be != null && be.isQueryAvailable()) {
+                    LOG.debug("backendId={} ", backendId);
+                    return backendId;
+                }
+            }
+
+            List<Long> res = hashReplicaToBes(clusterId, false, 
Config.cloud_replica_num);
+            if (res.size() < indexRand + 1) {
+                if (res.isEmpty()) {
+                    return -1;
+                } else {
+                    return res.get(0);
+                }
+            } else {
+                return res.get(indexRand);
+            }
+        }
+
+        if (clusterToBackends.containsKey(clusterId)) {
+            long backendId = clusterToBackends.get(clusterId).get(0);
+            Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+            if (be != null && be.isQueryAvailable()) {
+                LOG.debug("backendId={} ", backendId);
+                return backendId;
+            }
+        }
+
+        return hashReplicaToBe(clusterId, false);
+    }
+
+    public long hashReplicaToBe(String clusterId, boolean isBackGround) {
+        // TODO(luwei) list should be sorted
+        List<Backend> clusterBes = 
Env.getCurrentSystemInfo().getBackendsByClusterId(clusterId);
+        // use alive be to exec sql
+        List<Backend> availableBes = new ArrayList<>();
+        for (Backend be : clusterBes) {
+            long lastUpdateMs = be.getLastUpdateMs();
+            long missTimeMs = Math.abs(lastUpdateMs - 
System.currentTimeMillis());
+            // be core or restart must in heartbeat_interval_second
+            if ((be.isAlive() || missTimeMs <= 
Config.heartbeat_interval_second * 1000L)
+                    && !be.isSmoothUpgradeSrc()) {
+                availableBes.add(be);
+            }
+        }
+        if (availableBes == null || availableBes.size() == 0) {
+            if (!isBackGround) {
+                LOG.warn("failed to get available be, clusterId: {}", 
clusterId);
+            }
+            return -1;
+        }
+        LOG.debug("availableBes={}", availableBes);
+        long index = -1;
+        HashCode hashCode = null;
+        if (idx == -1) {
+            index = getId() % availableBes.size();
+        } else {
+            hashCode = Hashing.murmur3_128().hashLong(partitionId);
+            int beNum = availableBes.size();
+            // (hashCode.asLong() + idx) % beNum may be a negative value, so we
+            // need to take the modulus of beNum again to ensure that index is
+            // a positive value
+            index = ((hashCode.asLong() + idx) % beNum + beNum) % beNum;
+        }
+        long pickedBeId = availableBes.get((int) index).getId();
+        LOG.info("picked beId {}, replicaId {}, partitionId {}, beNum {}, 
replicaIdx {}, picked Index {}, hashVal {}",
+                pickedBeId, getId(), partitionId, availableBes.size(), idx, 
index,
+                hashCode == null ? -1 : hashCode.asLong());
+
+        // save to clusterToBackends map
+        List<Long> bes = new ArrayList<Long>();
+        bes.add(pickedBeId);
+        clusterToBackends.put(clusterId, bes);
+
+        return pickedBeId;
+    }
+
+    public List<Long> hashReplicaToBes(String clusterId, boolean isBackGround, 
int replicaNum) {
+        // TODO(luwei) list should be sorted
+        List<Backend> clusterBes = 
Env.getCurrentSystemInfo().getBackendsByClusterId(clusterId);
+        // use alive be to exec sql
+        List<Backend> availableBes = new ArrayList<>();
+        for (Backend be : clusterBes) {
+            long lastUpdateMs = be.getLastUpdateMs();
+            long missTimeMs = Math.abs(lastUpdateMs - 
System.currentTimeMillis());
+            // be core or restart must in heartbeat_interval_second
+            if ((be.isAlive() || missTimeMs <= 
Config.heartbeat_interval_second * 1000L)
+                    && !be.isSmoothUpgradeSrc()) {
+                availableBes.add(be);
+            }
+        }
+        if (availableBes == null || availableBes.size() == 0) {
+            if (!isBackGround) {
+                LOG.warn("failed to get available be, clusterId: {}", 
clusterId);
+            }
+            return new ArrayList<Long>();
+        }
+        LOG.debug("availableBes={}", availableBes);
+
+        int realReplicaNum = replicaNum > availableBes.size() ? 
availableBes.size() : replicaNum;
+        List<Long> bes = new ArrayList<Long>();
+        for (int i = 0; i < realReplicaNum; ++i) {
+            long index = -1;
+            HashCode hashCode = null;
+            if (idx == -1) {
+                index = getId() % availableBes.size();
+            } else {
+                hashCode = Hashing.murmur3_128().hashLong(partitionId + i);
+                int beNum = availableBes.size();
+                // (hashCode.asLong() + idx) % beNum may be a negative value, 
so we
+                // need to take the modulus of beNum again to ensure that 
index is
+                // a positive value
+                index = ((hashCode.asLong() + idx) % beNum + beNum) % beNum;
+            }
+            long pickedBeId = availableBes.get((int) index).getId();
+            availableBes.remove((int) index);
+            LOG.info("picked beId {}, replicaId {}, partId {}, beNum {}, 
replicaIdx {}, picked Index {}, hashVal {}",
+                    pickedBeId, getId(), partitionId, availableBes.size(), 
idx, index,
+                    hashCode == null ? -1 : hashCode.asLong());
+            // save to memClusterToBackends map
+            bes.add(pickedBeId);
+        }
+
+        memClusterToBackends.put(clusterId, bes);
+
+        return bes;
+    }
+
+    @Override
+    public boolean checkVersionCatchUp(long expectedVersion, boolean 
ignoreAlter) {
+        // ATTN: expectedVersion is not used here, and 
OlapScanNode.addScanRangeLocations
+        // depends this feature to implement snapshot partition version. See 
comments in
+        // OlapScanNode.addScanRangeLocations for details.
+        if (ignoreAlter && getState() == ReplicaState.ALTER
+                && getVersion() == Partition.PARTITION_INIT_VERSION) {
+            return true;
+        }
+
+        if (expectedVersion == Partition.PARTITION_INIT_VERSION) {
+            // no data is loaded into this replica, just return true
+            return true;
+        }
+
+        return true;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        dbId = in.readLong();
+        tableId = in.readLong();
+        partitionId = in.readLong();
+        indexId = in.readLong();
+        idx = in.readLong();
+        int count = in.readInt();
+        for (int i = 0; i < count; ++i) {
+            String clusterId = Text.readString(in);
+            String realClusterId = 
Env.getCurrentSystemInfo().getCloudClusterIdByName(clusterId);
+            LOG.debug("cluster Id {}, real cluster Id {}", clusterId, 
realClusterId);
+
+            if (!Strings.isNullOrEmpty(realClusterId)) {
+                clusterId = realClusterId;
+            }
+
+            long beId = in.readLong();
+            List<Long> bes = new ArrayList<Long>();
+            bes.add(beId);
+            clusterToBackends.put(clusterId, bes);
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+        out.writeLong(dbId);
+        out.writeLong(tableId);
+        out.writeLong(partitionId);
+        out.writeLong(indexId);
+        out.writeLong(idx);
+        out.writeInt(clusterToBackends.size());
+        for (Map.Entry<String, List<Long>> entry : 
clusterToBackends.entrySet()) {
+            Text.writeString(out, entry.getKey());
+            out.writeLong(entry.getValue().get(0));
+        }
+    }
+
+    public static CloudReplica read(DataInput in) throws IOException {
+        CloudReplica replica = new CloudReplica();
+        replica.readFields(in);
+        // TODO(luwei): persist and fill-up clusterToBackends to take full 
advantage of data cache
+        return replica;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public long getTableId() {
+        return tableId;
+    }
+
+    public long getPartitionId() {
+        return partitionId;
+    }
+
+    public long getIndexId() {
+        return indexId;
+    }
+
+    public long getIdx() {
+        return idx;
+    }
+
+    public Map<String, List<Long>> getClusterToBackends() {
+        return clusterToBackends;
+    }
+
+    public void updateClusterToBe(String cluster, long beId) {
+        // write lock
+        List<Long> bes = new ArrayList<Long>();
+        bes.add(beId);
+        clusterToBackends.put(cluster, bes);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
index c36821fd996..750da42235d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -1208,7 +1208,10 @@ public enum ErrorCode {
             "the auto increment is only supported in duplicate table and 
unique table."),
 
     ERR_ARROW_FLIGHT_SQL_MUST_ONLY_RESULT_STMT(5097, new byte[]{'4', '2', '0', 
'0', '0'},
-            "There can only be one stmt that returns the result and it is at 
the end.");
+            "There can only be one stmt that returns the result and it is at 
the end."),
+
+    ERR_CLOUD_CLUSTER_ERROR(5093, new byte[]{'4', '2', '0', '0', '0'},
+            "Cluster %s not exist, use SQL 'SHOW CLUSTERS' to get a valid 
cluster");
 
     // This is error code
     private final int code;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index c478ea0fd29..01fc64e0747 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -70,6 +70,9 @@ public class SummaryProfile {
     public static final String FETCH_RESULT_TIME = "Fetch Result Time";
     public static final String WRITE_RESULT_TIME = "Write Result Time";
     public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result 
Time";
+    public static final String GET_PARTITION_VERSION_TIME = "Get Partition 
Version Time";
+    public static final String GET_PARTITION_VERSION_COUNT = "Get Partition 
Version Count";
+    public static final String GET_PARTITION_VERSION_BY_HAS_DATA_COUNT = "Get 
Partition Version Count (hasData)";
 
     public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
     public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";
@@ -87,23 +90,31 @@ public class SummaryProfile {
             WORKLOAD_GROUP, ANALYSIS_TIME,
             PLAN_TIME, JOIN_REORDER_TIME, CREATE_SINGLE_NODE_TIME, 
QUERY_DISTRIBUTED_TIME,
             INIT_SCAN_NODE_TIME, FINALIZE_SCAN_NODE_TIME, GET_SPLITS_TIME, 
GET_PARTITIONS_TIME,
-            GET_PARTITION_FILES_TIME, CREATE_SCAN_RANGE_TIME, SCHEDULE_TIME, 
FETCH_RESULT_TIME,
+            GET_PARTITION_FILES_TIME, CREATE_SCAN_RANGE_TIME, 
GET_PARTITION_VERSION_TIME,
+            GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, 
GET_PARTITION_VERSION_COUNT, SCHEDULE_TIME, FETCH_RESULT_TIME,
             WRITE_RESULT_TIME, WAIT_FETCH_RESULT_TIME, DORIS_VERSION, 
IS_NEREIDS, IS_PIPELINE,
             IS_CACHED, TOTAL_INSTANCES_NUM, INSTANCES_NUM_PER_BE, 
PARALLEL_FRAGMENT_EXEC_INSTANCE, TRACE_ID);
 
     // Ident of each item. Default is 0, which doesn't need to present in this 
Map.
     // Please set this map for new profile items if they need ident.
-    public static ImmutableMap<String, Integer> 
EXECUTION_SUMMARY_KEYS_IDENTATION = ImmutableMap.of(
-            JOIN_REORDER_TIME, 1,
-            CREATE_SINGLE_NODE_TIME, 1,
-            QUERY_DISTRIBUTED_TIME, 1,
-            INIT_SCAN_NODE_TIME, 1,
-            FINALIZE_SCAN_NODE_TIME, 1,
-            GET_SPLITS_TIME, 2,
-            GET_PARTITIONS_TIME, 3,
-            GET_PARTITION_FILES_TIME, 3,
-            CREATE_SCAN_RANGE_TIME, 2
-    );
+    public static ImmutableMap<String, Integer> 
EXECUTION_SUMMARY_KEYS_IDENTATION = ImmutableMap.of();
+
+    {
+        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder<>();
+        builder.put(JOIN_REORDER_TIME, 1);
+        builder.put(CREATE_SINGLE_NODE_TIME, 1);
+        builder.put(QUERY_DISTRIBUTED_TIME, 1);
+        builder.put(INIT_SCAN_NODE_TIME, 1);
+        builder.put(FINALIZE_SCAN_NODE_TIME, 1);
+        builder.put(GET_SPLITS_TIME, 2);
+        builder.put(GET_PARTITIONS_TIME, 3);
+        builder.put(GET_PARTITION_FILES_TIME, 3);
+        builder.put(CREATE_SCAN_RANGE_TIME, 2);
+        builder.put(GET_PARTITION_VERSION_TIME, 1);
+        builder.put(GET_PARTITION_VERSION_COUNT, 1);
+        builder.put(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, 1);
+        EXECUTION_SUMMARY_KEYS_IDENTATION = builder.build();
+    }
 
     private RuntimeProfile summaryProfile;
     private RuntimeProfile executionSummaryProfile;
@@ -140,6 +151,9 @@ public class SummaryProfile {
     private long tempStarTime = -1;
     private long queryFetchResultConsumeTime = 0;
     private long queryWriteResultConsumeTime = 0;
+    private long getPartitionVersionTime = 0;
+    private long getPartitionVersionCount = 0;
+    private long getPartitionVersionByHasDataCount = 0;
 
     public SummaryProfile(RuntimeProfile rootProfile) {
         summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME);
@@ -210,6 +224,10 @@ public class SummaryProfile {
         executionSummaryProfile.addInfoString(WRITE_RESULT_TIME,
                 RuntimeProfile.printCounter(queryWriteResultConsumeTime, 
TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME, 
getPrettyQueryFetchResultFinishTime());
+        executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_TIME, 
getPrettyGetPartitionVersionTime());
+        executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_COUNT, 
getPrettyGetPartitionVersionCount());
+        
executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT,
+                getPrettyGetPartitionVersionByHasDataCount());
     }
 
     public void setNereidsAnalysisTime() {
@@ -308,6 +326,15 @@ public class SummaryProfile {
         this.queryWriteResultConsumeTime += TimeUtils.getStartTimeMs() - 
tempStarTime;
     }
 
+    public void addGetPartitionVersionTime(long ns) {
+        this.getPartitionVersionTime += ns;
+        this.getPartitionVersionCount += 1;
+    }
+
+    public void incGetPartitionVersionByHasDataCount() {
+        this.getPartitionVersionByHasDataCount += 1;
+    }
+
     public long getQueryBeginTime() {
         return queryBeginTime;
     }
@@ -528,4 +555,19 @@ public class SummaryProfile {
         }
         return RuntimeProfile.printCounter(queryFetchResultFinishTime - 
queryScheduleFinishTime, TUnit.TIME_MS);
     }
+
+    private String getPrettyGetPartitionVersionTime() {
+        if (getPartitionVersionTime == 0) {
+            return "N/A";
+        }
+        return RuntimeProfile.printCounter(getPartitionVersionTime, 
TUnit.TIME_NS);
+    }
+
+    private String getPrettyGetPartitionVersionByHasDataCount() {
+        return RuntimeProfile.printCounter(getPartitionVersionByHasDataCount, 
TUnit.UNIT);
+    }
+
+    private String getPrettyGetPartitionVersionCount() {
+        return RuntimeProfile.printCounter(getPartitionVersionCount, 
TUnit.UNIT);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
index c9b0dfcf1ae..2a097c38e2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
@@ -27,6 +27,7 @@ import org.apache.doris.analysis.GrantStmt;
 import org.apache.doris.analysis.PasswordOptions;
 import org.apache.doris.analysis.RefreshLdapStmt;
 import org.apache.doris.analysis.ResourcePattern;
+import org.apache.doris.analysis.ResourceTypeEnum;
 import org.apache.doris.analysis.RevokeStmt;
 import org.apache.doris.analysis.SetLdapPassVar;
 import org.apache.doris.analysis.SetPassVar;
@@ -83,6 +84,7 @@ import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+
 public class Auth implements Writable {
     private static final Logger LOG = LogManager.getLogger(Auth.class);
 
@@ -389,6 +391,12 @@ public class Auth implements Writable {
         }
     }
 
+    // ==== cloud ====
+    public boolean checkCloudPriv(UserIdentity currentUser, String cloudName,
+                                  PrivPredicate wanted, ResourceTypeEnum type) 
{
+        throw new RuntimeException("Auth.checkCloudPriv is not implemented");
+    }
+
     // ==== Other ====
     /*
      * Check if current user has certain privilege.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 8a1a16999b6..59833d0ea5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -135,7 +135,10 @@ public class ConnectContext {
     protected volatile long currentDbId = -1;
     // Transaction
     protected volatile TransactionEntry txnEntry = null;
-
+    // cluster name
+    protected volatile String clusterName = "";
+    // used for ShowSqlAction which don't allow a user account
+    protected volatile boolean noAuth = false;
     // username@host of current login user
     protected volatile String qualifiedUser;
     // LDAP authenticated but the Doris account does not exist,
@@ -176,6 +179,9 @@ public class ConnectContext {
     // So in the query planning stage, do not use any value in this attribute.
     protected QueryDetail queryDetail = null;
 
+    // cloud cluster name
+    protected volatile String cloudCluster = null;
+
     // If set to true, the nondeterministic function will not be rewrote to 
constant.
     private boolean notEvalNondeterministicFunction = false;
     // The resource tag is used to limit the node resources that the user can 
use for query.
@@ -525,6 +531,14 @@ public class ConnectContext {
         return env;
     }
 
+    public boolean getNoAuth() {
+        return noAuth;
+    }
+
+    public void setNoAuth(boolean noAuth) {
+        this.noAuth = noAuth;
+    }
+
     public String getQualifiedUser() {
         return qualifiedUser;
     }
@@ -798,6 +812,14 @@ public class ConnectContext {
         return queryId;
     }
 
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setCluster(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
     public String getSqlHash() {
         return sqlHash;
     }
@@ -1002,6 +1024,26 @@ public class ConnectContext {
         return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]";
     }
 
+    // maybe user set cluster by SQL hint of session variable: cloud_cluster
+    // so first check it and then get from connect context.
+    public String getCurrentCloudCluster() {
+        String cluster = getSessionVariable().getCloudCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = getCloudCluster();
+        }
+        return cluster;
+    }
+
+    // Set cloud cluster by `use @clusterName`
+    public void setCloudCluster(String cluster) {
+        this.cloudCluster = cluster;
+    }
+
+    // The returned cluster is set by `use @clusterName`
+    public String getCloudCluster() {
+        return cloudCluster;
+    }
+
     public StatsErrorEstimator getStatsErrorEstimator() {
         return statsErrorEstimator;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d4b8dc0acea..9c8546a98e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -65,6 +65,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+
 /**
  * System variable.
  **/
@@ -500,6 +501,12 @@ public class SessionVariable implements Serializable, 
Writable {
     );
 
     public static final String ENABLE_STATS = "enable_stats";
+
+    // CLOUD_VARIABLES_BEGIN
+    public static final String CLOUD_CLUSTER = "cloud_cluster";
+    public static final String DISABLE_EMPTY_PARTITION_PRUNE = 
"disable_empty_partition_prune";
+    // CLOUD_VARIABLES_BEGIN
+
     /**
      * If set false, user couldn't submit analyze SQL and FE won't allocate 
any related resources.
      */
@@ -1546,6 +1553,13 @@ public class SessionVariable implements Serializable, 
Writable {
                 "use other health replica when the use_fix_replica meet error" 
})
     public boolean fallbackOtherReplicaWhenFixedCorrupt = false;
 
+    // CLOUD_VARIABLES_BEGIN
+    @VariableMgr.VarAttr(name = CLOUD_CLUSTER)
+    public String cloudCluster = "";
+    @VariableMgr.VarAttr(name = DISABLE_EMPTY_PARTITION_PRUNE)
+    public boolean disableEmptyPartitionPrune = false;
+    // CLOUD_VARIABLES_END
+
     // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to 
generate some variables,
     // not the default value set in the code.
     @SuppressWarnings("checkstyle:Indentation")
@@ -3261,4 +3275,22 @@ public class SessionVariable implements Serializable, 
Writable {
     public void setIgnoreStorageDataDistribution(boolean 
ignoreStorageDataDistribution) {
         this.ignoreStorageDataDistribution = ignoreStorageDataDistribution;
     }
+
+    // CLOUD_VARIABLES_BEGIN
+    public String getCloudCluster() {
+        return cloudCluster;
+    }
+
+    public String setCloudCluster(String cloudCluster) {
+        return this.cloudCluster = cloudCluster;
+    }
+
+    public boolean getDisableEmptyPartitionPrune() {
+        return disableEmptyPartitionPrune;
+    }
+
+    public void setDisableEmptyPartitionPrune(boolean val) {
+        disableEmptyPartitionPrune = val;
+    }
+    // CLOUD_VARIABLES_END
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java 
b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
index b9ba7af89ba..9353140b9d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
@@ -66,6 +66,13 @@ public class Tag implements Writable {
     public static final String VALUE_DEFAULT_TAG = "default";
     public static final String VALUE_INVALID_TAG = "invalid";
 
+    public static final String CLOUD_CLUSTER_NAME = "cloud_cluster_name";
+    public static final String CLOUD_CLUSTER_ID = "cloud_cluster_id";
+    public static final String CLOUD_UNIQUE_ID = "cloud_unique_id";
+    public static final String CLOUD_CLUSTER_PUBLIC_ENDPOINT = 
"cloud_cluster_public_endpoint";
+    public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT = 
"cloud_cluster_private_endpoint";
+    public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status";
+
     public static final ImmutableSet<String> RESERVED_TAG_TYPE = 
ImmutableSet.of(
             TYPE_ROLE, TYPE_FUNCTION, TYPE_LOCATION);
     public static final ImmutableSet<String> RESERVED_TAG_VALUES = 
ImmutableSet.of(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index fcb5e63e838..2f8faa06b1f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -120,6 +120,9 @@ public class Backend implements Writable {
     @SerializedName("tagMap")
     private Map<String, String> tagMap = Maps.newHashMap();
 
+    private boolean isSmoothUpgradeSrc = false; // This be process is old 
process when doing smooth upgrade
+    private boolean isSmoothUpgradeDst = false; // This be process is new 
process when doing smooth upgrade
+
     // cpu cores
     @SerializedName("cpuCores")
     private int cpuCores = 1;
@@ -174,6 +177,38 @@ public class Backend implements Writable {
         this.tagMap.put(locationTag.type, locationTag.value);
     }
 
+    public String getCloudClusterStatus() {
+        return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_STATUS, "");
+    }
+
+    public void setCloudClusterStatus(final String clusterStatus) {
+        tagMap.put(Tag.CLOUD_CLUSTER_STATUS, clusterStatus);
+    }
+
+    public String getCloudClusterName() {
+        return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_NAME, "");
+    }
+
+    public void setCloudClusterName(final String clusterName) {
+        tagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+    }
+
+    public String getCloudClusterId() {
+        return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_ID, "");
+    }
+
+    public String getCloudUniqueId() {
+        return tagMap.getOrDefault(Tag.CLOUD_UNIQUE_ID, "");
+    }
+
+    public String getCloudPublicEndpoint() {
+        return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, "");
+    }
+
+    public String getCloudPrivateEndpoint() {
+        return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, "");
+    }
+
     public long getId() {
         return id;
     }
@@ -369,6 +404,22 @@ public class Backend implements Writable {
         return backendStatus;
     }
 
+    public void setSmoothUpgradeSrc(boolean is) {
+        this.isSmoothUpgradeSrc = is;
+    }
+
+    public boolean isSmoothUpgradeSrc() {
+        return this.isSmoothUpgradeSrc;
+    }
+
+    public void setSmoothUpgradeDst(boolean is) {
+        this.isSmoothUpgradeDst = is;
+    }
+
+    public boolean isSmoothUpgradeDst() {
+        return this.isSmoothUpgradeDst;
+    }
+
     public int getHeartbeatFailureCounter() {
         return heartbeatFailureCounter;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 9c1e196923f..78297aae83a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -23,6 +23,10 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.proto.Cloud.ClusterPB;
+import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -32,9 +36,11 @@ import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.CountingDataOutputStream;
 import org.apache.doris.common.util.NetUtils;
+import org.apache.doris.ha.FrontendNodeType;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TNodeInfo;
 import org.apache.doris.thrift.TPaloNodesInfo;
 import org.apache.doris.thrift.TStatusCode;
@@ -63,7 +69,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 public class SystemInfoService {
@@ -77,9 +85,22 @@ public class SystemInfoService {
 
     private volatile ImmutableMap<Long, Backend> idToBackendRef = 
ImmutableMap.of();
     private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = 
ImmutableMap.of();
+    // TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk 
of inconsistency
+    // use exclusive lock to make sure only one thread can change 
clusterIdToBackend and clusterNameToId
+    private ReentrantLock lock = new ReentrantLock();
+
+    // for show cluster and cache user owned cluster
+    // mysqlUserName -> List of ClusterPB
+    private Map<String, List<ClusterPB>> mysqlUserNameToClusterPB = 
ImmutableMap.of();
+    // clusterId -> List<Backend>
+    private Map<String, List<Backend>> clusterIdToBackend = new 
ConcurrentHashMap<>();
+    // clusterName -> clusterId
+    private Map<String, String> clusterNameToId = new ConcurrentHashMap<>();
 
     private volatile ImmutableMap<Long, DiskInfo> pathHashToDiskInfoRef = 
ImmutableMap.of();
 
+    private InstanceInfoPB.Status instanceStatus;
+
     public static class HostInfo implements Comparable<HostInfo> {
         public String host;
         public int port;
@@ -159,6 +180,114 @@ public class SystemInfoService {
         }
     };
 
+    public boolean availableBackendsExists() {
+        if (FeConstants.runningUnitTest) {
+            return true;
+        }
+        if (null == clusterNameToId || clusterNameToId.isEmpty()) {
+            return false;
+        }
+        return clusterIdToBackend != null && !clusterIdToBackend.isEmpty()
+               && clusterIdToBackend.values().stream().anyMatch(list -> list 
!= null && !list.isEmpty());
+    }
+
+    public boolean containClusterName(String clusterName) {
+        return clusterNameToId.containsKey(clusterName);
+    }
+
+    public List<Backend> getBackendsByClusterName(final String clusterName) {
+        String clusterId = clusterNameToId.getOrDefault(clusterName, "");
+        if (clusterId.isEmpty()) {
+            return new ArrayList<>();
+        }
+        return clusterIdToBackend.get(clusterId);
+    }
+
+    public List<Backend> getBackendsByClusterId(final String clusterId) {
+        return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>());
+    }
+
+    public List<String> getCloudClusterIds() {
+        return new ArrayList<>(clusterIdToBackend.keySet());
+    }
+
+    public String getCloudStatusByName(final String clusterName) {
+        String clusterId = clusterNameToId.getOrDefault(clusterName, "");
+        if (Strings.isNullOrEmpty(clusterId)) {
+            // for rename cluster or dropped cluster
+            LOG.warn("cant find clusterId by clusterName {}", clusterName);
+            return "";
+        }
+        return getCloudStatusById(clusterId);
+    }
+
+    public String getCloudStatusById(final String clusterId) {
+        return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>())
+            
.stream().map(Backend::getCloudClusterStatus).findFirst().orElse("");
+    }
+
+    public void updateClusterNameToId(final String newName,
+            final String originalName, final String clusterId) {
+        lock.lock();
+        clusterNameToId.remove(originalName);
+        clusterNameToId.put(newName, clusterId);
+        lock.unlock();
+    }
+
+    public String getClusterNameByClusterId(final String clusterId) {
+        String clusterName = "";
+        for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
+            if (entry.getValue().equals(clusterId)) {
+                clusterName = entry.getKey();
+                break;
+            }
+        }
+        return clusterName;
+    }
+
+    public void dropCluster(final String clusterId, final String clusterName) {
+        lock.lock();
+        clusterNameToId.remove(clusterName, clusterId);
+        clusterIdToBackend.remove(clusterId);
+        lock.unlock();
+    }
+
+    public List<String> getCloudClusterNames() {
+        return new ArrayList<>(clusterNameToId.keySet());
+    }
+
+    // Return the ref of concurrentMap clusterIdToBackend
+    // It should be thread-safe to iterate.
+    // reference: 
https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe
+    public Map<String, List<Backend>> getCloudClusterIdToBackend() {
+        return clusterIdToBackend;
+    }
+
+    public String getCloudClusterIdByName(String clusterName) {
+        return clusterNameToId.get(clusterName);
+    }
+
+    public ImmutableMap<Long, Backend> getCloudIdToBackend(String clusterName) 
{
+        String clusterId = clusterNameToId.get(clusterName);
+        if (Strings.isNullOrEmpty(clusterId)) {
+            LOG.warn("cant find clusterId, this cluster may be has been 
dropped, clusterName={}", clusterName);
+            return ImmutableMap.of();
+        }
+        List<Backend> backends = clusterIdToBackend.get(clusterId);
+        Map<Long, Backend> idToBackend = Maps.newHashMap();
+        for (Backend be : backends) {
+            idToBackend.put(be.getId(), be);
+        }
+        return ImmutableMap.copyOf(idToBackend);
+    }
+
+    // Return the ref of concurrentMap clusterNameToId
+    // It should be thread-safe to iterate.
+    // reference: 
https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe
+    public Map<String, String> getCloudClusterNameToId() {
+        return clusterNameToId;
+    }
+
     public static TPaloNodesInfo createAliveNodesInfo() {
         TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
         SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
@@ -169,6 +298,23 @@ public class SystemInfoService {
         return nodesInfo;
     }
 
+    public Map<String, List<ClusterPB>> getMysqlUserNameToClusterPb() {
+        return mysqlUserNameToClusterPB;
+    }
+
+    public void updateMysqlUserNameToClusterPb(Map<String, List<ClusterPB>> m) 
{
+        mysqlUserNameToClusterPB = m;
+    }
+
+    public List<Pair<String, Integer>> getCurrentObFrontends() {
+        List<Frontend> frontends = 
Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
+        List<Pair<String, Integer>> frontendsPair = new ArrayList<>();
+        for (Frontend frontend : frontends) {
+            frontendsPair.add(Pair.of(frontend.getHost(), 
frontend.getEditLogPort()));
+        }
+        return frontendsPair;
+    }
+
     // for deploy manager
     public void addBackends(List<HostInfo> hostInfos, boolean isFree)
             throws UserException {
@@ -979,4 +1125,31 @@ public class SystemInfoService {
     public long aliveBECount() {
         return 
idToBackendRef.values().stream().filter(Backend::isAlive).count();
     }
+
+    public Cloud.GetInstanceResponse getCloudInstance() {
+        Cloud.GetInstanceRequest.Builder builder =
+                Cloud.GetInstanceRequest.newBuilder();
+        builder.setCloudUniqueId(Config.cloud_unique_id);
+        final Cloud.GetInstanceRequest pRequest = builder.build();
+        Cloud.GetInstanceResponse response;
+        try {
+            response = MetaServiceProxy.getInstance().getInstance(pRequest);
+            return response;
+        } catch (RpcException e) {
+            LOG.warn("rpcToGetInstance exception: {}", e.getMessage());
+        }
+        return null;
+    }
+
+    public InstanceInfoPB.Status getInstanceStatus() {
+        return this.instanceStatus;
+    }
+
+    public void setInstanceStatus(InstanceInfoPB.Status instanceStatus) {
+        LOG.debug("fe set instance status {}", instanceStatus);
+        if (this.instanceStatus != instanceStatus) {
+            LOG.info("fe change instance status from {} to {}", 
this.instanceStatus, instanceStatus);
+        }
+        this.instanceStatus = instanceStatus;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to