This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 94b97261382 [fix](ccr) handle large binlog and download failure
(#30429)
94b97261382 is described below
commit 94b972613821e5498fab4abc9bf430b33b983a2f
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sat Jan 27 16:09:32 2024 +0800
[fix](ccr) handle large binlog and download failure (#30429)
---
be/src/agent/task_worker_pool.cpp | 4 ++--
.../src/main/java/org/apache/doris/common/Config.java | 5 +++++
.../main/java/org/apache/doris/binlog/BinlogManager.java | 14 +++++++++++++-
3 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 9716a65865c..1e16af1b8e9 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -850,8 +850,8 @@ void TaskWorkerPool::_download_worker_thread_callback() {
auto status = Status::OK();
if (download_request.__isset.remote_tablet_snapshots) {
SnapshotLoader loader(_env, download_request.job_id,
agent_task_req.signature);
-
loader.remote_http_download(download_request.remote_tablet_snapshots,
- &downloaded_tablet_ids);
+ status =
loader.remote_http_download(download_request.remote_tablet_snapshots,
+ &downloaded_tablet_ids);
} else {
std::unique_ptr<SnapshotLoader> loader =
std::make_unique<SnapshotLoader>(
_env, download_request.job_id, agent_task_req.signature,
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 36dcfc1f8d0..2da574eb844 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2106,6 +2106,11 @@ public class Config extends ConfigBase {
"Whether to enable binlog feature"})
public static boolean enable_feature_binlog = false;
+ @ConfField(mutable = false, masterOnly = false, expType =
ExperimentalType.EXPERIMENTAL, description = {
+ "设置 binlog 消息最字节长度",
+ "Set the maximum byte length of binlog message"})
+ public static int max_binlog_messsage_size = 1024 * 1024 * 1024;
+
@ConfField(mutable = true, masterOnly = true, description = {
"是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
"Whether to disable creating catalog with WITH RESOURCE
statement."})
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index b819d8444eb..6ac3ba3b3a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -457,10 +457,22 @@ public class BinlogManager {
int length = dis.readInt();
byte[] data = new byte[length];
dis.readFully(data);
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
+ Boolean isLargeBinlog = length > 8 * 1024 * 1024;
+ if (isLargeBinlog) {
+ LOG.info("a large binlog length {}", length);
+ }
+
+ TMemoryInputTransport transport = new TMemoryInputTransport();
+
transport.getConfiguration().setMaxMessageSize(Config.max_binlog_messsage_size);
+ transport.reset(data);
+
TBinaryProtocol protocol = new TBinaryProtocol(transport);
TBinlog binlog = new TBinlog();
binlog.read(protocol);
+
+ if (isLargeBinlog) {
+ LOG.info("a large binlog length {} type {}", length, binlog.type);
+ }
return binlog;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]