This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 461e6494860 [fix](ccr) handle large binlog (#30435)
461e6494860 is described below
commit 461e649486018755d145f156941bd802f0af7fa5
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sun Jan 28 17:52:39 2024 +0800
[fix](ccr) handle large binlog (#30435)
---
.../src/main/java/org/apache/doris/common/Config.java | 5 +++++
.../main/java/org/apache/doris/binlog/BinlogManager.java | 14 +++++++++++++-
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index bc849e24bd5..1231312cf5d 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2145,6 +2145,11 @@ public class Config extends ConfigBase {
"Whether to enable binlog feature"})
public static boolean enable_feature_binlog = false;
+ @ConfField(mutable = false, masterOnly = false, varType =
VariableAnnotation.EXPERIMENTAL, description = {
+ "设置 binlog 消息最字节长度",
+ "Set the maximum byte length of binlog message"})
+ public static int max_binlog_messsage_size = 1024 * 1024 * 1024;
+
@ConfField(mutable = true, masterOnly = true, description = {
"是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
"Whether to disable creating catalog with WITH RESOURCE
statement."})
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index b819d8444eb..6ac3ba3b3a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -457,10 +457,22 @@ public class BinlogManager {
int length = dis.readInt();
byte[] data = new byte[length];
dis.readFully(data);
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
+ Boolean isLargeBinlog = length > 8 * 1024 * 1024;
+ if (isLargeBinlog) {
+ LOG.info("a large binlog length {}", length);
+ }
+
+ TMemoryInputTransport transport = new TMemoryInputTransport();
+
transport.getConfiguration().setMaxMessageSize(Config.max_binlog_messsage_size);
+ transport.reset(data);
+
TBinaryProtocol protocol = new TBinaryProtocol(transport);
TBinlog binlog = new TBinlog();
binlog.read(protocol);
+
+ if (isLargeBinlog) {
+ LOG.info("a large binlog length {} type {}", length, binlog.type);
+ }
return binlog;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]