This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3dc00b991a6 [feature](merge-cloud) Add fe cloud partition and replica
definition (#29855)
3dc00b991a6 is described below
commit 3dc00b991a6e1a55b5a31429d194cf03b72e5c2f
Author: walter <[email protected]>
AuthorDate: Thu Jan 11 20:19:49 2024 +0800
[feature](merge-cloud) Add fe cloud partition and replica definition
(#29855)
Co-authored-by: Luwei <[email protected]>
Co-authored-by: deardeng <[email protected]>
Co-authored-by: Gavin Chou <[email protected]>
Co-authored-by: Lightman <[email protected]>
Co-authored-by: zhengyu <[email protected]>
Co-authored-by: Lei Zhang <[email protected]>
Co-authored-by: AlexYue <[email protected]>
Co-authored-by: Xiaocc <[email protected]>
Co-authored-by: panDing19 <[email protected]>
Co-authored-by: plat1ko <[email protected]>
Co-authored-by: zhangdong <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 15 +
.../apache/doris/analysis/ResourceTypeEnum.java | 25 ++
.../main/java/org/apache/doris/catalog/Env.java | 22 ++
.../java/org/apache/doris/catalog/Partition.java | 6 +-
.../cloud/catalog/CloudInstanceStatusChecker.java | 52 +++
.../apache/doris/cloud/catalog/CloudPartition.java | 422 +++++++++++++++++++++
.../apache/doris/cloud/catalog/CloudReplica.java | 394 +++++++++++++++++++
.../java/org/apache/doris/common/ErrorCode.java | 5 +-
.../doris/common/profile/SummaryProfile.java | 66 +++-
.../org/apache/doris/mysql/privilege/Auth.java | 8 +
.../java/org/apache/doris/qe/ConnectContext.java | 44 ++-
.../java/org/apache/doris/qe/SessionVariable.java | 32 ++
.../main/java/org/apache/doris/resource/Tag.java | 7 +
.../main/java/org/apache/doris/system/Backend.java | 51 +++
.../org/apache/doris/system/SystemInfoService.java | 173 +++++++++
15 files changed, 1305 insertions(+), 17 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a84185b673b..60ae7ec1463 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2450,6 +2450,21 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int meta_service_connection_age_base_minutes = 5;
+ @ConfField
+ public static int cloud_meta_service_rpc_failed_retry_times = 200;
+
+ @ConfField
+ public static int default_get_version_from_ms_timeout_second = 3;
+
+ @ConfField(mutable = true)
+ public static boolean enable_cloud_multi_replica = false;
+
+ @ConfField(mutable = true)
+ public static int cloud_replica_num = 3;
+
+ @ConfField(mutable = true)
+ public static int cloud_cold_read_percent = 10; // 10%
+
//==========================================================================
// end of cloud config
//==========================================================================
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResourceTypeEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResourceTypeEnum.java
new file mode 100644
index 00000000000..cb6972b6d0b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResourceTypeEnum.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+/**
+ * Resource type enum
+ **/
+public enum ResourceTypeEnum {
+ GENERAL, CLUSTER, STAGE
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 2f33e12da47..28c74c749a8 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -69,6 +69,7 @@ import org.apache.doris.analysis.RecoverDbStmt;
import org.apache.doris.analysis.RecoverPartitionStmt;
import org.apache.doris.analysis.RecoverTableStmt;
import org.apache.doris.analysis.ReplacePartitionClause;
+import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.RollupRenameClause;
import org.apache.doris.analysis.SetType;
@@ -4970,6 +4971,27 @@ public class Env {
this.alter.getClusterHandler().cancel(stmt);
}
+ public void checkCloudClusterPriv(String clusterName) throws DdlException {
+ // check resource usage privilege
+ if
(!Env.getCurrentEnv().getAuth().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(),
+ clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
+ throw new DdlException("USAGE denied to user "
+ + ConnectContext.get().getQualifiedUser() + "'@'" +
ConnectContext.get().getRemoteIP()
+ + "' for cloud cluster '" + clusterName + "'",
ErrorCode.ERR_CLUSTER_NO_PERMISSIONS);
+ }
+
+ if
(!Env.getCurrentSystemInfo().getCloudClusterNames().contains(clusterName)) {
+ LOG.debug("current instance does not have a cluster name :{}",
clusterName);
+ throw new DdlException(String.format("Cluster %s not exist",
clusterName),
+ ErrorCode.ERR_CLOUD_CLUSTER_ERROR);
+ }
+ }
+
+ public static void waitForAutoStart(final String clusterName) throws
DdlException {
+ // TODO: merge from cloud.
+ throw new DdlException("Env.waitForAutoStart unimplemented");
+ }
+
// Switch catalog of this sesseion.
public void changeCatalog(ConnectContext ctx, String catalogName) throws
DdlException {
CatalogIf catalogIf = catalogMgr.getCatalogNullable(catalogName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 76c3097a033..755e5f14a29 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -95,14 +95,14 @@ public class Partition extends MetaObject implements
Writable {
@SerializedName(value = "visibleVersionHash")
private long visibleVersionHash;
@SerializedName(value = "nextVersion")
- private long nextVersion;
+ protected long nextVersion;
@Deprecated
@SerializedName(value = "nextVersionHash")
private long nextVersionHash;
@SerializedName(value = "distributionInfo")
private DistributionInfo distributionInfo;
- private Partition() {
+ protected Partition() {
}
public Partition(long id, String name,
@@ -181,7 +181,7 @@ public class Partition extends MetaObject implements
Writable {
}
// The method updateVisibleVersionAndVersionHash is called when fe
restart, the visibleVersionTime is updated
- private void setVisibleVersion(long visibleVersion) {
+ protected void setVisibleVersion(long visibleVersion) {
this.visibleVersion = visibleVersion;
this.visibleVersionTime = System.currentTimeMillis();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
new file mode 100644
index 00000000000..b8329deffbb
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CloudInstanceStatusChecker extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(CloudInstanceStatusChecker.class);
+
+ public CloudInstanceStatusChecker() {
+ super("cloud instance check");
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ try {
+ Cloud.GetInstanceResponse response =
+ Env.getCurrentSystemInfo().getCloudInstance();
+ LOG.debug("get from ms response {}", response);
+ if (!response.hasStatus() || !response.getStatus().hasCode()
+ || response.getStatus().getCode() !=
Cloud.MetaServiceCode.OK) {
+ LOG.warn("failed to get cloud instance due to incomplete
response, "
+ + "cloud_unique_id={}, response={}",
Config.cloud_unique_id, response);
+ } else {
+
Env.getCurrentSystemInfo().setInstanceStatus(response.getInstance().getStatus());
+ }
+ } catch (Exception e) {
+ LOG.warn("get instance from ms exception", e);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
new file mode 100644
index 00000000000..fba1daa512f
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
@@ -0,0 +1,422 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.rpc.RpcException;
+
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * Internal representation of partition-related metadata.
+ */
+public class CloudPartition extends Partition {
+ // Every partition starts from version 1, version 1 has no data
+ public static long EMPTY_VERSION = 1;
+
+ private static final Logger LOG =
LogManager.getLogger(CloudPartition.class);
+
+ // not Serialized
+ @SerializedName(value = "dbId")
+ private long dbId;
+ @SerializedName(value = "tableId")
+ private long tableId;
+
+ public CloudPartition(long id, String name, MaterializedIndex baseIndex,
+ DistributionInfo distributionInfo, long dbId, long
tableId) {
+ super(id, name, baseIndex, distributionInfo);
+ super.nextVersion = -1;
+ this.dbId = dbId;
+ this.tableId = tableId;
+ }
+
+ public CloudPartition() {
+ super();
+ }
+
+ public long getDbId() {
+ return this.dbId;
+ }
+
+ public void setDbId(long dbId) {
+ this.dbId = dbId;
+ }
+
+ public long getTableId() {
+ return this.tableId;
+ }
+
+ public void setTableId(long tableId) {
+ this.tableId = tableId;
+ }
+
+ protected void setVisibleVersion(long visibleVersion) {
+ LOG.debug("setVisibleVersion use CloudPartition {}", super.getName());
+ return;
+ }
+
+ protected void setCachedVisibleVersion(long version) {
+ super.setVisibleVersion(version);
+ }
+
+ @Override
+ public long getVisibleVersion() {
+ LOG.debug("getVisibleVersion use CloudPartition {}", super.getName());
+
+ Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
+ .setDbId(this.dbId)
+ .setTableId(this.tableId)
+ .setPartitionId(super.getId())
+ .setBatchMode(false)
+ .build();
+
+ try {
+ Cloud.GetVersionResponse resp = getVersionFromMeta(request);
+ long version = -1;
+ if (resp.getStatus().getCode() == MetaServiceCode.OK) {
+ version = resp.getVersion();
+ } else {
+ assert resp.getStatus().getCode() ==
MetaServiceCode.VERSION_NOT_FOUND;
+ version = 0;
+ }
+ LOG.debug("get version from meta service, version: {}, partition:
{}", version, super.getId());
+ // Cache visible version, see hasData() for details.
+ super.setVisibleVersion(version);
+ if (version == 0 && isEmptyPartitionPruneDisabled()) {
+ version = 1;
+ }
+ return version;
+ } catch (RpcException e) {
+ throw new RuntimeException("get version from meta service failed");
+ }
+ }
+
+ // Select the non-empty partitions and return the ids.
+ public static List<Long> selectNonEmptyPartitionIds(List<CloudPartition>
partitions) {
+ List<Long> nonEmptyPartitionIds = partitions.stream()
+ .filter(CloudPartition::hasDataCached)
+ .map(CloudPartition::getId)
+ .collect(Collectors.toList());
+ if (nonEmptyPartitionIds.size() == partitions.size()) {
+ return nonEmptyPartitionIds;
+ }
+
+ List<CloudPartition> unknowns = partitions.stream()
+ .filter(p -> !p.hasDataCached())
+ .collect(Collectors.toList());
+
+ SummaryProfile profile = getSummaryProfile();
+ if (profile != null) {
+ profile.incGetPartitionVersionByHasDataCount();
+ }
+
+ try {
+ List<Long> versions =
CloudPartition.getSnapshotVisibleVersion(unknowns);
+
+ int size = versions.size();
+ for (int i = 0; i < size; i++) {
+ if (versions.get(i) > CloudPartition.EMPTY_VERSION) {
+ nonEmptyPartitionIds.add(unknowns.get(i).getId());
+ }
+ }
+
+ return nonEmptyPartitionIds;
+ } catch (RpcException e) {
+ throw new RuntimeException("get version from meta service failed");
+ }
+ }
+
+ // Get visible version from the specified partitions;
+ //
+ // Return the visible version in order of the specified partition ids, -1
means version NOT FOUND.
+ public static List<Long> getSnapshotVisibleVersion(List<CloudPartition>
partitions) throws RpcException {
+ if (partitions.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ List<Long> dbIds = new ArrayList<>();
+ List<Long> tableIds = new ArrayList<>();
+ List<Long> partitionIds = new ArrayList<>();
+ for (CloudPartition partition : partitions) {
+ dbIds.add(partition.getDbId());
+ tableIds.add(partition.getTableId());
+ partitionIds.add(partition.getId());
+ }
+
+ List<Long> versions = getSnapshotVisibleVersion(dbIds, tableIds,
partitionIds);
+
+ // Cache visible version, see hasData() for details.
+ int size = versions.size();
+ for (int i = 0; i < size; ++i) {
+ Long version = versions.get(i);
+ if (version > EMPTY_VERSION) {
+ partitions.get(i).setCachedVisibleVersion(versions.get(i));
+ }
+ }
+
+ return versions;
+ }
+
+ // Get visible versions for the specified partitions.
+ //
+ // Return the visible version in order of the specified partition ids, -1
means version NOT FOUND.
+ public static List<Long> getSnapshotVisibleVersion(List<Long> dbIds,
List<Long> tableIds, List<Long> partitionIds)
+ throws RpcException {
+ assert dbIds.size() == partitionIds.size() :
+ "partition ids size: " + partitionIds.size() + " should equals
to db ids size: " + dbIds.size();
+ assert tableIds.size() == partitionIds.size() :
+ "partition ids size: " + partitionIds.size() + " should equals
to tablet ids size: " + tableIds.size();
+
+ Cloud.GetVersionRequest req = Cloud.GetVersionRequest.newBuilder()
+ .setDbId(-1)
+ .setTableId(-1)
+ .setPartitionId(-1)
+ .setBatchMode(true)
+ .addAllDbIds(dbIds)
+ .addAllTableIds(tableIds)
+ .addAllPartitionIds(partitionIds)
+ .build();
+
+ LOG.debug("getVisibleVersion use CloudPartition {}",
partitionIds.toString());
+ Cloud.GetVersionResponse resp = getVersionFromMeta(req);
+ if (resp.getStatus().getCode() != MetaServiceCode.OK) {
+ throw new RpcException("get visible version", "unexpected status "
+ resp.getStatus());
+ }
+
+ List<Long> versions = resp.getVersionsList();
+ if (versions.size() != partitionIds.size()) {
+ throw new RpcException("get visible version",
+ "wrong number of versions, required " +
partitionIds.size() + ", but got " + versions.size());
+ }
+
+ LOG.debug("get version from meta service, partitions: {}, versions:
{}", partitionIds, versions);
+
+ if (isEmptyPartitionPruneDisabled()) {
+ ArrayList<Long> news = new ArrayList<>();
+ for (Long v : versions) {
+ news.add(v == -1 ? 1 : v);
+ }
+ return news;
+ }
+
+ return versions;
+ }
+
+ @Override
+ public long getNextVersion() {
+ // use meta service visibleVersion
+ LOG.debug("getNextVersion use CloudPartition {}", super.getName());
+ return -1;
+ }
+
+ @Override
+ public void setNextVersion(long nextVersion) {
+ // use meta service visibleVersion
+ LOG.debug("setNextVersion use CloudPartition {} Version {}",
super.getName(), nextVersion);
+ return;
+ }
+
+ @Override
+ public void updateVersionForRestore(long visibleVersion) {
+ LOG.debug("updateVersionForRestore use CloudPartition {} version for
restore: visible: {}",
+ super.getName(), visibleVersion);
+ return;
+ }
+
+ @Override
+ public void updateVisibleVersion(long visibleVersion) {
+ // use meta service visibleVersion
+ LOG.debug("updateVisibleVersion use CloudPartition {} version for
restore: visible: {}",
+ super.getName(), visibleVersion);
+
+ return;
+ }
+
+ @Override
+ public void updateVisibleVersionAndTime(long visibleVersion, long
visibleVersionTime) {
+ }
+
+ // Determine whether data this partition has, according to the cached
visible version.
+ public boolean hasDataCached() {
+ // In order to determine whether a partition is empty, a get_version
RPC is issued to
+ // the meta service. The pruning process will be very slow when there
are lots of empty
+ // partitions. This option disables the empty partition prune
optimization to speed SQL
+ // analysis/plan phase.
+ if (isEmptyPartitionPruneDisabled()) {
+ return true;
+ }
+
+ // Every partition starts from version 1, version 1 has no data.
+ // So as long as version is greater than 1, it can be determined that
there is data here.
+ return super.getVisibleVersion() > EMPTY_VERSION;
+ }
+
+ /**
+ * CloudPartition always has data
+ */
+ @Override
+ public boolean hasData() {
+ // To avoid sending an RPC request, see the cached visible version
here first.
+ if (hasDataCached()) {
+ return true;
+ }
+
+ SummaryProfile profile = getSummaryProfile();
+ if (profile != null) {
+ profile.incGetPartitionVersionByHasDataCount();
+ }
+
+ return getVisibleVersion() > EMPTY_VERSION;
+ }
+
+ private static Cloud.GetVersionResponse
getVersionFromMeta(Cloud.GetVersionRequest req)
+ throws RpcException {
+ long startAt = System.nanoTime();
+ try {
+ return getVersionFromMetaInner(req);
+ } finally {
+ SummaryProfile profile = getSummaryProfile();
+ if (profile != null) {
+ profile.addGetPartitionVersionTime(System.nanoTime() -
startAt);
+ }
+ }
+ }
+
+ private static Cloud.GetVersionResponse
getVersionFromMetaInner(Cloud.GetVersionRequest req)
+ throws RpcException {
+ for (int retryTime = 0; retryTime <
Config.cloud_meta_service_rpc_failed_retry_times; retryTime++) {
+ try {
+ long deadline = System.currentTimeMillis() +
Config.default_get_version_from_ms_timeout_second * 1000L;
+ Future<Cloud.GetVersionResponse> future =
+
MetaServiceProxy.getInstance().getVisibleVersionAsync(req);
+
+ Cloud.GetVersionResponse resp = null;
+ while (resp == null) {
+ try {
+ resp = future.get(Math.max(0, deadline -
System.currentTimeMillis()), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("get version from meta service: future get
interrupted exception");
+ }
+ }
+
+ if (resp.hasStatus() && (resp.getStatus().getCode() ==
MetaServiceCode.OK
+ || resp.getStatus().getCode() ==
MetaServiceCode.VERSION_NOT_FOUND)) {
+ LOG.debug("get version from meta service, code: {}",
resp.getStatus().getCode());
+ return resp;
+ }
+
+ LOG.warn("get version from meta service failed, status: {},
retry time: {}",
+ resp.getStatus(), retryTime);
+ } catch (RpcException | ExecutionException | TimeoutException |
RuntimeException e) {
+ LOG.warn("get version from meta service failed, retry times:
{} exception: ", retryTime, e);
+ }
+
+ // sleep random millis [20, 200] ms, retry rpc failed
+ int randomMillis = 20 + (int) (Math.random() * (200 - 20));
+ if (retryTime > Config.cloud_meta_service_rpc_failed_retry_times /
2) {
+ // sleep random millis [500, 1000] ms, retry rpc failed
+ randomMillis = 500 + (int) (Math.random() * (1000 - 500));
+ }
+ try {
+ Thread.sleep(randomMillis);
+ } catch (InterruptedException e) {
+ LOG.warn("get version from meta service: sleep get interrupted
exception");
+ }
+ }
+
+ LOG.warn("get version from meta service failed after retry {} times",
+ Config.cloud_meta_service_rpc_failed_retry_times);
+ throw new RpcException("get version from meta service", "failed after
retry n times");
+ }
+
+ private static boolean isEmptyPartitionPruneDisabled() {
+ ConnectContext ctx = ConnectContext.get();
+ if (ctx != null &&
ctx.getSessionVariable().getDisableEmptyPartitionPrune()) {
+ return true;
+ }
+ return false;
+ }
+
+ private static SummaryProfile getSummaryProfile() {
+ ConnectContext ctx = ConnectContext.get();
+ if (ctx != null) {
+ StmtExecutor executor = ctx.getExecutor();
+ if (executor != null) {
+ return executor.getSummaryProfile();
+ }
+ }
+ return null;
+ }
+
+ public static CloudPartition read(DataInput in) throws IOException {
+ CloudPartition partition = new CloudPartition();
+ partition.readFields(in);
+ partition.setDbId(in.readLong());
+ partition.setTableId(in.readLong());
+ return partition;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeLong(this.dbId);
+ out.writeLong(this.tableId);
+ }
+
+ public boolean equals(Object obj) {
+ if (!super.equals(obj)) {
+ return false;
+ }
+
+ if (!(obj instanceof CloudPartition)) {
+ return false;
+ }
+ CloudPartition cloudPartition = (CloudPartition) obj;
+ return (dbId == cloudPartition.dbId) && (tableId ==
cloudPartition.tableId);
+ }
+
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(super.toString());
+ buffer.append("dbId: ").append(this.dbId).append("; ");
+ buffer.append("tableId: ").append(this.tableId).append("; ");
+ return buffer.toString();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
new file mode 100644
index 00000000000..cf503d9bc40
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+
+import com.google.common.base.Strings;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CloudReplica extends Replica {
+ private static final Logger LOG = LogManager.getLogger(CloudReplica.class);
+
+ // In the future, a replica may be mapped to multiple BEs in a cluster,
+ // so this value is be list
+ private Map<String, List<Long>> clusterToBackends = new
ConcurrentHashMap<String, List<Long>>();
+ @SerializedName(value = "dbId")
+ private long dbId = -1;
+ @SerializedName(value = "tableId")
+ private long tableId = -1;
+ @SerializedName(value = "partitionId")
+ private long partitionId = -1;
+ @SerializedName(value = "indexId")
+ private long indexId = -1;
+ @SerializedName(value = "idx")
+ private long idx = -1;
+
+ private Random rand = new Random();
+
+ private Map<String, List<Long>> memClusterToBackends = new
ConcurrentHashMap<String, List<Long>>();
+
+ public CloudReplica() {
+ }
+
+ public CloudReplica(long replicaId, List<Long> backendIds, ReplicaState
state, long version, int schemaHash,
+ long dbId, long tableId, long partitionId, long indexId, long idx)
{
+ super(replicaId, -1, state, version, schemaHash);
+ this.dbId = dbId;
+ this.tableId = tableId;
+ this.partitionId = partitionId;
+ this.indexId = indexId;
+ this.idx = idx;
+ }
+
+ private boolean isColocated() {
+ return Env.getCurrentColocateIndex().isColocateTable(tableId);
+ }
+
+ private long getColocatedBeId(String cluster) {
+ List<Backend> bes =
Env.getCurrentSystemInfo().getBackendsByClusterId(cluster);
+ List<Backend> availableBes = new ArrayList<>();
+ for (Backend be : bes) {
+ if (be.isAlive()) {
+ availableBes.add(be);
+ }
+ }
+ if (availableBes == null || availableBes.size() == 0) {
+ LOG.warn("failed to get available be, clusterId: {}", cluster);
+ return -1;
+ }
+
+ // Tablets with the same idx will be hashed to the same BE, which
+ // meets the requirements of colocated table.
+ long index = idx % availableBes.size();
+ long pickedBeId = availableBes.get((int) index).getId();
+
+ return pickedBeId;
+ }
+
+ @Override
+ public long getBackendId() {
+ String cluster = null;
+ // Not in a connect session
+ ConnectContext context = ConnectContext.get();
+ if (context != null) {
+ if
(!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) {
+ cluster = context.getSessionVariable().getCloudCluster();
+ try {
+ Env.getCurrentEnv().checkCloudClusterPriv(cluster);
+ } catch (Exception e) {
+ LOG.warn("get cluster by session context exception");
+ return -1;
+ }
+ LOG.debug("get cluster by session context cluster: {}",
cluster);
+ } else {
+ cluster = context.getCloudCluster();
+ LOG.debug("get cluster by context {}", cluster);
+ }
+ } else {
+ LOG.debug("connect context is null in getBackendId");
+ return -1;
+ }
+
+ // check default cluster valid.
+ if (!Strings.isNullOrEmpty(cluster)) {
+ boolean exist =
Env.getCurrentSystemInfo().getCloudClusterNames().contains(cluster);
+ if (!exist) {
+ //can't use this default cluster, plz change another
+ LOG.warn("cluster: {} is not existed", cluster);
+ return -1;
+ }
+ } else {
+ LOG.warn("failed to get available be, clusterName: {}", cluster);
+ return -1;
+ }
+
+ // if cluster is SUSPENDED, wait
+ try {
+ Env.waitForAutoStart(cluster);
+ } catch (DdlException e) {
+ // this function cant throw exception. so just log it
+ LOG.warn("cant resume cluster {}", cluster);
+ }
+ String clusterId =
Env.getCurrentSystemInfo().getCloudClusterIdByName(cluster);
+
+ if (isColocated()) {
+ return getColocatedBeId(clusterId);
+ }
+
+ if (Config.enable_cloud_multi_replica) {
+ int indexRand = rand.nextInt(Config.cloud_replica_num);
+ int coldReadRand = rand.nextInt(100);
+ boolean allowColdRead = coldReadRand <
Config.cloud_cold_read_percent;
+ boolean replicaEnough = memClusterToBackends.get(clusterId) != null
+ && memClusterToBackends.get(clusterId).size() > indexRand;
+
+ long backendId = -1;
+ if (replicaEnough) {
+ backendId = memClusterToBackends.get(clusterId).get(indexRand);
+ }
+
+ if (!replicaEnough && !allowColdRead &&
clusterToBackends.containsKey(clusterId)) {
+ backendId = clusterToBackends.get(clusterId).get(0);
+ }
+
+ if (backendId > 0) {
+ Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+ if (be != null && be.isQueryAvailable()) {
+ LOG.debug("backendId={} ", backendId);
+ return backendId;
+ }
+ }
+
+ List<Long> res = hashReplicaToBes(clusterId, false,
Config.cloud_replica_num);
+ if (res.size() < indexRand + 1) {
+ if (res.isEmpty()) {
+ return -1;
+ } else {
+ return res.get(0);
+ }
+ } else {
+ return res.get(indexRand);
+ }
+ }
+
+ if (clusterToBackends.containsKey(clusterId)) {
+ long backendId = clusterToBackends.get(clusterId).get(0);
+ Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+ if (be != null && be.isQueryAvailable()) {
+ LOG.debug("backendId={} ", backendId);
+ return backendId;
+ }
+ }
+
+ return hashReplicaToBe(clusterId, false);
+ }
+
+ public long hashReplicaToBe(String clusterId, boolean isBackGround) {
+ // TODO(luwei) list should be sorted
+ List<Backend> clusterBes =
Env.getCurrentSystemInfo().getBackendsByClusterId(clusterId);
+ // use alive be to exec sql
+ List<Backend> availableBes = new ArrayList<>();
+ for (Backend be : clusterBes) {
+ long lastUpdateMs = be.getLastUpdateMs();
+ long missTimeMs = Math.abs(lastUpdateMs -
System.currentTimeMillis());
+ // be core or restart must in heartbeat_interval_second
+ if ((be.isAlive() || missTimeMs <=
Config.heartbeat_interval_second * 1000L)
+ && !be.isSmoothUpgradeSrc()) {
+ availableBes.add(be);
+ }
+ }
+ if (availableBes == null || availableBes.size() == 0) {
+ if (!isBackGround) {
+ LOG.warn("failed to get available be, clusterId: {}",
clusterId);
+ }
+ return -1;
+ }
+ LOG.debug("availableBes={}", availableBes);
+ long index = -1;
+ HashCode hashCode = null;
+ if (idx == -1) {
+ index = getId() % availableBes.size();
+ } else {
+ hashCode = Hashing.murmur3_128().hashLong(partitionId);
+ int beNum = availableBes.size();
+ // (hashCode.asLong() + idx) % beNum may be a negative value, so we
+ // need to take the modulus of beNum again to ensure that index is
+ // a positive value
+ index = ((hashCode.asLong() + idx) % beNum + beNum) % beNum;
+ }
+ long pickedBeId = availableBes.get((int) index).getId();
+ LOG.info("picked beId {}, replicaId {}, partitionId {}, beNum {},
replicaIdx {}, picked Index {}, hashVal {}",
+ pickedBeId, getId(), partitionId, availableBes.size(), idx,
index,
+ hashCode == null ? -1 : hashCode.asLong());
+
+ // save to clusterToBackends map
+ List<Long> bes = new ArrayList<Long>();
+ bes.add(pickedBeId);
+ clusterToBackends.put(clusterId, bes);
+
+ return pickedBeId;
+ }
+
+ public List<Long> hashReplicaToBes(String clusterId, boolean isBackGround,
int replicaNum) {
+ // TODO(luwei) list should be sorted
+ List<Backend> clusterBes =
Env.getCurrentSystemInfo().getBackendsByClusterId(clusterId);
+ // use alive be to exec sql
+ List<Backend> availableBes = new ArrayList<>();
+ for (Backend be : clusterBes) {
+ long lastUpdateMs = be.getLastUpdateMs();
+ long missTimeMs = Math.abs(lastUpdateMs -
System.currentTimeMillis());
+ // be core or restart must in heartbeat_interval_second
+ if ((be.isAlive() || missTimeMs <=
Config.heartbeat_interval_second * 1000L)
+ && !be.isSmoothUpgradeSrc()) {
+ availableBes.add(be);
+ }
+ }
+ if (availableBes == null || availableBes.size() == 0) {
+ if (!isBackGround) {
+ LOG.warn("failed to get available be, clusterId: {}",
clusterId);
+ }
+ return new ArrayList<Long>();
+ }
+ LOG.debug("availableBes={}", availableBes);
+
+ int realReplicaNum = replicaNum > availableBes.size() ?
availableBes.size() : replicaNum;
+ List<Long> bes = new ArrayList<Long>();
+ for (int i = 0; i < realReplicaNum; ++i) {
+ long index = -1;
+ HashCode hashCode = null;
+ if (idx == -1) {
+ index = getId() % availableBes.size();
+ } else {
+ hashCode = Hashing.murmur3_128().hashLong(partitionId + i);
+ int beNum = availableBes.size();
+ // (hashCode.asLong() + idx) % beNum may be a negative value,
so we
+ // need to take the modulus of beNum again to ensure that
index is
+ // a positive value
+ index = ((hashCode.asLong() + idx) % beNum + beNum) % beNum;
+ }
+ long pickedBeId = availableBes.get((int) index).getId();
+ availableBes.remove((int) index);
+ LOG.info("picked beId {}, replicaId {}, partId {}, beNum {},
replicaIdx {}, picked Index {}, hashVal {}",
+ pickedBeId, getId(), partitionId, availableBes.size(),
idx, index,
+ hashCode == null ? -1 : hashCode.asLong());
+ // save to memClusterToBackends map
+ bes.add(pickedBeId);
+ }
+
+ memClusterToBackends.put(clusterId, bes);
+
+ return bes;
+ }
+
+ @Override
+ public boolean checkVersionCatchUp(long expectedVersion, boolean
ignoreAlter) {
+ // ATTN: expectedVersion is not used here, and
OlapScanNode.addScanRangeLocations
+ // depends this feature to implement snapshot partition version. See
comments in
+ // OlapScanNode.addScanRangeLocations for details.
+ if (ignoreAlter && getState() == ReplicaState.ALTER
+ && getVersion() == Partition.PARTITION_INIT_VERSION) {
+ return true;
+ }
+
+ if (expectedVersion == Partition.PARTITION_INIT_VERSION) {
+ // no data is loaded into this replica, just return true
+ return true;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ dbId = in.readLong();
+ tableId = in.readLong();
+ partitionId = in.readLong();
+ indexId = in.readLong();
+ idx = in.readLong();
+ int count = in.readInt();
+ for (int i = 0; i < count; ++i) {
+ String clusterId = Text.readString(in);
+ String realClusterId =
Env.getCurrentSystemInfo().getCloudClusterIdByName(clusterId);
+ LOG.debug("cluster Id {}, real cluster Id {}", clusterId,
realClusterId);
+
+ if (!Strings.isNullOrEmpty(realClusterId)) {
+ clusterId = realClusterId;
+ }
+
+ long beId = in.readLong();
+ List<Long> bes = new ArrayList<Long>();
+ bes.add(beId);
+ clusterToBackends.put(clusterId, bes);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeLong(dbId);
+ out.writeLong(tableId);
+ out.writeLong(partitionId);
+ out.writeLong(indexId);
+ out.writeLong(idx);
+ out.writeInt(clusterToBackends.size());
+ for (Map.Entry<String, List<Long>> entry :
clusterToBackends.entrySet()) {
+ Text.writeString(out, entry.getKey());
+ out.writeLong(entry.getValue().get(0));
+ }
+ }
+
+ public static CloudReplica read(DataInput in) throws IOException {
+ CloudReplica replica = new CloudReplica();
+ replica.readFields(in);
+ // TODO(luwei): persist and fill-up clusterToBackends to take full
advantage of data cache
+ return replica;
+ }
+
+ public long getDbId() {
+ return dbId;
+ }
+
+ public long getTableId() {
+ return tableId;
+ }
+
+ public long getPartitionId() {
+ return partitionId;
+ }
+
+ public long getIndexId() {
+ return indexId;
+ }
+
+ public long getIdx() {
+ return idx;
+ }
+
+ public Map<String, List<Long>> getClusterToBackends() {
+ return clusterToBackends;
+ }
+
+ public void updateClusterToBe(String cluster, long beId) {
+ // write lock
+ List<Long> bes = new ArrayList<Long>();
+ bes.add(beId);
+ clusterToBackends.put(cluster, bes);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
index c36821fd996..750da42235d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -1208,7 +1208,10 @@ public enum ErrorCode {
"the auto increment is only supported in duplicate table and
unique table."),
ERR_ARROW_FLIGHT_SQL_MUST_ONLY_RESULT_STMT(5097, new byte[]{'4', '2', '0',
'0', '0'},
- "There can only be one stmt that returns the result and it is at
the end.");
+ "There can only be one stmt that returns the result and it is at
the end."),
+
+ ERR_CLOUD_CLUSTER_ERROR(5093, new byte[]{'4', '2', '0', '0', '0'},
+ "Cluster %s not exist, use SQL 'SHOW CLUSTERS' to get a valid
cluster");
// This is error code
private final int code;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index c478ea0fd29..01fc64e0747 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -70,6 +70,9 @@ public class SummaryProfile {
public static final String FETCH_RESULT_TIME = "Fetch Result Time";
public static final String WRITE_RESULT_TIME = "Write Result Time";
public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result
Time";
+ public static final String GET_PARTITION_VERSION_TIME = "Get Partition
Version Time";
+ public static final String GET_PARTITION_VERSION_COUNT = "Get Partition
Version Count";
+ public static final String GET_PARTITION_VERSION_BY_HAS_DATA_COUNT = "Get
Partition Version Count (hasData)";
public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";
@@ -87,23 +90,31 @@ public class SummaryProfile {
WORKLOAD_GROUP, ANALYSIS_TIME,
PLAN_TIME, JOIN_REORDER_TIME, CREATE_SINGLE_NODE_TIME,
QUERY_DISTRIBUTED_TIME,
INIT_SCAN_NODE_TIME, FINALIZE_SCAN_NODE_TIME, GET_SPLITS_TIME,
GET_PARTITIONS_TIME,
- GET_PARTITION_FILES_TIME, CREATE_SCAN_RANGE_TIME, SCHEDULE_TIME,
FETCH_RESULT_TIME,
+ GET_PARTITION_FILES_TIME, CREATE_SCAN_RANGE_TIME,
GET_PARTITION_VERSION_TIME,
+ GET_PARTITION_VERSION_BY_HAS_DATA_COUNT,
GET_PARTITION_VERSION_COUNT, SCHEDULE_TIME, FETCH_RESULT_TIME,
WRITE_RESULT_TIME, WAIT_FETCH_RESULT_TIME, DORIS_VERSION,
IS_NEREIDS, IS_PIPELINE,
IS_CACHED, TOTAL_INSTANCES_NUM, INSTANCES_NUM_PER_BE,
PARALLEL_FRAGMENT_EXEC_INSTANCE, TRACE_ID);
// Ident of each item. Default is 0, which doesn't need to present in this
Map.
// Please set this map for new profile items if they need ident.
- public static ImmutableMap<String, Integer>
EXECUTION_SUMMARY_KEYS_IDENTATION = ImmutableMap.of(
- JOIN_REORDER_TIME, 1,
- CREATE_SINGLE_NODE_TIME, 1,
- QUERY_DISTRIBUTED_TIME, 1,
- INIT_SCAN_NODE_TIME, 1,
- FINALIZE_SCAN_NODE_TIME, 1,
- GET_SPLITS_TIME, 2,
- GET_PARTITIONS_TIME, 3,
- GET_PARTITION_FILES_TIME, 3,
- CREATE_SCAN_RANGE_TIME, 2
- );
+ public static ImmutableMap<String, Integer>
EXECUTION_SUMMARY_KEYS_IDENTATION = ImmutableMap.of();
+
+ {
+ ImmutableMap.Builder<String, Integer> builder = new
ImmutableMap.Builder<>();
+ builder.put(JOIN_REORDER_TIME, 1);
+ builder.put(CREATE_SINGLE_NODE_TIME, 1);
+ builder.put(QUERY_DISTRIBUTED_TIME, 1);
+ builder.put(INIT_SCAN_NODE_TIME, 1);
+ builder.put(FINALIZE_SCAN_NODE_TIME, 1);
+ builder.put(GET_SPLITS_TIME, 2);
+ builder.put(GET_PARTITIONS_TIME, 3);
+ builder.put(GET_PARTITION_FILES_TIME, 3);
+ builder.put(CREATE_SCAN_RANGE_TIME, 2);
+ builder.put(GET_PARTITION_VERSION_TIME, 1);
+ builder.put(GET_PARTITION_VERSION_COUNT, 1);
+ builder.put(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, 1);
+ EXECUTION_SUMMARY_KEYS_IDENTATION = builder.build();
+ }
private RuntimeProfile summaryProfile;
private RuntimeProfile executionSummaryProfile;
@@ -140,6 +151,9 @@ public class SummaryProfile {
private long tempStarTime = -1;
private long queryFetchResultConsumeTime = 0;
private long queryWriteResultConsumeTime = 0;
+ private long getPartitionVersionTime = 0;
+ private long getPartitionVersionCount = 0;
+ private long getPartitionVersionByHasDataCount = 0;
public SummaryProfile(RuntimeProfile rootProfile) {
summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME);
@@ -210,6 +224,10 @@ public class SummaryProfile {
executionSummaryProfile.addInfoString(WRITE_RESULT_TIME,
RuntimeProfile.printCounter(queryWriteResultConsumeTime,
TUnit.TIME_MS));
executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME,
getPrettyQueryFetchResultFinishTime());
+ executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_TIME,
getPrettyGetPartitionVersionTime());
+ executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_COUNT,
getPrettyGetPartitionVersionCount());
+
executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT,
+ getPrettyGetPartitionVersionByHasDataCount());
}
public void setNereidsAnalysisTime() {
@@ -308,6 +326,15 @@ public class SummaryProfile {
this.queryWriteResultConsumeTime += TimeUtils.getStartTimeMs() -
tempStarTime;
}
+ public void addGetPartitionVersionTime(long ns) {
+ this.getPartitionVersionTime += ns;
+ this.getPartitionVersionCount += 1;
+ }
+
+ public void incGetPartitionVersionByHasDataCount() {
+ this.getPartitionVersionByHasDataCount += 1;
+ }
+
public long getQueryBeginTime() {
return queryBeginTime;
}
@@ -528,4 +555,19 @@ public class SummaryProfile {
}
return RuntimeProfile.printCounter(queryFetchResultFinishTime -
queryScheduleFinishTime, TUnit.TIME_MS);
}
+
+ private String getPrettyGetPartitionVersionTime() {
+ if (getPartitionVersionTime == 0) {
+ return "N/A";
+ }
+ return RuntimeProfile.printCounter(getPartitionVersionTime,
TUnit.TIME_NS);
+ }
+
+ private String getPrettyGetPartitionVersionByHasDataCount() {
+ return RuntimeProfile.printCounter(getPartitionVersionByHasDataCount,
TUnit.UNIT);
+ }
+
+ private String getPrettyGetPartitionVersionCount() {
+ return RuntimeProfile.printCounter(getPartitionVersionCount,
TUnit.UNIT);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
index c9b0dfcf1ae..2a097c38e2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
@@ -27,6 +27,7 @@ import org.apache.doris.analysis.GrantStmt;
import org.apache.doris.analysis.PasswordOptions;
import org.apache.doris.analysis.RefreshLdapStmt;
import org.apache.doris.analysis.ResourcePattern;
+import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.analysis.RevokeStmt;
import org.apache.doris.analysis.SetLdapPassVar;
import org.apache.doris.analysis.SetPassVar;
@@ -83,6 +84,7 @@ import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+
public class Auth implements Writable {
private static final Logger LOG = LogManager.getLogger(Auth.class);
@@ -389,6 +391,12 @@ public class Auth implements Writable {
}
}
+ // ==== cloud ====
+ public boolean checkCloudPriv(UserIdentity currentUser, String cloudName,
+ PrivPredicate wanted, ResourceTypeEnum type)
{
+ throw new RuntimeException("Auth.checkCloudPriv is not implemented");
+ }
+
// ==== Other ====
/*
* Check if current user has certain privilege.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 8a1a16999b6..59833d0ea5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -135,7 +135,10 @@ public class ConnectContext {
protected volatile long currentDbId = -1;
// Transaction
protected volatile TransactionEntry txnEntry = null;
-
+ // cluster name
+ protected volatile String clusterName = "";
+ // used for ShowSqlAction which don't allow a user account
+ protected volatile boolean noAuth = false;
// username@host of current login user
protected volatile String qualifiedUser;
// LDAP authenticated but the Doris account does not exist,
@@ -176,6 +179,9 @@ public class ConnectContext {
// So in the query planning stage, do not use any value in this attribute.
protected QueryDetail queryDetail = null;
+ // cloud cluster name
+ protected volatile String cloudCluster = null;
+
// If set to true, the nondeterministic function will not be rewrote to
constant.
private boolean notEvalNondeterministicFunction = false;
// The resource tag is used to limit the node resources that the user can
use for query.
@@ -525,6 +531,14 @@ public class ConnectContext {
return env;
}
+ public boolean getNoAuth() {
+ return noAuth;
+ }
+
+ public void setNoAuth(boolean noAuth) {
+ this.noAuth = noAuth;
+ }
+
public String getQualifiedUser() {
return qualifiedUser;
}
@@ -798,6 +812,14 @@ public class ConnectContext {
return queryId;
}
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setCluster(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
public String getSqlHash() {
return sqlHash;
}
@@ -1002,6 +1024,26 @@ public class ConnectContext {
return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]";
}
+ // maybe user set cluster by SQL hint of session variable: cloud_cluster
+ // so first check it and then get from connect context.
+ public String getCurrentCloudCluster() {
+ String cluster = getSessionVariable().getCloudCluster();
+ if (Strings.isNullOrEmpty(cluster)) {
+ cluster = getCloudCluster();
+ }
+ return cluster;
+ }
+
+ // Set cloud cluster by `use @clusterName`
+ public void setCloudCluster(String cluster) {
+ this.cloudCluster = cluster;
+ }
+
+ // The returned cluster is set by `use @clusterName`
+ public String getCloudCluster() {
+ return cloudCluster;
+ }
+
public StatsErrorEstimator getStatsErrorEstimator() {
return statsErrorEstimator;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d4b8dc0acea..9c8546a98e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -65,6 +65,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+
/**
* System variable.
**/
@@ -500,6 +501,12 @@ public class SessionVariable implements Serializable,
Writable {
);
public static final String ENABLE_STATS = "enable_stats";
+
+ // CLOUD_VARIABLES_BEGIN
+ public static final String CLOUD_CLUSTER = "cloud_cluster";
+ public static final String DISABLE_EMPTY_PARTITION_PRUNE =
"disable_empty_partition_prune";
+ // CLOUD_VARIABLES_BEGIN
+
/**
* If set false, user couldn't submit analyze SQL and FE won't allocate
any related resources.
*/
@@ -1546,6 +1553,13 @@ public class SessionVariable implements Serializable,
Writable {
"use other health replica when the use_fix_replica meet error"
})
public boolean fallbackOtherReplicaWhenFixedCorrupt = false;
+ // CLOUD_VARIABLES_BEGIN
+ @VariableMgr.VarAttr(name = CLOUD_CLUSTER)
+ public String cloudCluster = "";
+ @VariableMgr.VarAttr(name = DISABLE_EMPTY_PARTITION_PRUNE)
+ public boolean disableEmptyPartitionPrune = false;
+ // CLOUD_VARIABLES_END
+
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to
generate some variables,
// not the default value set in the code.
@SuppressWarnings("checkstyle:Indentation")
@@ -3261,4 +3275,22 @@ public class SessionVariable implements Serializable,
Writable {
public void setIgnoreStorageDataDistribution(boolean
ignoreStorageDataDistribution) {
this.ignoreStorageDataDistribution = ignoreStorageDataDistribution;
}
+
+ // CLOUD_VARIABLES_BEGIN
+ public String getCloudCluster() {
+ return cloudCluster;
+ }
+
+ public String setCloudCluster(String cloudCluster) {
+ return this.cloudCluster = cloudCluster;
+ }
+
+ public boolean getDisableEmptyPartitionPrune() {
+ return disableEmptyPartitionPrune;
+ }
+
+ public void setDisableEmptyPartitionPrune(boolean val) {
+ disableEmptyPartitionPrune = val;
+ }
+ // CLOUD_VARIABLES_END
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
index b9ba7af89ba..9353140b9d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
@@ -66,6 +66,13 @@ public class Tag implements Writable {
public static final String VALUE_DEFAULT_TAG = "default";
public static final String VALUE_INVALID_TAG = "invalid";
+ public static final String CLOUD_CLUSTER_NAME = "cloud_cluster_name";
+ public static final String CLOUD_CLUSTER_ID = "cloud_cluster_id";
+ public static final String CLOUD_UNIQUE_ID = "cloud_unique_id";
+ public static final String CLOUD_CLUSTER_PUBLIC_ENDPOINT =
"cloud_cluster_public_endpoint";
+ public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT =
"cloud_cluster_private_endpoint";
+ public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status";
+
public static final ImmutableSet<String> RESERVED_TAG_TYPE =
ImmutableSet.of(
TYPE_ROLE, TYPE_FUNCTION, TYPE_LOCATION);
public static final ImmutableSet<String> RESERVED_TAG_VALUES =
ImmutableSet.of(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index fcb5e63e838..2f8faa06b1f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -120,6 +120,9 @@ public class Backend implements Writable {
@SerializedName("tagMap")
private Map<String, String> tagMap = Maps.newHashMap();
+ private boolean isSmoothUpgradeSrc = false; // This be process is old
process when doing smooth upgrade
+ private boolean isSmoothUpgradeDst = false; // This be process is new
process when doing smooth upgrade
+
// cpu cores
@SerializedName("cpuCores")
private int cpuCores = 1;
@@ -174,6 +177,38 @@ public class Backend implements Writable {
this.tagMap.put(locationTag.type, locationTag.value);
}
+ public String getCloudClusterStatus() {
+ return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_STATUS, "");
+ }
+
+ public void setCloudClusterStatus(final String clusterStatus) {
+ tagMap.put(Tag.CLOUD_CLUSTER_STATUS, clusterStatus);
+ }
+
+ public String getCloudClusterName() {
+ return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_NAME, "");
+ }
+
+ public void setCloudClusterName(final String clusterName) {
+ tagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ }
+
+ public String getCloudClusterId() {
+ return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_ID, "");
+ }
+
+ public String getCloudUniqueId() {
+ return tagMap.getOrDefault(Tag.CLOUD_UNIQUE_ID, "");
+ }
+
+ public String getCloudPublicEndpoint() {
+ return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, "");
+ }
+
+ public String getCloudPrivateEndpoint() {
+ return tagMap.getOrDefault(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, "");
+ }
+
public long getId() {
return id;
}
@@ -369,6 +404,22 @@ public class Backend implements Writable {
return backendStatus;
}
+ public void setSmoothUpgradeSrc(boolean is) {
+ this.isSmoothUpgradeSrc = is;
+ }
+
+ public boolean isSmoothUpgradeSrc() {
+ return this.isSmoothUpgradeSrc;
+ }
+
+ public void setSmoothUpgradeDst(boolean is) {
+ this.isSmoothUpgradeDst = is;
+ }
+
+ public boolean isSmoothUpgradeDst() {
+ return this.isSmoothUpgradeDst;
+ }
+
public int getHeartbeatFailureCounter() {
return heartbeatFailureCounter;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 9c1e196923f..78297aae83a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -23,6 +23,10 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.proto.Cloud.ClusterPB;
+import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -32,9 +36,11 @@ import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.util.NetUtils;
+import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
+import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TNodeInfo;
import org.apache.doris.thrift.TPaloNodesInfo;
import org.apache.doris.thrift.TStatusCode;
@@ -63,7 +69,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class SystemInfoService {
@@ -77,9 +85,22 @@ public class SystemInfoService {
private volatile ImmutableMap<Long, Backend> idToBackendRef =
ImmutableMap.of();
private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef =
ImmutableMap.of();
+ // TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk
of inconsistency
+ // use exclusive lock to make sure only one thread can change
clusterIdToBackend and clusterNameToId
+ private ReentrantLock lock = new ReentrantLock();
+
+ // for show cluster and cache user owned cluster
+ // mysqlUserName -> List of ClusterPB
+ private Map<String, List<ClusterPB>> mysqlUserNameToClusterPB =
ImmutableMap.of();
+ // clusterId -> List<Backend>
+ private Map<String, List<Backend>> clusterIdToBackend = new
ConcurrentHashMap<>();
+ // clusterName -> clusterId
+ private Map<String, String> clusterNameToId = new ConcurrentHashMap<>();
private volatile ImmutableMap<Long, DiskInfo> pathHashToDiskInfoRef =
ImmutableMap.of();
+ private InstanceInfoPB.Status instanceStatus;
+
public static class HostInfo implements Comparable<HostInfo> {
public String host;
public int port;
@@ -159,6 +180,114 @@ public class SystemInfoService {
}
};
+ public boolean availableBackendsExists() {
+ if (FeConstants.runningUnitTest) {
+ return true;
+ }
+ if (null == clusterNameToId || clusterNameToId.isEmpty()) {
+ return false;
+ }
+ return clusterIdToBackend != null && !clusterIdToBackend.isEmpty()
+ && clusterIdToBackend.values().stream().anyMatch(list -> list
!= null && !list.isEmpty());
+ }
+
+ public boolean containClusterName(String clusterName) {
+ return clusterNameToId.containsKey(clusterName);
+ }
+
+ public List<Backend> getBackendsByClusterName(final String clusterName) {
+ String clusterId = clusterNameToId.getOrDefault(clusterName, "");
+ if (clusterId.isEmpty()) {
+ return new ArrayList<>();
+ }
+ return clusterIdToBackend.get(clusterId);
+ }
+
+ public List<Backend> getBackendsByClusterId(final String clusterId) {
+ return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>());
+ }
+
+ public List<String> getCloudClusterIds() {
+ return new ArrayList<>(clusterIdToBackend.keySet());
+ }
+
+ public String getCloudStatusByName(final String clusterName) {
+ String clusterId = clusterNameToId.getOrDefault(clusterName, "");
+ if (Strings.isNullOrEmpty(clusterId)) {
+ // for rename cluster or dropped cluster
+ LOG.warn("cant find clusterId by clusterName {}", clusterName);
+ return "";
+ }
+ return getCloudStatusById(clusterId);
+ }
+
+ public String getCloudStatusById(final String clusterId) {
+ return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>())
+
.stream().map(Backend::getCloudClusterStatus).findFirst().orElse("");
+ }
+
+ public void updateClusterNameToId(final String newName,
+ final String originalName, final String clusterId) {
+ lock.lock();
+ clusterNameToId.remove(originalName);
+ clusterNameToId.put(newName, clusterId);
+ lock.unlock();
+ }
+
+ public String getClusterNameByClusterId(final String clusterId) {
+ String clusterName = "";
+ for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
+ if (entry.getValue().equals(clusterId)) {
+ clusterName = entry.getKey();
+ break;
+ }
+ }
+ return clusterName;
+ }
+
+ public void dropCluster(final String clusterId, final String clusterName) {
+ lock.lock();
+ clusterNameToId.remove(clusterName, clusterId);
+ clusterIdToBackend.remove(clusterId);
+ lock.unlock();
+ }
+
+ public List<String> getCloudClusterNames() {
+ return new ArrayList<>(clusterNameToId.keySet());
+ }
+
+ // Return the ref of concurrentMap clusterIdToBackend
+ // It should be thread-safe to iterate.
+ // reference:
https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe
+ public Map<String, List<Backend>> getCloudClusterIdToBackend() {
+ return clusterIdToBackend;
+ }
+
+ public String getCloudClusterIdByName(String clusterName) {
+ return clusterNameToId.get(clusterName);
+ }
+
+ public ImmutableMap<Long, Backend> getCloudIdToBackend(String clusterName)
{
+ String clusterId = clusterNameToId.get(clusterName);
+ if (Strings.isNullOrEmpty(clusterId)) {
+ LOG.warn("cant find clusterId, this cluster may be has been
dropped, clusterName={}", clusterName);
+ return ImmutableMap.of();
+ }
+ List<Backend> backends = clusterIdToBackend.get(clusterId);
+ Map<Long, Backend> idToBackend = Maps.newHashMap();
+ for (Backend be : backends) {
+ idToBackend.put(be.getId(), be);
+ }
+ return ImmutableMap.copyOf(idToBackend);
+ }
+
+ // Return the ref of concurrentMap clusterNameToId
+ // It should be thread-safe to iterate.
+ // reference:
https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe
+ public Map<String, String> getCloudClusterNameToId() {
+ return clusterNameToId;
+ }
+
public static TPaloNodesInfo createAliveNodesInfo() {
TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
@@ -169,6 +298,23 @@ public class SystemInfoService {
return nodesInfo;
}
+ public Map<String, List<ClusterPB>> getMysqlUserNameToClusterPb() {
+ return mysqlUserNameToClusterPB;
+ }
+
+ public void updateMysqlUserNameToClusterPb(Map<String, List<ClusterPB>> m)
{
+ mysqlUserNameToClusterPB = m;
+ }
+
+ public List<Pair<String, Integer>> getCurrentObFrontends() {
+ List<Frontend> frontends =
Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER);
+ List<Pair<String, Integer>> frontendsPair = new ArrayList<>();
+ for (Frontend frontend : frontends) {
+ frontendsPair.add(Pair.of(frontend.getHost(),
frontend.getEditLogPort()));
+ }
+ return frontendsPair;
+ }
+
// for deploy manager
public void addBackends(List<HostInfo> hostInfos, boolean isFree)
throws UserException {
@@ -979,4 +1125,31 @@ public class SystemInfoService {
public long aliveBECount() {
return
idToBackendRef.values().stream().filter(Backend::isAlive).count();
}
+
+ public Cloud.GetInstanceResponse getCloudInstance() {
+ Cloud.GetInstanceRequest.Builder builder =
+ Cloud.GetInstanceRequest.newBuilder();
+ builder.setCloudUniqueId(Config.cloud_unique_id);
+ final Cloud.GetInstanceRequest pRequest = builder.build();
+ Cloud.GetInstanceResponse response;
+ try {
+ response = MetaServiceProxy.getInstance().getInstance(pRequest);
+ return response;
+ } catch (RpcException e) {
+ LOG.warn("rpcToGetInstance exception: {}", e.getMessage());
+ }
+ return null;
+ }
+
+ public InstanceInfoPB.Status getInstanceStatus() {
+ return this.instanceStatus;
+ }
+
+ public void setInstanceStatus(InstanceInfoPB.Status instanceStatus) {
+ LOG.debug("fe set instance status {}", instanceStatus);
+ if (this.instanceStatus != instanceStatus) {
+ LOG.info("fe change instance status from {} to {}",
this.instanceStatus, instanceStatus);
+ }
+ this.instanceStatus = instanceStatus;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]