This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new fb75f1253bf [fix](planner) Fix sample partition table #25912 (#26399)
fb75f1253bf is described below
commit fb75f1253bf91b46176cd506d3bee74478f93375
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Nov 3 23:35:47 2023 +0800
[fix](planner) Fix sample partition table #25912 (#26399)
In the past, two conditions needed to be met when sampling a partitioned
table: 1. Data is evenly distributed between partitions; 2. Data is evenly
distributed between buckets. Finally, the number of sampled rows in each
partition and each bucket is the same.
Now, sampling will be proportional to the number of partitioned and
bucketed rows.
---
.../glue/translator/PhysicalPlanTranslator.java | 2 +-
.../org/apache/doris/planner/OlapScanNode.java | 130 ++++++++++++---------
.../org/apache/doris/analysis/SelectStmtTest.java | 9 +-
3 files changed, 83 insertions(+), 58 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index e2bdf6b92fd..3da1e9917bf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -611,7 +611,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
BaseTableRef tableRef = new BaseTableRef(ref, olapTable, tableName);
tupleDescriptor.setRef(tableRef);
olapScanNode.setSelectedPartitionIds(olapScan.getSelectedPartitionIds());
- olapScanNode.setSampleTabletIds(olapScan.getSelectedTabletIds());
+ olapScanNode.setSampleTabletIds(olapScan.getSelectedTabletIds()); //
TODO
if (olapScan.getTableSample().isPresent()) {
olapScanNode.setTableSample(new
TableSample(olapScan.getTableSample().get().isPercent,
olapScan.getTableSample().get().sampleValue,
olapScan.getTableSample().get().seek));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 98490369f03..5c85baca318 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -59,6 +59,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
@@ -94,9 +95,11 @@ import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -219,7 +222,7 @@ public class OlapScanNode extends ScanNode {
public void setIsPreAggregation(boolean isPreAggregation, String reason) {
this.isPreAggregation = isPreAggregation;
this.reasonOfPreAggregation = this.reasonOfPreAggregation == null ?
reason :
- this.reasonOfPreAggregation + " " +
reason;
+ this.reasonOfPreAggregation + " " + reason;
}
@@ -401,7 +404,8 @@ public class OlapScanNode extends ScanNode {
String scanRangeInfo = stringBuilder.toString();
String situation;
boolean update;
- CHECK: { // CHECKSTYLE IGNORE THIS LINE
+ CHECK:
+ { // CHECKSTYLE IGNORE THIS LINE
if (olapTable.getKeysType() == KeysType.DUP_KEYS ||
(olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
situation = "The key type of table is duplicate, or unique key
with merge-on-write.";
@@ -773,7 +777,7 @@ public class OlapScanNode extends ScanNode {
// but it means we will do 3 S3 IO to get the data which will
bring 3 slow query
if (-1L != coolDownReplicaId) {
final Optional<Replica> replicaOptional = replicas.stream()
- .filter(r -> r.getId() ==
coolDownReplicaId).findAny();
+ .filter(r -> r.getId() == coolDownReplicaId).findAny();
replicaOptional.ifPresent(
r -> {
Backend backend = Env.getCurrentSystemInfo()
@@ -925,75 +929,91 @@ public class OlapScanNode extends ScanNode {
}
/**
- * First, determine how many rows to sample from each partition according
to the number of partitions.
- * Then determine the number of Tablets to be selected for each partition
according to the average number
- * of rows of Tablet,
- * If seek is not specified, the specified number of Tablets are
pseudo-randomly selected from each partition.
- * If seek is specified, it will be selected sequentially from the seek
tablet of the partition.
- * And add the manually specified Tablet id to the selected Tablet.
- * simpleTabletNums = simpleRows / partitionNums / (partitionRows /
partitionTabletNums)
+ * Sample some tablets in the selected partition.
+ * If Seek is specified, the tablets sampled each time are the same.
*/
public void computeSampleTabletIds() {
if (tableSample == null) {
return;
}
OlapTable olapTable = (OlapTable) desc.getTable();
- long sampleRows; // The total number of sample rows
- long hitRows = 1; // The total number of rows hit by the tablet
- long totalRows = 0; // The total number of partition rows hit
- long totalTablet = 0; // The total number of tablets in the hit
partition
+
+ // 1. Calculate the total number of rows in the selected partition,
and sort partition list.
+ long selectedRows = 0;
+ long totalSampleRows = 0;
+ List<Long> selectedPartitionList = new ArrayList<>();
+ if (FeConstants.runningUnitTest && selectedIndexId == -1) {
+ selectedIndexId = olapTable.getBaseIndexId();
+ }
+ for (Long partitionId : selectedPartitionIds) {
+ final Partition partition = olapTable.getPartition(partitionId);
+ final MaterializedIndex selectedTable =
partition.getIndex(selectedIndexId);
+ selectedRows += selectedTable.getRowCount();
+ selectedPartitionList.add(partitionId);
+ }
+ selectedPartitionList.sort(Comparator.naturalOrder());
+
+ // 2.Sampling is not required in some cases, will not take effect
after clear sampleTabletIds.
if (tableSample.isPercent()) {
- sampleRows = (long) Math.max(olapTable.getRowCount() *
(tableSample.getSampleValue() / 100.0), 1);
+ if (tableSample.getSampleValue() >= 100) {
+ return;
+ }
+ totalSampleRows = (long) Math.max(selectedRows *
(tableSample.getSampleValue() / 100.0), 1);
} else {
- sampleRows = Math.max(tableSample.getSampleValue(), 1);
+ if (tableSample.getSampleValue() > selectedRows) {
+ return;
+ }
+ totalSampleRows = tableSample.getSampleValue();
}
- // calculate the number of tablets by each partition
- long avgRowsPerPartition = sampleRows /
Math.max(olapTable.getPartitions().size(), 1);
-
- for (Partition p : olapTable.getPartitions()) {
- List<Long> ids = p.getBaseIndex().getTabletIdsInOrder();
-
- if (ids.isEmpty()) {
+ // 3. Sampling partition. If Seek is specified, the partition will be
the same for each sampling.
+ long hitRows = 0; // The number of rows hit by the tablet
+ long partitionSeek = tableSample.getSeek() != -1
+ ? tableSample.getSeek() : (long) (new
SecureRandom().nextDouble() * selectedPartitionIds.size());
+ for (int i = 0; i < selectedPartitionList.size(); i++) {
+ int seekPid = (int) ((i + partitionSeek) %
selectedPartitionList.size());
+ final Partition partition =
olapTable.getPartition(selectedPartitionList.get(seekPid));
+ final MaterializedIndex selectedTable =
partition.getIndex(selectedIndexId);
+ List<Tablet> tablets = selectedTable.getTablets();
+ if (tablets.isEmpty()) {
continue;
}
- // Skip partitions with row count < row count / 2 expected to be
sampled per partition.
- // It can be expected to sample a smaller number of partitions to
avoid uneven distribution
- // of sampling results.
- if (p.getBaseIndex().getRowCount() < (avgRowsPerPartition / 2)) {
- continue;
+ // 4. Calculate the number of rows that need to be sampled in the
current partition.
+ long sampleRows = 0; // The number of sample rows in partition
+ if (tableSample.isPercent()) {
+ sampleRows = (long) Math.max(selectedTable.getRowCount() *
(tableSample.getSampleValue() / 100.0), 1);
+ } else {
+ sampleRows = (long) Math.max(
+ tableSample.getSampleValue() *
(selectedTable.getRowCount() / selectedRows), 1);
}
- // It is assumed here that all tablets row count is uniformly
distributed
- // TODO Use `p.getBaseIndex().getTablet(n).getRowCount()` to get
each tablet row count to compute sample.
- long avgRowsPerTablet = Math.max(p.getBaseIndex().getRowCount() /
ids.size(), 1);
- long tabletCounts = Math.max(
- avgRowsPerPartition / avgRowsPerTablet +
(avgRowsPerPartition % avgRowsPerTablet != 0 ? 1 : 0), 1);
- tabletCounts = Math.min(tabletCounts, ids.size());
-
- long seek = tableSample.getSeek() != -1
- ? tableSample.getSeek() : (long) (Math.random() *
ids.size());
- for (int i = 0; i < tabletCounts; i++) {
- int seekTid = (int) ((i + seek) % ids.size());
- sampleTabletIds.add(ids.get(seekTid));
+ // 5. Sampling tablets. If Seek is specified, the same tablet will
be sampled each time.
+ long tabletSeek = tableSample.getSeek() != -1
+ ? tableSample.getSeek() : (long) (new
SecureRandom().nextDouble() * tablets.size());
+ for (int j = 0; j < tablets.size(); j++) {
+ int seekTid = (int) ((j + tabletSeek) % tablets.size());
+ long tabletRowCount;
+ if (!FeConstants.runningUnitTest) {
+ tabletRowCount = tablets.get(seekTid).getRowCount(true);
+ } else {
+ tabletRowCount = selectedTable.getRowCount() /
tablets.size();
+ }
+ if (tabletRowCount == 0) {
+ continue;
+ }
+ sampleTabletIds.add(tablets.get(seekTid).getId());
+ sampleRows -= tabletRowCount;
+ hitRows += tabletRowCount;
+ if (sampleRows <= 0) {
+ break;
+ }
+ }
+ if (hitRows > totalSampleRows) {
+ break;
}
-
- hitRows += avgRowsPerTablet * tabletCounts;
- totalRows += p.getBaseIndex().getRowCount();
- totalTablet += ids.size();
- }
-
- // all hit, direct full
- if (totalRows < sampleRows) {
- // can't fill full sample rows
- sampleTabletIds.clear();
- } else if (sampleTabletIds.size() == totalTablet) {
- // TODO add limit
- sampleTabletIds.clear();
- } else if (!sampleTabletIds.isEmpty()) {
- // TODO add limit
}
+ LOG.debug("after computeSampleTabletIds, hitRows {}, selectedRows {}",
hitRows, selectedRows);
}
public boolean isFromPrepareStmt() {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
index 33c47f6bf41..78f330eef6d 100755
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OriginalPlanner;
@@ -862,6 +863,7 @@ public class SelectStmtTest {
@Test
public void testSelectSampleHashBucketTable() throws Exception {
+ FeConstants.runningUnitTest = true;
Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1");
OlapTable tbl = (OlapTable) db.getTableOrMetaException("table1");
long tabletId = 10031L;
@@ -894,7 +896,7 @@ public class SelectStmtTest {
String sql4 = "SELECT * FROM db1.table1 TABLESAMPLE(9500 ROWS)";
OriginalPlanner planner4 = (OriginalPlanner)
dorisAssert.query(sql4).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds4 = ((OlapScanNode)
planner4.getScanNodes().get(0)).getSampleTabletIds();
- Assert.assertEquals(0, sampleTabletIds4.size()); // no sample, all
tablet
+ Assert.assertEquals(10, sampleTabletIds4.size());
String sql5 = "SELECT * FROM db1.table1 TABLESAMPLE(11000 ROWS)";
OriginalPlanner planner5 = (OriginalPlanner)
dorisAssert.query(sql5).internalExecuteOneAndGetPlan();
@@ -963,10 +965,12 @@ public class SelectStmtTest {
OriginalPlanner planner16 = (OriginalPlanner)
dorisAssert.query(sql16).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds16 = ((OlapScanNode)
planner16.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds16.size());
+ FeConstants.runningUnitTest = false;
}
@Test
public void testSelectSampleRandomBucketTable() throws Exception {
+ FeConstants.runningUnitTest = true;
Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1");
OlapTable tbl = (OlapTable) db.getTableOrMetaException("table3");
long tabletId = 10031L;
@@ -999,7 +1003,7 @@ public class SelectStmtTest {
String sql4 = "SELECT * FROM db1.table3 TABLESAMPLE(9500 ROWS)";
OriginalPlanner planner4 = (OriginalPlanner)
dorisAssert.query(sql4).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds4 = ((OlapScanNode)
planner4.getScanNodes().get(0)).getSampleTabletIds();
- Assert.assertEquals(0, sampleTabletIds4.size()); // no sample, all
tablet
+ Assert.assertEquals(10, sampleTabletIds4.size());
String sql5 = "SELECT * FROM db1.table3 TABLESAMPLE(11000 ROWS)";
OriginalPlanner planner5 = (OriginalPlanner)
dorisAssert.query(sql5).internalExecuteOneAndGetPlan();
@@ -1068,6 +1072,7 @@ public class SelectStmtTest {
OriginalPlanner planner16 = (OriginalPlanner)
dorisAssert.query(sql16).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds16 = ((OlapScanNode)
planner16.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds16.size());
+ FeConstants.runningUnitTest = false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]