This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dc9d092e4fc [feature](stats)Get partition update rows. (#34908)
dc9d092e4fc is described below
commit dc9d092e4fcb2b49ab8f6904a3da290b429f4788
Author: Jibing-Li <[email protected]>
AuthorDate: Fri May 17 11:21:59 2024 +0800
[feature](stats)Get partition update rows. (#34908)
To support partition level stats collection, we need to record partition
level update rows
for each load transaction. This pr is to support this function.
BE returns tabeltId -> updateRows map to FE, FE combine the row count of
each tablet
belongs to the same partition, and store the result to
TableStatsMeta.partitionUpdateRows.
Before, BE returns tableId -> updateRows map to FE, which doesn't contain
partition level info.
---
be/src/agent/task_worker_pool.cpp | 9 +-
be/src/olap/task/engine_publish_version_task.cpp | 8 +-
be/src/olap/task/engine_publish_version_task.h | 6 +-
.../apache/doris/datasource/InternalCatalog.java | 10 +-
.../java/org/apache/doris/master/MasterImpl.java | 4 +-
.../apache/doris/statistics/AnalysisManager.java | 123 ++++++++++++++++++---
.../apache/doris/statistics/TableStatsMeta.java | 3 +
.../apache/doris/statistics/UpdateRowsEvent.java | 52 ++++++++-
.../org/apache/doris/task/PublishVersionTask.java | 12 +-
.../doris/transaction/DatabaseTransactionMgr.java | 17 +--
.../doris/transaction/PublishVersionDaemon.java | 36 +++---
.../apache/doris/transaction/TransactionState.java | 14 +--
gensrc/thrift/MasterService.thrift | 1 +
13 files changed, 219 insertions(+), 76 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 3fb5f3dfcf2..37feaf18dcd 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1721,17 +1721,17 @@ void
PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
std::map<TTabletId, TVersion> succ_tablets;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>>
discontinuous_version_tablets;
- std::map<TTableId, int64_t> table_id_to_num_delta_rows;
+ std::map<TTableId, std::map<TTabletId, int64_t>>
table_id_to_tablet_id_to_num_delta_rows;
uint32_t retry_time = 0;
Status status;
constexpr uint32_t PUBLISH_VERSION_MAX_RETRY = 3;
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
succ_tablets.clear();
error_tablet_ids.clear();
- table_id_to_num_delta_rows.clear();
+ table_id_to_tablet_id_to_num_delta_rows.clear();
EnginePublishVersionTask engine_task(_engine, publish_version_req,
&error_tablet_ids,
&succ_tablets,
&discontinuous_version_tablets,
- &table_id_to_num_delta_rows);
+
&table_id_to_tablet_id_to_num_delta_rows);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
status = engine_task.execute();
if (status.ok()) {
@@ -1834,7 +1834,8 @@ void
PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
finish_task_request.__set_succ_tablets(succ_tablets);
finish_task_request.__set_error_tablet_ids(
std::vector<TTabletId>(error_tablet_ids.begin(),
error_tablet_ids.end()));
-
finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows);
+ finish_task_request.__set_table_id_to_tablet_id_to_delta_num_rows(
+ table_id_to_tablet_id_to_num_delta_rows);
finish_task(finish_task_request);
remove_task_info(req.task_type, req.signature);
}
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 41d28a6124b..acdcebae165 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -75,13 +75,13 @@ EnginePublishVersionTask::EnginePublishVersionTask(
StorageEngine& engine, const TPublishVersionRequest&
publish_version_req,
std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>*
succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>*
discontinuous_version_tablets,
- std::map<TTableId, int64_t>* table_id_to_num_delta_rows)
+ std::map<TTableId, std::map<TTabletId, int64_t>>*
table_id_to_tablet_id_to_num_delta_rows)
: _engine(engine),
_publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets),
- _table_id_to_num_delta_rows(table_id_to_num_delta_rows) {
+
_table_id_to_tablet_id_to_num_delta_rows(table_id_to_tablet_id_to_num_delta_rows)
{
_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"TabletPublishTxnTask");
}
@@ -340,7 +340,9 @@ void
EnginePublishVersionTask::_calculate_tbl_num_delta_rows(
continue;
}
auto table_id = tablet->get_table_id();
- (*_table_id_to_num_delta_rows)[table_id] += kv.second;
+ if (kv.second > 0) {
+ (*_table_id_to_tablet_id_to_num_delta_rows)[table_id][kv.first] +=
kv.second;
+ }
}
}
diff --git a/be/src/olap/task/engine_publish_version_task.h
b/be/src/olap/task/engine_publish_version_task.h
index e4824176368..761c9358cd9 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -92,7 +92,8 @@ public:
StorageEngine& engine, const TPublishVersionRequest&
publish_version_req,
std::set<TTabletId>* error_tablet_ids, std::map<TTabletId,
TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>*
discontinous_version_tablets,
- std::map<TTableId, int64_t>* table_id_to_num_delta_rows);
+ std::map<TTableId, std::map<TTabletId, int64_t>>*
+ table_id_to_tablet_id_to_num_delta_rows);
~EnginePublishVersionTask() override = default;
Status execute() override;
@@ -109,7 +110,8 @@ private:
std::set<TTabletId>* _error_tablet_ids = nullptr;
std::map<TTabletId, TVersion>* _succ_tablets;
std::vector<std::tuple<int64_t, int64_t, int64_t>>*
_discontinuous_version_tablets = nullptr;
- std::map<TTableId, int64_t>* _table_id_to_num_delta_rows = nullptr;
+ std::map<TTableId, std::map<TTabletId, int64_t>>*
_table_id_to_tablet_id_to_num_delta_rows =
+ nullptr;
};
class AsyncTabletPublishTask {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 2dc88d6df59..e2d84594547 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3204,7 +3204,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
Database db = (Database) getDbOrDdlException(dbTbl.getDb());
OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl());
- long rowsToTruncate = 0;
+ HashMap<Long, Long> updateRecords = new HashMap<>();
BinlogConfig binlogConfig;
olapTable.readLock();
@@ -3223,7 +3223,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
origPartitions.put(partName, partition.getId());
partitionsDistributionInfo.put(partition.getId(),
partition.getDistributionInfo());
- rowsToTruncate += partition.getBaseIndex().getRowCount();
+ updateRecords.put(partition.getId(),
partition.getBaseIndex().getRowCount());
}
} else {
for (Partition partition : olapTable.getPartitions()) {
@@ -3234,7 +3234,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
origPartitions.put(partition.getName(), partition.getId());
partitionsDistributionInfo.put(partition.getId(),
partition.getDistributionInfo());
- rowsToTruncate += partition.getBaseIndex().getRowCount();
+ updateRecords.put(partition.getId(),
partition.getBaseIndex().getRowCount());
}
}
// if table currently has no partitions, this sql like empty
command and do nothing, should return directly.
@@ -3390,13 +3390,11 @@ public class InternalCatalog implements
CatalogIf<Database> {
erasePartitionDropBackendReplicas(oldPartitions);
- HashMap<Long, Long> updateRecords = new HashMap<>();
- updateRecords.put(olapTable.getId(), rowsToTruncate);
if (truncateEntireTable) {
// Drop the whole table stats after truncate the entire table
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
}
-
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords);
+
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords,
db.getId(), olapTable.getId());
LOG.info("finished to truncate table {}, partitions: {}",
tblRef.getName().toSql(), tblRef.getPartitionNames());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index e9c850d0566..8be450404a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -511,8 +511,8 @@ public class MasterImpl {
// not remove the task from queue and be will retry
return;
}
- if (request.isSetTableIdToDeltaNumRows()) {
-
publishVersionTask.setTableIdToDeltaNumRows(request.getTableIdToDeltaNumRows());
+ if (request.isSetTableIdToTabletIdToDeltaNumRows()) {
+
publishVersionTask.setTableIdTabletsDeltaRows(request.getTableIdToTabletIdToDeltaNumRows());
}
AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
publishVersionTask.getTaskType(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 0d4d18bc682..57109400ee7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -30,9 +30,13 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.View;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@@ -46,6 +50,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.Slot;
@@ -86,6 +91,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -990,17 +996,33 @@ public class AnalysisManager implements Writable {
}
// Invoke this when load transaction finished.
- public void updateUpdatedRows(Map<Long, Long> records) {
- if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() ||
records == null || records.isEmpty()) {
+ public void updateUpdatedRows(Map<Long, Map<Long, Long>> tabletRecords,
long dbId) {
+ if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
return;
}
- for (Entry<Long, Long> record : records.entrySet()) {
- TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
- if (statsStatus != null) {
- statsStatus.updatedRows.addAndGet(record.getValue());
- }
+ UpdateRowsEvent updateRowsEvent = new UpdateRowsEvent(tabletRecords,
dbId);
+ replayUpdateRowsRecord(updateRowsEvent);
+ logUpdateRowsRecord(updateRowsEvent);
+ }
+
+ // Invoke this when load truncate table finished.
+ public void updateUpdatedRows(Map<Long, Long> partitionToUpdateRows, long
dbId, long tableId) {
+ if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
+ return;
+ }
+ UpdateRowsEvent updateRowsEvent = new
UpdateRowsEvent(partitionToUpdateRows, dbId, tableId);
+ replayUpdateRowsRecord(updateRowsEvent);
+ logUpdateRowsRecord(updateRowsEvent);
+ }
+
+ // Invoke this for cloud version load.
+ public void updateUpdatedRows(Map<Long, Long> updatedRows) {
+ if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
+ return;
}
- logUpdateRowsRecord(new UpdateRowsEvent(records));
+ UpdateRowsEvent updateRowsEvent = new UpdateRowsEvent(updatedRows);
+ replayUpdateRowsRecord(updateRowsEvent);
+ logUpdateRowsRecord(updateRowsEvent);
}
// Set to true means new partition loaded data
@@ -1039,13 +1061,86 @@ public class AnalysisManager implements Writable {
}
public void replayUpdateRowsRecord(UpdateRowsEvent event) {
- if (event == null || event.getRecords() == null) {
+ // For older version compatible.
+ InternalCatalog catalog = Env.getCurrentInternalCatalog();
+ if (event.getRecords() != null) {
+ for (Entry<Long, Long> record : event.getRecords().entrySet()) {
+ TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
+ if (statsStatus != null) {
+ statsStatus.updatedRows.addAndGet(record.getValue());
+ }
+ }
return;
}
- for (Entry<Long, Long> record : event.getRecords().entrySet()) {
- TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
+
+ // Record : TableId -> (TabletId -> update rows)
+ if (event.getTabletRecords() != null) {
+ for (Entry<Long, Map<Long, Long>> record :
event.getTabletRecords().entrySet()) {
+ TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
+ if (statsStatus != null) {
+ Table table =
catalog.getDb(event.getDbId()).get().getTable(record.getKey()).get();
+ if (!(table instanceof OlapTable)) {
+ continue;
+ }
+ OlapTable olapTable = (OlapTable) table;
+ short replicaNum =
olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum();
+ Map<Long, Long> tabletRows = record.getValue();
+ long tableUpdateRows = 0;
+ for (Entry<Long, Long> entry : tabletRows.entrySet()) {
+ tableUpdateRows += entry.getValue() / replicaNum;
+ }
+ statsStatus.updatedRows.addAndGet(tableUpdateRows);
+ if (StatisticsUtil.enablePartitionAnalyze()) {
+ updatePartitionRows(olapTable, tabletRows,
statsStatus, replicaNum);
+ }
+ }
+ }
+ return;
+ }
+
+ // Handle truncate table
+ if (event.getPartitionToUpdateRows() != null && event.getTableId() >
0) {
+ Map<Long, Long> partRows = event.getPartitionToUpdateRows();
+ long totalRows = partRows.values().stream().mapToLong(rows ->
rows).sum();
+ TableStatsMeta statsStatus = idToTblStats.get(event.getTableId());
if (statsStatus != null) {
- statsStatus.updatedRows.addAndGet(record.getValue());
+ statsStatus.updatedRows.addAndGet(totalRows);
+ if (StatisticsUtil.enablePartitionAnalyze()) {
+ for (Entry<Long, Long> entry : partRows.entrySet()) {
+
statsStatus.partitionUpdateRows.computeIfPresent(entry.getKey(),
+ (id, rows) -> rows += entry.getValue());
+
statsStatus.partitionUpdateRows.putIfAbsent(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+ }
+
+ protected void updatePartitionRows(OlapTable table, Map<Long, Long>
originTabletToRows,
+ TableStatsMeta statsStatus, short
replicaNum) {
+ List<Partition> partitions = table.getPartitions().stream().sorted(
+
Comparator.comparing(Partition::getVisibleVersionTime).reversed()).collect(Collectors.toList());
+ Map<Long, Long> tabletToRows = new HashMap<>(originTabletToRows);
+ int tabletCount = tabletToRows.size();
+ for (Partition p : partitions) {
+ MaterializedIndex baseIndex = p.getBaseIndex();
+ Iterator<Entry<Long, Long>> iterator =
tabletToRows.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<Long, Long> entry = iterator.next();
+ long tabletId = entry.getKey();
+ Tablet tablet = baseIndex.getTablet(tabletId);
+ if (tablet == null) {
+ continue;
+ }
+ long tabletRows = entry.getValue();
+ statsStatus.partitionUpdateRows.computeIfPresent(p.getId(),
+ (id, rows) -> rows += tabletRows / replicaNum);
+ statsStatus.partitionUpdateRows.putIfAbsent(p.getId(),
tabletRows / replicaNum);
+ iterator.remove();
+ tabletCount--;
+ }
+ if (tabletCount <= 0) {
+ break;
}
}
}
@@ -1079,10 +1174,6 @@ public class AnalysisManager implements Writable {
return tableStats.findColumnStatsMeta(indexName, colName);
}
- public AnalysisJob findJob(long id) {
- return idToAnalysisJob.get(id);
- }
-
public AnalysisInfo findJobInfo(long id) {
return analysisJobInfoMap.get(id);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
index 02ec6abf705..3aa7decfed3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
@@ -79,6 +79,9 @@ public class TableStatsMeta implements Writable {
@SerializedName("userInjected")
public boolean userInjected;
+ @SerializedName("pur")
+ public ConcurrentMap<Long, Long> partitionUpdateRows = new
ConcurrentHashMap<>();
+
@VisibleForTesting
public TableStatsMeta() {
tblId = 0;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
index 8cce3d29391..c9a0b9d820d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
@@ -31,16 +31,66 @@ import java.util.Map;
public class UpdateRowsEvent implements Writable {
@SerializedName("records")
- private Map<Long, Long> records;
+ private final Map<Long, Long> records;
+
+ @SerializedName("tr")
+ private final Map<Long, Map<Long, Long>> tabletRecords;
+
+ @SerializedName("dbId")
+ private final long dbId;
+
+ @SerializedName("pur")
+ private final Map<Long, Long> partitionToUpdateRows;
+
+ @SerializedName("tableId")
+ private final long tableId;
public UpdateRowsEvent(Map<Long, Long> records) {
this.records = records;
+ this.tabletRecords = null;
+ this.dbId = -1;
+ this.partitionToUpdateRows = null;
+ this.tableId = -1;
+ }
+
+ public UpdateRowsEvent(Map<Long, Map<Long, Long>> tabletRecords, long
dbId) {
+ this.records = null;
+ this.tabletRecords = tabletRecords;
+ this.dbId = dbId;
+ this.partitionToUpdateRows = null;
+ this.tableId = -1;
+ }
+
+ public UpdateRowsEvent(Map<Long, Long> partitionToUpdateRows, long dbId,
long tableId) {
+ this.records = null;
+ this.tabletRecords = null;
+ this.dbId = dbId;
+ this.partitionToUpdateRows = partitionToUpdateRows;
+ this.tableId = tableId;
}
+ // TableId -> table update rows
public Map<Long, Long> getRecords() {
return records;
}
+ // TableId -> (TabletId -> tablet update rows)
+ public Map<Long, Map<Long, Long>> getTabletRecords() {
+ return tabletRecords;
+ }
+
+ public long getDbId() {
+ return dbId;
+ }
+
+ public Map<Long, Long> getPartitionToUpdateRows() {
+ return partitionToUpdateRows;
+ }
+
+ public long getTableId() {
+ return tableId;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
index 222cf0fd78e..0cde6de539a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
@@ -50,9 +50,9 @@ public class PublishVersionTask extends AgentTask {
private Map<Long, Long> succTablets;
/**
- * To collect loaded rows for each table from each BE
+ * To collect loaded rows for each tablet from each BE
*/
- private final Map<Long, Long> tableIdToDeltaNumRows = Maps.newHashMap();
+ private final Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows =
Maps.newHashMap();
public PublishVersionTask(long backendId, long transactionId, long dbId,
List<TPartitionVersionInfo> partitionVersionInfos, long
createTime) {
@@ -99,11 +99,11 @@ public class PublishVersionTask extends AgentTask {
this.errorTablets.addAll(errorTablets);
}
- public void setTableIdToDeltaNumRows(Map<Long, Long>
tabletIdToDeltaNumRows) {
- this.tableIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows);
+ public void setTableIdTabletsDeltaRows(Map<Long, Map<Long, Long>>
tableIdToTabletDeltaRows) {
+ this.tableIdToTabletDeltaRows.putAll(tableIdToTabletDeltaRows);
}
- public Map<Long, Long> getTableIdToDeltaNumRows() {
- return tableIdToDeltaNumRows;
+ public Map<Long, Map<Long, Long>> getTableIdToTabletDeltaRows() {
+ return tableIdToTabletDeltaRows;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index e0383e6f7c0..0f709bec24f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -2263,23 +2263,8 @@ public class DatabaseTransactionMgr {
long tableVersionTime = tableCommitInfo.getVersionTime();
table.updateVisibleVersionAndTime(tableVersion, tableVersionTime);
}
- Map<Long, Long> tableIdToTotalNumDeltaRows =
transactionState.getTableIdToTotalNumDeltaRows();
- Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
- tableIdToTotalNumDeltaRows
- .forEach((tableId, numRows) -> {
- OlapTable table = (OlapTable)
db.getTableNullable(tableId);
- if (table != null) {
- short replicaNum = table.getTableProperty()
- .getReplicaAllocation()
- .getTotalReplicaNum();
- tableIdToNumDeltaRows.put(tableId, numRows /
replicaNum);
- }
- });
- if (LOG.isDebugEnabled()) {
- LOG.debug("table id to loaded rows:{}", tableIdToNumDeltaRows);
- }
analysisManager.setNewPartitionLoaded(newPartitionLoadedTableIds);
- analysisManager.updateUpdatedRows(tableIdToNumDeltaRows);
+
analysisManager.updateUpdatedRows(transactionState.getTableIdToTabletDeltaRows(),
db.getId());
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index f4814965473..c9dd3dd258a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -151,24 +151,16 @@ public class PublishVersionDaemon extends MasterDaemon {
private void tryFinishTxn(List<TransactionState> readyTransactionStates,
SystemInfoService infoService,
GlobalTransactionMgrIface globalTransactionMgr,
Map<Long, Long> partitionVisibleVersions,
Map<Long, Set<Long>> backendPartitions) {
- Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
+ Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows =
Maps.newHashMap();
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
AtomicBoolean hasBackendAliveAndUnfinishedTask = new
AtomicBoolean(false);
Set<Long> notFinishTaskBe = Sets.newHashSet();
- transactionState.getPublishVersionTasks().entrySet().forEach(entry
-> {
- long beId = entry.getKey();
- List<PublishVersionTask> tasks = entry.getValue();
+ transactionState.getPublishVersionTasks().forEach((key, tasks) -> {
+ long beId = key;
for (PublishVersionTask task : tasks) {
if (task.isFinished()) {
- if (CollectionUtils.isEmpty(task.getErrorTablets())) {
- Map<Long, Long> tableIdToDeltaNumRows =
task.getTableIdToDeltaNumRows();
- tableIdToDeltaNumRows.forEach((tableId, numRows)
-> {
- tableIdToTotalDeltaNumRows
- .computeIfPresent(tableId, (id,
orgNumRows) -> orgNumRows + numRows);
-
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
- });
- }
+ calculateTaskUpdateRows(tableIdToTabletDeltaRows,
task);
} else {
if
(infoService.checkBackendAlive(task.getBackendId())) {
hasBackendAliveAndUnfinishedTask.set(true);
@@ -178,7 +170,7 @@ public class PublishVersionDaemon extends MasterDaemon {
}
});
-
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
+
transactionState.setTableIdToTabletDeltaRows(tableIdToTabletDeltaRows);
if (LOG.isDebugEnabled()) {
LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe,
transactionState);
}
@@ -236,6 +228,24 @@ public class PublishVersionDaemon extends MasterDaemon {
} // end for readyTransactionStates
}
+ // Merge task tablets update rows to tableToTabletsDelta.
+ private void calculateTaskUpdateRows(Map<Long, Map<Long, Long>>
tableIdToTabletDeltaRows, PublishVersionTask task) {
+ if (CollectionUtils.isEmpty(task.getErrorTablets())) {
+ for (Entry<Long, Map<Long, Long>> tableEntry :
task.getTableIdToTabletDeltaRows().entrySet()) {
+ if (tableIdToTabletDeltaRows.containsKey(tableEntry.getKey()))
{
+ Map<Long, Long> tabletsDelta =
tableIdToTabletDeltaRows.get(tableEntry.getKey());
+ for (Entry<Long, Long> tabletEntry :
tableEntry.getValue().entrySet()) {
+ tabletsDelta.computeIfPresent(tabletEntry.getKey(),
+ (tabletId, origRows) -> origRows +
tabletEntry.getValue());
+ tabletsDelta.putIfAbsent(tabletEntry.getKey(),
tabletEntry.getValue());
+ }
+ } else {
+ tableIdToTabletDeltaRows.put(tableEntry.getKey(),
tableEntry.getValue());
+ }
+ }
+ }
+ }
+
private Map<Long, Set<Long>> getBaseTabletIdsForEachBe(TransactionState
transactionState,
TableCommitInfo tableCommitInfo) throws MetaNotFoundException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 257226b4c8a..e12eeb0c594 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -272,9 +272,9 @@ public class TransactionState implements Writable {
private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap();
/**
- * the value is the num delta rows of all replicas in each table
+ * the value is the num delta rows of all replicas in each tablet
*/
- private final Map<Long, Long> tableIdToTotalNumDeltaRows =
Maps.newHashMap();
+ private final Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows =
Maps.newHashMap();
private String errorLogUrl = null;
@@ -785,12 +785,12 @@ public class TransactionState implements Writable {
}
}
- public Map<Long, Long> getTableIdToTotalNumDeltaRows() {
- return tableIdToTotalNumDeltaRows;
+ public Map<Long, Map<Long, Long>> getTableIdToTabletDeltaRows() {
+ return tableIdToTabletDeltaRows;
}
- public void setTableIdToTotalNumDeltaRows(Map<Long, Long>
tableIdToTotalNumDeltaRows) {
- this.tableIdToTotalNumDeltaRows.putAll(tableIdToTotalNumDeltaRows);
+ public void setTableIdToTabletDeltaRows(Map<Long, Map<Long, Long>>
tableIdToTabletDeltaRows) {
+ this.tableIdToTabletDeltaRows.putAll(tableIdToTabletDeltaRows);
}
public void setErrorMsg(String errMsg) {
@@ -808,7 +808,7 @@ public class TransactionState implements Writable {
// reduce memory
public void pruneAfterVisible() {
publishVersionTasks.clear();
- tableIdToTotalNumDeltaRows.clear();
+ tableIdToTabletDeltaRows.clear();
// TODO if subTransactionStates can be cleared?
}
diff --git a/gensrc/thrift/MasterService.thrift
b/gensrc/thrift/MasterService.thrift
index ec647dbcf92..1db7a109f55 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -71,6 +71,7 @@ struct TFinishTaskRequest {
16: optional i64 copy_time_ms
17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
18: optional map<i64, i64> table_id_to_delta_num_rows
+ 19: optional map<i64, map<i64, i64>>
table_id_to_tablet_id_to_delta_num_rows
}
struct TTablet {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]