This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 902ab93 [fix](session-variable) fix bug that checkpoint may overwrite
the global variables (#7526)
902ab93 is described below
commit 902ab9304357b65a9e085b340b661da869a6833a
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Jan 14 09:25:10 2022 +0800
[fix](session-variable) fix bug that checkpoint may overwrite the global
variables (#7526)
We should create temporary object for some static fields when doing
checkpoint,
to avoid there variables to be overwritten by the checkpoint process.
---
.../java/org/apache/doris/catalog/Catalog.java | 10 +-
.../org/apache/doris/journal/JournalEntity.java | 9 +-
.../java/org/apache/doris/master/Checkpoint.java | 34 +++-
.../java/org/apache/doris/persist/EditLog.java | 20 +-
.../main/java/org/apache/doris/qe/VariableMgr.java | 138 ++++++++------
.../doris/transaction/GlobalTransactionMgr.java | 44 ++---
.../java/org/apache/doris/qe/VariableMgrTest.java | 211 +++++++++++----------
7 files changed, 245 insertions(+), 221 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index a96306f..4bb7c3a 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -217,7 +217,6 @@ import org.apache.doris.qe.AuditEventProcessor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.JournalObservable;
-import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.FrontendOptions;
@@ -717,6 +716,11 @@ public class Catalog {
return getCurrentCatalog().getAuditEventProcessor();
}
+ // For unit test only
+ public Checkpoint getCheckpointer() {
+ return checkpointer;
+ }
+
public StatisticsManager getStatisticsManager() {
return statisticsManager;
}
@@ -2238,10 +2242,6 @@ public class Catalog {
return checksum;
}
- public void replayGlobalVariable(SessionVariable variable) throws
IOException, DdlException {
- VariableMgr.replayGlobalVariable(variable);
- }
-
public void replayGlobalVariableV2(GlobalVarPersistInfo info) throws
IOException, DdlException {
VariableMgr.replayGlobalVariableV2(info);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index cc7d89d..69af8b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -90,7 +90,6 @@ import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TablePropertyInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.plugin.PluginInfo;
-import org.apache.doris.qe.SessionVariable;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@@ -380,12 +379,6 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
- case OperationType.OP_GLOBAL_VARIABLE: {
- data = new SessionVariable();
- ((SessionVariable) data).readFields(in);
- isRead = true;
- break;
- }
case OperationType.OP_CREATE_CLUSTER: {
data = Cluster.read(in);
isRead = true;
@@ -612,7 +605,7 @@ public class JournalEntity implements Writable {
data = PluginInfo.read(in);
isRead = true;
break;
- }
+ }
case OperationType.OP_REMOVE_ALTER_JOB_V2: {
data = RemoveAlterJobV2OperationLog.read(in);
isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
index 56e30b7..69efd61 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
@@ -29,6 +29,7 @@ import org.apache.doris.monitor.jvm.JvmStats.MemoryPool;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.MetaCleaner;
import org.apache.doris.persist.Storage;
+import org.apache.doris.qe.VariableMgr;
import org.apache.doris.system.Frontend;
import org.apache.logging.log4j.LogManager;
@@ -49,7 +50,7 @@ public class Checkpoint extends MasterDaemon {
private static final int PUT_TIMEOUT_SECOND = 3600;
private static final int CONNECT_TIMEOUT_SECOND = 1;
private static final int READ_TIMEOUT_SECOND = 1;
-
+
private Catalog catalog;
private String imageDir;
private EditLog editLog;
@@ -70,6 +71,12 @@ public class Checkpoint extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
+ doCheckpoint();
+ }
+
+ // public for unit test, so that we can trigger checkpoint manually.
+ // DO NOT call it manually outside the unit test.
+ public synchronized void doCheckpoint() {
long imageVersion = 0;
long checkPointVersion = 0;
Storage storage = null;
@@ -103,6 +110,7 @@ public class Checkpoint extends MasterDaemon {
LOG.info("begin to generate new image: image.{}", checkPointVersion);
catalog = Catalog.getCurrentCatalog();
catalog.setEditLog(editLog);
+ createStaticFieldForCkpt();
try {
catalog.loadImage(imageDir);
catalog.replayJournal(checkPointVersion);
@@ -128,8 +136,9 @@ public class Checkpoint extends MasterDaemon {
// destroy checkpoint catalog, reclaim memory
catalog = null;
Catalog.destroyCheckpoint();
+ destroyStaticFieldForCkpt();
}
-
+
// push image file to all the other non master nodes
// DO NOT get other nodes from HaProtocol, because node may not in
bdbje replication group yet.
List<Frontend> allFrontends =
Catalog.getServingCatalog().getFrontends(null);
@@ -144,7 +153,7 @@ public class Checkpoint extends MasterDaemon {
continue;
}
int port = Config.http_port;
-
+
String url = "http://" + host + ":" + port + "/put?version=" +
replayedJournalId
+ "&port=" + port;
LOG.info("Put image:{}", url);
@@ -213,7 +222,7 @@ public class Checkpoint extends MasterDaemon {
}
deleteVersion = Math.min(minOtherNodesJournalId,
checkPointVersion);
}
-
+
editLog.deleteJournals(deleteVersion + 1);
if (MetricRepo.isInit) {
MetricRepo.COUNTER_EDIT_LOG_CLEAN_SUCCESS.increase(1L);
@@ -242,7 +251,19 @@ public class Checkpoint extends MasterDaemon {
}
}
}
-
+
+ // Some classes use static variables to store information,
+ // and we need to generate new temporary objects for these static variables
+ // during the checkpoint process to cope with changes made to these
variables
+ // during the checkpoint process
+ private void createStaticFieldForCkpt() {
+ VariableMgr.createDefaultSessionVariableForCkpt();
+ }
+
+ private void destroyStaticFieldForCkpt() {
+ VariableMgr.destroyDefaultSessionVariableForCkpt();
+ }
+
/*
* Check whether can we do the checkpoint due to the memory used percent.
*/
@@ -255,7 +276,7 @@ public class Checkpoint extends MasterDaemon {
memUsedPercent,
Config.metadata_checkpoint_memory_threshold);
return false;
}
-
+
return true;
}
@@ -287,5 +308,4 @@ public class Checkpoint extends MasterDaemon {
return used * 100 / max;
}
}
-
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 24871e0..169e89e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -66,7 +66,6 @@ import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.plugin.PluginInfo;
-import org.apache.doris.qe.SessionVariable;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@@ -226,7 +225,7 @@ public class EditLog {
}
case OperationType.OP_BATCH_MODIFY_PARTITION: {
BatchModifyPartitionsInfo info =
(BatchModifyPartitionsInfo) journal.getData();
- for(ModifyPartitionInfo modifyPartitionInfo :
info.getModifyPartitionInfos()) {
+ for (ModifyPartitionInfo modifyPartitionInfo :
info.getModifyPartitionInfos()) {
catalog.getAlterInstance().replayModifyPartition(modifyPartitionInfo);
}
break;
@@ -531,11 +530,6 @@ public class EditLog {
MetaContext.get().setMetaVersion(version);
break;
}
- case OperationType.OP_GLOBAL_VARIABLE: {
- SessionVariable variable = (SessionVariable)
journal.getData();
- catalog.replayGlobalVariable(variable);
- break;
- }
case OperationType.OP_CREATE_CLUSTER: {
final Cluster value = (Cluster) journal.getData();
catalog.replayCreateCluster(value);
@@ -769,14 +763,14 @@ public class EditLog {
break;
}
case OperationType.OP_BATCH_ADD_ROLLUP: {
- BatchAlterJobPersistInfo batchAlterJobV2 =
(BatchAlterJobPersistInfo)journal.getData();
+ BatchAlterJobPersistInfo batchAlterJobV2 =
(BatchAlterJobPersistInfo) journal.getData();
for (AlterJobV2 alterJobV2 :
batchAlterJobV2.getAlterJobV2List()) {
catalog.getRollupHandler().replayAlterJobV2(alterJobV2);
}
break;
}
case OperationType.OP_MODIFY_DISTRIBUTION_TYPE: {
- TableInfo tableInfo = (TableInfo)journal.getData();
+ TableInfo tableInfo = (TableInfo) journal.getData();
catalog.replayConvertDistributionType(tableInfo);
break;
}
@@ -1090,7 +1084,7 @@ public class EditLog {
logEdit(OperationType.OP_DROP_ROLLUP, info);
}
- public void logBatchDropRollup (BatchDropInfo batchDropInfo) {
+ public void logBatchDropRollup(BatchDropInfo batchDropInfo) {
logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo);
}
@@ -1234,10 +1228,6 @@ public class EditLog {
logEdit(OperationType.OP_RENAME_PARTITION, tableInfo);
}
- public void logGlobalVariable(SessionVariable variable) {
- logEdit(OperationType.OP_GLOBAL_VARIABLE, variable);
- }
-
public void logCreateCluster(Cluster cluster) {
logEdit(OperationType.OP_CREATE_CLUSTER, cluster);
}
@@ -1295,7 +1285,7 @@ public class EditLog {
public void logInsertTransactionState(TransactionState transactionState) {
logEdit(OperationType.OP_UPSERT_TRANSACTION_STATE, transactionState);
}
-
+
public void logBackupJob(BackupJob job) {
logEdit(OperationType.OP_BACKUP_JOB, job);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index 102951e..46e73d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.PatternMatcher;
-import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.GlobalVarPersistInfo;
import com.google.common.collect.ImmutableMap;
@@ -38,6 +37,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.lang.SerializationUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
import org.json.JSONObject;
import java.io.DataInputStream;
@@ -111,6 +111,13 @@ public class VariableMgr {
// Whenever a new session is established, the value in this object is
copied to the session-level variable.
private static SessionVariable defaultSessionVariable;
+ // The following 2 static fields is for checkpoint.
+ // Because ctxByVarName and defaultSessionVariable are static variables,
and during the checkpoint process,
+ // we cannot modify any values in Serving Catalog, including these static
variables.
+ // So we use two additional fields provided to the checkpoint thread.
+ private static SessionVariable defaultSessionVariableForCkpt;
+ private static ImmutableMap<String, VarContext> ctxByVarNameForCkpt;
+
// Global read/write lock to protect access of globalSessionVariable.
private static final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private static final Lock rlock = rwlock.readLock();
@@ -120,32 +127,7 @@ public class VariableMgr {
static {
// Session value
defaultSessionVariable = new SessionVariable();
- ImmutableSortedMap.Builder<String, VarContext> builder =
- ImmutableSortedMap.orderedBy(String.CASE_INSENSITIVE_ORDER);
- for (Field field : SessionVariable.class.getDeclaredFields()) {
- VarAttr attr = field.getAnnotation(VarAttr.class);
- if (attr == null) {
- continue;
- }
-
- field.setAccessible(true);
- builder.put(attr.name(),
- new VarContext(field, defaultSessionVariable, SESSION |
attr.flag(),
- getValue(defaultSessionVariable, field)));
- }
-
- // Variables only exist in global environment.
- for (Field field : GlobalVariable.class.getDeclaredFields()) {
- VarAttr attr = field.getAnnotation(VarAttr.class);
- if (attr == null) {
- continue;
- }
-
- field.setAccessible(true);
- builder.put(attr.name(),
- new VarContext(field, null, GLOBAL | attr.flag(),
getValue(null, field)));
- }
-
+ ImmutableSortedMap.Builder<String, VarContext> builder =
getStringVarContextBuilder(defaultSessionVariable);
ctxByVarName = builder.build();
}
@@ -211,7 +193,7 @@ public class VariableMgr {
// revert the operator[set_var] on select/*+ SET_VAR()*/ sql;
public static void revertSessionValue(SessionVariable obj) throws
DdlException {
Map<Field, String> sessionOriginValue = obj.getSessionOriginValue();
- if(!sessionOriginValue.isEmpty()) {
+ if (!sessionOriginValue.isEmpty()) {
for (Field field : sessionOriginValue.keySet()) {
// revert session value
setValue(obj, field, sessionOriginValue.get(field));
@@ -272,7 +254,7 @@ public class VariableMgr {
// set session variable
Field field = ctx.getField();
// if stmt is "Select /*+ SET_VAR(...)*/"
- if(sessionVariable.getIsSingleSetVar()) {
+ if (sessionVariable.getIsSingleSetVar()) {
try {
sessionVariable.addSessionOriginValue(field,
field.get(sessionVariable).toString());
} catch (Exception e) {
@@ -304,17 +286,30 @@ public class VariableMgr {
// global variable persistence
public static void write(DataOutputStream out) throws IOException {
- defaultSessionVariable.write(out);
+ SessionVariable variablesToWrite = defaultSessionVariable;
+ if (Catalog.isCheckpointThread()) {
+ // If this is checkpoint thread, we should write value in
`defaultSessionVariableForCkpt` to the image
+ // instead of `defaultSessionVariable`.
+ variablesToWrite = defaultSessionVariableForCkpt;
+ }
+ variablesToWrite.write(out);
// get all global variables
List<String> varNames = GlobalVariable.getPersistentGlobalVarNames();
- GlobalVarPersistInfo info = new
GlobalVarPersistInfo(defaultSessionVariable, varNames);
+ GlobalVarPersistInfo info = new GlobalVarPersistInfo(variablesToWrite,
varNames);
info.write(out);
}
public static void read(DataInputStream in) throws IOException,
DdlException {
wlock.lock();
try {
- defaultSessionVariable.readFields(in);
+ SessionVariable variablesToRead = defaultSessionVariable;
+ if (Catalog.isCheckpointThread()) {
+ // If this is checkpoint thread, we should read value to set
them to `defaultSessionVariableForCkpt`
+ // instead of `defaultSessionVariable`.
+ // This approach ensures that checkpoint threads do not modify
the values in serving catalog.
+ variablesToRead = defaultSessionVariableForCkpt;
+ }
+ variablesToRead.readFields(in);
if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_90) {
GlobalVarPersistInfo info = GlobalVarPersistInfo.read(in);
replayGlobalVariableV2(info);
@@ -324,35 +319,6 @@ public class VariableMgr {
}
}
- @Deprecated
- private static void writeGlobalVariableUpdate(SessionVariable variable,
String msg) {
- EditLog editLog = Catalog.getCurrentCatalog().getEditLog();
- editLog.logGlobalVariable(variable);
- }
-
- @Deprecated
- public static void replayGlobalVariable(SessionVariable variable) throws
DdlException {
- wlock.lock();
- try {
- for (Field field : SessionVariable.class.getDeclaredFields()) {
- VarAttr attr = field.getAnnotation(VarAttr.class);
- if (attr == null) {
- continue;
- }
-
- field.setAccessible(true);
-
- VarContext ctx = ctxByVarName.get(attr.name());
- if (ctx.getFlag() == SESSION) {
- String value = getValue(variable, ctx.getField());
- setValue(ctx.getObj(), ctx.getField(), value);
- }
- }
- } finally {
- wlock.unlock();
- }
- }
-
// this method is used to replace the `replayGlobalVariable()`
public static void replayGlobalVariableV2(GlobalVarPersistInfo info)
throws DdlException {
wlock.lock();
@@ -361,6 +327,11 @@ public class VariableMgr {
JSONObject root = new JSONObject(json);
for (String varName : root.keySet()) {
VarContext varContext = ctxByVarName.get(varName);
+ if (Catalog.isCheckpointThread()) {
+ // If this is checkpoint thread, we should write value in
`ctxByVarNameForCkpt` to the image
+ // instead of `ctxByVarName`.
+ varContext = ctxByVarNameForCkpt.get(varName);
+ }
if (varContext == null) {
LOG.error("failed to get global variable {} when
replaying", varName);
continue;
@@ -437,6 +408,7 @@ public class VariableMgr {
}
// Get variable value through variable name, used to satisfy statement
like `SELECT @@comment_version`
+ // For test only
public static String getValue(SessionVariable var, SysVariableDesc desc)
throws AnalysisException {
VarContext ctx = ctxByVarName.get(desc.getName());
if (ctx == null) {
@@ -536,9 +508,12 @@ public class VariableMgr {
public static @interface VarAttr {
// Name in show variables and set statement;
String name();
+
int flag() default 0;
+
// TODO(zhaochun): min and max is not used.
String minValue() default "0";
+
String maxValue() default "0";
// Set to true if the variables need to be forwarded along with
forward statement.
@@ -577,4 +552,45 @@ public class VariableMgr {
return defaultValue;
}
}
+
+ public static void createDefaultSessionVariableForCkpt() {
+ defaultSessionVariableForCkpt = new SessionVariable();
+ ImmutableSortedMap.Builder<String, VarContext> builder =
getStringVarContextBuilder(defaultSessionVariableForCkpt);
+ ctxByVarNameForCkpt = builder.build();
+ }
+
+ public static void destroyDefaultSessionVariableForCkpt() {
+ defaultSessionVariableForCkpt = null;
+ ctxByVarNameForCkpt = null;
+ }
+
+ @NotNull
+ private static ImmutableSortedMap.Builder<String, VarContext>
getStringVarContextBuilder(SessionVariable sessionVariable) {
+ ImmutableSortedMap.Builder<String, VarContext> builder =
+ ImmutableSortedMap.orderedBy(String.CASE_INSENSITIVE_ORDER);
+ for (Field field : SessionVariable.class.getDeclaredFields()) {
+ VarAttr attr = field.getAnnotation(VarAttr.class);
+ if (attr == null) {
+ continue;
+ }
+
+ field.setAccessible(true);
+ builder.put(attr.name(),
+ new VarContext(field, sessionVariable, SESSION |
attr.flag(),
+ getValue(sessionVariable, field)));
+ }
+
+ // Variables only exist in global environment.
+ for (Field field : GlobalVariable.class.getDeclaredFields()) {
+ VarAttr attr = field.getAnnotation(VarAttr.class);
+ if (attr == null) {
+ continue;
+ }
+
+ field.setAccessible(true);
+ builder.put(attr.name(),
+ new VarContext(field, null, GLOBAL | attr.flag(),
getValue(null, field)));
+ }
+ return builder;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index e3416d2..198b5a4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -71,13 +71,13 @@ public class GlobalTransactionMgr implements Writable {
private TransactionIdGenerator idGenerator = new TransactionIdGenerator();
private TxnStateCallbackFactory callbackFactory = new
TxnStateCallbackFactory();
-
+
private Catalog catalog;
public GlobalTransactionMgr(Catalog catalog) {
this.catalog = catalog;
}
-
+
public TxnStateCallbackFactory getCallbackFactory() {
return callbackFactory;
}
@@ -103,15 +103,15 @@ public class GlobalTransactionMgr implements Writable {
}
public long beginTransaction(long dbId, List<Long> tableIdList, String
label, TxnCoordinator coordinator, LoadJobSourceType sourceType,
- long timeoutSecond)
+ long timeoutSecond)
throws AnalysisException, LabelAlreadyUsedException,
BeginTransactionException, DuplicatedRequestException,
QuotaExceedException, MetaNotFoundException {
return beginTransaction(dbId, tableIdList, label, null, coordinator,
sourceType, -1, timeoutSecond);
}
-
+
/**
* the app could specify the transaction id
- *
+ *
* requestId is used to judge that whether the request is a internal retry
request
* if label already exist, and requestId are equal, we return the exist
tid, and consider this 'begin'
* as success.
@@ -146,7 +146,7 @@ public class GlobalTransactionMgr implements Writable {
private void checkValidTimeoutSecond(long timeoutSecond, int
maxLoadTimeoutSecond, int minLoadTimeOutSecond) throws AnalysisException {
if (timeoutSecond > maxLoadTimeoutSecond ||
timeoutSecond < minLoadTimeOutSecond) {
- throw new AnalysisException("Invalid timeout. Timeout should
between "
+ throw new AnalysisException("Invalid timeout: " + timeoutSecond +
". Timeout should between "
+ minLoadTimeOutSecond + " and " + maxLoadTimeoutSecond
+ " seconds");
}
@@ -177,7 +177,7 @@ public class GlobalTransactionMgr implements Writable {
throws UserException {
commitTransaction(dbId, tableList, transactionId, tabletCommitInfos,
null);
}
-
+
/**
* @param transactionId
* @param tabletCommitInfos
@@ -193,21 +193,21 @@ public class GlobalTransactionMgr implements Writable {
if (Config.disable_load_job) {
throw new TransactionCommitFailedException("disable_load_job is
set to true, all load jobs are prevented");
}
-
+
LOG.debug("try to commit transaction: {}", transactionId);
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
dbTransactionMgr.commitTransaction(tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
}
-
+
public boolean commitAndPublishTransaction(Database db, List<Table>
tableList, long transactionId,
List<TabletCommitInfo>
tabletCommitInfos, long timeoutMillis)
throws UserException {
return commitAndPublishTransaction(db, tableList, transactionId,
tabletCommitInfos, timeoutMillis, null);
}
-
+
public boolean commitAndPublishTransaction(Database db, List<Table>
tableList, long transactionId,
- List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
- TxnCommitAttachment txnCommitAttachment)
+ List<TabletCommitInfo>
tabletCommitInfos, long timeoutMillis,
+ TxnCommitAttachment
txnCommitAttachment)
throws UserException {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
@@ -217,7 +217,7 @@ public class GlobalTransactionMgr implements Writable {
try {
commitTransaction(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
} finally {
- MetaLockUtils.writeUnlockTables(tableList);
+ MetaLockUtils.writeUnlockTables(tableList);
}
stopWatch.stop();
long publishTimeoutMillis = timeoutMillis - stopWatch.getTime();
@@ -228,7 +228,7 @@ public class GlobalTransactionMgr implements Writable {
return false;
}
return dbTransactionMgr.publishTransaction(db, transactionId,
publishTimeoutMillis);
- }
+ }
public void abortTransaction(long dbId, long transactionId, String reason)
throws UserException {
abortTransaction(dbId, transactionId, reason, null);
@@ -267,7 +267,7 @@ public class GlobalTransactionMgr implements Writable {
if (transactionState.getTableIdList().contains(tableId)) {
if (partitionId == null) {
return true;
- } else if
(transactionState.getTableCommitInfo(tableId).getPartitionCommitInfo(partitionId)
!= null){
+ } else if
(transactionState.getTableCommitInfo(tableId).getPartitionCommitInfo(partitionId)
!= null) {
return true;
}
}
@@ -290,7 +290,7 @@ public class GlobalTransactionMgr implements Writable {
/**
* Check whether a load job already exists before checking all
`TransactionId` related with this load job have finished.
* finished
- *
+ *
* @throws AnalysisException is database does not exist anymore
*/
public boolean isPreviousTransactionsFinished(long endTransactionId, long
dbId, List<Long> tableIdList)
@@ -330,7 +330,7 @@ public class GlobalTransactionMgr implements Writable {
return null;
}
}
-
+
public void setEditLog(EditLog editLog) {
this.idGenerator.setEditLog(editLog);
}
@@ -384,7 +384,7 @@ public class GlobalTransactionMgr implements Writable {
}
return infos;
}
-
+
public List<List<String>> getDbTransStateInfo(long dbId) {
try {
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
@@ -399,7 +399,7 @@ public class GlobalTransactionMgr implements Writable {
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
return dbTransactionMgr.getTxnStateInfoList(running, limit);
}
-
+
// get show info of a specified txnId
public List<List<String>> getSingleTranInfo(long dbId, long txnId) throws
AnalysisException {
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
@@ -410,7 +410,7 @@ public class GlobalTransactionMgr implements Writable {
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
return dbTransactionMgr.getTableTransInfo(txnId);
}
-
+
public List<List<Comparable>> getPartitionTransInfo(long dbId, long tid,
long tableId)
throws AnalysisException {
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
@@ -427,7 +427,7 @@ public class GlobalTransactionMgr implements Writable {
}
return txnNum;
}
-
+
public TransactionIdGenerator getTransactionIDGenerator() {
return this.idGenerator;
}
@@ -441,7 +441,7 @@ public class GlobalTransactionMgr implements Writable {
}
idGenerator.write(out);
}
-
+
public void readFields(DataInput in) throws IOException {
int numTransactions = in.readInt();
for (int i = 0; i < numTransactions; ++i) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index 243832c..fc8582f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -17,162 +17,168 @@
package org.apache.doris.qe;
-import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.SetStmt;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.SysVariableDesc;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
-import org.apache.doris.mysql.privilege.PaloAuth;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.persist.EditLog;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.utframe.UtFrameUtils;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.File;
import java.util.List;
-
-import mockit.Expectations;
-import mockit.Mocked;
+import java.util.UUID;
public class VariableMgrTest {
- private static final Logger LOG =
LoggerFactory.getLogger(VariableMgrTest.class);
- @Mocked
- private Catalog catalog;
- @Mocked
- private EditLog editLog;
- @Mocked
- private PaloAuth auth;
-
- @Before
- public void setUp() {
- new Expectations() {
- {
- catalog.getEditLog();
- minTimes = 0;
- result = editLog;
-
- editLog.logGlobalVariable((SessionVariable) any);
- minTimes = 0;
-
- catalog.getAuth();
- minTimes = 0;
- result = auth;
-
- auth.checkGlobalPriv((ConnectContext) any,
PrivPredicate.ADMIN);
- minTimes = 0;
- result = true;
- }
- };
+ private static String runningDir = "fe/mocked/VariableMgrTest/" +
UUID.randomUUID().toString() + "/";
+ private static ConnectContext ctx;
- new Expectations(catalog) {
- {
- Catalog.getCurrentCatalog();
- minTimes = 0;
- result = catalog;
- }
- };
+ @After
+ public void tearDown() throws Exception {
+ FileUtils.deleteDirectory(new File(runningDir));
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UtFrameUtils.createDorisCluster(runningDir);
+ ctx = UtFrameUtils.createDefaultCtx();
+ String createDbStmtStr = "create database db1;";
+ CreateDbStmt createDbStmt = (CreateDbStmt)
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
+ Catalog.getCurrentCatalog().createDb(createDbStmt);
}
@Test
- public void testNormal() throws IllegalAccessException,
NoSuchFieldException, UserException {
+ public void testNormal() throws Exception {
SessionVariable var = VariableMgr.newSessionVariable();
- Assert.assertEquals(2147483648L, var.getMaxExecMemByte());
- Assert.assertEquals(300, var.getQueryTimeoutS());
- Assert.assertEquals(false, var.enableProfile());
- Assert.assertEquals(0L, var.getSqlMode());
+ long originExecMemLimit = var.getMaxExecMemByte();
+ boolean originEnableProfile = var.enableProfile();
+ long originQueryTimeOut = var.getQueryTimeoutS();
List<List<String>> rows = VariableMgr.dump(SetType.SESSION, var, null);
Assert.assertTrue(rows.size() > 5);
for (List<String> row : rows) {
if (row.get(0).equalsIgnoreCase("exec_mem_limit")) {
- Assert.assertEquals("2147483648", row.get(1));
+ Assert.assertEquals(String.valueOf(originExecMemLimit),
row.get(1));
} else if (row.get(0).equalsIgnoreCase("is_report_success")) {
- Assert.assertEquals("false", row.get(1));
+ Assert.assertEquals(String.valueOf(originEnableProfile),
row.get(1));
} else if (row.get(0).equalsIgnoreCase("query_timeout")) {
- Assert.assertEquals("300", row.get(1));
+ Assert.assertEquals(String.valueOf(originQueryTimeOut),
row.get(1));
} else if (row.get(0).equalsIgnoreCase("sql_mode")) {
Assert.assertEquals("", row.get(1));
}
}
// Set global variable
- SetVar setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new
IntLiteral(1234L));
- setVar.analyze(null);
- VariableMgr.setVar(var, setVar);
- Assert.assertEquals(2147483648L, var.getMaxExecMemByte());
+ SetStmt stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global
exec_mem_limit=1234", ctx);
+ SetExecutor executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals(originExecMemLimit, var.getMaxExecMemByte());
var = VariableMgr.newSessionVariable();
Assert.assertEquals(1234L, var.getMaxExecMemByte());
- SetVar setVar2 = new SetVar(SetType.GLOBAL,
"parallel_fragment_exec_instance_num", new IntLiteral(5L));
- setVar2.analyze(null);
- VariableMgr.setVar(var, setVar2);
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global
parallel_fragment_exec_instance_num=5", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
Assert.assertEquals(1L, var.getParallelExecInstanceNum());
var = VariableMgr.newSessionVariable();
Assert.assertEquals(5L, var.getParallelExecInstanceNum());
- SetVar setVar3 = new SetVar(SetType.GLOBAL, "time_zone", new
StringLiteral("Asia/Shanghai"));
- setVar3.analyze(null);
- VariableMgr.setVar(var, setVar3);
+ // Test checkTimeZoneValidAndStandardize
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global
time_zone='+8:00'", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals("+08:00",
VariableMgr.newSessionVariable().getTimeZone());
+
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global
time_zone='Asia/Shanghai'", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
Assert.assertEquals("Asia/Shanghai", var.getTimeZone());
var = VariableMgr.newSessionVariable();
Assert.assertEquals("Asia/Shanghai", var.getTimeZone());
- setVar3 = new SetVar(SetType.GLOBAL, "time_zone", new
StringLiteral("CST"));
- setVar3.analyze(null);
- VariableMgr.setVar(var, setVar3);
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global
time_zone='CST'", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
Assert.assertEquals("Asia/Shanghai", var.getTimeZone());
var = VariableMgr.newSessionVariable();
Assert.assertEquals("CST", var.getTimeZone());
- // Set session variable
- setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new
IntLiteral(1234L));
- setVar.analyze(null);
- VariableMgr.setVar(var, setVar);
- Assert.assertEquals(1234L, var.getMaxExecMemByte());
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global
time_zone='8:00'", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals("+08:00",
VariableMgr.newSessionVariable().getTimeZone());
+
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global
time_zone='-8:00'", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals("-08:00",
VariableMgr.newSessionVariable().getTimeZone());
- setVar3 = new SetVar(SetType.SESSION, "time_zone", new
StringLiteral("Asia/Jakarta"));
- setVar3.analyze(null);
- VariableMgr.setVar(var, setVar3);
- Assert.assertEquals("Asia/Jakarta", var.getTimeZone());
+ // Set session variable
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set
exec_mem_limit=1234", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals(1234L,
ctx.getSessionVariable().getMaxExecMemByte());
+
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set
time_zone='Asia/Jakarta'", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals("Asia/Jakarta",
ctx.getSessionVariable().getTimeZone());
+
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set
sql_mode='PIPES_AS_CONCAT'", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals(2L, ctx.getSessionVariable().getSqlMode());
+
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set
runtime_filter_type ='BLOOM_FILTER'", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals(2L,
ctx.getSessionVariable().getRuntimeFilterType());
// Get from name
SysVariableDesc desc = new SysVariableDesc("exec_mem_limit");
Assert.assertEquals(var.getMaxExecMemByte() + "",
VariableMgr.getValue(var, desc));
+ }
- SetVar setVar4 = new SetVar(SetType.SESSION, "sql_mode", new
StringLiteral(
- SqlModeHelper.encode("PIPES_AS_CONCAT").toString()));
- setVar4.analyze(null);
- VariableMgr.setVar(var, setVar4);
- Assert.assertEquals(2L, var.getSqlMode());
-
- // Test checkTimeZoneValidAndStandardize
- SetVar setVar5 = new SetVar(SetType.GLOBAL, "time_zone", new
StringLiteral("+8:00"));
- setVar5.analyze(null);
- VariableMgr.setVar(var, setVar5);
- Assert.assertEquals("+08:00",
VariableMgr.newSessionVariable().getTimeZone());
-
- SetVar setVar6 = new SetVar(SetType.GLOBAL, "time_zone", new
StringLiteral("8:00"));
- setVar6.analyze(null);
- VariableMgr.setVar(var, setVar6);
- Assert.assertEquals("+08:00",
VariableMgr.newSessionVariable().getTimeZone());
-
- SetVar setVar7 = new SetVar(SetType.GLOBAL, "time_zone", new
StringLiteral("-8:00"));
- setVar7.analyze(null);
- VariableMgr.setVar(var, setVar7);
- Assert.assertEquals("-08:00",
VariableMgr.newSessionVariable().getTimeZone());
-
- SetVar setVar8 = new SetVar(SetType.SESSION, "runtime_filter_type",
new StringLiteral(
- RuntimeFilterTypeHelper.encode("BLOOM_FILTER").toString()));
- setVar8.analyze(null);
- VariableMgr.setVar(var, setVar8);
- Assert.assertEquals(2L, var.getRuntimeFilterType());
+ @Test
+ public void testGlobalVariablePersist() throws Exception {
+ Config.edit_log_roll_num = 1;
+ SetStmt stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global
exec_mem_limit=5678", ctx);
+ SetExecutor executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals(5678,
VariableMgr.newSessionVariable().getMaxExecMemByte());
+
+ Config.edit_log_roll_num = 100;
+ stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global
exec_mem_limit=7890", ctx);
+ executor = new SetExecutor(ctx, stmt);
+ executor.execute();
+ Assert.assertEquals(7890,
VariableMgr.newSessionVariable().getMaxExecMemByte());
+
+ // Get currentCatalog first
+ Catalog currentCatalog = Catalog.getCurrentCatalog();
+ // Save real ckptThreadId
+ long ckptThreadId = currentCatalog.getCheckpointer().getId();
+ try {
+ // set checkpointThreadId to current thread id, so that when do
checkpoint manually here,
+ // the Catalog.isCheckpointThread() will return true.
+ Deencapsulation.setField(Catalog.class, "checkpointThreadId",
Thread.currentThread().getId());
+ currentCatalog.getCheckpointer().doCheckpoint();
+ } finally {
+ // Restore the ckptThreadId
+ Deencapsulation.setField(Catalog.class, "checkpointThreadId",
ckptThreadId);
+ }
+ Assert.assertEquals(7890,
VariableMgr.newSessionVariable().getMaxExecMemByte());
}
@Test(expected = UserException.class)
@@ -214,7 +220,6 @@ public class VariableMgrTest {
@Test(expected = DdlException.class)
public void testReadOnly() throws AnalysisException, DdlException {
SysVariableDesc desc = new SysVariableDesc("version_comment");
- LOG.info(VariableMgr.getValue(null, desc));
// Set global variable
SetVar setVar = new SetVar(SetType.SESSION, "version_comment", null);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]