This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a1b0264544c [improvement](statistics)Async drop stats while truncating 
table. (#37715)
a1b0264544c is described below

commit a1b0264544ce14245c4198f2e84399d400dfdf9a
Author: Jibing-Li <[email protected]>
AuthorDate: Tue Jul 16 18:59:23 2024 +0800

    [improvement](statistics)Async drop stats while truncating table. (#37715)
    
    Drop stats for table with many partitions may slow, because to
    invalidate partition stats cache is time consuming. Truncate table
    operation do the drop stats synchronously, so the truncate table may be
    very slow for partition tables.
    This pr is to improvement the performance of truncate table. Do the drop
    stats asynchronously.
    
    Time consumed for truncate a table with 10000 partitions and 10 columns
    reduced to 2.5s from 10s.
---
 .../apache/doris/service/FrontendServiceImpl.java  |  9 +++-
 .../apache/doris/statistics/AnalysisManager.java   | 51 +++++++++++++++++++---
 .../doris/statistics/InvalidateStatsTarget.java    |  7 ++-
 .../doris/statistics/PartitionColumnStatistic.java |  4 +-
 .../doris/statistics/AnalysisManagerTest.java      | 27 ++++++++++++
 5 files changed, 88 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 17bc984b2cc..8f34fed5e5f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3264,8 +3264,13 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (target.partitions != null) {
             partitionNames = new PartitionNames(false, new 
ArrayList<>(target.partitions));
         }
-        analysisManager.invalidateLocalStats(target.catalogId, target.dbId, 
target.tableId,
-                target.columns, tableStats, partitionNames);
+        if (target.isTruncate) {
+            analysisManager.submitAsyncDropStatsTask(target.catalogId, 
target.dbId,
+                    target.tableId, tableStats, partitionNames);
+        } else {
+            analysisManager.invalidateLocalStats(target.catalogId, 
target.dbId, target.tableId,
+                    target.columns, tableStats, partitionNames);
+        }
         return new TStatus(TStatusCode.OK);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 1fdd09d661f..39d483e5657 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -134,6 +134,7 @@ public class AnalysisManager implements Writable {
     private StatisticsCache statisticsCache;
 
     private AnalysisTaskExecutor taskExecutor;
+    private ThreadPoolExecutor dropStatsExecutors;
 
     // Store task information in metadata.
     protected final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
@@ -157,6 +158,11 @@ public class AnalysisManager implements Writable {
             this.taskExecutor = new 
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
                     Integer.MAX_VALUE);
             this.statisticsCache = new StatisticsCache();
+            this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool(
+                1, 1, 0,
+                TimeUnit.DAYS, new LinkedBlockingQueue<>(10),
+                new ThreadPoolExecutor.AbortPolicy(),
+                "Drop stats executor", true);
         }
     }
 
@@ -656,7 +662,7 @@ public class AnalysisManager implements Writable {
         if (partitionNames != null && !partitionNames.isStar() && 
partitionNames.getPartitionNames() != null) {
             partitions = new HashSet<>(partitionNames.getPartitionNames());
         }
-        invalidateRemoteStats(catalogId, dbId, tblId, cols, partitions);
+        invalidateRemoteStats(catalogId, dbId, tblId, cols, partitions, false);
         StatisticsRepository.dropStatistics(catalogId, dbId, tblId, cols, 
partitions);
     }
 
@@ -668,17 +674,51 @@ public class AnalysisManager implements Writable {
         long catalogId = table.getDatabase().getCatalog().getId();
         long dbId = table.getDatabase().getId();
         long tableId = table.getId();
-        invalidateLocalStats(catalogId, dbId, tableId, null, tableStats, 
partitionNames);
+        submitAsyncDropStatsTask(catalogId, dbId, tableId, tableStats, 
partitionNames);
         // Drop stats ddl is master only operation.
         Set<String> partitions = null;
         if (partitionNames != null && !partitionNames.isStar() && 
partitionNames.getPartitionNames() != null) {
             partitions = new HashSet<>(partitionNames.getPartitionNames());
         }
         // Drop stats ddl is master only operation.
-        invalidateRemoteStats(catalogId, dbId, tableId, null, partitions);
+        invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, 
true);
         StatisticsRepository.dropStatistics(catalogId, dbId, table.getId(), 
null, partitions);
     }
 
+    class DropStatsTask implements Runnable {
+        private final long catalogId;
+        private final long dbId;
+        private final long tableId;
+        private final Set<String> columns;
+        private final TableStatsMeta tableStats;
+        private final PartitionNames partitionNames;
+
+        public DropStatsTask(long catalogId, long dbId, long tableId, 
Set<String> columns,
+                             TableStatsMeta tableStats, PartitionNames 
partitionNames) {
+            this.catalogId = catalogId;
+            this.dbId = dbId;
+            this.tableId = tableId;
+            this.columns = columns;
+            this.tableStats = tableStats;
+            this.partitionNames = partitionNames;
+        }
+
+        @Override
+        public void run() {
+            invalidateLocalStats(catalogId, dbId, tableId, columns, 
tableStats, partitionNames);
+        }
+    }
+
+    public void submitAsyncDropStatsTask(long catalogId, long dbId, long 
tableId,
+                                         TableStatsMeta tableStats, 
PartitionNames partitionNames) {
+        try {
+            dropStatsExecutors.submit(new DropStatsTask(catalogId, dbId, 
tableId, null, tableStats, partitionNames));
+        } catch (Throwable t) {
+            LOG.info("Failed to drop stats for truncate table {}.{}.{}. 
Reason:{}",
+                    catalogId, dbId, tableId, t.getMessage());
+        }
+    }
+
     public void invalidateLocalStats(long catalogId, long dbId, long tableId, 
Set<String> columns,
                                      TableStatsMeta tableStats, PartitionNames 
partitionNames) {
         if (tableStats == null) {
@@ -743,8 +783,9 @@ public class AnalysisManager implements Writable {
     }
 
     public void invalidateRemoteStats(long catalogId, long dbId, long tableId,
-                                      Set<String> columns, Set<String> 
partitions) {
-        InvalidateStatsTarget target = new InvalidateStatsTarget(catalogId, 
dbId, tableId, columns, partitions);
+                                      Set<String> columns, Set<String> 
partitions, boolean isTruncate) {
+        InvalidateStatsTarget target = new InvalidateStatsTarget(
+                catalogId, dbId, tableId, columns, partitions, isTruncate);
         TInvalidateFollowerStatsCacheRequest request = new 
TInvalidateFollowerStatsCacheRequest();
         request.key = GsonUtils.GSON.toJson(target);
         StatisticsCache statisticsCache = 
Env.getCurrentEnv().getStatisticsCache();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java
index c45db7553dc..dd29093e164 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java
@@ -38,11 +38,16 @@ public class InvalidateStatsTarget {
     @SerializedName("partitions")
     public final Set<String> partitions;
 
-    public InvalidateStatsTarget(long catalogId, long dbId, long tableId, 
Set<String> columns, Set<String> partitions) {
+    @SerializedName("it")
+    public final boolean isTruncate;
+
+    public InvalidateStatsTarget(long catalogId, long dbId, long tableId, 
Set<String> columns,
+                                 Set<String> partitions, boolean isTruncate) {
         this.catalogId = catalogId;
         this.dbId = dbId;
         this.tableId = tableId;
         this.columns = columns;
         this.partitions = partitions;
+        this.isTruncate = isTruncate;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatistic.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatistic.java
index 24f2946cbc4..d755392e79f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatistic.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatistic.java
@@ -137,7 +137,7 @@ public class PartitionColumnStatistic {
                     / partitionStatisticBuilder.getCount());
             String min = row.get(9);
             String max = row.get(10);
-            if (!"NULL".equalsIgnoreCase(min)) {
+            if (min != null && !"NULL".equalsIgnoreCase(min)) {
                 try {
                     
partitionStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(),
 min));
                     
partitionStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(),
 min));
@@ -148,7 +148,7 @@ public class PartitionColumnStatistic {
             } else {
                 
partitionStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY);
             }
-            if (!"NULL".equalsIgnoreCase(max)) {
+            if (max != null && !"NULL".equalsIgnoreCase(max)) {
                 try {
                     
partitionStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(),
 max));
                     
partitionStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(),
 max));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index b8be6794a09..898c3af0fdf 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -63,6 +63,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 // CHECKSTYLE OFF
 public class AnalysisManagerTest {
@@ -639,4 +640,30 @@ public class AnalysisManagerTest {
         Assertions.assertTrue(job.columns.contains(Pair.of("index1", "col7")));
         Assertions.assertEquals(JobPriority.LOW, job.priority);
     }
+
+    @Test
+    public void testAsyncDropStats() throws InterruptedException {
+        AtomicInteger count = new AtomicInteger(0);
+        new MockUp<AnalysisManager>() {
+            @Mock
+            public void invalidateLocalStats(long catalogId, long dbId, long 
tableId, Set<String> columns,
+                                             TableStatsMeta tableStats, 
PartitionNames partitionNames) {
+                try {
+                    Thread.sleep(1000);
+                    count.incrementAndGet();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        AnalysisManager analysisManager = new AnalysisManager();
+        for (int i = 0; i < 20; i++) {
+            System.out.println("Submit " + i);
+            analysisManager.submitAsyncDropStatsTask(0, 0, 0, null, null);
+        }
+        Thread.sleep(25000);
+        System.out.println(count.get());
+        Assertions.assertTrue(count.get() > 10);
+        Assertions.assertTrue(count.get() < 20);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to