This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new ab7ecb7740e [pick](statistics) pick loaded rows statistics to 2.0
(#25531)
ab7ecb7740e is described below
commit ab7ecb7740eb19f88a99dcd16263b4a68c54ae3b
Author: Siyang Tang <[email protected]>
AuthorDate: Fri Oct 20 13:35:55 2023 +0800
[pick](statistics) pick loaded rows statistics to 2.0 (#25531)
* [feature](load) collect loaded rows on table level after txn published
(#24346)
As title.
Stream load 20 lines
```
2023-09-14 11:40:04,186 DEBUG (PUBLISH_VERSION|23)
[DatabaseTransactionMgr.updateCatalogAfterVisible():1769] table id to loaded
rows:{51016=20}
```
```
mysql> select count(*) from dup_tbl_basic;
+----------+
| count(*) |
+----------+
| 20 |
+----------+
1 row in set (0.05 sec)
```
* [enhancement](statistics) collect table level loaded rows on BE to make
RPC light weight (#24609)
* [fix](statistics) use replication_num as replica num (#25325)
---
be/src/agent/task_worker_pool.cpp | 7 ++++--
be/src/olap/task/engine_publish_version_task.cpp | 25 ++++++++++++++++++----
be/src/olap/task/engine_publish_version_task.h | 11 +++++++---
.../java/org/apache/doris/master/MasterImpl.java | 3 +++
.../org/apache/doris/task/PublishVersionTask.java | 14 ++++++++++++
.../doris/transaction/DatabaseTransactionMgr.java | 16 ++++++++++++++
.../doris/transaction/PublishVersionDaemon.java | 25 ++++++++++++++++++++--
.../apache/doris/transaction/TransactionState.java | 13 +++++++++++
gensrc/thrift/MasterService.thrift | 1 +
9 files changed, 104 insertions(+), 11 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index b288731d09a..09c46461aca 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1528,14 +1528,17 @@ void
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
std::map<TTabletId, TVersion> succ_tablets;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>>
discontinuous_version_tablets;
+ std::map<TTableId, int64_t> table_id_to_num_delta_rows;
uint32_t retry_time = 0;
Status status;
bool is_task_timeout = false;
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
succ_tablets.clear();
error_tablet_ids.clear();
+ table_id_to_num_delta_rows.clear();
EnginePublishVersionTask engine_task(publish_version_req,
&error_tablet_ids,
- &succ_tablets,
&discontinuous_version_tablets);
+ &succ_tablets,
&discontinuous_version_tablets,
+ &table_id_to_num_delta_rows);
status = StorageEngine::instance()->execute_task(&engine_task);
if (status.ok()) {
break;
@@ -1620,7 +1623,7 @@ void
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
finish_task_request.__set_succ_tablets(succ_tablets);
finish_task_request.__set_error_tablet_ids(
std::vector<TTabletId>(error_tablet_ids.begin(),
error_tablet_ids.end()));
-
+
finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 24593c40bad..702c4386f11 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -29,6 +29,7 @@
#include <set>
#include <shared_mutex>
#include <string>
+#include <unordered_map>
#include <utility>
#include "common/logging.h"
@@ -72,11 +73,13 @@ void TabletPublishStatistics::record_in_bvar() {
EnginePublishVersionTask::EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req,
std::set<TTabletId>* error_tablet_ids,
std::map<TTabletId, TVersion>* succ_tablets,
- std::vector<std::tuple<int64_t, int64_t, int64_t>>*
discontinuous_version_tablets)
+ std::vector<std::tuple<int64_t, int64_t, int64_t>>*
discontinuous_version_tablets,
+ std::map<TTableId, int64_t>* table_id_to_num_delta_rows)
: _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablets(succ_tablets),
- _discontinuous_version_tablets(discontinuous_version_tablets) {}
+ _discontinuous_version_tablets(discontinuous_version_tablets),
+ _table_id_to_num_delta_rows(table_id_to_num_delta_rows) {}
void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
@@ -91,7 +94,7 @@ Status EnginePublishVersionTask::finish() {
std::unique_ptr<ThreadPoolToken> token =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);
-
+ std::unordered_map<int64_t, int64_t> tablet_id_to_num_delta_rows;
// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
int64_t partition_id = par_ver_info.partition_id;
@@ -188,6 +191,11 @@ Status EnginePublishVersionTask::finish() {
continue;
}
}
+
+ auto rowset_meta_ptr = rowset->rowset_meta();
+ tablet_id_to_num_delta_rows.insert(
+ {rowset_meta_ptr->tablet_id(),
rowset_meta_ptr->num_rows()});
+
auto tablet_publish_txn_ptr =
std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id,
version, tablet_info);
auto submit_st = token->submit_func([=]() {
tablet_publish_txn_ptr->handle(); });
@@ -204,7 +212,6 @@ Status EnginePublishVersionTask::finish() {
std::set<TabletInfo> partition_related_tablet_infos;
StorageEngine::instance()->tablet_manager()->get_partition_related_tablets(
partition_id, &partition_related_tablet_infos);
-
Version version(par_ver_info.version, par_ver_info.version);
for (auto& tablet_info : partition_related_tablet_infos) {
TabletSharedPtr tablet =
@@ -241,6 +248,7 @@ Status EnginePublishVersionTask::finish() {
}
}
}
+ _calculate_tbl_num_delta_rows(tablet_id_to_num_delta_rows);
if (!res.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
LOG(INFO) << "finish to publish version on transaction."
@@ -252,6 +260,15 @@ Status EnginePublishVersionTask::finish() {
return res;
}
+void EnginePublishVersionTask::_calculate_tbl_num_delta_rows(
+ const std::unordered_map<int64_t, int64_t>&
tablet_id_to_num_delta_rows) {
+ for (const auto& kv : tablet_id_to_num_delta_rows) {
+ auto table_id =
+
StorageEngine::instance()->tablet_manager()->get_tablet(kv.first)->get_table_id();
+ (*_table_id_to_num_delta_rows)[table_id] += kv.second;
+ }
+}
+
TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask*
engine_task,
TabletSharedPtr tablet,
RowsetSharedPtr rowset,
int64_t partition_id, int64_t
transaction_id,
diff --git a/be/src/olap/task/engine_publish_version_task.h
b/be/src/olap/task/engine_publish_version_task.h
index 0a270c93d2a..8f3790574a2 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -87,21 +87,26 @@ public:
EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req,
std::set<TTabletId>* error_tablet_ids, std::map<TTabletId,
TVersion>* succ_tablets,
- std::vector<std::tuple<int64_t, int64_t, int64_t>>*
discontinous_version_tablets);
- ~EnginePublishVersionTask() {}
+ std::vector<std::tuple<int64_t, int64_t, int64_t>>*
discontinous_version_tablets,
+ std::map<TTableId, int64_t>* table_id_to_num_delta_rows);
+ ~EnginePublishVersionTask() override = default;
- virtual Status finish() override;
+ Status finish() override;
void add_error_tablet_id(int64_t tablet_id);
int64_t finish_task();
private:
+ void _calculate_tbl_num_delta_rows(
+ const std::unordered_map<int64_t, int64_t>&
tablet_id_to_num_delta_rows);
+
const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
std::set<TTabletId>* _error_tablet_ids;
std::map<TTabletId, TVersion>* _succ_tablets;
std::vector<std::tuple<int64_t, int64_t, int64_t>>*
_discontinuous_version_tablets;
+ std::map<TTableId, int64_t>* _table_id_to_num_delta_rows;
};
class AsyncTabletPublishTask {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 23118647cca..2833eff5f3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -491,6 +491,9 @@ public class MasterImpl {
// not remove the task from queue and be will retry
return;
}
+ if (request.isSetTableIdToDeltaNumRows()) {
+
publishVersionTask.setTableIdToDeltaNumRows(request.getTableIdToDeltaNumRows());
+ }
AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
publishVersionTask.getTaskType(),
publishVersionTask.getSignature());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
index 8461b1db4f5..74cff551b4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
@@ -21,6 +21,7 @@ import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TPublishVersionRequest;
import org.apache.doris.thrift.TTaskType;
+import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -38,6 +39,11 @@ public class PublishVersionTask extends AgentTask {
// tabletId => version, current version = 0
private Map<Long, Long> succTablets;
+ /**
+ * To collect loaded rows for each table from each BE
+ */
+ private final Map<Long, Long> tableIdToDeltaNumRows = Maps.newHashMap();
+
public PublishVersionTask(long backendId, long transactionId, long dbId,
List<TPartitionVersionInfo> partitionVersionInfos, long
createTime) {
super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L,
-1L, transactionId, createTime);
@@ -81,4 +87,12 @@ public class PublishVersionTask extends AgentTask {
}
this.errorTablets.addAll(errorTablets);
}
+
+ public void setTableIdToDeltaNumRows(Map<Long, Long>
tabletIdToDeltaNumRows) {
+ this.tableIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows);
+ }
+
+ public Map<Long, Long> getTableIdToDeltaNumRows() {
+ return tableIdToDeltaNumRows;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 7677c1684f6..b74f902358f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -51,6 +51,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.ClearTransactionTask;
@@ -1791,6 +1792,21 @@ public class DatabaseTransactionMgr {
}
}
}
+ AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
+ Map<Long, Long> tableIdToTotalNumDeltaRows =
transactionState.getTableIdToTotalNumDeltaRows();
+ Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
+ tableIdToTotalNumDeltaRows
+ .forEach((tableId, numRows) -> {
+ OlapTable table = (OlapTable)
db.getTableNullable(tableId);
+ if (table != null) {
+ short replicaNum = table.getTableProperty()
+ .getReplicaAllocation()
+ .getTotalReplicaNum();
+ tableIdToNumDeltaRows.put(tableId, numRows /
replicaNum);
+ }
+ });
+ LOG.debug("table id to loaded rows:{}", tableIdToNumDeltaRows);
+ tableIdToNumDeltaRows.forEach(analysisManager::updateUpdatedRows);
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 33ea8de07eb..747508d2118 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -29,13 +29,17 @@ import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TTaskType;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.stream.Stream;
public class PublishVersionDaemon extends MasterDaemon {
@@ -121,12 +125,29 @@ public class PublishVersionDaemon extends MasterDaemon {
AgentTaskExecutor.submit(batchTask);
}
+ Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
- boolean hasBackendAliveAndUnfinishTask =
transactionState.getPublishVersionTasks().values().stream()
+ Stream<PublishVersionTask> publishVersionTaskStream =
transactionState
+ .getPublishVersionTasks()
+ .values()
+ .stream()
+ .peek(task -> {
+ if (task.isFinished() &&
CollectionUtils.isEmpty(task.getErrorTablets())) {
+ Map<Long, Long> tableIdToDeltaNumRows =
+ task.getTableIdToDeltaNumRows();
+ tableIdToDeltaNumRows.forEach((tableId, numRows)
-> {
+ tableIdToDeltaNumRows
+ .computeIfPresent(tableId, (id,
orgNumRows) -> orgNumRows + numRows);
+ tableIdToNumDeltaRows.putIfAbsent(tableId,
numRows);
+ });
+ }
+ });
+ boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
.anyMatch(task -> !task.isFinished() &&
infoService.checkBackendAlive(task.getBackendId()));
+
transactionState.setTableIdToTotalNumDeltaRows(tableIdToNumDeltaRows);
- boolean shouldFinishTxn = !hasBackendAliveAndUnfinishTask ||
transactionState.isPublishTimeout();
+ boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask ||
transactionState.isPublishTimeout();
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other
transaction
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 897bc3b63b8..5d95917e58d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -253,6 +253,11 @@ public class TransactionState implements Writable {
// tbl id -> (index ids)
private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap();
+ /**
+ * the value is the num delta rows of all replicas in each table
+ */
+ private final Map<Long, Long> tableIdToTotalNumDeltaRows =
Maps.newHashMap();
+
private String errorLogUrl = null;
// record some error msgs during the transaction operation.
@@ -701,6 +706,14 @@ public class TransactionState implements Writable {
}
}
+ public Map<Long, Long> getTableIdToTotalNumDeltaRows() {
+ return tableIdToTotalNumDeltaRows;
+ }
+
+ public void setTableIdToTotalNumDeltaRows(Map<Long, Long>
tableIdToTotalNumDeltaRows) {
+ this.tableIdToTotalNumDeltaRows.putAll(tableIdToTotalNumDeltaRows);
+ }
+
public void setErrorMsg(String errMsg) {
this.errMsg = errMsg;
}
diff --git a/gensrc/thrift/MasterService.thrift
b/gensrc/thrift/MasterService.thrift
index dedc454d33f..9acd3f85f7b 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -66,6 +66,7 @@ struct TFinishTaskRequest {
15: optional i64 copy_size
16: optional i64 copy_time_ms
17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
+ 18: optional map<i64, i64> table_id_to_delta_num_rows
}
struct TTablet {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]