This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e1cb568d119 [Optimize] Add session variable
`max_fetch_remote_schema_tablet_count… (#37505)
e1cb568d119 is described below
commit e1cb568d119482d3d47ee7d59c614ae24f3c37e4
Author: lihangyu <[email protected]>
AuthorDate: Thu Jul 11 10:04:20 2024 +0800
[Optimize] Add session variable `max_fetch_remote_schema_tablet_count…
(#37505)
pick from #37217
---
.../java/org/apache/doris/catalog/OlapTable.java | 50 ++++++++++++++++++++++
.../common/proc/RemoteIndexSchemaProcDir.java | 4 +-
.../common/proc/RemoteIndexSchemaProcNode.java | 9 ++++
.../common/util/FetchRemoteTabletSchemaUtil.java | 2 +
.../java/org/apache/doris/qe/SessionVariable.java | 5 +++
regression-test/data/variant_p0/desc.out | 8 ++++
regression-test/suites/variant_p0/desc.groovy | 20 +++++++++
7 files changed, 97 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 0164c311974..abd8a3b6baa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -92,6 +92,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -2650,6 +2651,55 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
return tablets;
}
+ // Get sample tablets for remote desc schema
+ // 1. Estimate tablets for a partition, 1 at least
+ // 2. Pick the partition sorted with id in desc order, greater id with the
newest partition
+ // 3. Truncate to sampleSize
+ public List<Tablet> getSampleTablets(int sampleSize) {
+ List<Tablet> sampleTablets = new ArrayList<>();
+ // Filter partition with empty data
+ Collection<Partition> partitions = getPartitions()
+ .stream()
+ .filter(partition -> partition.getVisibleVersion() >
Partition.PARTITION_INIT_VERSION)
+ .collect(Collectors.toList());
+ if (partitions.isEmpty()) {
+ return sampleTablets;
+ }
+ // 1. Estimate tablets for a partition, 1 at least
+ int estimatePartitionTablets = Math.max(sampleSize /
partitions.size(), 1);
+
+ // 2. Sort the partitions by id in descending order (greater id means
the newest partition)
+ List<Partition> sortedPartitions = partitions.stream().sorted(new
Comparator<Partition>() {
+ @Override
+ public int compare(Partition p1, Partition p2) {
+ // compare with desc order
+ return Long.compare(p2.getId(), p1.getId());
+ }
+ }).collect(Collectors.toList());
+
+ // 3. Collect tablets from partitions
+ for (Partition partition : sortedPartitions) {
+ List<Tablet> targetTablets = new
ArrayList<>(partition.getBaseIndex().getTablets());
+ Collections.shuffle(targetTablets);
+ if (!targetTablets.isEmpty()) {
+ // Ensure we do not exceed the available number of tablets
+ int tabletsToFetch = Math.min(targetTablets.size(),
estimatePartitionTablets);
+ sampleTablets.addAll(targetTablets.subList(0, tabletsToFetch));
+ }
+
+ if (sampleTablets.size() >= sampleSize) {
+ break;
+ }
+ }
+
+ // 4. Truncate to sample size if needed
+ if (sampleTablets.size() > sampleSize) {
+ sampleTablets = sampleTablets.subList(0, sampleSize);
+ }
+
+ return sampleTablets;
+ }
+
// During `getNextVersion` and `updateVisibleVersionAndTime` period,
// the write lock on the table should be held continuously
public void updateVisibleVersionAndTime(long visibleVersion, long
visibleVersionTime) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java
index 1af9ebab0f2..c21f64c5d38 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -63,7 +64,8 @@ public class RemoteIndexSchemaProcDir implements
ProcDirInterface {
table.readLock();
try {
OlapTable olapTable = (OlapTable) table;
- tablets = olapTable.getAllTablets();
+ // Get sample tablets for remote desc schema
+ tablets =
olapTable.getSampleTablets(ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount);
} finally {
table.readUnlock();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java
index 8176b09bbf7..cdb1bbc133e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java
@@ -23,11 +23,13 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -62,6 +64,13 @@ public class RemoteIndexSchemaProcNode implements
ProcNodeInterface {
tablets.add(tablet);
}
}
+ // Get the maximum number of Remote Tablets that can be fetched
+ int maxFetchCount =
ConnectContext.get().getSessionVariable().maxFetchRemoteTabletCount;
+ // If the number of tablets is greater than the maximum fetch count,
randomly select maxFetchCount tablets
+ if (tablets.size() > maxFetchCount) {
+ Collections.shuffle(tablets);
+ tablets = tablets.subList(0, maxFetchCount);
+ }
List<Column> remoteSchema = new
FetchRemoteTabletSchemaUtil(tablets).fetch();
this.schema.addAll(remoteSchema);
return IndexSchemaProcNode.createResult(this.schema, this.bfColumns);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java
index db9700f7448..0e96dc8c593 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java
@@ -92,6 +92,8 @@ public class FetchRemoteTabletSchemaUtil {
Long backendId = entry.getKey();
Set<Long> tabletIds = entry.getValue();
Backend backend =
Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId);
+ LOG.debug("fetch schema from coord backend {}, sample tablets
count {}",
+ backend.getId(), tabletIds.size());
// only need alive be
if (!backend.isAlive()) {
continue;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8234b2cd5eb..429cf1f03b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -588,6 +588,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS =
"fetch_remote_schema_timeout_seconds";
+ public static final String MAX_FETCH_REMOTE_TABLET_COUNT =
"max_fetch_remote_schema_tablet_count";
+
// CLOUD_VARIABLES_BEGIN
public static final String CLOUD_CLUSTER = "cloud_cluster";
public static final String DISABLE_EMPTY_PARTITION_PRUNE =
"disable_empty_partition_prune";
@@ -1839,6 +1841,9 @@ public class SessionVariable implements Serializable,
Writable {
// fetch remote schema rpc timeout
@VariableMgr.VarAttr(name = FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS, fuzzy =
true)
public long fetchRemoteSchemaTimeoutSeconds = 120;
+ // max tablet count for fetch remote schema
+ @VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true)
+ public int maxFetchRemoteTabletCount = 512;
@VariableMgr.VarAttr(
name = ENABLE_JOIN_SPILL,
diff --git a/regression-test/data/variant_p0/desc.out
b/regression-test/data/variant_p0/desc.out
index b46b5f9b4b0..b3ebce2b887 100644
--- a/regression-test/data/variant_p0/desc.out
+++ b/regression-test/data/variant_p0/desc.out
@@ -198,3 +198,11 @@ v.金额 SMALLINT Yes false \N NONE
k BIGINT Yes true \N
v VARIANT Yes false \N NONE
+-- !sql15 --
+k BIGINT Yes true \N
+v VARIANT Yes false \N NONE
+v.a TINYINT Yes false \N NONE
+v.b TINYINT Yes false \N NONE
+v.c TINYINT Yes false \N NONE
+v.d TINYINT Yes false \N NONE
+
diff --git a/regression-test/suites/variant_p0/desc.groovy
b/regression-test/suites/variant_p0/desc.groovy
index 78c9f078c9f..ee2607756b8 100644
--- a/regression-test/suites/variant_p0/desc.groovy
+++ b/regression-test/suites/variant_p0/desc.groovy
@@ -235,6 +235,26 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """ insert into ${table_name} values (0, '100')"""
sql """set describe_extend_variant_column = true"""
qt_sql_12 """desc ${table_name}"""
+
+
+ // desc with large tablets
+ table_name = "large_tablets"
+ create_table_partition.call(table_name, "200")
+ sql """insert into large_tablets values (1, '{"a" : 10}')"""
+ sql """insert into large_tablets values (3001, '{"b" : 10}')"""
+ sql """insert into large_tablets values (50001, '{"c" : 10}')"""
+ sql """insert into large_tablets values (99999, '{"d" : 10}')"""
+ sql """set max_fetch_remote_schema_tablet_count = 2"""
+ sql "desc large_tablets"
+ sql """set max_fetch_remote_schema_tablet_count = 128"""
+ sql "desc large_tablets"
+ sql """set max_fetch_remote_schema_tablet_count = 512"""
+ sql "desc large_tablets"
+ sql """set max_fetch_remote_schema_tablet_count = 2048"""
+ qt_sql15 "desc large_tablets"
+
+ sql "truncate table large_tablets"
+ sql "desc large_tablets"
} finally {
// reset flags
set_be_config.call("variant_ratio_of_defaults_as_sparse_column",
"0.95")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]