This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new f2b5893b74d [To dev/1.3] Fix that config changes rejected by DN will 
be written to CN's config… (#14552)
f2b5893b74d is described below

commit f2b5893b74dec770934f71d500212474d5f4c191
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Dec 27 12:29:10 2024 +0800

    [To dev/1.3] Fix that config changes rejected by DN will be written to CN's 
config… (#14552)
    
    * Fix that config changes rejected by DN will be written to CN's config file
    
    (cherry picked from commit 9f4c2269f7f3d539beb9bfd193f43ff21131485a)
    
    * fix test
    
    * fix not forwarded config
    
    (cherry picked from commit 4870e52237488cdfb9946d371ad3bc8b75360085)
---
 .../iotdb/db/it/IoTDBSetConfigurationIT.java       | 17 +++--
 .../iotdb/confignode/manager/ConfigManager.java    | 76 ++++++++++++++--------
 .../iotdb/db/storageengine/StorageEngine.java      |  2 +
 .../iotdb/commons/conf/ConfigurationFileUtils.java | 16 +++--
 4 files changed, 76 insertions(+), 35 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
index 8a792aacc38..5fc44c47d8a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
@@ -52,6 +52,7 @@ import static org.junit.Assert.assertTrue;
 @RunWith(IoTDBTestRunner.class)
 @Category({LocalStandaloneIT.class})
 public class IoTDBSetConfigurationIT {
+
   @BeforeClass
   public static void setUp() throws Exception {
     EnvFactory.getEnv().initClusterEnvironment();
@@ -108,6 +109,8 @@ public class IoTDBSetConfigurationIT {
     } catch (Exception e) {
       Assert.fail(e.getMessage());
     }
+    // set configuration "enable_seq_space_compaction"="false"
+    // set configuration "enable_unseq_space_compaction"="false" on 0
     Assert.assertTrue(
         EnvFactory.getEnv().getConfigNodeWrapperList().stream()
             .allMatch(
@@ -116,14 +119,17 @@ public class IoTDBSetConfigurationIT {
                         nodeWrapper,
                         "enable_seq_space_compaction=false",
                         "enable_unseq_space_compaction=false")));
+    // set configuration "enable_seq_space_compaction"="false"
     Assert.assertTrue(
         EnvFactory.getEnv().getDataNodeWrapperList().stream()
             .allMatch(
                 nodeWrapper ->
-                    checkConfigFileContains(
-                        nodeWrapper,
-                        "enable_seq_space_compaction=false",
-                        "enable_cross_space_compaction=false")));
+                    checkConfigFileContains(nodeWrapper, 
"enable_seq_space_compaction=false")));
+    // set configuration "enable_cross_space_compaction"="false" on 1
+    assertTrue(
+        checkConfigFileContains(
+            EnvFactory.getEnv().getDataNodeWrapperList().get(0),
+            "enable_cross_space_compaction=false"));
   }
 
   @Test
@@ -240,6 +246,9 @@ public class IoTDBSetConfigurationIT {
       assertFalse(
           checkConfigFileContains(
               EnvFactory.getEnv().getDataNodeWrapper(0), 
"default_storage_group_level=-1"));
+      assertFalse(
+          checkConfigFileContains(
+              EnvFactory.getEnv().getConfigNodeWrapper(0), 
"default_storage_group_level=-1"));
       assertTrue(
           checkConfigFileContains(
               EnvFactory.getEnv().getDataNodeWrapper(0), 
"default_storage_group_level=3"));
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 87ceac0acac..a0b43ddc7a3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -249,6 +249,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
@@ -1579,41 +1580,62 @@ public class ConfigManager implements IManager {
         return tsStatus;
       }
     }
-    if (currentNodeId == req.getNodeId() || req.getNodeId() < 0) {
-      URL url = 
ConfigNodeDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME);
-      boolean configurationFileFound = (url != null && new 
File(url.getFile()).exists());
-      TrimProperties properties = new TrimProperties();
-      properties.putAll(req.getConfigs());
 
-      if (configurationFileFound) {
-        File file = new File(url.getFile());
-        try {
-          ConfigurationFileUtils.updateConfiguration(
-              file,
-              properties,
-              mergedProps -> {
-                
ConfigNodeDescriptor.getInstance().loadHotModifiedProps(mergedProps);
-              });
-        } catch (Exception e) {
-          tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
-        }
-      } else {
-        String msg =
-            "Unable to find the configuration file. Some modifications are 
made only in memory.";
-        tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
msg);
-        LOGGER.warn(msg);
-      }
-      if (currentNodeId == req.getNodeId()) {
-        return tsStatus;
-      }
+    if (currentNodeId == req.getNodeId()) {
+      return setConfigLocally(req, null);
+    } else if (req.getNodeId() < 0) {
+      // re-config CN in memory -> re-config DN in memory -> re-config DN in 
file -> re-config CN in
+      // file
+      TSStatus finalTsStatus = tsStatus;
+      return setConfigLocally(req, () -> broadcastSetConfig(finalTsStatus, 
req));
+    } else {
+      // not for this node, ignore it
+      return broadcastSetConfig(tsStatus, req);
     }
+  }
+
+  private TSStatus broadcastSetConfig(TSStatus thisNodeResult, 
TSetConfigurationReq req) {
     List<TSStatus> statusListOfOtherNodes = nodeManager.setConfiguration(req);
     List<TSStatus> statusList = new ArrayList<>(statusListOfOtherNodes.size() 
+ 1);
-    statusList.add(tsStatus);
+    statusList.add(thisNodeResult);
     statusList.addAll(statusListOfOtherNodes);
     return RpcUtils.squashResponseStatusList(statusList);
   }
 
+  private TSStatus setConfigLocally(
+      TSetConfigurationReq req, Supplier<TSStatus> beforeWriteFileAction) {
+    // re-config this node only
+    TSStatus tsStatus;
+    URL url = 
ConfigNodeDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME);
+    boolean configurationFileFound = (url != null && new 
File(url.getFile()).exists());
+    TrimProperties newProperties = new TrimProperties();
+    newProperties.putAll(req.getConfigs());
+
+    if (configurationFileFound) {
+      File file = new File(url.getFile());
+      try {
+        tsStatus =
+            ConfigurationFileUtils.updateConfiguration(
+                file,
+                newProperties,
+                mergedProps -> {
+                  
ConfigNodeDescriptor.getInstance().loadHotModifiedProps(mergedProps);
+                  return beforeWriteFileAction != null
+                      ? beforeWriteFileAction.get()
+                      : StatusUtils.OK;
+                });
+      } catch (Exception e) {
+        tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+      }
+    } else {
+      String msg =
+          "Unable to find the configuration file. Some modifications are made 
only in memory.";
+      tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, msg);
+      LOGGER.warn(msg);
+    }
+    return tsStatus;
+  }
+
   @Override
   public TSStatus startRepairData() {
     TSStatus status = confirmLeader();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index cc0b852e3b0..0e1144919e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.commons.schema.ttl.TTLCache;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -686,6 +687,7 @@ public class StorageEngine implements IService {
             } catch (Exception e) {
               throw new IllegalArgumentException(e);
             }
+            return StatusUtils.OK;
           });
     } catch (Exception e) {
       if (e instanceof InterruptedException) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
index f5702143dd9..bd7d561f449 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
@@ -19,6 +19,10 @@
 
 package org.apache.iotdb.commons.conf;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -248,7 +252,7 @@ public class ConfigurationFileUtils {
     return ignoredConfigItems;
   }
 
-  public static void updateConfiguration(
+  public static TSStatus updateConfiguration(
       File file, Properties newConfigItems, LoadHotModifiedPropsFunc 
loadHotModifiedPropertiesFunc)
       throws IOException, InterruptedException {
     File lockFile = new File(file.getPath() + lockFileSuffix);
@@ -271,7 +275,10 @@ public class ConfigurationFileUtils {
 
       // load hot modified properties
       if (loadHotModifiedPropertiesFunc != null) {
-        loadHotModifiedPropertiesFunc.loadHotModifiedProperties(mergedProps);
+        TSStatus status = 
loadHotModifiedPropertiesFunc.loadHotModifiedProperties(mergedProps);
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          return status;
+        }
       }
 
       // generate new configuration file content in memory
@@ -300,7 +307,7 @@ public class ConfigurationFileUtils {
       }
       if (newConfigItems.isEmpty()) {
         // No configuration needs to be modified
-        return;
+        return StatusUtils.OK;
       }
       logger.info("Updating configuration file {}", file.getAbsolutePath());
       try (BufferedWriter writer = new BufferedWriter(new 
FileWriter(lockFile))) {
@@ -316,6 +323,7 @@ public class ConfigurationFileUtils {
     } finally {
       releaseFileLock(lockFile);
     }
+    return StatusUtils.OK;
   }
 
   private static String readConfigLinesWithoutLicense(File file) throws 
IOException {
@@ -358,7 +366,7 @@ public class ConfigurationFileUtils {
 
   @FunctionalInterface
   public interface LoadHotModifiedPropsFunc {
-    void loadHotModifiedProperties(TrimProperties properties)
+    TSStatus loadHotModifiedProperties(TrimProperties properties)
         throws IOException, InterruptedException;
   }
 }

Reply via email to