This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch opt_dict_perf in repository https://gitbox.apache.org/repos/asf/doris.git
commit d02207efca33cbcafc52bf832f9db6cd782ebfdf Author: yiguolei <[email protected]> AuthorDate: Fri Mar 17 08:48:25 2023 +0800 call close to release lock; use try write lock --- .../main/java/org/apache/doris/catalog/Table.java | 1 + .../org/apache/doris/catalog/TabletStatMgr.java | 12 ++- .../org/apache/doris/nereids/CascadesContext.java | 1 + .../java/org/apache/doris/qe/ResultReceiver.java | 11 +- .../java/org/apache/doris/qe/StmtExecutor.java | 6 ++ .../doris/statistics/StatisticsCacheLoader.java | 115 +++++++++++---------- fe/pom.xml | 3 +- 7 files changed, 84 insertions(+), 65 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 60547c659a..3026764e2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -184,6 +184,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf { LOG.warn("Failed to try table {}'s write lock. timeout {} {}. Current owner: {}", name, timeout, unit.name(), rwLock.getOwner()); } + return res; } catch (InterruptedException e) { LOG.warn("failed to try write lock at table[" + name + "]", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index f507e4ecb8..acead86893 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; /* * TabletStatMgr is for collecting tablet(replica) statistics from backends. @@ -49,7 +50,8 @@ public class TabletStatMgr extends MasterDaemon { super("tablet stat mgr", Config.tablet_stat_update_interval_second * 1000); } - @Override + protected void runAfterCatalogReady2() {} + protected void runAfterCatalogReady() { ImmutableMap<Long, Backend> backends = Env.getCurrentSystemInfo().getIdToBackend(); long start = System.currentTimeMillis(); @@ -94,7 +96,13 @@ public class TabletStatMgr extends MasterDaemon { continue; } OlapTable olapTable = (OlapTable) table; - if (!table.writeLockIfExist()) { + // Use try write lock to avoid such cases + // Time1: Thread1 hold read lock for 5min + // Time2: Thread2 want to add write lock, then it will be the first element in lock queue + // Time3: Thread3 want to add read lock, but it will not, because thread 2 want to add write lock + // In this case, thread 3 has to wait more than 5min, because it has to wait thread 2 to add + // write lock and release write lock and thread 2 has to wait thread 1 to release read lock + if (!table.tryWriteLockIfExist(3000, TimeUnit.MILLISECONDS)) { continue; } try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index 2ed1123bb1..7caffbfd68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -386,6 +386,7 @@ public class CascadesContext implements ScheduleContext, PlanSource { cascadesContext.extractTables(plan); for (Table table : cascadesContext.tables) { if (!table.tryReadLock(1, TimeUnit.MINUTES)) { + close(); throw new RuntimeException(String.format("Failed to get read lock on table: %s", table.getName())); } locked.push(table); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index d7c9070421..9923a4e7d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -18,6 +18,7 @@ package org.apache.doris.qe; import org.apache.doris.common.Status; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types; import org.apache.doris.rpc.BackendServiceProxy; @@ -96,7 +97,7 @@ public class ResultReceiver { rowBatch.setQueryStatistics(pResult.getQueryStatistics()); if (packetIdx != pResult.getPacketSeq()) { - LOG.warn("receive packet failed, expect={}, receive={}", packetIdx, pResult.getPacketSeq()); + LOG.warn("finistId={}, receive packet failed, expect={}, receive={}", DebugUtil.printId(finstId), packetIdx, pResult.getPacketSeq()); status.setRpcStatus("receive error packet"); return null; } @@ -105,7 +106,7 @@ public class ResultReceiver { isDone = pResult.getEos(); if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) { - LOG.info("get first empty rowbatch"); + LOG.info("finistId={}, get first empty rowbatch", DebugUtil.printId(finstId)); rowBatch.setEos(false); return rowBatch; } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) { @@ -119,11 +120,11 @@ public class ResultReceiver { } } } catch (RpcException e) { - LOG.warn("fetch result rpc exception, finstId={}", finstId, e); + LOG.warn("fetch result rpc exception, finstId={}", DebugUtil.printId(finstId), e); status.setRpcStatus(e.getMessage()); SimpleScheduler.addToBlacklist(backendId, e.getMessage()); } catch (ExecutionException e) { - LOG.warn("fetch result execution exception, finstId={}", finstId, e); + LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(finstId), e); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage())); @@ -132,7 +133,7 @@ public class ResultReceiver { SimpleScheduler.addToBlacklist(backendId, e.getMessage()); } } catch (TimeoutException e) { - LOG.warn("fetch result timeout, finstId={}", finstId, e); + LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e); status.setStatus("query timeout"); } finally { synchronized (this) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 97532558e9..9bbbb03d7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -801,9 +801,15 @@ public class StmtExecutor implements ProfileWriter { List<TableIf> tables = Lists.newArrayList(tableMap.values()); int analyzeTimes = 2; for (int i = 1; i <= analyzeTimes; i++) { + MetaLockUtils.readLockTables(tables); try { + long startAnalysisTime = System.currentTimeMillis(); analyzeAndGenerateQueryPlan(tQueryOptions); + long endAnalysisTime = System.currentTimeMillis(); + if (endAnalysisTime - startAnalysisTime > 10000) { + LOG.info("yyyyy {}", endAnalysisTime - startAnalysisTime); + } break; } catch (MVSelectFailedException e) { /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java index 08781e5689..a50d9f0274 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java @@ -33,8 +33,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +/** + * Use to load stats cache. + */ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKey, Statistic> { private static final Logger LOG = LogManager.getLogger(StatisticsCacheLoader.class); @@ -47,72 +52,68 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe + "." + StatisticConstants.HISTOGRAM_TBL_NAME + " WHERE " + "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')"; - private static int CUR_RUNNING_LOAD = 0; - - private static final Object LOCK = new Object(); + private final ConcurrentMap<StatisticsCacheKey, CompletableFuture<Statistic>> + inProgressing = new ConcurrentHashMap<>(); // TODO: Maybe we should trigger a analyze job when the required ColumnStatistic doesn't exists. @Override public @NonNull CompletableFuture<Statistic> asyncLoad(@NonNull StatisticsCacheKey key, @NonNull Executor executor) { - synchronized (LOCK) { - if (CUR_RUNNING_LOAD > StatisticConstants.LOAD_TASK_LIMITS) { + CompletableFuture<Statistic> future = inProgressing.get(key); + if (future != null) { + return future; + } + future = CompletableFuture.supplyAsync(() -> { + Statistic statistic = new Statistic(); + long startTime = 0; + try { + LOG.info("Query BE for column stats:{}-{} start time:{}", key.tableId, key.colName, + startTime); + Map<String, String> params = new HashMap<>(); + params.put("tblId", String.valueOf(key.tableId)); + params.put("idxId", String.valueOf(key.idxId)); + params.put("colId", String.valueOf(key.colName)); + + List<ColumnStatistic> columnStatistics; + List<ResultRow> columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(QUERY_COLUMN_STATISTICS)); try { - LOCK.wait(); - } catch (InterruptedException e) { - LOG.warn("Ignore interruption", e); + columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult); + } catch (Exception e) { + LOG.warn("Failed to deserialize column statistics", e); + throw new CompletionException(e); + } + if (CollectionUtils.isEmpty(columnStatistics)) { + statistic.setColumnStatistic(ColumnStatistic.DEFAULT); + } else { + statistic.setColumnStatistic(columnStatistics.get(0)); } - } - CUR_RUNNING_LOAD++; - return CompletableFuture.supplyAsync(() -> { - Statistic statistic = new Statistic(); + List<Histogram> histogramStatistics; + List<ResultRow> histogramResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(QUERY_HISTOGRAM_STATISTICS)); try { - Map<String, String> params = new HashMap<>(); - params.put("tblId", String.valueOf(key.tableId)); - params.put("idxId", String.valueOf(key.idxId)); - params.put("colId", String.valueOf(key.colName)); - - List<ColumnStatistic> columnStatistics; - List<ResultRow> columnResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(QUERY_COLUMN_STATISTICS)); - try { - columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult); - } catch (Exception e) { - LOG.warn("Failed to deserialize column statistics", e); - throw new CompletionException(e); - } - if (CollectionUtils.isEmpty(columnStatistics)) { - statistic.setColumnStatistic(ColumnStatistic.DEFAULT); - } else { - statistic.setColumnStatistic(columnStatistics.get(0)); - } - - List<Histogram> histogramStatistics; - List<ResultRow> histogramResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(QUERY_HISTOGRAM_STATISTICS)); - try { - histogramStatistics = StatisticsUtil.deserializeToHistogramStatistics(histogramResult); - } catch (Exception e) { - LOG.warn("Failed to deserialize histogram statistics", e); - throw new CompletionException(e); - } - if (CollectionUtils.isEmpty(histogramStatistics)) { - statistic.setHistogram(Histogram.DEFAULT); - } else { - statistic.setHistogram(histogramStatistics.get(0)); - } - } finally { - synchronized (LOCK) { - CUR_RUNNING_LOAD--; - LOCK.notify(); - } + histogramStatistics = StatisticsUtil.deserializeToHistogramStatistics(histogramResult); + } catch (Exception e) { + LOG.warn("Failed to deserialize histogram statistics", e); + throw new CompletionException(e); } - - return statistic; - }); - } + if (CollectionUtils.isEmpty(histogramStatistics)) { + statistic.setHistogram(Histogram.DEFAULT); + } else { + statistic.setHistogram(histogramStatistics.get(0)); + } + } finally { + long endTime = System.currentTimeMillis(); + LOG.info("Query BE for column stats:{}-{} end time:{} cost time:{}", key.tableId, key.colName, + endTime, endTime - startTime); + inProgressing.remove(key); + } + return statistic; + }); + inProgressing.put(key, future); + return future; } } diff --git a/fe/pom.xml b/fe/pom.xml index 6164a4b2ec..a01f8d6cd9 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -69,10 +69,11 @@ under the License. <artifactId>flatten-maven-plugin</artifactId> <version>1.2.5</version> </plugin> + <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> - </plugin> + </plugin>--> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>license-maven-plugin</artifactId> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
