This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fc1cf2b043d [Fix](compress) Fix occasional crushes when serializing
blocks (#32672)
fc1cf2b043d is described below
commit fc1cf2b043da9de5108d2e99ad3dba66e041100b
Author: HappenLee <[email protected]>
AuthorDate: Sat Mar 23 00:36:34 2024 +0800
[Fix](compress) Fix occasional crushes when serializing blocks (#32672)
---
be/src/vec/core/block.cpp | 4 +-
.../main/java/org/apache/doris/catalog/Env.java | 22 ++++---
.../java/org/apache/doris/qe/SessionVariable.java | 13 ++++
.../main/java/org/apache/doris/qe/VariableMgr.java | 71 ++--------------------
4 files changed, 33 insertions(+), 77 deletions(-)
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 047c7029a2e..c93bfb11f09 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -868,9 +868,9 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version());
}
*uncompressed_bytes = content_uncompressed_size;
- const size_t serialize_bytes = buf - column_values.data();
+ const size_t serialize_bytes = buf - column_values.data() +
STREAMVBYTE_PADDING;
*compressed_bytes = serialize_bytes;
- column_values.resize(serialize_bytes + STREAMVBYTE_PADDING);
+ column_values.resize(serialize_bytes);
// compress
if (compression_type != segment_v2::NO_COMPRESSION &&
content_uncompressed_size > 0) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 50c8b239193..3241a318715 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -229,6 +229,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.JournalObservable;
import org.apache.doris.qe.QueryCancelWorker;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
@@ -1433,6 +1434,7 @@ public class Env {
}
}
+ @SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"})
private void transferToMaster() {
// stop replayer
if (replayer != null) {
@@ -1500,24 +1502,24 @@ public class Env {
// because the default parallelism of pipeline engine is
higher than previous version.
// so set parallel_pipeline_task_num to
parallel_fragment_exec_instance_num
int newVal =
VariableMgr.newSessionVariable().parallelExecInstanceNum;
- VariableMgr.setGlobalPipelineTask(newVal);
- LOG.info("upgrade FE from 1.x to 2.0, set
parallel_pipeline_task_num "
- + "to parallel_fragment_exec_instance_num: {}",
newVal);
+ VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
+ String.valueOf(newVal));
// similar reason as above, need to upgrade broadcast scale
factor during 1.2 to 2.x
// if the default value has been upgraded
double newBcFactorVal =
VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
- VariableMgr.setGlobalBroadcastScaleFactor(newBcFactorVal);
- LOG.info("upgrade FE from 1.x to 2.x, set
broadcast_right_table_scale_factor "
- + "to new default value: {}", newBcFactorVal);
+ VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
+ SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
+ String.valueOf(newBcFactorVal));
// similar reason as above, need to upgrade
enable_nereids_planner to true
- VariableMgr.enableNereidsPlanner();
- LOG.info("upgrade FE from 1.x to 2.x, set
enable_nereids_planner to new default value: true");
+ VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.ENABLE_NEREIDS_PLANNER,
+ "true");
}
if (journalVersion <= FeMetaVersion.VERSION_123) {
- VariableMgr.enableNereidsDml();
- LOG.info("upgrade FE from 2.0 to 2.1, set enable_nereids_dml
to new default value: true");
+ VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.ENABLE_NEREIDS_DML, "true");
+ VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
+
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 25076e803f6..0ab3658ad89 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1757,6 +1757,7 @@ public class SessionVariable implements Serializable,
Writable {
this.enableFunctionPushdown = true;
this.enableDeleteSubPredicateV2 = true;
}
+
/*
switch (randomInt) {
case 0:
@@ -1807,6 +1808,18 @@ public class SessionVariable implements Serializable,
Writable {
} else {
this.enableFoldConstantByBe = true;
}
+
+ switch (Config.pull_request_id % 3) {
+ case 0:
+ this.fragmentTransmissionCompressionCodec = "snappy";
+ break;
+ case 1:
+ this.fragmentTransmissionCompressionCodec = "lz4";
+ break;
+ default:
+ this.fragmentTransmissionCompressionCodec = "none";
+ }
+
this.runtimeFilterType = 1 << randomInt;
this.enableParallelScan = Config.pull_request_id % 2 == 0 ?
randomInt % 2 == 0 : randomInt % 1 == 0;
switch (randomInt) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index e83fd474daf..6e75f17a042 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -389,84 +389,25 @@ public class VariableMgr {
}
}
- public static void setGlobalPipelineTask(int instance) {
+ public static void refreshDefaultSessionVariables(String versionMsg,
String sessionVar, String value) {
wlock.lock();
try {
- VarContext ctx =
ctxByVarName.get(SessionVariable.PARALLEL_PIPELINE_TASK_NUM);
+ VarContext ctx = ctxByVarName.get(sessionVar);
try {
- setValue(ctx.getObj(), new
SessionVariableField(ctx.getField()), String.valueOf(instance));
+ setValue(ctx.getObj(), new
SessionVariableField(ctx.getField()), value);
} catch (DdlException e) {
- LOG.warn("failed to set global variable: {}",
SessionVariable.PARALLEL_PIPELINE_TASK_NUM, e);
+ LOG.warn("failed to set global variable: {}", sessionVar, e);
return;
}
// write edit log
GlobalVarPersistInfo info = new
GlobalVarPersistInfo(defaultSessionVariable,
-
Lists.newArrayList(SessionVariable.PARALLEL_PIPELINE_TASK_NUM));
- Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
- } finally {
- wlock.unlock();
- }
- }
-
- public static void setGlobalBroadcastScaleFactor(double factor) {
- wlock.lock();
- try {
- VarContext ctx =
ctxByVarName.get(SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR);
- try {
- setValue(ctx.getObj(), new
SessionVariableField(ctx.getField()), String.valueOf(factor));
- } catch (DdlException e) {
- LOG.warn("failed to set global variable: {}",
SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR, e);
- return;
- }
-
- // write edit log
- GlobalVarPersistInfo info = new
GlobalVarPersistInfo(defaultSessionVariable,
-
Lists.newArrayList(SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR));
- Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
- } finally {
- wlock.unlock();
- }
- }
-
- public static void enableNereidsPlanner() {
- wlock.lock();
- try {
- VarContext ctx =
ctxByVarName.get(SessionVariable.ENABLE_NEREIDS_PLANNER);
- try {
- setValue(ctx.getObj(), new
SessionVariableField(ctx.getField()), String.valueOf(true));
- } catch (DdlException e) {
- LOG.warn("failed to set global variable: {}",
SessionVariable.ENABLE_NEREIDS_PLANNER, e);
- return;
- }
-
- // write edit log
- GlobalVarPersistInfo info = new
GlobalVarPersistInfo(defaultSessionVariable,
-
Lists.newArrayList(SessionVariable.ENABLE_NEREIDS_PLANNER));
- Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
- } finally {
- wlock.unlock();
- }
- }
-
- public static void enableNereidsDml() {
- wlock.lock();
- try {
- VarContext ctx =
ctxByVarName.get(SessionVariable.ENABLE_NEREIDS_DML);
- try {
- setValue(ctx.getObj(), new
SessionVariableField(ctx.getField()), String.valueOf(true));
- } catch (DdlException e) {
- LOG.warn("failed to set global variable: {}",
SessionVariable.ENABLE_NEREIDS_DML, e);
- return;
- }
-
- // write edit log
- GlobalVarPersistInfo info = new
GlobalVarPersistInfo(defaultSessionVariable,
- Lists.newArrayList(SessionVariable.ENABLE_NEREIDS_DML));
+ Lists.newArrayList(sessionVar));
Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
} finally {
wlock.unlock();
}
+ LOG.info("upgrade FE from {}, set {} to new default value: {}",
versionMsg, sessionVar, value);
}
public static void setLowerCaseTableNames(int mode) throws DdlException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]