This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new fb75f1253bf [fix](planner) Fix sample partition table #25912 (#26399)
fb75f1253bf is described below

commit fb75f1253bf91b46176cd506d3bee74478f93375
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Nov 3 23:35:47 2023 +0800

    [fix](planner) Fix sample partition table #25912 (#26399)
    
    In the past, two conditions needed to be met when sampling a partitioned 
table: 1. Data is evenly distributed between partitions; 2. Data is evenly 
distributed between buckets. Finally, the number of sampled rows in each 
partition and each bucket is the same.
    
    Now, sampling will be proportional to the number of partitioned and 
bucketed rows.
---
 .../glue/translator/PhysicalPlanTranslator.java    |   2 +-
 .../org/apache/doris/planner/OlapScanNode.java     | 130 ++++++++++++---------
 .../org/apache/doris/analysis/SelectStmtTest.java  |   9 +-
 3 files changed, 83 insertions(+), 58 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index e2bdf6b92fd..3da1e9917bf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -611,7 +611,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         BaseTableRef tableRef = new BaseTableRef(ref, olapTable, tableName);
         tupleDescriptor.setRef(tableRef);
         
olapScanNode.setSelectedPartitionIds(olapScan.getSelectedPartitionIds());
-        olapScanNode.setSampleTabletIds(olapScan.getSelectedTabletIds());
+        olapScanNode.setSampleTabletIds(olapScan.getSelectedTabletIds()); // 
TODO
         if (olapScan.getTableSample().isPresent()) {
             olapScanNode.setTableSample(new 
TableSample(olapScan.getTableSample().get().isPercent,
                     olapScan.getTableSample().get().sampleValue, 
olapScan.getTableSample().get().seek));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 98490369f03..5c85baca318 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -59,6 +59,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
@@ -94,9 +95,11 @@ import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -219,7 +222,7 @@ public class OlapScanNode extends ScanNode {
     public void setIsPreAggregation(boolean isPreAggregation, String reason) {
         this.isPreAggregation = isPreAggregation;
         this.reasonOfPreAggregation = this.reasonOfPreAggregation == null ? 
reason :
-                                      this.reasonOfPreAggregation + " " + 
reason;
+                this.reasonOfPreAggregation + " " + reason;
     }
 
 
@@ -401,7 +404,8 @@ public class OlapScanNode extends ScanNode {
         String scanRangeInfo = stringBuilder.toString();
         String situation;
         boolean update;
-        CHECK: { // CHECKSTYLE IGNORE THIS LINE
+        CHECK:
+        { // CHECKSTYLE IGNORE THIS LINE
             if (olapTable.getKeysType() == KeysType.DUP_KEYS || 
(olapTable.getKeysType() == KeysType.UNIQUE_KEYS
                     && olapTable.getEnableUniqueKeyMergeOnWrite())) {
                 situation = "The key type of table is duplicate, or unique key 
with merge-on-write.";
@@ -773,7 +777,7 @@ public class OlapScanNode extends ScanNode {
             // but it means we will do 3 S3 IO to get the data which will 
bring 3 slow query
             if (-1L != coolDownReplicaId) {
                 final Optional<Replica> replicaOptional = replicas.stream()
-                                .filter(r -> r.getId() == 
coolDownReplicaId).findAny();
+                        .filter(r -> r.getId() == coolDownReplicaId).findAny();
                 replicaOptional.ifPresent(
                         r -> {
                             Backend backend = Env.getCurrentSystemInfo()
@@ -925,75 +929,91 @@ public class OlapScanNode extends ScanNode {
     }
 
     /**
-     * First, determine how many rows to sample from each partition according 
to the number of partitions.
-     * Then determine the number of Tablets to be selected for each partition 
according to the average number
-     * of rows of Tablet,
-     * If seek is not specified, the specified number of Tablets are 
pseudo-randomly selected from each partition.
-     * If seek is specified, it will be selected sequentially from the seek 
tablet of the partition.
-     * And add the manually specified Tablet id to the selected Tablet.
-     * simpleTabletNums = simpleRows / partitionNums / (partitionRows / 
partitionTabletNums)
+     * Sample some tablets in the selected partition.
+     * If Seek is specified, the tablets sampled each time are the same.
      */
     public void computeSampleTabletIds() {
         if (tableSample == null) {
             return;
         }
         OlapTable olapTable = (OlapTable) desc.getTable();
-        long sampleRows; // The total number of sample rows
-        long hitRows = 1; // The total number of rows hit by the tablet
-        long totalRows = 0; // The total number of partition rows hit
-        long totalTablet = 0; // The total number of tablets in the hit 
partition
+
+        // 1. Calculate the total number of rows in the selected partition, 
and sort partition list.
+        long selectedRows = 0;
+        long totalSampleRows = 0;
+        List<Long> selectedPartitionList = new ArrayList<>();
+        if (FeConstants.runningUnitTest && selectedIndexId == -1) {
+            selectedIndexId = olapTable.getBaseIndexId();
+        }
+        for (Long partitionId : selectedPartitionIds) {
+            final Partition partition = olapTable.getPartition(partitionId);
+            final MaterializedIndex selectedTable = 
partition.getIndex(selectedIndexId);
+            selectedRows += selectedTable.getRowCount();
+            selectedPartitionList.add(partitionId);
+        }
+        selectedPartitionList.sort(Comparator.naturalOrder());
+
+        // 2.Sampling is not required in some cases, will not take effect 
after clear sampleTabletIds.
         if (tableSample.isPercent()) {
-            sampleRows = (long) Math.max(olapTable.getRowCount() * 
(tableSample.getSampleValue() / 100.0), 1);
+            if (tableSample.getSampleValue() >= 100) {
+                return;
+            }
+            totalSampleRows = (long) Math.max(selectedRows * 
(tableSample.getSampleValue() / 100.0), 1);
         } else {
-            sampleRows = Math.max(tableSample.getSampleValue(), 1);
+            if (tableSample.getSampleValue() > selectedRows) {
+                return;
+            }
+            totalSampleRows = tableSample.getSampleValue();
         }
 
-        // calculate the number of tablets by each partition
-        long avgRowsPerPartition = sampleRows / 
Math.max(olapTable.getPartitions().size(), 1);
-
-        for (Partition p : olapTable.getPartitions()) {
-            List<Long> ids = p.getBaseIndex().getTabletIdsInOrder();
-
-            if (ids.isEmpty()) {
+        // 3. Sampling partition. If Seek is specified, the partition will be 
the same for each sampling.
+        long hitRows = 0; // The number of rows hit by the tablet
+        long partitionSeek = tableSample.getSeek() != -1
+                ? tableSample.getSeek() : (long) (new 
SecureRandom().nextDouble() * selectedPartitionIds.size());
+        for (int i = 0; i < selectedPartitionList.size(); i++) {
+            int seekPid = (int) ((i + partitionSeek) % 
selectedPartitionList.size());
+            final Partition partition = 
olapTable.getPartition(selectedPartitionList.get(seekPid));
+            final MaterializedIndex selectedTable = 
partition.getIndex(selectedIndexId);
+            List<Tablet> tablets = selectedTable.getTablets();
+            if (tablets.isEmpty()) {
                 continue;
             }
 
-            // Skip partitions with row count < row count / 2 expected to be 
sampled per partition.
-            // It can be expected to sample a smaller number of partitions to 
avoid uneven distribution
-            // of sampling results.
-            if (p.getBaseIndex().getRowCount() < (avgRowsPerPartition / 2)) {
-                continue;
+            // 4. Calculate the number of rows that need to be sampled in the 
current partition.
+            long sampleRows = 0; // The number of sample rows in partition
+            if (tableSample.isPercent()) {
+                sampleRows = (long) Math.max(selectedTable.getRowCount() * 
(tableSample.getSampleValue() / 100.0), 1);
+            } else {
+                sampleRows = (long) Math.max(
+                        tableSample.getSampleValue() * 
(selectedTable.getRowCount() / selectedRows), 1);
             }
 
-            // It is assumed here that all tablets row count is uniformly 
distributed
-            // TODO Use `p.getBaseIndex().getTablet(n).getRowCount()` to get 
each tablet row count to compute sample.
-            long avgRowsPerTablet = Math.max(p.getBaseIndex().getRowCount() / 
ids.size(), 1);
-            long tabletCounts = Math.max(
-                    avgRowsPerPartition / avgRowsPerTablet + 
(avgRowsPerPartition % avgRowsPerTablet != 0 ? 1 : 0), 1);
-            tabletCounts = Math.min(tabletCounts, ids.size());
-
-            long seek = tableSample.getSeek() != -1
-                    ? tableSample.getSeek() : (long) (Math.random() * 
ids.size());
-            for (int i = 0; i < tabletCounts; i++) {
-                int seekTid = (int) ((i + seek) % ids.size());
-                sampleTabletIds.add(ids.get(seekTid));
+            // 5. Sampling tablets. If Seek is specified, the same tablet will 
be sampled each time.
+            long tabletSeek = tableSample.getSeek() != -1
+                    ? tableSample.getSeek() : (long) (new 
SecureRandom().nextDouble() * tablets.size());
+            for (int j = 0; j < tablets.size(); j++) {
+                int seekTid = (int) ((j + tabletSeek) % tablets.size());
+                long tabletRowCount;
+                if (!FeConstants.runningUnitTest) {
+                    tabletRowCount = tablets.get(seekTid).getRowCount(true);
+                } else {
+                    tabletRowCount = selectedTable.getRowCount() / 
tablets.size();
+                }
+                if (tabletRowCount == 0) {
+                    continue;
+                }
+                sampleTabletIds.add(tablets.get(seekTid).getId());
+                sampleRows -= tabletRowCount;
+                hitRows += tabletRowCount;
+                if (sampleRows <= 0) {
+                    break;
+                }
+            }
+            if (hitRows > totalSampleRows) {
+                break;
             }
-
-            hitRows += avgRowsPerTablet * tabletCounts;
-            totalRows += p.getBaseIndex().getRowCount();
-            totalTablet += ids.size();
-        }
-
-        // all hit, direct full
-        if (totalRows < sampleRows) {
-            // can't fill full sample rows
-            sampleTabletIds.clear();
-        } else if (sampleTabletIds.size() == totalTablet) {
-            // TODO add limit
-            sampleTabletIds.clear();
-        } else if (!sampleTabletIds.isEmpty()) {
-            // TODO add limit
         }
+        LOG.debug("after computeSampleTabletIds, hitRows {}, selectedRows {}", 
hitRows, selectedRows);
     }
 
     public boolean isFromPrepareStmt() {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
index 33c47f6bf41..78f330eef6d 100755
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.OriginalPlanner;
@@ -862,6 +863,7 @@ public class SelectStmtTest {
 
     @Test
     public void testSelectSampleHashBucketTable() throws Exception {
+        FeConstants.runningUnitTest = true;
         Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1");
         OlapTable tbl = (OlapTable) db.getTableOrMetaException("table1");
         long tabletId = 10031L;
@@ -894,7 +896,7 @@ public class SelectStmtTest {
         String sql4 = "SELECT * FROM db1.table1 TABLESAMPLE(9500 ROWS)";
         OriginalPlanner planner4 = (OriginalPlanner) 
dorisAssert.query(sql4).internalExecuteOneAndGetPlan();
         Set<Long> sampleTabletIds4 = ((OlapScanNode) 
planner4.getScanNodes().get(0)).getSampleTabletIds();
-        Assert.assertEquals(0, sampleTabletIds4.size()); // no sample, all 
tablet
+        Assert.assertEquals(10, sampleTabletIds4.size());
 
         String sql5 = "SELECT * FROM db1.table1 TABLESAMPLE(11000 ROWS)";
         OriginalPlanner planner5 = (OriginalPlanner) 
dorisAssert.query(sql5).internalExecuteOneAndGetPlan();
@@ -963,10 +965,12 @@ public class SelectStmtTest {
         OriginalPlanner planner16 = (OriginalPlanner) 
dorisAssert.query(sql16).internalExecuteOneAndGetPlan();
         Set<Long> sampleTabletIds16 = ((OlapScanNode) 
planner16.getScanNodes().get(0)).getSampleTabletIds();
         Assert.assertEquals(1, sampleTabletIds16.size());
+        FeConstants.runningUnitTest = false;
     }
 
     @Test
     public void testSelectSampleRandomBucketTable() throws Exception {
+        FeConstants.runningUnitTest = true;
         Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1");
         OlapTable tbl = (OlapTable) db.getTableOrMetaException("table3");
         long tabletId = 10031L;
@@ -999,7 +1003,7 @@ public class SelectStmtTest {
         String sql4 = "SELECT * FROM db1.table3 TABLESAMPLE(9500 ROWS)";
         OriginalPlanner planner4 = (OriginalPlanner) 
dorisAssert.query(sql4).internalExecuteOneAndGetPlan();
         Set<Long> sampleTabletIds4 = ((OlapScanNode) 
planner4.getScanNodes().get(0)).getSampleTabletIds();
-        Assert.assertEquals(0, sampleTabletIds4.size()); // no sample, all 
tablet
+        Assert.assertEquals(10, sampleTabletIds4.size());
 
         String sql5 = "SELECT * FROM db1.table3 TABLESAMPLE(11000 ROWS)";
         OriginalPlanner planner5 = (OriginalPlanner) 
dorisAssert.query(sql5).internalExecuteOneAndGetPlan();
@@ -1068,6 +1072,7 @@ public class SelectStmtTest {
         OriginalPlanner planner16 = (OriginalPlanner) 
dorisAssert.query(sql16).internalExecuteOneAndGetPlan();
         Set<Long> sampleTabletIds16 = ((OlapScanNode) 
planner16.getScanNodes().get(0)).getSampleTabletIds();
         Assert.assertEquals(1, sampleTabletIds16.size());
+        FeConstants.runningUnitTest = false;
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to