This is an automated email from the ASF dual-hosted git repository. yuyuankang pushed a commit to branch kyy in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 4637756254e3c546566b4415fb069cafe3553e18 Author: Ring-k <[email protected]> AuthorDate: Mon Jul 20 16:45:05 2020 +0800 separate read and write config --- .../resources/conf/iotdb-cluster.properties | 8 +++++- .../cluster/client/sync/SyncClientAdaptor.java | 2 +- .../apache/iotdb/cluster/config/ClusterConfig.java | 20 ++++++++++---- .../iotdb/cluster/config/ClusterDescriptor.java | 32 +++++++++++++--------- .../apache/iotdb/cluster/server/RaftServer.java | 8 +++--- .../cluster/server/member/MetaGroupMember.java | 4 +-- 6 files changed, 48 insertions(+), 26 deletions(-) diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties index b656542..216af2e 100644 --- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties +++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties @@ -53,7 +53,13 @@ max_concurrent_client_num=10000 default_replica_num=2 # connection time out (ms) among raft nodes -connection_time_out_ms=20000 +connection_timeout_ms=20000 + +# write operation timeout threshold (ms) +write_operation_timeout_ms=30000 + +# read operation timeout threshold (ms) +read_operation_timeout_ms=30000 # when the logs size larger than this, we actually delete snapshoted logs, the unit is bytes max_unsnapshoted_log_size=134217728 diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java index 6a4631e..42e2a2b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java @@ -302,7 +302,7 @@ public class SyncClientAdaptor { GenericHandler<ByteBuffer> nodeHandler = new GenericHandler<>(client.getNode(), resultRef); synchronized (resultRef) { client.previousFill(request, nodeHandler); - resultRef.wait(RaftServer.getQueryTimeoutInSec() * 1000L); + resultRef.wait(RaftServer.getReadOperationTimeoutMS()); } return resultRef.get(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java index ca77d5a..ab47959 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java @@ -56,7 +56,9 @@ public class ClusterConfig { */ private long maxUnsnapshotedLogSize = 1024 * 1024 * 128L; - private int queryTimeoutInSec = 30; + private int readOperationTimeoutMS = 30 * 1000; + + private int writeOperationTimeoutMS = 30 * 1000; private boolean useBatchInLogCatchUp = true; @@ -203,12 +205,20 @@ public class ClusterConfig { this.connectionTimeoutInMS = connectionTimeoutInMS; } - public int getQueryTimeoutInSec() { - return queryTimeoutInSec; + public int getReadOperationTimeoutMS() { + return readOperationTimeoutMS; + } + + public void setReadOperationTimeoutMS(int readOperationTimeoutMS) { + this.readOperationTimeoutMS = readOperationTimeoutMS; + } + + public int getWriteOperationTimeoutMS() { + return writeOperationTimeoutMS; } - public void setQueryTimeoutInSec(int queryTimeoutInSec) { - this.queryTimeoutInSec = queryTimeoutInSec; + public void setWriteOperationTimeoutMS(int writeOperationTimeoutMS) { + this.writeOperationTimeoutMS = writeOperationTimeoutMS; } public int getMaxNumberOfLogs() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java index d0fb38e..4ed4bf5 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java @@ -91,7 +91,8 @@ public class ClusterDescriptor { public void replaceProps(String[] params) { Options options = new Options(); - Option metaPort = new Option(OPTION_META_PORT, OPTION_META_PORT, true, "port for metadata service"); + Option metaPort = new Option(OPTION_META_PORT, OPTION_META_PORT, true, + "port for metadata service"); metaPort.setRequired(false); options.addOption(metaPort); @@ -99,7 +100,8 @@ public class ClusterDescriptor { metaPort.setRequired(false); options.addOption(dataPort); - Option clientPort = new Option(OPTION_CLIENT_PORT, OPTION_CLIENT_PORT, true, "port for client service"); + Option clientPort = new Option(OPTION_CLIENT_PORT, OPTION_CLIENT_PORT, true, + "port for client service"); metaPort.setRequired(false); options.addOption(clientPort); @@ -208,16 +210,19 @@ public class ClusterDescriptor { config.setRpcThriftCompressionEnabled(Boolean.parseBoolean(properties.getProperty( "rpc_thrift_compression_enable", String.valueOf(config.isRpcThriftCompressionEnabled())))); - config - .setConnectionTimeoutInMS(Integer.parseInt(properties.getProperty("connection_time_out_ms", - String.valueOf(config.getConnectionTimeoutInMS())))); + config.setConnectionTimeoutInMS(Integer.parseInt(properties + .getProperty("connection_timeout_ms", String.valueOf(config.getConnectionTimeoutInMS())))); + + config.setReadOperationTimeoutMS(Integer.parseInt(properties + .getProperty("read_operation_timeout_ms", + String.valueOf(config.getReadOperationTimeoutMS())))); - config - .setQueryTimeoutInSec(Integer.parseInt(properties.getProperty("QUERY_TIME_OUT_SEC", - String.valueOf(config.getQueryTimeoutInSec())))); + config.setWriteOperationTimeoutMS(Integer.parseInt(properties + .getProperty("write_operation_timeout_ms", + String.valueOf(config.getWriteOperationTimeoutMS())))); - config - .setMaxUnsnapshotedLogSize(Long.parseLong(properties.getProperty("max_unsnapshoted_log_size", + config.setMaxUnsnapshotedLogSize(Long.parseLong(properties + .getProperty("max_unsnapshoted_log_size", String.valueOf(config.getMaxUnsnapshotedLogSize())))); config.setUseBatchInLogCatchUp(Boolean.parseBoolean(properties.getProperty( @@ -288,7 +293,7 @@ public class ClusterDescriptor { /** * This method is for setting hot modified properties of the cluster. Currently, we support - * max_concurrent_client_num, connection_time_out_ms, max_resolved_log_size + * max_concurrent_client_num, connection_timeout_ms, max_resolved_log_size * * @param properties * @throws QueryProcessException @@ -300,10 +305,11 @@ public class ClusterDescriptor { String.valueOf(config.getMaxConcurrentClientNum())))); config.setConnectionTimeoutInMS(Integer.parseInt(properties - .getProperty("connection_time_out_ms", String.valueOf(config.getConnectionTimeoutInMS())))); + .getProperty("connection_timeout_ms", String.valueOf(config.getConnectionTimeoutInMS())))); config.setMaxUnsnapshotedLogSize(Long.parseLong(properties - .getProperty("max_unsnapshoted_log_size", String.valueOf(config.getMaxUnsnapshotedLogSize())))); + .getProperty("max_unsnapshoted_log_size", + String.valueOf(config.getMaxUnsnapshotedLogSize())))); logger.info("Set cluster configuration {}", properties); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java index 35cb914..42986b8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java @@ -56,8 +56,8 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService. private static final Logger logger = LoggerFactory.getLogger(RaftServer.class); private static int connectionTimeoutInMS = ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS(); - private static int queryTimeoutInSec = - ClusterDescriptor.getInstance().getConfig().getQueryTimeoutInSec(); + private static int readOperationTimeoutMS = + ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS(); private static int syncLeaderMaxWaitMs = 20 * 1000; private static long heartBeatIntervalMs = 1000L; @@ -93,8 +93,8 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService. RaftServer.connectionTimeoutInMS = connectionTimeoutInMS; } - public static int getQueryTimeoutInSec() { - return queryTimeoutInSec; + public static int getReadOperationTimeoutMS() { + return readOperationTimeoutMS; } public static int getSyncLeaderMaxWaitMs() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index fa0abff..4a6cf34 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -2688,7 +2688,7 @@ public class MetaGroupMember extends RaftMember { } getAllPathsService.shutdown(); try { - getAllPathsService.awaitTermination(RaftServer.getQueryTimeoutInSec(), TimeUnit.SECONDS); + getAllPathsService.awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Unexpected interruption when waiting for get all paths services to stop", e); @@ -3391,7 +3391,7 @@ public class MetaGroupMember extends RaftMember { } fillService.shutdown(); try { - fillService.awaitTermination(RaftServer.getQueryTimeoutInSec(), TimeUnit.SECONDS); + fillService.awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Unexpected interruption when waiting for fill pool to stop", e);
