This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2c4bb243956 Fix that config changes rejected by DN will be written to
CN's config (#14551)
2c4bb243956 is described below
commit 2c4bb243956f2b2b8c08d3efdd7f71fa6e4bc4e6
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Dec 27 12:29:04 2024 +0800
Fix that config changes rejected by DN will be written to CN's config
(#14551)
* Fix that config changes rejected by DN will be written to CN's config file
* fix test
(cherry picked from commit e15012c1dbfeae180b1ff17999d730e6912deabd)
* fix test
* Revert "fix test"
This reverts commit f6bf884aa68abadeb8647506f5b57b9ac5e6251f.
* fix not forwarded config
---
.../iotdb/db/it/IoTDBSetConfigurationIT.java | 17 +++--
.../iotdb/confignode/manager/ConfigManager.java | 76 ++++++++++++++--------
.../iotdb/db/storageengine/StorageEngine.java | 2 +
.../iotdb/commons/conf/ConfigurationFileUtils.java | 16 +++--
4 files changed, 76 insertions(+), 35 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
index 4df8f79af15..7f5f234859b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java
@@ -50,6 +50,7 @@ import static org.junit.Assert.assertTrue;
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class})
public class IoTDBSetConfigurationIT {
+
@BeforeClass
public static void setUp() throws Exception {
EnvFactory.getEnv().initClusterEnvironment();
@@ -70,6 +71,8 @@ public class IoTDBSetConfigurationIT {
} catch (Exception e) {
Assert.fail(e.getMessage());
}
+ // set configuration "enable_seq_space_compaction"="false"
+ // set configuration "enable_unseq_space_compaction"="false" on 0
Assert.assertTrue(
EnvFactory.getEnv().getConfigNodeWrapperList().stream()
.allMatch(
@@ -78,14 +81,17 @@ public class IoTDBSetConfigurationIT {
nodeWrapper,
"enable_seq_space_compaction=false",
"enable_unseq_space_compaction=false")));
+ // set configuration "enable_seq_space_compaction"="false"
Assert.assertTrue(
EnvFactory.getEnv().getDataNodeWrapperList().stream()
.allMatch(
nodeWrapper ->
- checkConfigFileContains(
- nodeWrapper,
- "enable_seq_space_compaction=false",
- "enable_cross_space_compaction=false")));
+ checkConfigFileContains(nodeWrapper,
"enable_seq_space_compaction=false")));
+ // set configuration "enable_cross_space_compaction"="false" on 1
+ assertTrue(
+ checkConfigFileContains(
+ EnvFactory.getEnv().getDataNodeWrapperList().get(0),
+ "enable_cross_space_compaction=false"));
}
@Test
@@ -180,6 +186,9 @@ public class IoTDBSetConfigurationIT {
assertFalse(
checkConfigFileContains(
EnvFactory.getEnv().getDataNodeWrapper(0),
"default_storage_group_level=-1"));
+ assertFalse(
+ checkConfigFileContains(
+ EnvFactory.getEnv().getConfigNodeWrapper(0),
"default_storage_group_level=-1"));
assertTrue(
checkConfigFileContains(
EnvFactory.getEnv().getDataNodeWrapper(0),
"default_storage_group_level=3"));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index d1006aaba6f..fc147ed411b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -261,6 +261,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
@@ -1647,41 +1648,62 @@ public class ConfigManager implements IManager {
return tsStatus;
}
}
- if (currentNodeId == req.getNodeId() || req.getNodeId() < 0) {
- URL url =
ConfigNodeDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME);
- boolean configurationFileFound = (url != null && new
File(url.getFile()).exists());
- TrimProperties properties = new TrimProperties();
- properties.putAll(req.getConfigs());
- if (configurationFileFound) {
- File file = new File(url.getFile());
- try {
- ConfigurationFileUtils.updateConfiguration(
- file,
- properties,
- mergedProps -> {
-
ConfigNodeDescriptor.getInstance().loadHotModifiedProps(mergedProps);
- });
- } catch (Exception e) {
- tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
- }
- } else {
- String msg =
- "Unable to find the configuration file. Some modifications are
made only in memory.";
- tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
msg);
- LOGGER.warn(msg);
- }
- if (currentNodeId == req.getNodeId()) {
- return tsStatus;
- }
+ if (currentNodeId == req.getNodeId()) {
+ return setConfigLocally(req, null);
+ } else if (req.getNodeId() < 0) {
+ // re-config CN in memory -> re-config DN in memory -> re-config DN in
file -> re-config CN in
+ // file
+ TSStatus finalTsStatus = tsStatus;
+ return setConfigLocally(req, () -> broadcastSetConfig(finalTsStatus,
req));
+ } else {
+ // not for this node, ignore it
+ return broadcastSetConfig(tsStatus, req);
}
+ }
+
+ private TSStatus broadcastSetConfig(TSStatus thisNodeResult,
TSetConfigurationReq req) {
List<TSStatus> statusListOfOtherNodes = nodeManager.setConfiguration(req);
List<TSStatus> statusList = new ArrayList<>(statusListOfOtherNodes.size()
+ 1);
- statusList.add(tsStatus);
+ statusList.add(thisNodeResult);
statusList.addAll(statusListOfOtherNodes);
return RpcUtils.squashResponseStatusList(statusList);
}
+ private TSStatus setConfigLocally(
+ TSetConfigurationReq req, Supplier<TSStatus> beforeWriteFileAction) {
+ // re-config this node only
+ TSStatus tsStatus;
+ URL url =
ConfigNodeDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME);
+ boolean configurationFileFound = (url != null && new
File(url.getFile()).exists());
+ TrimProperties newProperties = new TrimProperties();
+ newProperties.putAll(req.getConfigs());
+
+ if (configurationFileFound) {
+ File file = new File(url.getFile());
+ try {
+ tsStatus =
+ ConfigurationFileUtils.updateConfiguration(
+ file,
+ newProperties,
+ mergedProps -> {
+
ConfigNodeDescriptor.getInstance().loadHotModifiedProps(mergedProps);
+ return beforeWriteFileAction != null
+ ? beforeWriteFileAction.get()
+ : StatusUtils.OK;
+ });
+ } catch (Exception e) {
+ tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
+ }
+ } else {
+ String msg =
+ "Unable to find the configuration file. Some modifications are made
only in memory.";
+ tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, msg);
+ LOGGER.warn(msg);
+ }
+ return tsStatus;
+ }
+
@Override
public TSStatus startRepairData() {
TSStatus status = confirmLeader();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 6496aaf256c..cadae97d144 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -686,6 +687,7 @@ public class StorageEngine implements IService {
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
+ return StatusUtils.OK;
});
} catch (Exception e) {
if (e instanceof InterruptedException) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
index f5702143dd9..bd7d561f449 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/ConfigurationFileUtils.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.commons.conf;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -248,7 +252,7 @@ public class ConfigurationFileUtils {
return ignoredConfigItems;
}
- public static void updateConfiguration(
+ public static TSStatus updateConfiguration(
File file, Properties newConfigItems, LoadHotModifiedPropsFunc
loadHotModifiedPropertiesFunc)
throws IOException, InterruptedException {
File lockFile = new File(file.getPath() + lockFileSuffix);
@@ -271,7 +275,10 @@ public class ConfigurationFileUtils {
// load hot modified properties
if (loadHotModifiedPropertiesFunc != null) {
- loadHotModifiedPropertiesFunc.loadHotModifiedProperties(mergedProps);
+ TSStatus status =
loadHotModifiedPropertiesFunc.loadHotModifiedProperties(mergedProps);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
}
// generate new configuration file content in memory
@@ -300,7 +307,7 @@ public class ConfigurationFileUtils {
}
if (newConfigItems.isEmpty()) {
// No configuration needs to be modified
- return;
+ return StatusUtils.OK;
}
logger.info("Updating configuration file {}", file.getAbsolutePath());
try (BufferedWriter writer = new BufferedWriter(new
FileWriter(lockFile))) {
@@ -316,6 +323,7 @@ public class ConfigurationFileUtils {
} finally {
releaseFileLock(lockFile);
}
+ return StatusUtils.OK;
}
private static String readConfigLinesWithoutLicense(File file) throws
IOException {
@@ -358,7 +366,7 @@ public class ConfigurationFileUtils {
@FunctionalInterface
public interface LoadHotModifiedPropsFunc {
- void loadHotModifiedProperties(TrimProperties properties)
+ TSStatus loadHotModifiedProperties(TrimProperties properties)
throws IOException, InterruptedException;
}
}