This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 902ab93  [fix](session-variable) fix bug that checkpoint may overwrite 
the global variables (#7526)
902ab93 is described below

commit 902ab9304357b65a9e085b340b661da869a6833a
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Jan 14 09:25:10 2022 +0800

    [fix](session-variable) fix bug that checkpoint may overwrite the global 
variables (#7526)
    
    We should create temporary object for some static fields when doing 
checkpoint,
    to avoid there variables to be overwritten by the checkpoint process.
---
 .../java/org/apache/doris/catalog/Catalog.java     |  10 +-
 .../org/apache/doris/journal/JournalEntity.java    |   9 +-
 .../java/org/apache/doris/master/Checkpoint.java   |  34 +++-
 .../java/org/apache/doris/persist/EditLog.java     |  20 +-
 .../main/java/org/apache/doris/qe/VariableMgr.java | 138 ++++++++------
 .../doris/transaction/GlobalTransactionMgr.java    |  44 ++---
 .../java/org/apache/doris/qe/VariableMgrTest.java  | 211 +++++++++++----------
 7 files changed, 245 insertions(+), 221 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index a96306f..4bb7c3a 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -217,7 +217,6 @@ import org.apache.doris.qe.AuditEventProcessor;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.GlobalVariable;
 import org.apache.doris.qe.JournalObservable;
-import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.service.FrontendOptions;
@@ -717,6 +716,11 @@ public class Catalog {
         return getCurrentCatalog().getAuditEventProcessor();
     }
 
+    // For unit test only
+    public Checkpoint getCheckpointer() {
+        return checkpointer;
+    }
+
     public StatisticsManager getStatisticsManager() {
         return statisticsManager;
     }
@@ -2238,10 +2242,6 @@ public class Catalog {
         return checksum;
     }
 
-    public void replayGlobalVariable(SessionVariable variable) throws 
IOException, DdlException {
-        VariableMgr.replayGlobalVariable(variable);
-    }
-
     public void replayGlobalVariableV2(GlobalVarPersistInfo info) throws 
IOException, DdlException {
         VariableMgr.replayGlobalVariableV2(info);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index cc7d89d..69af8b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -90,7 +90,6 @@ import org.apache.doris.persist.TableInfo;
 import org.apache.doris.persist.TablePropertyInfo;
 import org.apache.doris.persist.TruncateTableInfo;
 import org.apache.doris.plugin.PluginInfo;
-import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.transaction.TransactionState;
@@ -380,12 +379,6 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
-            case OperationType.OP_GLOBAL_VARIABLE: {
-                data = new SessionVariable();
-                ((SessionVariable) data).readFields(in);
-                isRead = true;
-                break;
-            }
             case OperationType.OP_CREATE_CLUSTER: {
                 data = Cluster.read(in);
                 isRead = true;
@@ -612,7 +605,7 @@ public class JournalEntity implements Writable {
                 data = PluginInfo.read(in);
                 isRead = true;
                 break;
-            }              
+            }
             case OperationType.OP_REMOVE_ALTER_JOB_V2: {
                 data = RemoveAlterJobV2OperationLog.read(in);
                 isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
index 56e30b7..69efd61 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
@@ -29,6 +29,7 @@ import org.apache.doris.monitor.jvm.JvmStats.MemoryPool;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.MetaCleaner;
 import org.apache.doris.persist.Storage;
+import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.system.Frontend;
 
 import org.apache.logging.log4j.LogManager;
@@ -49,7 +50,7 @@ public class Checkpoint extends MasterDaemon {
     private static final int PUT_TIMEOUT_SECOND = 3600;
     private static final int CONNECT_TIMEOUT_SECOND = 1;
     private static final int READ_TIMEOUT_SECOND = 1;
-    
+
     private Catalog catalog;
     private String imageDir;
     private EditLog editLog;
@@ -70,6 +71,12 @@ public class Checkpoint extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
+        doCheckpoint();
+    }
+
+    // public for unit test, so that we can trigger checkpoint manually.
+    // DO NOT call it manually outside the unit test.
+    public synchronized void doCheckpoint() {
         long imageVersion = 0;
         long checkPointVersion = 0;
         Storage storage = null;
@@ -103,6 +110,7 @@ public class Checkpoint extends MasterDaemon {
         LOG.info("begin to generate new image: image.{}", checkPointVersion);
         catalog = Catalog.getCurrentCatalog();
         catalog.setEditLog(editLog);
+        createStaticFieldForCkpt();
         try {
             catalog.loadImage(imageDir);
             catalog.replayJournal(checkPointVersion);
@@ -128,8 +136,9 @@ public class Checkpoint extends MasterDaemon {
             // destroy checkpoint catalog, reclaim memory
             catalog = null;
             Catalog.destroyCheckpoint();
+            destroyStaticFieldForCkpt();
         }
-        
+
         // push image file to all the other non master nodes
         // DO NOT get other nodes from HaProtocol, because node may not in 
bdbje replication group yet.
         List<Frontend> allFrontends = 
Catalog.getServingCatalog().getFrontends(null);
@@ -144,7 +153,7 @@ public class Checkpoint extends MasterDaemon {
                     continue;
                 }
                 int port = Config.http_port;
-                
+
                 String url = "http://"; + host + ":" + port + "/put?version=" + 
replayedJournalId
                         + "&port=" + port;
                 LOG.info("Put image:{}", url);
@@ -213,7 +222,7 @@ public class Checkpoint extends MasterDaemon {
                     }
                     deleteVersion = Math.min(minOtherNodesJournalId, 
checkPointVersion);
                 }
-                
+
                 editLog.deleteJournals(deleteVersion + 1);
                 if (MetricRepo.isInit) {
                     MetricRepo.COUNTER_EDIT_LOG_CLEAN_SUCCESS.increase(1L);
@@ -242,7 +251,19 @@ public class Checkpoint extends MasterDaemon {
             }
         }
     }
-    
+
+    // Some classes use static variables to store information,
+    // and we need to generate new temporary objects for these static variables
+    // during the checkpoint process to cope with changes made to these 
variables
+    // during the checkpoint process
+    private void createStaticFieldForCkpt() {
+        VariableMgr.createDefaultSessionVariableForCkpt();
+    }
+
+    private void destroyStaticFieldForCkpt() {
+        VariableMgr.destroyDefaultSessionVariableForCkpt();
+    }
+
     /*
      * Check whether can we do the checkpoint due to the memory used percent.
      */
@@ -255,7 +276,7 @@ public class Checkpoint extends MasterDaemon {
                     memUsedPercent, 
Config.metadata_checkpoint_memory_threshold);
             return false;
         }
-       
+
         return true;
     }
 
@@ -287,5 +308,4 @@ public class Checkpoint extends MasterDaemon {
             return used * 100 / max;
         }
     }
-
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 24871e0..169e89e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -66,7 +66,6 @@ import org.apache.doris.meta.MetaContext;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.UserPropertyInfo;
 import org.apache.doris.plugin.PluginInfo;
-import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.transaction.TransactionState;
@@ -226,7 +225,7 @@ public class EditLog {
                 }
                 case OperationType.OP_BATCH_MODIFY_PARTITION: {
                     BatchModifyPartitionsInfo info = 
(BatchModifyPartitionsInfo) journal.getData();
-                    for(ModifyPartitionInfo modifyPartitionInfo : 
info.getModifyPartitionInfos()) {
+                    for (ModifyPartitionInfo modifyPartitionInfo : 
info.getModifyPartitionInfos()) {
                         
catalog.getAlterInstance().replayModifyPartition(modifyPartitionInfo);
                     }
                     break;
@@ -531,11 +530,6 @@ public class EditLog {
                     MetaContext.get().setMetaVersion(version);
                     break;
                 }
-                case OperationType.OP_GLOBAL_VARIABLE: {
-                    SessionVariable variable = (SessionVariable) 
journal.getData();
-                    catalog.replayGlobalVariable(variable);
-                    break;
-                }
                 case OperationType.OP_CREATE_CLUSTER: {
                     final Cluster value = (Cluster) journal.getData();
                     catalog.replayCreateCluster(value);
@@ -769,14 +763,14 @@ public class EditLog {
                     break;
                 }
                 case OperationType.OP_BATCH_ADD_ROLLUP: {
-                    BatchAlterJobPersistInfo batchAlterJobV2 = 
(BatchAlterJobPersistInfo)journal.getData();
+                    BatchAlterJobPersistInfo batchAlterJobV2 = 
(BatchAlterJobPersistInfo) journal.getData();
                     for (AlterJobV2 alterJobV2 : 
batchAlterJobV2.getAlterJobV2List()) {
                         
catalog.getRollupHandler().replayAlterJobV2(alterJobV2);
                     }
                     break;
                 }
                 case OperationType.OP_MODIFY_DISTRIBUTION_TYPE: {
-                    TableInfo tableInfo = (TableInfo)journal.getData();
+                    TableInfo tableInfo = (TableInfo) journal.getData();
                     catalog.replayConvertDistributionType(tableInfo);
                     break;
                 }
@@ -1090,7 +1084,7 @@ public class EditLog {
         logEdit(OperationType.OP_DROP_ROLLUP, info);
     }
 
-    public void logBatchDropRollup (BatchDropInfo batchDropInfo) {
+    public void logBatchDropRollup(BatchDropInfo batchDropInfo) {
         logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo);
     }
 
@@ -1234,10 +1228,6 @@ public class EditLog {
         logEdit(OperationType.OP_RENAME_PARTITION, tableInfo);
     }
 
-    public void logGlobalVariable(SessionVariable variable) {
-        logEdit(OperationType.OP_GLOBAL_VARIABLE, variable);
-    }
-
     public void logCreateCluster(Cluster cluster) {
         logEdit(OperationType.OP_CREATE_CLUSTER, cluster);
     }
@@ -1295,7 +1285,7 @@ public class EditLog {
     public void logInsertTransactionState(TransactionState transactionState) {
         logEdit(OperationType.OP_UPSERT_TRANSACTION_STATE, transactionState);
     }
-    
+
     public void logBackupJob(BackupJob job) {
         logEdit(OperationType.OP_BACKUP_JOB, job);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index 102951e..46e73d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.PatternMatcher;
-import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.GlobalVarPersistInfo;
 
 import com.google.common.collect.ImmutableMap;
@@ -38,6 +37,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
 import org.json.JSONObject;
 
 import java.io.DataInputStream;
@@ -111,6 +111,13 @@ public class VariableMgr {
     // Whenever a new session is established, the value in this object is 
copied to the session-level variable.
     private static SessionVariable defaultSessionVariable;
 
+    // The following 2 static fields is for checkpoint.
+    // Because ctxByVarName and defaultSessionVariable are static variables, 
and during the checkpoint process,
+    // we cannot modify any values in Serving Catalog, including these static 
variables.
+    // So we use two additional fields provided to the checkpoint thread.
+    private static SessionVariable defaultSessionVariableForCkpt;
+    private static ImmutableMap<String, VarContext> ctxByVarNameForCkpt;
+
     // Global read/write lock to protect access of globalSessionVariable.
     private static final ReadWriteLock rwlock = new ReentrantReadWriteLock();
     private static final Lock rlock = rwlock.readLock();
@@ -120,32 +127,7 @@ public class VariableMgr {
     static {
         // Session value
         defaultSessionVariable = new SessionVariable();
-        ImmutableSortedMap.Builder<String, VarContext> builder =
-                ImmutableSortedMap.orderedBy(String.CASE_INSENSITIVE_ORDER);
-        for (Field field : SessionVariable.class.getDeclaredFields()) {
-            VarAttr attr = field.getAnnotation(VarAttr.class);
-            if (attr == null) {
-                continue;
-            }
-
-            field.setAccessible(true);
-            builder.put(attr.name(),
-                    new VarContext(field, defaultSessionVariable, SESSION | 
attr.flag(),
-                            getValue(defaultSessionVariable, field)));
-        }
-
-        // Variables only exist in global environment.
-        for (Field field : GlobalVariable.class.getDeclaredFields()) {
-            VarAttr attr = field.getAnnotation(VarAttr.class);
-            if (attr == null) {
-                continue;
-            }
-
-            field.setAccessible(true);
-            builder.put(attr.name(),
-                    new VarContext(field, null, GLOBAL | attr.flag(), 
getValue(null, field)));
-        }
-
+        ImmutableSortedMap.Builder<String, VarContext> builder = 
getStringVarContextBuilder(defaultSessionVariable);
         ctxByVarName = builder.build();
     }
 
@@ -211,7 +193,7 @@ public class VariableMgr {
     // revert the operator[set_var] on select/*+ SET_VAR()*/  sql;
     public static void revertSessionValue(SessionVariable obj) throws 
DdlException {
         Map<Field, String> sessionOriginValue = obj.getSessionOriginValue();
-        if(!sessionOriginValue.isEmpty()) {
+        if (!sessionOriginValue.isEmpty()) {
             for (Field field : sessionOriginValue.keySet()) {
                 // revert session value
                 setValue(obj, field, sessionOriginValue.get(field));
@@ -272,7 +254,7 @@ public class VariableMgr {
             // set session variable
             Field field = ctx.getField();
             // if stmt is "Select /*+ SET_VAR(...)*/"
-            if(sessionVariable.getIsSingleSetVar()) {
+            if (sessionVariable.getIsSingleSetVar()) {
                 try {
                     sessionVariable.addSessionOriginValue(field, 
field.get(sessionVariable).toString());
                 } catch (Exception e) {
@@ -304,17 +286,30 @@ public class VariableMgr {
 
     // global variable persistence
     public static void write(DataOutputStream out) throws IOException {
-        defaultSessionVariable.write(out);
+        SessionVariable variablesToWrite = defaultSessionVariable;
+        if (Catalog.isCheckpointThread()) {
+            // If this is checkpoint thread, we should write value in 
`defaultSessionVariableForCkpt` to the image
+            // instead of `defaultSessionVariable`.
+            variablesToWrite = defaultSessionVariableForCkpt;
+        }
+        variablesToWrite.write(out);
         // get all global variables
         List<String> varNames = GlobalVariable.getPersistentGlobalVarNames();
-        GlobalVarPersistInfo info = new 
GlobalVarPersistInfo(defaultSessionVariable, varNames);
+        GlobalVarPersistInfo info = new GlobalVarPersistInfo(variablesToWrite, 
varNames);
         info.write(out);
     }
 
     public static void read(DataInputStream in) throws IOException, 
DdlException {
         wlock.lock();
         try {
-            defaultSessionVariable.readFields(in);
+            SessionVariable variablesToRead = defaultSessionVariable;
+            if (Catalog.isCheckpointThread()) {
+                // If this is checkpoint thread, we should read value to set 
them to `defaultSessionVariableForCkpt`
+                // instead of `defaultSessionVariable`.
+                // This approach ensures that checkpoint threads do not modify 
the values in serving catalog.
+                variablesToRead = defaultSessionVariableForCkpt;
+            }
+            variablesToRead.readFields(in);
             if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_90) {
                 GlobalVarPersistInfo info = GlobalVarPersistInfo.read(in);
                 replayGlobalVariableV2(info);
@@ -324,35 +319,6 @@ public class VariableMgr {
         }
     }
 
-    @Deprecated
-    private static void writeGlobalVariableUpdate(SessionVariable variable, 
String msg) {
-        EditLog editLog = Catalog.getCurrentCatalog().getEditLog();
-        editLog.logGlobalVariable(variable);
-    }
-
-    @Deprecated
-    public static void replayGlobalVariable(SessionVariable variable) throws 
DdlException {
-        wlock.lock();
-        try {
-            for (Field field : SessionVariable.class.getDeclaredFields()) {
-                VarAttr attr = field.getAnnotation(VarAttr.class);
-                if (attr == null) {
-                    continue;
-                }
-
-                field.setAccessible(true);
-
-                VarContext ctx = ctxByVarName.get(attr.name());
-                if (ctx.getFlag() == SESSION) {
-                    String value = getValue(variable, ctx.getField());
-                    setValue(ctx.getObj(), ctx.getField(), value);
-                }
-            }
-        } finally {
-            wlock.unlock();
-        }
-    }
-
     // this method is used to replace the `replayGlobalVariable()`
     public static void replayGlobalVariableV2(GlobalVarPersistInfo info) 
throws DdlException {
         wlock.lock();
@@ -361,6 +327,11 @@ public class VariableMgr {
             JSONObject root = new JSONObject(json);
             for (String varName : root.keySet()) {
                 VarContext varContext = ctxByVarName.get(varName);
+                if (Catalog.isCheckpointThread()) {
+                    // If this is checkpoint thread, we should write value in 
`ctxByVarNameForCkpt` to the image
+                    // instead of `ctxByVarName`.
+                    varContext = ctxByVarNameForCkpt.get(varName);
+                }
                 if (varContext == null) {
                     LOG.error("failed to get global variable {} when 
replaying", varName);
                     continue;
@@ -437,6 +408,7 @@ public class VariableMgr {
     }
 
     // Get variable value through variable name, used to satisfy statement 
like `SELECT @@comment_version`
+    // For test only
     public static String getValue(SessionVariable var, SysVariableDesc desc) 
throws AnalysisException {
         VarContext ctx = ctxByVarName.get(desc.getName());
         if (ctx == null) {
@@ -536,9 +508,12 @@ public class VariableMgr {
     public static @interface VarAttr {
         // Name in show variables and set statement;
         String name();
+
         int flag() default 0;
+
         // TODO(zhaochun): min and max is not used.
         String minValue() default "0";
+
         String maxValue() default "0";
 
         // Set to true if the variables need to be forwarded along with 
forward statement.
@@ -577,4 +552,45 @@ public class VariableMgr {
             return defaultValue;
         }
     }
+
+    public static void createDefaultSessionVariableForCkpt() {
+        defaultSessionVariableForCkpt = new SessionVariable();
+        ImmutableSortedMap.Builder<String, VarContext> builder = 
getStringVarContextBuilder(defaultSessionVariableForCkpt);
+        ctxByVarNameForCkpt = builder.build();
+    }
+
+    public static void destroyDefaultSessionVariableForCkpt() {
+        defaultSessionVariableForCkpt = null;
+        ctxByVarNameForCkpt = null;
+    }
+
+    @NotNull
+    private static ImmutableSortedMap.Builder<String, VarContext> 
getStringVarContextBuilder(SessionVariable sessionVariable) {
+        ImmutableSortedMap.Builder<String, VarContext> builder =
+                ImmutableSortedMap.orderedBy(String.CASE_INSENSITIVE_ORDER);
+        for (Field field : SessionVariable.class.getDeclaredFields()) {
+            VarAttr attr = field.getAnnotation(VarAttr.class);
+            if (attr == null) {
+                continue;
+            }
+
+            field.setAccessible(true);
+            builder.put(attr.name(),
+                    new VarContext(field, sessionVariable, SESSION | 
attr.flag(),
+                            getValue(sessionVariable, field)));
+        }
+
+        // Variables only exist in global environment.
+        for (Field field : GlobalVariable.class.getDeclaredFields()) {
+            VarAttr attr = field.getAnnotation(VarAttr.class);
+            if (attr == null) {
+                continue;
+            }
+
+            field.setAccessible(true);
+            builder.put(attr.name(),
+                    new VarContext(field, null, GLOBAL | attr.flag(), 
getValue(null, field)));
+        }
+        return builder;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index e3416d2..198b5a4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -71,13 +71,13 @@ public class GlobalTransactionMgr implements Writable {
 
     private TransactionIdGenerator idGenerator = new TransactionIdGenerator();
     private TxnStateCallbackFactory callbackFactory = new 
TxnStateCallbackFactory();
-    
+
     private Catalog catalog;
 
     public GlobalTransactionMgr(Catalog catalog) {
         this.catalog = catalog;
     }
-    
+
     public TxnStateCallbackFactory getCallbackFactory() {
         return callbackFactory;
     }
@@ -103,15 +103,15 @@ public class GlobalTransactionMgr implements Writable {
     }
 
     public long beginTransaction(long dbId, List<Long> tableIdList, String 
label, TxnCoordinator coordinator, LoadJobSourceType sourceType,
-            long timeoutSecond)
+                                 long timeoutSecond)
             throws AnalysisException, LabelAlreadyUsedException, 
BeginTransactionException, DuplicatedRequestException,
             QuotaExceedException, MetaNotFoundException {
         return beginTransaction(dbId, tableIdList, label, null, coordinator, 
sourceType, -1, timeoutSecond);
     }
-    
+
     /**
      * the app could specify the transaction id
-     * 
+     *
      * requestId is used to judge that whether the request is a internal retry 
request
      * if label already exist, and requestId are equal, we return the exist 
tid, and consider this 'begin'
      * as success.
@@ -146,7 +146,7 @@ public class GlobalTransactionMgr implements Writable {
     private void checkValidTimeoutSecond(long timeoutSecond, int 
maxLoadTimeoutSecond, int minLoadTimeOutSecond) throws AnalysisException {
         if (timeoutSecond > maxLoadTimeoutSecond ||
                 timeoutSecond < minLoadTimeOutSecond) {
-            throw new AnalysisException("Invalid timeout. Timeout should 
between "
+            throw new AnalysisException("Invalid timeout: " + timeoutSecond + 
". Timeout should between "
                     + minLoadTimeOutSecond + " and " + maxLoadTimeoutSecond
                     + " seconds");
         }
@@ -177,7 +177,7 @@ public class GlobalTransactionMgr implements Writable {
             throws UserException {
         commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, 
null);
     }
-    
+
     /**
      * @param transactionId
      * @param tabletCommitInfos
@@ -193,21 +193,21 @@ public class GlobalTransactionMgr implements Writable {
         if (Config.disable_load_job) {
             throw new TransactionCommitFailedException("disable_load_job is 
set to true, all load jobs are prevented");
         }
-        
+
         LOG.debug("try to commit transaction: {}", transactionId);
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
         dbTransactionMgr.commitTransaction(tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
     }
-    
+
     public boolean commitAndPublishTransaction(Database db, List<Table> 
tableList, long transactionId,
                                                List<TabletCommitInfo> 
tabletCommitInfos, long timeoutMillis)
             throws UserException {
         return commitAndPublishTransaction(db, tableList, transactionId, 
tabletCommitInfos, timeoutMillis, null);
     }
-    
+
     public boolean commitAndPublishTransaction(Database db, List<Table> 
tableList, long transactionId,
-            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
-            TxnCommitAttachment txnCommitAttachment)
+                                               List<TabletCommitInfo> 
tabletCommitInfos, long timeoutMillis,
+                                               TxnCommitAttachment 
txnCommitAttachment)
             throws UserException {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
@@ -217,7 +217,7 @@ public class GlobalTransactionMgr implements Writable {
         try {
             commitTransaction(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
         } finally {
-           MetaLockUtils.writeUnlockTables(tableList);
+            MetaLockUtils.writeUnlockTables(tableList);
         }
         stopWatch.stop();
         long publishTimeoutMillis = timeoutMillis - stopWatch.getTime();
@@ -228,7 +228,7 @@ public class GlobalTransactionMgr implements Writable {
             return false;
         }
         return dbTransactionMgr.publishTransaction(db, transactionId, 
publishTimeoutMillis);
-   }
+    }
 
     public void abortTransaction(long dbId, long transactionId, String reason) 
throws UserException {
         abortTransaction(dbId, transactionId, reason, null);
@@ -267,7 +267,7 @@ public class GlobalTransactionMgr implements Writable {
             if (transactionState.getTableIdList().contains(tableId)) {
                 if (partitionId == null) {
                     return true;
-                } else if 
(transactionState.getTableCommitInfo(tableId).getPartitionCommitInfo(partitionId)
 != null){
+                } else if 
(transactionState.getTableCommitInfo(tableId).getPartitionCommitInfo(partitionId)
 != null) {
                     return true;
                 }
             }
@@ -290,7 +290,7 @@ public class GlobalTransactionMgr implements Writable {
     /**
      * Check whether a load job already exists before checking all 
`TransactionId` related with this load job have finished.
      * finished
-     * 
+     *
      * @throws AnalysisException is database does not exist anymore
      */
     public boolean isPreviousTransactionsFinished(long endTransactionId, long 
dbId, List<Long> tableIdList)
@@ -330,7 +330,7 @@ public class GlobalTransactionMgr implements Writable {
             return null;
         }
     }
-    
+
     public void setEditLog(EditLog editLog) {
         this.idGenerator.setEditLog(editLog);
     }
@@ -384,7 +384,7 @@ public class GlobalTransactionMgr implements Writable {
         }
         return infos;
     }
-    
+
     public List<List<String>> getDbTransStateInfo(long dbId) {
         try {
             DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
@@ -399,7 +399,7 @@ public class GlobalTransactionMgr implements Writable {
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
         return dbTransactionMgr.getTxnStateInfoList(running, limit);
     }
-    
+
     // get show info of a specified txnId
     public List<List<String>> getSingleTranInfo(long dbId, long txnId) throws 
AnalysisException {
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
@@ -410,7 +410,7 @@ public class GlobalTransactionMgr implements Writable {
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
         return dbTransactionMgr.getTableTransInfo(txnId);
     }
-    
+
     public List<List<Comparable>> getPartitionTransInfo(long dbId, long tid, 
long tableId)
             throws AnalysisException {
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
@@ -427,7 +427,7 @@ public class GlobalTransactionMgr implements Writable {
         }
         return txnNum;
     }
-    
+
     public TransactionIdGenerator getTransactionIDGenerator() {
         return this.idGenerator;
     }
@@ -441,7 +441,7 @@ public class GlobalTransactionMgr implements Writable {
         }
         idGenerator.write(out);
     }
-    
+
     public void readFields(DataInput in) throws IOException {
         int numTransactions = in.readInt();
         for (int i = 0; i < numTransactions; ++i) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index 243832c..fc8582f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -17,162 +17,168 @@
 
 package org.apache.doris.qe;
 
-import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.SetStmt;
 import org.apache.doris.analysis.SetType;
 import org.apache.doris.analysis.SetVar;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.SysVariableDesc;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.mysql.privilege.PaloAuth;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.persist.EditLog;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.utframe.UtFrameUtils;
 
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.List;
-
-import mockit.Expectations;
-import mockit.Mocked;
+import java.util.UUID;
 
 public class VariableMgrTest {
-    private static final Logger LOG = 
LoggerFactory.getLogger(VariableMgrTest.class);
-    @Mocked
-    private Catalog catalog;
-    @Mocked
-    private EditLog editLog;
-    @Mocked
-    private PaloAuth auth;
-
-    @Before
-    public void setUp() {
-        new Expectations() {
-            {
-                catalog.getEditLog();
-                minTimes = 0;
-                result = editLog;
-
-                editLog.logGlobalVariable((SessionVariable) any);
-                minTimes = 0;
-
-                catalog.getAuth();
-                minTimes = 0;
-                result = auth;
-
-                auth.checkGlobalPriv((ConnectContext) any, 
PrivPredicate.ADMIN);
-                minTimes = 0;
-                result = true;
-            }
-        };
+    private static String runningDir = "fe/mocked/VariableMgrTest/" + 
UUID.randomUUID().toString() + "/";
+    private static ConnectContext ctx;
 
-        new Expectations(catalog) {
-            {
-                Catalog.getCurrentCatalog();
-                minTimes = 0;
-                result = catalog;
-            }
-        };
+    @After
+    public void tearDown() throws Exception {
+        FileUtils.deleteDirectory(new File(runningDir));
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        UtFrameUtils.createDorisCluster(runningDir);
+        ctx = UtFrameUtils.createDefaultCtx();
+        String createDbStmtStr = "create database db1;";
+        CreateDbStmt createDbStmt = (CreateDbStmt) 
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
+        Catalog.getCurrentCatalog().createDb(createDbStmt);
     }
 
     @Test
-    public void testNormal() throws IllegalAccessException, 
NoSuchFieldException, UserException {
+    public void testNormal() throws Exception {
         SessionVariable var = VariableMgr.newSessionVariable();
-        Assert.assertEquals(2147483648L, var.getMaxExecMemByte());
-        Assert.assertEquals(300, var.getQueryTimeoutS());
-        Assert.assertEquals(false, var.enableProfile());
-        Assert.assertEquals(0L, var.getSqlMode());
+        long originExecMemLimit = var.getMaxExecMemByte();
+        boolean originEnableProfile = var.enableProfile();
+        long originQueryTimeOut = var.getQueryTimeoutS();
 
         List<List<String>> rows = VariableMgr.dump(SetType.SESSION, var, null);
         Assert.assertTrue(rows.size() > 5);
         for (List<String> row : rows) {
             if (row.get(0).equalsIgnoreCase("exec_mem_limit")) {
-                Assert.assertEquals("2147483648", row.get(1));
+                Assert.assertEquals(String.valueOf(originExecMemLimit), 
row.get(1));
             } else if (row.get(0).equalsIgnoreCase("is_report_success")) {
-                Assert.assertEquals("false", row.get(1));
+                Assert.assertEquals(String.valueOf(originEnableProfile), 
row.get(1));
             } else if (row.get(0).equalsIgnoreCase("query_timeout")) {
-                Assert.assertEquals("300", row.get(1));
+                Assert.assertEquals(String.valueOf(originQueryTimeOut), 
row.get(1));
             } else if (row.get(0).equalsIgnoreCase("sql_mode")) {
                 Assert.assertEquals("", row.get(1));
             }
         }
 
         // Set global variable
-        SetVar setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new 
IntLiteral(1234L));
-        setVar.analyze(null);
-        VariableMgr.setVar(var, setVar);
-        Assert.assertEquals(2147483648L, var.getMaxExecMemByte());
+        SetStmt stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global 
exec_mem_limit=1234", ctx);
+        SetExecutor executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals(originExecMemLimit, var.getMaxExecMemByte());
         var = VariableMgr.newSessionVariable();
         Assert.assertEquals(1234L, var.getMaxExecMemByte());
 
-        SetVar setVar2 = new SetVar(SetType.GLOBAL, 
"parallel_fragment_exec_instance_num", new IntLiteral(5L));
-        setVar2.analyze(null);
-        VariableMgr.setVar(var, setVar2);
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global 
parallel_fragment_exec_instance_num=5", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
         Assert.assertEquals(1L, var.getParallelExecInstanceNum());
         var = VariableMgr.newSessionVariable();
         Assert.assertEquals(5L, var.getParallelExecInstanceNum());
 
-        SetVar setVar3 = new SetVar(SetType.GLOBAL, "time_zone", new 
StringLiteral("Asia/Shanghai"));
-        setVar3.analyze(null);
-        VariableMgr.setVar(var, setVar3);
+        // Test checkTimeZoneValidAndStandardize
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global 
time_zone='+8:00'", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals("+08:00", 
VariableMgr.newSessionVariable().getTimeZone());
+
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global 
time_zone='Asia/Shanghai'", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
         Assert.assertEquals("Asia/Shanghai", var.getTimeZone());
         var = VariableMgr.newSessionVariable();
         Assert.assertEquals("Asia/Shanghai", var.getTimeZone());
 
-        setVar3 = new SetVar(SetType.GLOBAL, "time_zone", new 
StringLiteral("CST"));
-        setVar3.analyze(null);
-        VariableMgr.setVar(var, setVar3);
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global 
time_zone='CST'", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
         Assert.assertEquals("Asia/Shanghai", var.getTimeZone());
         var = VariableMgr.newSessionVariable();
         Assert.assertEquals("CST", var.getTimeZone());
 
-        // Set session variable
-        setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new 
IntLiteral(1234L));
-        setVar.analyze(null);
-        VariableMgr.setVar(var, setVar);
-        Assert.assertEquals(1234L, var.getMaxExecMemByte());
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global 
time_zone='8:00'", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals("+08:00", 
VariableMgr.newSessionVariable().getTimeZone());
+
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global 
time_zone='-8:00'", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals("-08:00", 
VariableMgr.newSessionVariable().getTimeZone());
 
-        setVar3 = new SetVar(SetType.SESSION, "time_zone", new 
StringLiteral("Asia/Jakarta"));
-        setVar3.analyze(null);
-        VariableMgr.setVar(var, setVar3);
-        Assert.assertEquals("Asia/Jakarta", var.getTimeZone());
+        // Set session variable
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set 
exec_mem_limit=1234", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals(1234L, 
ctx.getSessionVariable().getMaxExecMemByte());
+
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set 
time_zone='Asia/Jakarta'", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals("Asia/Jakarta", 
ctx.getSessionVariable().getTimeZone());
+
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set 
sql_mode='PIPES_AS_CONCAT'", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals(2L, ctx.getSessionVariable().getSqlMode());
+
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set 
runtime_filter_type ='BLOOM_FILTER'", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals(2L, 
ctx.getSessionVariable().getRuntimeFilterType());
 
         // Get from name
         SysVariableDesc desc = new SysVariableDesc("exec_mem_limit");
         Assert.assertEquals(var.getMaxExecMemByte() + "", 
VariableMgr.getValue(var, desc));
+    }
 
-        SetVar setVar4 = new SetVar(SetType.SESSION, "sql_mode", new 
StringLiteral(
-                SqlModeHelper.encode("PIPES_AS_CONCAT").toString()));
-        setVar4.analyze(null);
-        VariableMgr.setVar(var, setVar4);
-        Assert.assertEquals(2L, var.getSqlMode());
-
-        // Test checkTimeZoneValidAndStandardize
-        SetVar setVar5 = new SetVar(SetType.GLOBAL, "time_zone", new 
StringLiteral("+8:00"));
-        setVar5.analyze(null);
-        VariableMgr.setVar(var, setVar5);
-        Assert.assertEquals("+08:00", 
VariableMgr.newSessionVariable().getTimeZone());
-
-        SetVar setVar6 = new SetVar(SetType.GLOBAL, "time_zone", new 
StringLiteral("8:00"));
-        setVar6.analyze(null);
-        VariableMgr.setVar(var, setVar6);
-        Assert.assertEquals("+08:00", 
VariableMgr.newSessionVariable().getTimeZone());
-
-        SetVar setVar7 = new SetVar(SetType.GLOBAL, "time_zone", new 
StringLiteral("-8:00"));
-        setVar7.analyze(null);
-        VariableMgr.setVar(var, setVar7);
-        Assert.assertEquals("-08:00", 
VariableMgr.newSessionVariable().getTimeZone());
-
-        SetVar setVar8 = new SetVar(SetType.SESSION, "runtime_filter_type", 
new StringLiteral(
-                RuntimeFilterTypeHelper.encode("BLOOM_FILTER").toString()));
-        setVar8.analyze(null);
-        VariableMgr.setVar(var, setVar8);
-        Assert.assertEquals(2L, var.getRuntimeFilterType());
+    @Test
+    public void testGlobalVariablePersist() throws Exception {
+        Config.edit_log_roll_num = 1;
+        SetStmt stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global 
exec_mem_limit=5678", ctx);
+        SetExecutor executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals(5678, 
VariableMgr.newSessionVariable().getMaxExecMemByte());
+
+        Config.edit_log_roll_num = 100;
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global 
exec_mem_limit=7890", ctx);
+        executor = new SetExecutor(ctx, stmt);
+        executor.execute();
+        Assert.assertEquals(7890, 
VariableMgr.newSessionVariable().getMaxExecMemByte());
+
+        // Get currentCatalog first
+        Catalog currentCatalog = Catalog.getCurrentCatalog();
+        // Save real ckptThreadId
+        long ckptThreadId = currentCatalog.getCheckpointer().getId();
+        try {
+            // set checkpointThreadId to current thread id, so that when do 
checkpoint manually here,
+            // the Catalog.isCheckpointThread() will return true.
+            Deencapsulation.setField(Catalog.class, "checkpointThreadId", 
Thread.currentThread().getId());
+            currentCatalog.getCheckpointer().doCheckpoint();
+        } finally {
+            // Restore the ckptThreadId
+            Deencapsulation.setField(Catalog.class, "checkpointThreadId", 
ckptThreadId);
+        }
+        Assert.assertEquals(7890, 
VariableMgr.newSessionVariable().getMaxExecMemByte());
     }
 
     @Test(expected = UserException.class)
@@ -214,7 +220,6 @@ public class VariableMgrTest {
     @Test(expected = DdlException.class)
     public void testReadOnly() throws AnalysisException, DdlException {
         SysVariableDesc desc = new SysVariableDesc("version_comment");
-        LOG.info(VariableMgr.getValue(null, desc));
 
         // Set global variable
         SetVar setVar = new SetVar(SetType.SESSION, "version_comment", null);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to