This is an automated email from the ASF dual-hosted git repository. CRZbulabula pushed a commit to branch analysis/confignode-hot-reload-configs in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6dc94edbd1c6005ca24e7a5ce6b6f589f1a92007 Author: Yongzao <[email protected]> AuthorDate: Wed Jun 17 18:55:36 2026 +0800 Support hot reload for cluster runtime configs --- .../iotdb/db/it/IoTDBSetConfigurationIT.java | 203 +++++++++++++++++++++ .../iotdb/confignode/conf/ConfigNodeConfig.java | 18 +- .../confignode/conf/ConfigNodeDescriptor.java | 202 ++++++++++++++------ .../iotdb/confignode/manager/ConfigManager.java | 37 ++-- .../confignode/manager/RetryFailedTasksThread.java | 23 ++- .../iotdb/confignode/manager/load/LoadManager.java | 8 + .../manager/load/cache/AbstractLoadCache.java | 27 +-- .../confignode/manager/load/cache/LoadCache.java | 6 + .../load/cache/region/RegionGroupCache.java | 4 + .../manager/load/service/EventService.java | 23 ++- .../manager/load/service/HeartbeatService.java | 23 ++- .../manager/load/service/StatisticsService.java | 23 ++- .../manager/load/service/TopologyService.java | 25 +-- .../manager/partition/PartitionManager.java | 8 +- .../manager/schema/ClusterSchemaManager.java | 10 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 7 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 34 ++++ .../plan/AbstractFragmentParallelPlanner.java | 4 +- .../conf/iotdb-system.properties.template | 18 +- 19 files changed, 559 insertions(+), 144 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java index 4b569bf1010..dffb0d1a025 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSetConfigurationIT.java @@ -20,6 +20,9 @@ package org.apache.iotdb.db.it; import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; @@ -117,6 +120,155 @@ public class IoTDBSetConfigurationIT { "topology_probing_timeout_ratio=0.4"))); } + @Test + public void testHotReloadHeartbeatInterval() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("set configuration \"heartbeat_interval_in_ms\"=\"500\""); + assertAppliedConfiguration(0, "heartbeat_interval_in_ms", "500"); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } finally { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("set configuration \"heartbeat_interval_in_ms\"=\"1000\""); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + Assert.assertTrue( + EnvFactory.getEnv().getConfigNodeWrapperList().stream() + .allMatch( + nodeWrapper -> + checkConfigFileContains(nodeWrapper, "heartbeat_interval_in_ms=1000"))); + } + + @Test + public void testHotReloadContinuousQueryMinEveryInterval() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("set configuration \"continuous_query_min_every_interval_in_ms\"=\"50\""); + assertAppliedConfiguration(0, "continuous_query_min_every_interval_in_ms", "50"); + assertAppliedConfiguration( + EnvFactory.getEnv().getConfigNodeWrapperList().size(), + "continuous_query_min_every_interval_in_ms", + "50"); + statement.execute( + "CREATE CQ hot_reload_cq\n" + + "RESAMPLE EVERY 50ms\n" + + "BEGIN \n" + + " SELECT count(s1) \n" + + " INTO root.sg_count.d(count_s1)\n" + + " FROM root.sg.d\n" + + " GROUP BY(30m)\n" + + "END"); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } finally { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + try { + statement.execute("DROP CQ hot_reload_cq"); + } catch (SQLException ignored) { + // The CQ may not exist if the hot-reload assertion failed before creation. + } + statement.execute( + "set configuration \"continuous_query_min_every_interval_in_ms\"=\"1000\""); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + Assert.assertTrue( + EnvFactory.getEnv().getNodeWrapperList().stream() + .allMatch( + nodeWrapper -> + checkConfigFileContains( + nodeWrapper, "continuous_query_min_every_interval_in_ms=1000"))); + } + + @Test + public void testHotReloadRegionGroupExtensionConfiguration() { + String database = "root.hot_reload_region"; + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("set configuration \"schema_region_group_extension_policy\"=\"CUSTOM\""); + statement.execute("set configuration \"data_region_group_extension_policy\"=\"CUSTOM\""); + statement.execute("set configuration \"default_schema_region_group_num_per_database\"=\"2\""); + statement.execute("set configuration \"default_data_region_group_num_per_database\"=\"3\""); + statement.execute("set configuration \"schema_region_per_data_node\"=\"2\""); + statement.execute("set configuration \"data_region_per_data_node\"=\"2\""); + + assertAppliedConfiguration(0, "schema_region_group_extension_policy", "CUSTOM"); + assertAppliedConfiguration(0, "data_region_group_extension_policy", "CUSTOM"); + assertAppliedConfiguration(0, "default_schema_region_group_num_per_database", "2"); + assertAppliedConfiguration(0, "default_data_region_group_num_per_database", "3"); + assertShowVariable(statement, ColumnHeaderConstant.SCHEMA_REGION_PER_DATA_NODE, "2"); + assertShowVariable(statement, ColumnHeaderConstant.DATA_REGION_PER_DATA_NODE, "2"); + + statement.execute("CREATE DATABASE " + database); + statement.execute("INSERT INTO " + database + ".d(timestamp, s1) VALUES (1, 1)"); + assertRegionGroupNum(statement, database, 2, 3); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } finally { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + try { + statement.execute("DELETE DATABASE " + database); + } catch (SQLException ignored) { + // The database may not exist if the hot-reload assertion failed before creation. + } + statement.execute("set configuration \"schema_region_group_extension_policy\"=\"AUTO\""); + statement.execute("set configuration \"data_region_group_extension_policy\"=\"AUTO\""); + statement.execute( + "set configuration \"default_schema_region_group_num_per_database\"=\"1\""); + statement.execute("set configuration \"default_data_region_group_num_per_database\"=\"2\""); + statement.execute("set configuration \"schema_region_per_data_node\"=\"1\""); + statement.execute("set configuration \"data_region_per_data_node\"=\"0\""); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + Assert.assertTrue( + EnvFactory.getEnv().getConfigNodeWrapperList().stream() + .allMatch( + nodeWrapper -> + checkConfigFileContains( + nodeWrapper, + "schema_region_group_extension_policy=AUTO", + "data_region_group_extension_policy=AUTO", + "default_schema_region_group_num_per_database=1", + "default_data_region_group_num_per_database=2", + "schema_region_per_data_node=1", + "data_region_per_data_node=0"))); + } + + @Test + public void testHotReloadReadConsistencyLevel() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("set configuration \"read_consistency_level\"=\"weak\""); + assertAppliedConfiguration(0, "read_consistency_level", "weak"); + assertAppliedConfiguration( + EnvFactory.getEnv().getConfigNodeWrapperList().size(), "read_consistency_level", "weak"); + assertShowVariable(statement, ColumnHeaderConstant.READ_CONSISTENCY_LEVEL, "weak"); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } finally { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("set configuration \"read_consistency_level\"=\"strong\""); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + Assert.assertTrue( + EnvFactory.getEnv().getNodeWrapperList().stream() + .allMatch( + nodeWrapper -> + checkConfigFileContains(nodeWrapper, "read_consistency_level=strong"))); + } + @Test public void testSetClusterName() throws Exception { // set cluster name on cn and dn @@ -177,6 +329,57 @@ public class IoTDBSetConfigurationIT { } } + private static void assertAppliedConfiguration(int nodeId, String key, String value) + throws Exception { + try (ITableSession tableSessionConnection = EnvFactory.getEnv().getTableSessionConnection()) { + SessionDataSet sessionDataSet = + tableSessionConnection.executeQueryStatement("show configuration on " + nodeId); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + if (key.equals(iterator.getString(1))) { + Assert.assertEquals(value, iterator.isNull(2) ? null : iterator.getString(2)); + return; + } + } + } + Assert.fail("Cannot find applied configuration: " + key); + } + + private static void assertShowVariable(Statement statement, String key, String value) + throws SQLException { + try (ResultSet resultSet = statement.executeQuery("show variables")) { + while (resultSet.next()) { + if (key.equals(resultSet.getString(1))) { + Assert.assertEquals(value, resultSet.getString(2)); + return; + } + } + } + Assert.fail("Cannot find variable: " + key); + } + + private static void assertRegionGroupNum( + Statement statement, + String database, + int expectedSchemaRegionGroupNum, + int expectedDataRegionGroupNum) + throws SQLException { + int schemaRegionGroupNum = 0; + int dataRegionGroupNum = 0; + try (ResultSet resultSet = statement.executeQuery("show regions of database " + database)) { + while (resultSet.next()) { + String regionType = resultSet.getString(ColumnHeaderConstant.TYPE); + if ("SchemaRegion".equals(regionType)) { + schemaRegionGroupNum++; + } else if ("DataRegion".equals(regionType)) { + dataRegionGroupNum++; + } + } + } + Assert.assertEquals(expectedSchemaRegionGroupNum, schemaRegionGroupNum); + Assert.assertEquals(expectedDataRegionGroupNum, dataRegionGroupNum); + } + @Test public void testSetDefaultSGLevel() throws SQLException { try (Connection connection = EnvFactory.getEnv().getConnection(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 326d8b43ceb..3c5b6a927ec 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -95,7 +95,7 @@ public class ConfigNodeConfig { private String dataPartitionAllocationStrategy = "INHERIT"; /** The policy of extension SchemaRegionGroup for each Database. */ - private RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy = + private volatile RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy = RegionGroupExtensionPolicy.AUTO; /** @@ -103,13 +103,13 @@ public class ConfigNodeConfig { * SchemaRegionGroups for each Database. When set schema_region_group_extension_policy=AUTO, this * parameter is the default minimal number of SchemaRegionGroups for each Database. */ - private int defaultSchemaRegionGroupNumPerDatabase = 1; + private volatile int defaultSchemaRegionGroupNumPerDatabase = 1; /** The maximum number of SchemaRegions expected to be managed by each DataNode. */ - private int schemaRegionPerDataNode = 1; + private volatile int schemaRegionPerDataNode = 1; /** The policy of extension DataRegionGroup for each Database. */ - private RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy = + private volatile RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy = RegionGroupExtensionPolicy.AUTO; /** @@ -117,13 +117,13 @@ public class ConfigNodeConfig { * DataRegionGroups for each Database. When set data_region_group_extension_policy=AUTO, this * parameter is the default minimal number of DataRegionGroups for each Database. */ - private int defaultDataRegionGroupNumPerDatabase = 2; + private volatile int defaultDataRegionGroupNumPerDatabase = 2; /** * The maximum number of DataRegions expected to be managed by each DataNode. Set to 0 means that * each dataNode automatically has the number of CPU cores / 2 regions. */ - private int dataRegionPerDataNode = 0; + private volatile int dataRegionPerDataNode = 0; /** each dataNode automatically has the number of CPU cores / 2 regions. */ private final double dataRegionPerDataNodeProportion = 0.5; @@ -202,7 +202,7 @@ public class ConfigNodeConfig { Math.max(Runtime.getRuntime().availableProcessors() / 4, 16); /** The heartbeat interval in milliseconds. */ - private long heartbeatIntervalInMs = 1000; + private volatile long heartbeatIntervalInMs = 1000; /** Failure detector implementation */ private String failureDetector = IFailureDetector.PHI_ACCRUAL_DETECTOR; @@ -234,7 +234,7 @@ public class ConfigNodeConfig { /** The route priority policy of cluster read/write requests. */ private String routePriorityPolicy = IPriorityBalancer.LEADER_POLICY; - private String readConsistencyLevel = "strong"; + private volatile String readConsistencyLevel = "strong"; /** RatisConsensus protocol, Max size for a single log append request from leader. */ private long dataRegionRatisConsensusLogAppenderBufferSize = 16 * 1024 * 1024L; @@ -293,7 +293,7 @@ public class ConfigNodeConfig { /** CQ related. */ private int cqSubmitThread = 2; - private long cqMinEveryIntervalInMs = 1_000; + private volatile long cqMinEveryIntervalInMs = 1_000; /** RatisConsensus protocol, request timeout for ratis client. */ private long dataRegionRatisRequestTimeoutMs = 10000L; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 1245bef7f3d..2564ce48dbc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -214,42 +214,7 @@ public class ConfigNodeDescriptor { properties.getProperty( "data_replication_factor", String.valueOf(conf.getDataReplicationFactor())))); - conf.setSchemaRegionGroupExtensionPolicy( - RegionGroupExtensionPolicy.parse( - properties.getProperty( - "schema_region_group_extension_policy", - conf.getSchemaRegionGroupExtensionPolicy().getPolicy()))); - - conf.setDefaultSchemaRegionGroupNumPerDatabase( - Integer.parseInt( - properties.getProperty( - "default_schema_region_group_num_per_database", - String.valueOf(conf.getDefaultSchemaRegionGroupNumPerDatabase())))); - - conf.setSchemaRegionPerDataNode( - (int) - Double.parseDouble( - properties.getProperty( - "schema_region_per_data_node", - String.valueOf(conf.getSchemaRegionPerDataNode())))); - - conf.setDataRegionGroupExtensionPolicy( - RegionGroupExtensionPolicy.parse( - properties.getProperty( - "data_region_group_extension_policy", - conf.getDataRegionGroupExtensionPolicy().getPolicy()))); - - conf.setDefaultDataRegionGroupNumPerDatabase( - Integer.parseInt( - properties.getProperty( - "default_data_region_group_num_per_database", - String.valueOf(conf.getDefaultDataRegionGroupNumPerDatabase())))); - - conf.setDataRegionPerDataNode( - (int) - Double.parseDouble( - properties.getProperty( - "data_region_per_data_node", String.valueOf(conf.getDataRegionPerDataNode())))); + loadRegionGroupExtensionConfig(properties).applyTo(conf); try { conf.setRegionAllocateStrategy( @@ -307,10 +272,7 @@ public class ConfigNodeDescriptor { "pipe_receiver_file_dir", conf.getSystemDir() + File.separator + "pipe" + File.separator + "receiver"))); - conf.setHeartbeatIntervalInMs( - Long.parseLong( - properties.getProperty( - "heartbeat_interval_in_ms", String.valueOf(conf.getHeartbeatIntervalInMs())))); + conf.setHeartbeatIntervalInMs(loadHeartbeatIntervalInMs(properties, false)); String failureDetector = properties.getProperty("failure_detector", conf.getFailureDetector()); if (IFailureDetector.FIXED_DETECTOR.equals(failureDetector) @@ -397,16 +359,7 @@ public class ConfigNodeDescriptor { ConfigNodeMessages.UNKNOWN_ROUTE_PRIORITY_POLICY_PLEASE_SET_TO, routePriorityPolicy)); } - String readConsistencyLevel = - properties.getProperty("read_consistency_level", conf.getReadConsistencyLevel()); - if (readConsistencyLevel.equals("strong") || readConsistencyLevel.equals("weak")) { - conf.setReadConsistencyLevel(readConsistencyLevel); - } else { - throw new IOException( - String.format( - ConfigNodeMessages.UNKNOWN_READ_CONSISTENCY_LEVEL_PLEASE_SET_TO, - readConsistencyLevel)); - } + conf.setReadConsistencyLevel(loadReadConsistencyLevel(properties)); // commons commonDescriptor.loadCommonProps(properties); @@ -768,7 +721,7 @@ public class ConfigNodeDescriptor { String.valueOf(conf.getForceWalPeriodForConfigNodeSimpleInMs())))); } - private void loadCQConfig(TrimProperties properties) { + private void loadCQConfig(TrimProperties properties) throws IOException { int cqSubmitThread = Integer.parseInt( properties.getProperty( @@ -782,12 +735,48 @@ public class ConfigNodeDescriptor { } conf.setCqSubmitThread(cqSubmitThread); + conf.setCqMinEveryIntervalInMs(loadCqMinEveryIntervalInMs(properties, false)); + } + + private long loadHeartbeatIntervalInMs(TrimProperties properties, boolean rejectInvalid) + throws IOException { + long heartbeatIntervalInMs = + Long.parseLong( + properties.getProperty( + "heartbeat_interval_in_ms", String.valueOf(conf.getHeartbeatIntervalInMs()))); + if (heartbeatIntervalInMs <= 0) { + String warning = + "heartbeat_interval_in_ms should be greater than 0, but was " + + heartbeatIntervalInMs + + ", using previous value " + + conf.getHeartbeatIntervalInMs() + + "."; + if (rejectInvalid) { + throw new IOException(warning); + } + LOGGER.warn( + "heartbeat_interval_in_ms should be greater than 0, but was {}, using previous value {}.", + heartbeatIntervalInMs, + conf.getHeartbeatIntervalInMs()); + heartbeatIntervalInMs = conf.getHeartbeatIntervalInMs(); + } + return heartbeatIntervalInMs; + } + + private long loadCqMinEveryIntervalInMs(TrimProperties properties, boolean rejectInvalid) + throws IOException { long cqMinEveryIntervalInMs = Long.parseLong( properties.getProperty( "continuous_query_min_every_interval_in_ms", String.valueOf(conf.getCqMinEveryIntervalInMs()))); if (cqMinEveryIntervalInMs <= 0) { + if (rejectInvalid) { + throw new IOException( + "continuous_query_min_every_interval_in_ms should be greater than 0, but current value is " + + cqMinEveryIntervalInMs + + "."); + } LOGGER.warn( ConfigNodeMessages.CONTINUOUS_QUERY_MIN_EVERY_INTERVAL_IN_MS_SHOULD_BE_GREATER, cqMinEveryIntervalInMs, @@ -795,7 +784,107 @@ public class ConfigNodeDescriptor { cqMinEveryIntervalInMs = conf.getCqMinEveryIntervalInMs(); } - conf.setCqMinEveryIntervalInMs(cqMinEveryIntervalInMs); + return cqMinEveryIntervalInMs; + } + + private RegionGroupExtensionConfig loadRegionGroupExtensionConfig(TrimProperties properties) + throws IOException { + RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy = + RegionGroupExtensionPolicy.parse( + properties.getProperty( + "schema_region_group_extension_policy", + conf.getSchemaRegionGroupExtensionPolicy().getPolicy())); + int defaultSchemaRegionGroupNumPerDatabase = + Integer.parseInt( + properties.getProperty( + "default_schema_region_group_num_per_database", + String.valueOf(conf.getDefaultSchemaRegionGroupNumPerDatabase()))); + int schemaRegionPerDataNode = + (int) + Double.parseDouble( + properties.getProperty( + "schema_region_per_data_node", + String.valueOf(conf.getSchemaRegionPerDataNode()))); + RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy = + RegionGroupExtensionPolicy.parse( + properties.getProperty( + "data_region_group_extension_policy", + conf.getDataRegionGroupExtensionPolicy().getPolicy())); + int defaultDataRegionGroupNumPerDatabase = + Integer.parseInt( + properties.getProperty( + "default_data_region_group_num_per_database", + String.valueOf(conf.getDefaultDataRegionGroupNumPerDatabase()))); + int dataRegionPerDataNode = + (int) + Double.parseDouble( + properties.getProperty( + "data_region_per_data_node", String.valueOf(conf.getDataRegionPerDataNode()))); + + if (defaultSchemaRegionGroupNumPerDatabase <= 0) { + throw new IOException("default_schema_region_group_num_per_database should be positive."); + } + if (schemaRegionPerDataNode <= 0) { + throw new IOException("schema_region_per_data_node should be positive."); + } + if (defaultDataRegionGroupNumPerDatabase <= 0) { + throw new IOException("default_data_region_group_num_per_database should be positive."); + } + if (dataRegionPerDataNode < 0) { + throw new IOException("data_region_per_data_node should be non-negative."); + } + + return new RegionGroupExtensionConfig( + schemaRegionGroupExtensionPolicy, + defaultSchemaRegionGroupNumPerDatabase, + schemaRegionPerDataNode, + dataRegionGroupExtensionPolicy, + defaultDataRegionGroupNumPerDatabase, + dataRegionPerDataNode); + } + + private String loadReadConsistencyLevel(TrimProperties properties) throws IOException { + String readConsistencyLevel = + properties.getProperty("read_consistency_level", conf.getReadConsistencyLevel()); + if (readConsistencyLevel.equals("strong") || readConsistencyLevel.equals("weak")) { + return readConsistencyLevel; + } + throw new IOException( + String.format( + ConfigNodeMessages.UNKNOWN_READ_CONSISTENCY_LEVEL_PLEASE_SET_TO, readConsistencyLevel)); + } + + private static class RegionGroupExtensionConfig { + private final RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy; + private final int defaultSchemaRegionGroupNumPerDatabase; + private final int schemaRegionPerDataNode; + private final RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy; + private final int defaultDataRegionGroupNumPerDatabase; + private final int dataRegionPerDataNode; + + private RegionGroupExtensionConfig( + RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy, + int defaultSchemaRegionGroupNumPerDatabase, + int schemaRegionPerDataNode, + RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy, + int defaultDataRegionGroupNumPerDatabase, + int dataRegionPerDataNode) { + this.schemaRegionGroupExtensionPolicy = schemaRegionGroupExtensionPolicy; + this.defaultSchemaRegionGroupNumPerDatabase = defaultSchemaRegionGroupNumPerDatabase; + this.schemaRegionPerDataNode = schemaRegionPerDataNode; + this.dataRegionGroupExtensionPolicy = dataRegionGroupExtensionPolicy; + this.defaultDataRegionGroupNumPerDatabase = defaultDataRegionGroupNumPerDatabase; + this.dataRegionPerDataNode = dataRegionPerDataNode; + } + + private void applyTo(ConfigNodeConfig conf) { + conf.setSchemaRegionGroupExtensionPolicy(schemaRegionGroupExtensionPolicy); + conf.setDefaultSchemaRegionGroupNumPerDatabase(defaultSchemaRegionGroupNumPerDatabase); + conf.setSchemaRegionPerDataNode(schemaRegionPerDataNode); + conf.setDataRegionGroupExtensionPolicy(dataRegionGroupExtensionPolicy); + conf.setDefaultDataRegionGroupNumPerDatabase(defaultDataRegionGroupNumPerDatabase); + conf.setDataRegionPerDataNode(dataRegionPerDataNode); + } } /** @@ -826,6 +915,15 @@ public class ConfigNodeDescriptor { ConfigurationFileUtils.updateAppliedProperties(properties, true); Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME)) .ifPresent(conf::setClusterName); + long heartbeatIntervalInMs = loadHeartbeatIntervalInMs(properties, true); + long cqMinEveryIntervalInMs = loadCqMinEveryIntervalInMs(properties, true); + RegionGroupExtensionConfig regionGroupExtensionConfig = + loadRegionGroupExtensionConfig(properties); + String readConsistencyLevel = loadReadConsistencyLevel(properties); + conf.setHeartbeatIntervalInMs(heartbeatIntervalInMs); + conf.setCqMinEveryIntervalInMs(cqMinEveryIntervalInMs); + regionGroupExtensionConfig.applyTo(conf); + conf.setReadConsistencyLevel(readConsistencyLevel); Optional.ofNullable(properties.getProperty("enable_topology_probing")) .ifPresent(v -> conf.setEnableTopologyProbing(Boolean.parseBoolean(v))); loadPipeHotModifiedProp(properties); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 37dd66b1527..0af1f1201d1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1501,17 +1501,6 @@ public class ConfigManager implements IManager { return errorStatus.setMessage(errorPrefix + "data_replication_factor" + errorSuffix); } - if (clusterParameters.getSchemaRegionPerDataNode() != CONF.getSchemaRegionPerDataNode()) { - return errorStatus.setMessage(errorPrefix + "schema_region_per_data_node" + errorSuffix); - } - if (clusterParameters.getDataRegionPerDataNode() != CONF.getDataRegionPerDataNode()) { - return errorStatus.setMessage(errorPrefix + "data_region_per_data_node" + errorSuffix); - } - - if (!clusterParameters.getReadConsistencyLevel().equals(CONF.getReadConsistencyLevel())) { - return errorStatus.setMessage(errorPrefix + "read_consistency_level" + errorSuffix); - } - if (clusterParameters.getDiskSpaceWarningThreshold() != COMMON_CONF.getDiskSpaceWarningThreshold()) { return errorStatus.setMessage(errorPrefix + "disk_space_warning_threshold" + errorSuffix); @@ -1762,6 +1751,9 @@ public class ConfigManager implements IManager { TrimProperties properties = new TrimProperties(); properties.putAll(req.getConfigs()); + long previousHeartbeatIntervalInMs = CONF.getHeartbeatIntervalInMs(); + int previousSchemaRegionPerDataNode = CONF.getSchemaRegionPerDataNode(); + int previousDataRegionPerDataNode = CONF.getDataRegionPerDataNode(); boolean wasTopologyProbingEnabled = CONF.isEnableTopologyProbing(); if (configurationFileFound) { File file = new File(url.getFile()); @@ -1786,6 +1778,9 @@ public class ConfigManager implements IManager { } LOGGER.warn(msg); } + handleHeartbeatIntervalHotReload(previousHeartbeatIntervalInMs); + handleRegionPerDataNodeHotReload( + previousSchemaRegionPerDataNode, previousDataRegionPerDataNode); handleTopologyProbingHotReload(wasTopologyProbingEnabled); if (currentNodeId == req.getNodeId()) { return tsStatus; @@ -1798,6 +1793,26 @@ public class ConfigManager implements IManager { return RpcUtils.squashResponseStatusList(statusList); } + private void handleHeartbeatIntervalHotReload(long previousHeartbeatIntervalInMs) { + if (previousHeartbeatIntervalInMs == CONF.getHeartbeatIntervalInMs()) { + return; + } + getLoadManager().reloadHeartbeatInterval(); + getRetryFailedTasksThread().reloadHeartbeatInterval(); + } + + private void handleRegionPerDataNodeHotReload( + int previousSchemaRegionPerDataNode, int previousDataRegionPerDataNode) { + if (previousSchemaRegionPerDataNode == CONF.getSchemaRegionPerDataNode() + && previousDataRegionPerDataNode == CONF.getDataRegionPerDataNode()) { + return; + } + if (!getConsensusManager().isLeader()) { + return; + } + getClusterSchemaManager().adjustMaxRegionGroupNum(); + } + private void handleTopologyProbingHotReload(boolean wasEnabled) { boolean isEnabled = CONF.isEnableTopologyProbing(); if (wasEnabled == isEnabled) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java index 748d6378462..2f222a93ac6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; -import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.i18n.ManagerMessages; import org.apache.iotdb.confignode.manager.load.LoadManager; @@ -53,8 +52,6 @@ public class RetryFailedTasksThread { private static final Logger LOGGER = LoggerFactory.getLogger(RetryFailedTasksThread.class); - private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); - private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs(); private final IManager configManager; private final NodeManager nodeManager; private final LoadManager loadManager; @@ -82,7 +79,7 @@ public class RetryFailedTasksThread { retryFailTasksExecutor, this::retryFailedTasks, 0, - HEARTBEAT_INTERVAL, + ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(), TimeUnit.MILLISECONDS); LOGGER.info(ManagerMessages.RETRYFAILMISSIONS_SERVICE_IS_STARTED_SUCCESSFULLY); } @@ -100,6 +97,24 @@ public class RetryFailedTasksThread { } } + /** Reload the retry interval without rebuilding the service instance. */ + public void reloadHeartbeatInterval() { + synchronized (scheduleMonitor) { + if (currentFailedTasksRetryThreadFuture == null) { + return; + } + currentFailedTasksRetryThreadFuture.cancel(false); + currentFailedTasksRetryThreadFuture = + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + retryFailTasksExecutor, + this::retryFailedTasks, + 0, + ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(), + TimeUnit.MILLISECONDS); + LOGGER.info(ManagerMessages.RETRYFAILMISSIONS_SERVICE_IS_STARTED_SUCCESSFULLY); + } + } + private void retryFailedTasks() { // trigger triggerDetectTask(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 63d5ff8befd..e7565ad4b1c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -186,6 +186,14 @@ public class LoadManager { routeBalancer.clearRegionPriority(); } + public void reloadHeartbeatInterval() { + loadCache.reloadFailureDetector(); + topologyService.reloadFailureDetector(); + heartbeatService.reloadHeartbeatInterval(); + statisticsService.reloadHeartbeatInterval(); + eventService.reloadHeartbeatInterval(); + } + public boolean isLoadReady() { return loadReady.get() || tryUpdateLoadReady(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java index e5ab6445f98..81174d13941 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java @@ -43,30 +43,35 @@ public abstract class AbstractLoadCache { // The current statistics calculated by the latest heartbeat sample protected final AtomicReference<AbstractStatistics> currentStatistics; - protected final IFailureDetector failureDetector; + protected volatile IFailureDetector failureDetector; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); protected AbstractLoadCache() { this.currentStatistics = new AtomicReference<>(); this.slidingWindow = Collections.synchronizedList(new LinkedList<>()); + this.failureDetector = buildFailureDetector(); + } + + public static IFailureDetector buildFailureDetector() { switch (CONF.getFailureDetector()) { case IFailureDetector.PHI_ACCRUAL_DETECTOR: - this.failureDetector = - new PhiAccrualDetector( - CONF.getFailureDetectorPhiThreshold(), - CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L, - CONF.getHeartbeatIntervalInMs() * 200_000L, - IFailureDetector.PHI_COLD_START_THRESHOLD, - new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 1000_000L)); - break; + return new PhiAccrualDetector( + CONF.getFailureDetectorPhiThreshold(), + CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L, + CONF.getHeartbeatIntervalInMs() * 200_000L, + IFailureDetector.PHI_COLD_START_THRESHOLD, + new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 1000_000L)); case IFailureDetector.FIXED_DETECTOR: default: - this.failureDetector = - new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 1000_000L); + return new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 1000_000L); } } + public void reloadFailureDetector() { + this.failureDetector = buildFailureDetector(); + } + /** * Cache the latest heartbeat sample. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 3416246811e..1c29c3a8744 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -203,6 +203,12 @@ public class LoadCache { consensusGroupCacheMap.clear(); } + public void reloadFailureDetector() { + nodeCacheMap.values().forEach(AbstractLoadCache::reloadFailureDetector); + regionGroupCacheMap.values().forEach(RegionGroupCache::reloadFailureDetector); + consensusGroupCacheMap.values().forEach(AbstractLoadCache::reloadFailureDetector); + } + /** * Check if the specified Node is processing heartbeat. And set the processing flag to true. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java index e441054b1df..6818bff138a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java @@ -96,6 +96,10 @@ public class RegionGroupCache { regionCacheMap.remove(dataNodeId); } + public void reloadFailureDetector() { + regionCacheMap.values().forEach(RegionCache::reloadFailureDetector); + } + /** * Update currentStatistics based on the latest NodeHeartbeatSamples that cached in the * slidingWindow. diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java index 762e6b1782b..5974e5ceb58 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java @@ -57,9 +57,6 @@ public class EventService { private static final Logger LOGGER = LoggerFactory.getLogger(EventService.class); - private static final long HEARTBEAT_INTERVAL = - ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(); - // Event executor service private final Object eventServiceMonitor = new Object(); @@ -101,7 +98,7 @@ public class EventService { eventServiceExecutor, this::broadcastChangeEventIfNecessary, 0, - HEARTBEAT_INTERVAL, + ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(), TimeUnit.MILLISECONDS); LOGGER.info(ManagerMessages.EVENT_SERVICE_IS_STARTED_SUCCESSFULLY); } @@ -124,6 +121,24 @@ public class EventService { } } + /** Reload the event-check interval without rebuilding the service instance. */ + public void reloadHeartbeatInterval() { + synchronized (eventServiceMonitor) { + if (currentEventServiceFuture == null) { + return; + } + currentEventServiceFuture.cancel(false); + currentEventServiceFuture = + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + eventServiceExecutor, + this::broadcastChangeEventIfNecessary, + 0, + ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(), + TimeUnit.MILLISECONDS); + LOGGER.info(ManagerMessages.EVENT_SERVICE_IS_STARTED_SUCCESSFULLY); + } + } + private void broadcastChangeEventIfNecessary() { checkAndBroadcastNodeStatisticsChangeEventIfNecessary(); checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index a491b3960c3..239a68801e2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -66,9 +66,6 @@ public class HeartbeatService { private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class); - private static final long HEARTBEAT_INTERVAL = - ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(); - protected IManager configManager; private final LoadCache loadCache; @@ -101,7 +98,7 @@ public class HeartbeatService { heartBeatExecutor, this::heartbeatLoopBody, 0, - HEARTBEAT_INTERVAL, + ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(), TimeUnit.MILLISECONDS); LOGGER.info(ManagerMessages.HEARTBEAT_SERVICE_IS_STARTED_SUCCESSFULLY); } @@ -119,6 +116,24 @@ public class HeartbeatService { } } + /** Reload the heartbeat interval without rebuilding the service instance. */ + public void reloadHeartbeatInterval() { + synchronized (heartbeatScheduleMonitor) { + if (currentHeartbeatFuture == null) { + return; + } + currentHeartbeatFuture.cancel(false); + currentHeartbeatFuture = + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + heartBeatExecutor, + this::heartbeatLoopBody, + 0, + ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(), + TimeUnit.MILLISECONDS); + LOGGER.info(ManagerMessages.HEARTBEAT_SERVICE_IS_STARTED_SUCCESSFULLY); + } + } + /** loop body of the heartbeat thread. */ private void heartbeatLoopBody() { // The consensusManager of configManager may not be fully initialized at this time diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java index 9fb235eed81..620276767f0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java @@ -38,9 +38,6 @@ public class StatisticsService { private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsService.class); - public static final long STATISTICS_UPDATE_INTERVAL = - ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(); - private final LoadCache loadCache; public StatisticsService(LoadCache loadCache) { @@ -64,7 +61,7 @@ public class StatisticsService { loadStatisticsExecutor, this::updateLoadStatistics, 0, - STATISTICS_UPDATE_INTERVAL, + ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(), TimeUnit.MILLISECONDS); LOGGER.info(ManagerMessages.LOADSTATISTICS_SERVICE_IS_STARTED_SUCCESSFULLY); } @@ -82,6 +79,24 @@ public class StatisticsService { } } + /** Reload the statistics update interval without rebuilding the service instance. */ + public void reloadHeartbeatInterval() { + synchronized (statisticsScheduleMonitor) { + if (currentLoadStatisticsFuture == null) { + return; + } + currentLoadStatisticsFuture.cancel(false); + currentLoadStatisticsFuture = + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + loadStatisticsExecutor, + this::updateLoadStatistics, + 0, + ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs(), + TimeUnit.MILLISECONDS); + LOGGER.info(ManagerMessages.LOADSTATISTICS_SERVICE_IS_STARTED_SUCCESSFULLY); + } + } + private void updateLoadStatistics() { loadCache.updateNodeStatistics(false); loadCache.updateRegionGroupStatistics(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index b99cbd00017..a8c02eff432 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -36,9 +36,8 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.i18n.ManagerMessages; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.cache.AbstractLoadCache; import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector; -import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector; -import org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; @@ -90,7 +89,7 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { /* (fromDataNodeId, toDataNodeId) -> heartbeat history */ private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> heartbeats; - private final IFailureDetector failureDetector; + private volatile IFailureDetector failureDetector; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private int proberRotationIndex = 0; @@ -109,21 +108,11 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { this.shouldRun = new AtomicBoolean(false); this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName()); - switch (CONF.getFailureDetector()) { - case IFailureDetector.PHI_ACCRUAL_DETECTOR: - this.failureDetector = - new PhiAccrualDetector( - CONF.getFailureDetectorPhiThreshold(), - CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L, - CONF.getHeartbeatIntervalInMs() * 200_000L, - IFailureDetector.PHI_COLD_START_THRESHOLD, - new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 1000_000L)); - break; - case IFailureDetector.FIXED_DETECTOR: - default: - this.failureDetector = - new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 1000_000L); - } + this.failureDetector = AbstractLoadCache.buildFailureDetector(); + } + + public void reloadFailureDetector() { + this.failureDetector = AbstractLoadCache.buildFailureDetector(); } public synchronized void startTopologyService() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index dfa3448bc0f..24b2c5d946c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -131,10 +131,6 @@ public class PartitionManager { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); - private static final RegionGroupExtensionPolicy SCHEMA_REGION_GROUP_EXTENSION_POLICY = - CONF.getSchemaRegionGroupExtensionPolicy(); - private static final RegionGroupExtensionPolicy DATA_REGION_GROUP_EXTENSION_POLICY = - CONF.getDataRegionGroupExtensionPolicy(); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); private final IManager configManager; @@ -582,7 +578,7 @@ public class PartitionManager { try { if (TConsensusGroupType.SchemaRegion.equals(consensusGroupType)) { - switch (SCHEMA_REGION_GROUP_EXTENSION_POLICY) { + switch (CONF.getSchemaRegionGroupExtensionPolicy()) { case CUSTOM: return customExtendRegionGroupIfNecessary( unassignedPartitionSlotsCountMap, consensusGroupType); @@ -592,7 +588,7 @@ public class PartitionManager { unassignedPartitionSlotsCountMap, consensusGroupType); } } else { - switch (DATA_REGION_GROUP_EXTENSION_POLICY) { + switch (CONF.getDataRegionGroupExtensionPolicy()) { case CUSTOM: return customExtendRegionGroupIfNecessary( unassignedPartitionSlotsCountMap, consensusGroupType); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java index fbf62d8f1dd..1c7ce1e74db 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java @@ -141,8 +141,6 @@ public class ClusterSchemaManager { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSchemaManager.class); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); - private static final int SCHEMA_REGION_PER_DATA_NODE = CONF.getSchemaRegionPerDataNode(); - private static final int DATA_REGION_PER_DATA_NODE = CONF.getDataRegionPerDataNode(); private final IManager configManager; private final ClusterSchemaInfo clusterSchemaInfo; @@ -523,7 +521,7 @@ public class ClusterSchemaManager { final int maxSchemaRegionGroupNum = calcMaxRegionGroupNum( databaseSchema.getMinSchemaRegionGroupNum(), - SCHEMA_REGION_PER_DATA_NODE, + CONF.getSchemaRegionPerDataNode(), dataNodeNum, databaseNum, databaseSchema.getSchemaReplicationFactor(), @@ -549,10 +547,10 @@ public class ClusterSchemaManager { final int maxDataRegionGroupNum = calcMaxRegionGroupNum( databaseSchema.getMinDataRegionGroupNum(), - DATA_REGION_PER_DATA_NODE == 0 + CONF.getDataRegionPerDataNode() == 0 ? CONF.getDataRegionPerDataNodeProportion() - : DATA_REGION_PER_DATA_NODE, - DATA_REGION_PER_DATA_NODE == 0 ? totalCpuCoreNum : dataNodeNum, + : CONF.getDataRegionPerDataNode(), + CONF.getDataRegionPerDataNode() == 0 ? totalCpuCoreNum : dataNodeNum, databaseNum, databaseSchema.getDataReplicationFactor(), allocatedDataRegionGroupCount); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 7aadfebc917..c20122bcf50 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -727,7 +727,7 @@ public class IoTDBConfig { * Minimum every interval to perform continuous query. * The every interval of continuous query instances should not be lower than this limit. */ - private long continuousQueryMinimumEveryInterval = 1000; + private volatile long continuousQueryMinimumEveryInterval = 1000; /** How much memory may be used in ONE SELECT INTO operation (in Byte). */ private long intoOperationBufferSizeInByte = 100 * 1024 * 1024L; @@ -1018,7 +1018,7 @@ public class IoTDBConfig { private long detailContainerMinDegradeMemoryInBytes = 1024 * 1024L; private int schemaThreadCount = 5; - private ReadConsistencyLevel readConsistencyLevel = ReadConsistencyLevel.STRONG; + private volatile ReadConsistencyLevel readConsistencyLevel = ReadConsistencyLevel.STRONG; /** Maximum size of wal buffer used in IoTConsensus. Unit: byte */ private long throttleThreshold = 200 * 1024 * 1024 * 1024L; @@ -1055,7 +1055,7 @@ public class IoTDBConfig { private long schemaRatisConsensusLeaderElectionTimeoutMaxMs = 4000L; /** CQ related */ - private long cqMinEveryIntervalInMs = 1_000; + private volatile long cqMinEveryIntervalInMs = 1_000; private long dataRatisConsensusRequestTimeoutMs = 10000L; private long schemaRatisConsensusRequestTimeoutMs = 10000L; @@ -3795,6 +3795,7 @@ public class IoTDBConfig { public void setCqMinEveryIntervalInMs(long cqMinEveryIntervalInMs) { this.cqMinEveryIntervalInMs = cqMinEveryIntervalInMs; + this.continuousQueryMinimumEveryInterval = cqMinEveryIntervalInMs; } public double getUsableCompactionMemoryProportion() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 13352020a15..14218dc76e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2196,6 +2196,12 @@ public class IoTDBDescriptor { // update load config loadLoadTsFileHotModifiedProp(properties); + // update CQ semantic-check config pushed from ConfigNode + loadCqMinEveryIntervalInMs(properties); + + // update query routing consistency config pushed from ConfigNode + loadReadConsistencyLevel(properties); + // update pipe config loadPipeHotModifiedProp(properties); @@ -2820,6 +2826,34 @@ public class IoTDBDescriptor { false)); } + private void loadCqMinEveryIntervalInMs(TrimProperties properties) throws IOException { + long cqMinEveryIntervalInMs = + Long.parseLong( + properties.getProperty( + "continuous_query_min_every_interval_in_ms", + String.valueOf(conf.getCqMinEveryIntervalInMs()))); + if (cqMinEveryIntervalInMs <= 0) { + throw new IOException( + "continuous_query_min_every_interval_in_ms should be greater than 0, but current value is " + + cqMinEveryIntervalInMs + + "."); + } + conf.setCqMinEveryIntervalInMs(cqMinEveryIntervalInMs); + } + + private void loadReadConsistencyLevel(TrimProperties properties) throws IOException { + String readConsistencyLevel = + properties.getProperty( + "read_consistency_level", conf.getReadConsistencyLevel().name().toLowerCase()); + if (!"strong".equals(readConsistencyLevel) && !"weak".equals(readConsistencyLevel)) { + throw new IOException( + String.format( + "Unknown read_consistency_level: %s, please set to \"strong\" or \"weak\"", + readConsistencyLevel)); + } + conf.setReadConsistencyLevel(readConsistencyLevel); + } + public void loadClusterProps(TrimProperties properties) throws IOException { String configNodeUrls = properties.getProperty(IoTDBConstant.DN_SEED_CONFIG_NODE); if (configNodeUrls == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java index 0fddb41d6e6..4fa4bbb0ac2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -54,13 +54,11 @@ public abstract class AbstractFragmentParallelPlanner implements IFragmentParall private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFragmentParallelPlanner.class); private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - private final ReadConsistencyLevel readConsistencyLevel; protected final MPPQueryContext queryContext; protected AbstractFragmentParallelPlanner(MPPQueryContext queryContext) { this.queryContext = queryContext; - this.readConsistencyLevel = CONFIG.getReadConsistencyLevel(); } protected void selectExecutorAndHost( @@ -117,7 +115,7 @@ public abstract class AbstractFragmentParallelPlanner implements IFragmentParall throw new IllegalArgumentException( String.format("regionReplicaSet is invalid: %s", regionReplicaSet)); } - boolean selectRandomDataNode = ReadConsistencyLevel.WEAK == this.readConsistencyLevel; + boolean selectRandomDataNode = ReadConsistencyLevel.WEAK == CONFIG.getReadConsistencyLevel(); // When planning fragment onto specific DataNode, the DataNode whose endPoint is in // black list won't be considered because it may have connection issue now. diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 14f3773e9f7..2bd65bb3f98 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -634,7 +634,7 @@ series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash # These policies are currently supported: # 1. CUSTOM(Each Database will allocate schema_region_group_per_database RegionGroups as soon as created) # 2. AUTO(Each Database will automatically extend SchemaRegionGroups based on the data it has) -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: String schema_region_group_extension_policy=AUTO @@ -642,7 +642,7 @@ schema_region_group_extension_policy=AUTO # this parameter is the default number of SchemaRegionGroups for each Database. # When set schema_region_group_extension_policy=AUTO, # this parameter is the default minimal number of SchemaRegionGroups for each Database. -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: Integer default_schema_region_group_num_per_database=1 @@ -650,7 +650,7 @@ default_schema_region_group_num_per_database=1 # This parameter is the maximum number of SchemaRegions expected to be managed by each DataNode. # Notice: Since each Database requires at least one SchemaRegionGroup to manage its schema, # this parameter doesn't limit the upper bound of cluster SchemaRegions when there are too many Databases. -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: Integer schema_region_per_data_node=1 @@ -658,7 +658,7 @@ schema_region_per_data_node=1 # These policies are currently supported: # 1. CUSTOM(Each Database will allocate data_region_group_per_database DataRegionGroups as soon as created) # 2. AUTO(Each Database will automatically extend DataRegionGroups based on the data it has) -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: String data_region_group_extension_policy=AUTO @@ -666,7 +666,7 @@ data_region_group_extension_policy=AUTO # this parameter is the default number of DataRegionGroups for each Database. # When set data_region_group_extension_policy=AUTO, # this parameter is the default minimal number of DataRegionGroups for each Database. -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: Integer default_data_region_group_num_per_database=2 @@ -675,7 +675,7 @@ default_data_region_group_num_per_database=2 # Set to 0 means that each node automatically has the number of CPU cores / 2 regions # Notice: Since each Database requires at least two DataRegionGroups to manage its data, # this parameter doesn't limit the upper bound of cluster DataRegions when there are too many Databases. -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: Integer data_region_per_data_node=0 @@ -713,7 +713,7 @@ time_partition_origin=0 time_partition_interval=604800000 # The heartbeat interval in milliseconds, default is 1000ms -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: long heartbeat_interval_in_ms=1000 @@ -1064,7 +1064,7 @@ text_compressor=LZ4 # These consistency levels are currently supported: # 1. strong(Default, read from the leader replica) # 2. weak(Read from a random replica) -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: string read_consistency_level=strong @@ -1911,7 +1911,7 @@ into_operation_execution_thread_count=2 continuous_query_submit_thread_count=2 # The minimum value of the continuous query execution time interval -# effectiveMode: restart +# effectiveMode: hot_reload # Datatype: long(duration) continuous_query_min_every_interval_in_ms=1000
