This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f2b5893b74d [To dev/1.3] Fix that config changes rejected by DN will
be written to CN's config… (#14552)
f2b5893b74d is described below
commit f2b5893b74dec770934f71d500212474d5f4c191
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Dec 27 12:29:10 2024 +0800
[To dev/1.3] Fix that config changes rejected by DN will be written to CN's
config… (#14552)
* Fix that config changes rejected by DN will be written to CN's config file
(cherry picked from commit 9f4c2269f7f3d539beb9bfd193f43ff21131485a)
* fix test
* fix not forwarded config
(cherry picked from commit 4870e52237488cdfb9946d371ad3bc8b75360085)
---
.../iotdb/db/it/IoTDBSetConfigurationIT.java | 17 +++--
.../iotdb/confignode/manager/ConfigManager.java | 76 ++++++++++++++--------
.../iotdb/db/storageengine/StorageEngine.java | 2 +
.../iotdb/commons/conf/ConfigurationFileUtils.java | 16 +++--
4 files changed, 76 insertions(+), 35 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
index 8a792aacc38..5fc44c47d8a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
@@ -52,6 +52,7 @@ import static org.junit.Assert.assertTrue;
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class})
public class IoTDBSetConfigurationIT {
+
@BeforeClass
public static void setUp() throws Exception {
EnvFactory.getEnv().initClusterEnvironment();
@@ -108,6 +109,8 @@ public class IoTDBSetConfigurationIT {
} catch (Exception e) {
Assert.fail(e.getMessage());
}
+ // set configuration "enable_seq_space_compaction"="false"
+ // set configuration "enable_unseq_space_compaction"="false" on 0
Assert.assertTrue(
EnvFactory.getEnv().getConfigNodeWrapperList().stream()
.allMatch(
@@ -116,14 +119,17 @@ public class IoTDBSetConfigurationIT {
nodeWrapper,
"enable_seq_space_compaction=false",
"enable_unseq_space_compaction=false")));
+ // set configuration "enable_seq_space_compaction"="false"
Assert.assertTrue(
EnvFactory.getEnv().getDataNodeWrapperList().stream()
.allMatch(
nodeWrapper ->
- checkConfigFileContains(
- nodeWrapper,
- "enable_seq_space_compaction=false",
- "enable_cross_space_compaction=false")));
+ checkConfigFileContains(nodeWrapper,
"enable_seq_space_compaction=false")));
+ // set configuration "enable_cross_space_compaction"="false" on 1
+ assertTrue(
+ checkConfigFileContains(
+ EnvFactory.getEnv().getDataNodeWrapperList().get(0),
+ "enable_cross_space_compaction=false"));
}
@Test
@@ -240,6 +246,9 @@ public class IoTDBSetConfigurationIT {
assertFalse(
checkConfigFileContains(
EnvFactory.getEnv().getDataNodeWrapper(0),
"default_storage_group_level=-1"));
+ assertFalse(
+ checkConfigFileContains(
+ EnvFactory.getEnv().getConfigNodeWrapper(0),
"default_storage_group_level=-1"));
assertTrue(
checkConfigFileContains(
EnvFactory.getEnv().getDataNodeWrapper(0),
"default_storage_group_level=3"));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 87ceac0acac..a0b43ddc7a3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -249,6 +249,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
@@ -1579,41 +1580,62 @@ public class ConfigManager implements IManager {
return tsStatus;
}
}
- if (currentNodeId == req.getNodeId() || req.getNodeId() < 0) {
- URL url =
ConfigNodeDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME);
- boolean configurationFileFound = (url != null && new
File(url.getFile()).exists());
- TrimProperties properties = new TrimProperties();
- properties.putAll(req.getConfigs());
- if (configurationFileFound) {
- File file = new File(url.getFile());
- try {
- ConfigurationFileUtils.updateConfiguration(
- file,
- properties,
- mergedProps -> {
-
ConfigNodeDescriptor.getInstance().loadHotModifiedProps(mergedProps);
- });
- } catch (Exception e) {
- tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
- }
- } else {
- String msg =
- "Unable to find the configuration file. Some modifications are
made only in memory.";
- tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
msg);
- LOGGER.warn(msg);
- }
- if (currentNodeId == req.getNodeId()) {
- return tsStatus;
- }
+ if (currentNodeId == req.getNodeId()) {
+ return setConfigLocally(req, null);
+ } else if (req.getNodeId() < 0) {
+ // re-config CN in memory -> re-config DN in memory -> re-config DN in
file -> re-config CN in
+ // file
+ TSStatus finalTsStatus = tsStatus;
+ return setConfigLocally(req, () -> broadcastSetConfig(finalTsStatus,
req));
+ } else {
+ // not for this node, ignore it
+ return broadcastSetConfig(tsStatus, req);
}
+ }
+
+ private TSStatus broadcastSetConfig(TSStatus thisNodeResult,
TSetConfigurationReq req) {
List<TSStatus> statusListOfOtherNodes = nodeManager.setConfiguration(req);
List<TSStatus> statusList = new ArrayList<>(statusListOfOtherNodes.size()
+ 1);
- statusList.add(tsStatus);
+ statusList.add(thisNodeResult);
statusList.addAll(statusListOfOtherNodes);
return RpcUtils.squashResponseStatusList(statusList);
}
+ private TSStatus setConfigLocally(
+ TSetConfigurationReq req, Supplier<TSStatus> beforeWriteFileAction) {
+ // re-config this node only
+ TSStatus tsStatus;
+ URL url =
ConfigNodeDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME);
+ boolean configurationFileFound = (url != null && new
File(url.getFile()).exists());
+ TrimProperties newProperties = new TrimProperties();
+ newProperties.putAll(req.getConfigs());
+
+ if (configurationFileFound) {
+ File file = new File(url.getFile());
+ try {
+ tsStatus =
+ ConfigurationFileUtils.updateConfiguration(
+ file,
+ newProperties,
+ mergedProps -> {
+
ConfigNodeDescriptor.getInstance().loadHotModifiedProps(mergedProps);
+ return beforeWriteFileAction != null
+ ? beforeWriteFileAction.get()
+ : StatusUtils.OK;
+ });
+ } catch (Exception e) {
+ tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ } else {
+ String msg =
+ "Unable to find the configuration file. Some modifications are made
only in memory.";
+ tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, msg);
+ LOGGER.warn(msg);
+ }
+ return tsStatus;
+ }
+
@Override
public TSStatus startRepairData() {
TSStatus status = confirmLeader();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index cc0b852e3b0..0e1144919e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -686,6 +687,7 @@ public class StorageEngine implements IService {
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
+ return StatusUtils.OK;
});
} catch (Exception e) {
if (e instanceof InterruptedException) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
index f5702143dd9..bd7d561f449 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.commons.conf;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -248,7 +252,7 @@ public class ConfigurationFileUtils {
return ignoredConfigItems;
}
- public static void updateConfiguration(
+ public static TSStatus updateConfiguration(
File file, Properties newConfigItems, LoadHotModifiedPropsFunc
loadHotModifiedPropertiesFunc)
throws IOException, InterruptedException {
File lockFile = new File(file.getPath() + lockFileSuffix);
@@ -271,7 +275,10 @@ public class ConfigurationFileUtils {
// load hot modified properties
if (loadHotModifiedPropertiesFunc != null) {
- loadHotModifiedPropertiesFunc.loadHotModifiedProperties(mergedProps);
+ TSStatus status =
loadHotModifiedPropertiesFunc.loadHotModifiedProperties(mergedProps);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
}
// generate new configuration file content in memory
@@ -300,7 +307,7 @@ public class ConfigurationFileUtils {
}
if (newConfigItems.isEmpty()) {
// No configuration needs to be modified
- return;
+ return StatusUtils.OK;
}
logger.info("Updating configuration file {}", file.getAbsolutePath());
try (BufferedWriter writer = new BufferedWriter(new
FileWriter(lockFile))) {
@@ -316,6 +323,7 @@ public class ConfigurationFileUtils {
} finally {
releaseFileLock(lockFile);
}
+ return StatusUtils.OK;
}
private static String readConfigLinesWithoutLicense(File file) throws
IOException {
@@ -358,7 +366,7 @@ public class ConfigurationFileUtils {
@FunctionalInterface
public interface LoadHotModifiedPropsFunc {
- void loadHotModifiedProperties(TrimProperties properties)
+ TSStatus loadHotModifiedProperties(TrimProperties properties)
throws IOException, InterruptedException;
}
}