This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 145683ccdbc [improvement](group commit) make get column function more
reliable when replaying wal (#28900)
145683ccdbc is described below
commit 145683ccdbc9ad37f23b2a8b91888f7b24aaa688
Author: huanghaibin <[email protected]>
AuthorDate: Sun Dec 24 21:17:39 2023 +0800
[improvement](group commit) make get column function more reliable when
replaying wal (#28900)
---
be/src/olap/wal_table.cpp | 46 +++++++++++++---------
be/src/olap/wal_table.h | 1 +
.../apache/doris/service/FrontendServiceImpl.java | 38 +++++++++---------
gensrc/thrift/FrontendService.thrift | 7 +++-
4 files changed, 53 insertions(+), 39 deletions(-)
diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp
index bde0e8dd69d..7f98c410b07 100644
--- a/be/src/olap/wal_table.cpp
+++ b/be/src/olap/wal_table.cpp
@@ -204,7 +204,17 @@ Status WalTable::_replay_wal_internal(const std::string&
wal) {
if (!st.ok()) {
LOG(WARNING) << "abort txn " << wal_id << " fail";
}
- RETURN_IF_ERROR(_get_column_info(_db_id, _table_id));
+ auto get_st = _get_column_info(_db_id, _table_id);
+ if (!get_st.ok()) {
+ if (get_st.is<ErrorCode::NOT_FOUND>()) {
+ {
+ std::lock_guard<std::mutex> lock(_replay_wal_lock);
+ _replay_wal_map.erase(wal);
+ }
+ RETURN_IF_ERROR(_delete_wal(wal_id));
+ }
+ return get_st;
+ }
#endif
RETURN_IF_ERROR(_send_request(wal_id, wal, label));
return Status::OK();
@@ -354,8 +364,7 @@ Status WalTable::_send_request(int64_t wal_id, const
std::string& wal, const std
}
} else {
LOG(INFO) << "success to replay wal =" << wal;
- RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
-
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id,
wal_id));
+ RETURN_IF_ERROR(_delete_wal(wal_id));
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (_replay_wal_map.erase(wal)) {
LOG(INFO) << "erase " << wal << " from _replay_wal_map";
@@ -414,26 +423,21 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t
tb_id) {
[&request, &result](FrontendServiceConnection& client) {
client->getColumnInfo(result, request);
}));
- std::string columns_str = result.column_info;
- std::vector<std::string> column_element;
- doris::vectorized::WalReader::string_split(columns_str, ",",
column_element);
+ status = Status::create(result.status);
+ if (!status.ok()) {
+ return status;
+ }
+ std::vector<TColumnInfo> column_element = result.columns;
int64_t column_index = 1;
_column_id_name_map.clear();
_column_id_index_map.clear();
for (auto column : column_element) {
- auto pos = column.find(":");
- try {
- auto column_name = column.substr(0, pos);
- int64_t column_id = std::strtoll(column.substr(pos +
1).c_str(), NULL, 10);
- _column_id_name_map.emplace(column_id, column_name);
- _column_id_index_map.emplace(column_id, column_index);
- column_index++;
- } catch (const std::invalid_argument& e) {
- return Status::InvalidArgument("Invalid format, {}", e.what());
- }
+ auto column_name = column.columnName;
+ auto column_id = column.columnId;
+ _column_id_name_map.emplace(column_id, column_name);
+ _column_id_index_map.emplace(column_id, column_index);
+ column_index++;
}
-
- status = Status::create(result.status);
}
return status;
}
@@ -447,4 +451,10 @@ Status WalTable::_read_wal_header(const std::string&
wal_path, std::string& colu
return Status::OK();
}
+Status WalTable::_delete_wal(int64_t wal_id) {
+ RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
+ RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id,
wal_id));
+ return Status::OK();
+}
+
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal_table.h
index 354f4f16b05..e3d66d577a2 100644
--- a/be/src/olap/wal_table.h
+++ b/be/src/olap/wal_table.h
@@ -50,6 +50,7 @@ private:
Status _read_wal_header(const std::string& wal, std::string& columns);
bool _need_replay(const replay_wal_info& info);
Status _replay_wal_internal(const std::string& wal);
+ Status _delete_wal(int64_t wal_id);
private:
ExecEnv* _exec_env;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 4f88d22c836..75405854cab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -114,6 +114,7 @@ import org.apache.doris.thrift.TCheckAuthRequest;
import org.apache.doris.thrift.TCheckAuthResult;
import org.apache.doris.thrift.TColumnDef;
import org.apache.doris.thrift.TColumnDesc;
+import org.apache.doris.thrift.TColumnInfo;
import org.apache.doris.thrift.TCommitTxnRequest;
import org.apache.doris.thrift.TCommitTxnResult;
import org.apache.doris.thrift.TConfirmUnusedRemoteFilesRequest;
@@ -240,7 +241,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -3280,40 +3280,38 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
@Override
public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) {
TGetColumnInfoResult result = new TGetColumnInfoResult();
- TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
long dbId = request.getDbId();
long tableId = request.getTableId();
if (!Env.getCurrentEnv().isMaster()) {
- errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
- errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
+ status.setStatusCode(TStatusCode.NOT_MASTER);
+ status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to getColumnInfo: {}", NOT_MASTER_ERR_MSG);
return result;
}
- Database db =
Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
- errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d
is not exists", dbId)));
- result.setStatus(errorStatus);
+ status.setStatusCode(TStatusCode.NOT_FOUND);
+ status.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is
not exists", dbId)));
return result;
}
-
- Table table;
- try {
- table = db.getTable(tableId).get();
- } catch (NoSuchElementException e) {
- errorStatus.setErrorMsgs(
+ Table table = db.getTableNullable(tableId);
+ if (table == null) {
+ status.setStatusCode(TStatusCode.NOT_FOUND);
+ status.setErrorMsgs(
(Lists.newArrayList(String.format("dbId=%d tableId=%d is
not exists", dbId, tableId))));
- result.setStatus(errorStatus);
return result;
}
- StringBuilder sb = new StringBuilder();
+ List<TColumnInfo> columnsResult = Lists.newArrayList();
for (Column column : table.getBaseSchema(true)) {
- sb.append(column.getName() + ":" + column.getUniqueId() + ",");
+ final TColumnInfo info = new TColumnInfo();
+ info.setColumnName(column.getName());
+ info.setColumnId(column.getUniqueId());
+ columnsResult.add(info);
}
- String columnInfo = sb.toString();
- columnInfo = columnInfo.substring(0, columnInfo.length() - 1);
- result.setStatus(new TStatus(TStatusCode.OK));
- result.setColumnInfo(columnInfo);
+ result.setColumns(columnsResult);
return result;
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index d8a53d35662..d672597a0db 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1302,6 +1302,11 @@ struct TGetBackendMetaResult {
3: optional Types.TNetworkAddress master_address
}
+struct TColumnInfo {
+ 1: optional string columnName
+ 2: optional i64 columnId
+}
+
struct TGetColumnInfoRequest {
1: optional i64 db_id
2: optional i64 table_id
@@ -1309,7 +1314,7 @@ struct TGetColumnInfoRequest {
struct TGetColumnInfoResult {
1: optional Status.TStatus status
- 2: optional string column_info
+ 2: optional list<TColumnInfo> columns
}
service FrontendService {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]