This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new e444348bd7b [improvement](statistics) Analyze partition columns when 
new partition loaded data for the first time. (#29154) (#29296)
e444348bd7b is described below

commit e444348bd7b3bdd752475cada43f8a37cee06473
Author: Jibing-Li <[email protected]>
AuthorDate: Fri Dec 29 17:04:32 2023 +0800

    [improvement](statistics) Analyze partition columns when new partition 
loaded data for the first time. (#29154) (#29296)
    
    The first time load data to a partition, we need to analyze the partition 
columns even when the health rate is high. Because if not, the min max value of 
the column may not include the new partition values, which may cause bad plan.
---
 .../apache/doris/analysis/ShowTableStatsStmt.java  |   2 +
 .../apache/doris/statistics/AnalysisManager.java   |   9 ++
 .../doris/statistics/StatisticsAutoCollector.java  |  35 ++++---
 .../apache/doris/statistics/TableStatsMeta.java    |  14 +++
 .../doris/transaction/DatabaseTransactionMgr.java  |   6 +-
 .../statistics/StatisticsAutoCollectorTest.java    |   5 +
 .../suites/statistics/analyze_stats.groovy         | 106 ++++++++++++++-------
 7 files changed, 130 insertions(+), 47 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
index 3a80daebc91..284b6248b85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
@@ -54,6 +54,7 @@ public class ShowTableStatsStmt extends ShowStmt {
                     .add("updated_time")
                     .add("columns")
                     .add("trigger")
+                    .add("new_partition")
                     .build();
 
     private final TableName tableName;
@@ -149,6 +150,7 @@ public class ShowTableStatsStmt extends ShowStmt {
         row.add(formattedDateTime);
         row.add(tableStatistic.analyzeColumns().toString());
         row.add(tableStatistic.jobType.toString());
+        row.add(String.valueOf(tableStatistic.newPartitionLoaded.get()));
         result.add(row);
         return new ShowResultSet(getMetaData(), result);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 39ae191d45a..ac4a83a6b28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -1031,6 +1031,15 @@ public class AnalysisManager implements Writable {
         }
     }
 
+    // Set to true means new partition loaded data
+    public void setNewPartitionLoaded(long tblId) {
+        TableStatsMeta statsStatus = idToTblStats.get(tblId);
+        if (statsStatus != null) {
+            statsStatus.newPartitionLoaded.set(true);
+            logCreateTableStats(statsStatus);
+        }
+    }
+
     public void updateTableStatsStatus(TableStatsMeta tableStats) {
         replayUpdateTableStatsStatus(tableStats);
         logCreateTableStats(tableStats);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 2c78a5f7f32..fc6e7fb3e2c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.Config;
@@ -39,6 +40,7 @@ import org.apache.logging.log4j.Logger;
 import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -138,8 +140,9 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
             return false;
         }
         TableStatsMeta tableStats = 
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
-        // means it never get analyzed yet
-        if (tableStats == null) {
+
+        // means it's never got analyzed or new partition loaded data.
+        if (tableStats == null || tableStats.newPartitionLoaded.get()) {
             return false;
         }
         return System.currentTimeMillis()
@@ -177,8 +180,7 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
 
     @VisibleForTesting
     protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
-        TableIf table = StatisticsUtil
-                .findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId);
+        TableIf table = StatisticsUtil.findTable(jobInfo.catalogId, 
jobInfo.dbId, jobInfo.tblId);
         // Skip tables that are too width.
         if (table.getBaseSchema().size() > 
StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) {
             return null;
@@ -186,16 +188,27 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
 
         AnalysisManager analysisManager = 
Env.getServingEnv().getAnalysisManager();
         TableStatsMeta tblStats = 
analysisManager.findTableStatsStatus(table.getId());
-        if (!table.needReAnalyzeTable(tblStats)) {
-            return null;
-        }
 
-        Map<String, Set<String>> needRunPartitions = 
table.findReAnalyzeNeededPartitions();
+        Map<String, Set<String>> needRunPartitions = null;
+        String colNames = jobInfo.colName;
+        if (table.needReAnalyzeTable(tblStats)) {
+            needRunPartitions = table.findReAnalyzeNeededPartitions();
+        } else if (table instanceof OlapTable && 
tblStats.newPartitionLoaded.get()) {
+            OlapTable olapTable = (OlapTable) table;
+            needRunPartitions = new HashMap<>();
+            Set<String> partitionColumnNames = 
olapTable.getPartitionInfo().getPartitionColumns().stream()
+                    .map(Column::getName).collect(Collectors.toSet());
+            colNames = 
partitionColumnNames.stream().collect(Collectors.joining(","));
+            Set<String> partitionNames = olapTable.getAllPartitions().stream()
+                    .map(Partition::getName).collect(Collectors.toSet());
+            for (String column : partitionColumnNames) {
+                needRunPartitions.put(column, partitionNames);
+            }
+        }
 
-        if (needRunPartitions.isEmpty()) {
+        if (needRunPartitions == null || needRunPartitions.isEmpty()) {
             return null;
         }
-
-        return new 
AnalysisInfoBuilder(jobInfo).setColToPartitions(needRunPartitions).build();
+        return new 
AnalysisInfoBuilder(jobInfo).setColName(colNames).setColToPartitions(needRunPartitions).build();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
index eb6672ffe18..00878adcc44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
@@ -19,6 +19,7 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
@@ -37,6 +38,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -67,6 +69,9 @@ public class TableStatsMeta implements Writable {
     @SerializedName("trigger")
     public JobType jobType;
 
+    @SerializedName("newPartitionLoaded")
+    public AtomicBoolean newPartitionLoaded = new AtomicBoolean(false);
+
     @VisibleForTesting
     public TableStatsMeta() {
         tblId = 0;
@@ -154,6 +159,15 @@ public class TableStatsMeta implements Writable {
                             .filter(c -> 
!StatisticsUtil.isUnsupportedType(c.getType()))
                             
.map(Column::getName).collect(Collectors.toSet()))) {
                 updatedRows.set(0);
+                newPartitionLoaded.set(false);
+            }
+            if (tableIf instanceof OlapTable) {
+                PartitionInfo partitionInfo = ((OlapTable) 
tableIf).getPartitionInfo();
+                if (partitionInfo != null && 
analyzedJob.colToPartitions.keySet()
+                        
.containsAll(partitionInfo.getPartitionColumns().stream()
+                            
.map(Column::getName).collect(Collectors.toSet()))) {
+                    newPartitionLoaded.set(false);
+                }
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index d8918368fa9..1e2a8764505 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -1748,6 +1748,7 @@ public class DatabaseTransactionMgr {
 
     private boolean updateCatalogAfterVisible(TransactionState 
transactionState, Database db) {
         Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
+        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
         for (TableCommitInfo tableCommitInfo : 
transactionState.getIdToTableCommitInfos().values()) {
             long tableId = tableCommitInfo.getTableId();
             OlapTable table = (OlapTable) db.getTableNullable(tableId);
@@ -1807,6 +1808,10 @@ public class DatabaseTransactionMgr {
                 } // end for indices
                 long version = partitionCommitInfo.getVersion();
                 long versionTime = partitionCommitInfo.getVersionTime();
+                if (partition.getVisibleVersion() == 
Partition.PARTITION_INIT_VERSION
+                        && version > Partition.PARTITION_INIT_VERSION) {
+                    analysisManager.setNewPartitionLoaded(tableId);
+                }
                 partition.updateVisibleVersionAndTime(version, versionTime);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("transaction state {} set partition {}'s version 
to [{}]",
@@ -1814,7 +1819,6 @@ public class DatabaseTransactionMgr {
                 }
             }
         }
-        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
         Map<Long, Long> tableIdToTotalNumDeltaRows = 
transactionState.getTableIdToTotalNumDeltaRows();
         Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
         tableIdToTotalNumDeltaRows
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
index cc77557c8ce..c4b2b08720f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
@@ -301,8 +301,13 @@ public class StatisticsAutoCollectorTest {
         };
         // A very huge table has been updated recently, so we should skip it 
this time
         stats.updatedTime = System.currentTimeMillis() - 1000;
+        stats.newPartitionLoaded = new AtomicBoolean();
+        stats.newPartitionLoaded.set(true);
         StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
+        // Test new partition loaded data for the first time. Not skip.
         Assertions.assertFalse(autoCollector.skip(olapTable));
+        stats.newPartitionLoaded.set(false);
+        // Assertions.assertTrue(autoCollector.skip(olapTable));
         // The update of this huge table is long time ago, so we shouldn't 
skip it this time
         stats.updatedTime = System.currentTimeMillis()
                 - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 
10000;
diff --git a/regression-test/suites/statistics/analyze_stats.groovy 
b/regression-test/suites/statistics/analyze_stats.groovy
index 40aa780f283..1f26c643b0f 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -2573,43 +2573,79 @@ PARTITION `p599` VALUES IN (599)
     sql """drop stats col1100 """
     sql """DROP TABLE IF EXISTS col1100"""
 
-    // Test trigger type.
-    sql """DROP DATABASE IF EXISTS trigger"""
-    sql """CREATE DATABASE IF NOT EXISTS trigger"""
-    sql """USE trigger"""
-    sql """
-      CREATE TABLE if not exists trigger_test(
-       `id`      int NOT NULL,
-       `name`    VARCHAR(152)
-      )ENGINE=OLAP
+   // Test partititon load data for the first time.
+   sql """
+     CREATE TABLE `partition_test` (
+      `id` INT NOT NULL,
+      `name` VARCHAR(25) NOT NULL,
+      `comment` VARCHAR(152) NULL
+      ) ENGINE=OLAP
       DUPLICATE KEY(`id`)
-      COMMENT "OLAP"
+      COMMENT 'OLAP'
+      PARTITION BY RANGE(`id`)
+      (PARTITION p1 VALUES [("0"), ("100")),
+       PARTITION p2 VALUES [("100"), ("200")),
+       PARTITION p3 VALUES [("200"), ("300")))
       DISTRIBUTED BY HASH(`id`) BUCKETS 1
       PROPERTIES (
-       "replication_num" = "1"
-      );
-    """
-    sql """insert into trigger_test values(1,'name1') """
-    sql """analyze database trigger PROPERTIES("use.auto.analyzer"="true")"""
-
-    int i = 0;
-    for (0; i < 10; i++) {
-        def result = sql """show column stats trigger_test"""
-        if (result.size() != 2) {
-           Thread.sleep(1000)
-            continue;
-        }
-        assertEquals(result[0][10], "SYSTEM")
-        assertEquals(result[1][10], "SYSTEM")
-        break
-    }
-    if (i < 10) {
-        sql """analyze table trigger_test with sync"""
-        def result = sql """show column stats trigger_test"""
-        assertEquals(result.size(), 2)
-        assertEquals(result[0][10], "MANUAL")
-        assertEquals(result[1][10], "MANUAL")
-    }
-    sql """DROP DATABASE IF EXISTS trigger"""
+       "replication_num" = "1");
+     """
+
+   sql """analyze table partition_test with sync"""
+   sql """insert into partition_test values (1, '1', '1')"""
+   def partition_result = sql """show table stats partition_test"""
+   assertEquals(partition_result[0][6], "true")
+   assertEquals(partition_result[0][0], "1")
+   sql """analyze table partition_test with sync"""
+   partition_result = sql """show table stats partition_test"""
+   assertEquals(partition_result[0][6], "false")
+   sql """insert into partition_test values (101, '1', '1')"""
+   partition_result = sql """show table stats partition_test"""
+   assertEquals(partition_result[0][6], "true")
+   sql """analyze table partition_test(id) with sync"""
+   partition_result = sql """show table stats partition_test"""
+   assertEquals(partition_result[0][6], "false")
+   sql """insert into partition_test values (102, '1', '1')"""
+   partition_result = sql """show table stats partition_test"""
+   assertEquals(partition_result[0][6], "false")
+
+   // Test trigger type.
+   sql """DROP DATABASE IF EXISTS trigger"""
+   sql """CREATE DATABASE IF NOT EXISTS trigger"""
+   sql """USE trigger"""
+   sql """
+     CREATE TABLE if not exists trigger_test(
+      `id`      int NOT NULL,
+      `name`    VARCHAR(152)
+     )ENGINE=OLAP
+     DUPLICATE KEY(`id`)
+     COMMENT "OLAP"
+     DISTRIBUTED BY HASH(`id`) BUCKETS 1
+     PROPERTIES (
+      "replication_num" = "1"
+     );
+   """
+   sql """insert into trigger_test values(1,'name1') """
+   sql """analyze database trigger PROPERTIES("use.auto.analyzer"="true")"""
+
+   int i = 0;
+   for (0; i < 10; i++) {
+       def result = sql """show column stats trigger_test"""
+       if (result.size() != 2) {
+          Thread.sleep(1000)
+           continue;
+       }
+       assertEquals(result[0][10], "SYSTEM")
+       assertEquals(result[1][10], "SYSTEM")
+       break
+   }
+   if (i < 10) {
+       sql """analyze table trigger_test with sync"""
+       def result = sql """show column stats trigger_test"""
+       assertEquals(result.size(), 2)
+       assertEquals(result[0][10], "MANUAL")
+       assertEquals(result[1][10], "MANUAL")
+   }
+   sql """DROP DATABASE IF EXISTS trigger"""
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to