This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ac681e8e8c513bc3ba07a82cc1661c65e1c0b53c Author: Jack Drogon <[email protected]> AuthorDate: Fri Feb 2 21:37:44 2024 +0800 [ehmancement](binlog) Add show proc '/binlog' impl (#30770) Signed-off-by: Jack Drogon <[email protected]> --- .../org/apache/doris/binlog/BinlogConfigCache.java | 4 +- .../org/apache/doris/binlog/BinlogManager.java | 24 +++++++ .../java/org/apache/doris/binlog/BinlogUtils.java | 4 ++ .../java/org/apache/doris/binlog/DBBinlog.java | 64 +++++++++++++++++ .../java/org/apache/doris/binlog/TableBinlog.java | 82 ++++++++++++++++++++++ .../apache/doris/common/proc/BinlogProcDir.java | 45 ++++++++++++ .../org/apache/doris/common/proc/ProcService.java | 1 + 7 files changed, 223 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java index c414b853078..30641bae8c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java @@ -97,7 +97,7 @@ public class BinlogConfigCache { return null; } - Table table = db.getTableOrMetaException(tableId); + Table table = db.getTableNullable(tableId); if (table == null) { LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); return null; @@ -109,6 +109,8 @@ public class BinlogConfigCache { OlapTable olapTable = (OlapTable) table; tableBinlogConfig = olapTable.getBinlogConfig(); + // get table binlog config, when table modify binlogConfig + // it create a new binlog, not update inplace, so we don't need to clone binlogConfig dbTableBinlogEnableMap.put(tableId, tableBinlogConfig); return tableBinlogConfig; } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 8c68f908ce5..8187e966561 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -22,6 +22,8 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; +import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.common.proc.ProcResult; import org.apache.doris.persist.AlterDatabasePropertyInfo; import org.apache.doris.persist.BarrierLog; import org.apache.doris.persist.BatchModifyPartitionsInfo; @@ -36,6 +38,7 @@ import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -54,6 +57,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class BinlogManager { private static final int BUFFER_SIZE = 16 * 1024; + private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("Name") + .add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime") + .add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime") + .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds") + .build(); private static final Logger LOG = LogManager.getLogger(BinlogManager.class); @@ -545,6 +553,22 @@ public class BinlogManager { return checksum; } + public ProcResult getBinlogInfo() { + BaseProcResult result = new BaseProcResult(); + result.setNames(TITLE_NAMES); + + lock.readLock().lock(); + try { + for (DBBinlog dbBinlog : dbBinlogMap.values()) { + dbBinlog.getBinlogInfo(result); + } + } finally { + lock.readLock().unlock(); + } + + return result; + } + // remove DB // remove Table } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java index 4e134104b6d..0f6c2308248 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java @@ -90,4 +90,8 @@ public class BinlogUtils { long expireSeconds = currentSeconds - ttlSeconds; return expireSeconds * 1000; } + + public static String convertTimeToReadable(long time) { + return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(time)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 4ba1416cd5c..a3133bfb5c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -18,7 +18,10 @@ package org.apache.doris.binlog; import org.apache.doris.catalog.BinlogConfig; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; +import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; @@ -30,6 +33,7 @@ import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -448,4 +452,64 @@ public class DBBinlog { lock.writeLock().unlock(); } } + + public void getBinlogInfo(BaseProcResult result) { + BinlogConfig binlogConfig = binlogConfigCache.getDBBinlogConfig(dbId); + + String dbName = "(dropped)"; + String dropped = "true"; + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db != null) { + dbName = db.getFullName(); + dropped = "false"; + } + + lock.readLock().lock(); + try { + boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId); + if (dbBinlogEnable) { + List<String> info = new ArrayList<>(); + + info.add(dbName); + String type = "db"; + info.add(type); + String id = String.valueOf(dbId); + info.add(id); + info.add(dropped); + String binlogLength = String.valueOf(allBinlogs.size()); + info.add(binlogLength); + String firstBinlogCommittedTime = null; + String readableFirstBinlogCommittedTime = null; + if (!timestamps.isEmpty()) { + long timestamp = timestamps.get(0).second; + firstBinlogCommittedTime = String.valueOf(timestamp); + readableFirstBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp); + } + info.add(firstBinlogCommittedTime); + info.add(readableFirstBinlogCommittedTime); + String lastBinlogCommittedTime = null; + String readableLastBinlogCommittedTime = null; + if (!timestamps.isEmpty()) { + long timestamp = timestamps.get(timestamps.size() - 1).second; + lastBinlogCommittedTime = String.valueOf(timestamp); + readableLastBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp); + } + info.add(lastBinlogCommittedTime); + info.add(readableLastBinlogCommittedTime); + String binlogTtlSeconds = null; + if (binlogConfig != null) { + binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds()); + } + info.add(binlogTtlSeconds); + + result.addRow(info); + } else { + for (TableBinlog tableBinlog : tableBinlogMap.values()) { + tableBinlog.getBinlogInfo(db, result); + } + } + } finally { + lock.readLock().unlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 0857ae7abb1..3dd290a07f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -18,7 +18,11 @@ package org.apache.doris.binlog; import org.apache.doris.catalog.BinlogConfig; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.common.Pair; +import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; @@ -27,8 +31,10 @@ import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; +import java.util.List; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -233,4 +239,80 @@ public class TableBinlog { lock.writeLock().unlock(); } } + + public void getBinlogInfo(Database db, BaseProcResult result) { + BinlogConfig binlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId); + + String tableName = null; + String dropped = null; + if (db == null) { + tableName = "(dropped).(unknown)"; + dropped = "true"; + } else { + String dbName = db.getFullName(); + Table table = db.getTableNullable(tableId); + if (table == null) { + dropped = "true"; + tableName = dbName + ".(dropped)"; + } + + dropped = "false"; + if (table instanceof OlapTable) { + OlapTable olapTable = (OlapTable) table; + tableName = dbName + "." + olapTable.getName(); + } else { + tableName = dbName + ".(not_olaptable)"; + } + } + + lock.readLock().lock(); + try { + List<String> info = new ArrayList<>(); + + info.add(tableName); + String type = "table"; + info.add(type); + + String id = String.valueOf(tableId); + info.add(id); + info.add(dropped); + String binlogLength = String.valueOf(binlogs.size()); + info.add(binlogLength); + String firstBinlogCommittedTime = null; + String readableFirstBinlogCommittedTime = null; + for (TBinlog binlog : binlogs) { + long timestamp = binlog.getTimestamp(); + if (timestamp != -1) { + firstBinlogCommittedTime = String.valueOf(timestamp); + readableFirstBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp); + break; + } + } + info.add(firstBinlogCommittedTime); + info.add(readableFirstBinlogCommittedTime); + String lastBinlogCommittedTime = null; + String readableLastBinlogCommittedTime = null; + Iterator<TBinlog> iterator = binlogs.descendingIterator(); + while (iterator.hasNext()) { + TBinlog binlog = iterator.next(); + long timestamp = binlog.getTimestamp(); + if (timestamp != -1) { + lastBinlogCommittedTime = String.valueOf(timestamp); + readableLastBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp); + break; + } + } + info.add(lastBinlogCommittedTime); + info.add(readableLastBinlogCommittedTime); + String binlogTtlSeconds = null; + if (binlogConfig != null) { + binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds()); + } + info.add(binlogTtlSeconds); + + result.addRow(info); + } finally { + lock.readLock().unlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java new file mode 100644 index 00000000000..f72ed30d2b6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.binlog.BinlogManager; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; + + +public class BinlogProcDir implements ProcDirInterface { + @Override + public boolean register(String name, ProcNodeInterface node) { + return false; + } + + @Override + public ProcNodeInterface lookup(String name) throws AnalysisException { + throw new AnalysisException("not implemented"); + } + + @Override + public ProcResult fetchResult() throws AnalysisException { + BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager(); + if (binlogManager == null) { + throw new AnalysisException("binlog manager is not initialized"); + } + + return binlogManager.getBinlogInfo(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java index e54ee4d5d11..42010ccbd20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java @@ -58,6 +58,7 @@ public final class ProcService { root.register("colocation_group", new ColocationGroupProcDir()); root.register("bdbje", new BDBJEProcDir()); root.register("diagnose", new DiagnoseProcDir()); + root.register("binlog", new BinlogProcDir()); } // 通过指定的路径获得对应的PROC Node --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
