This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new bc4d4c00e55 Pipe & IT : Refactor pipe / IT and improved IT behaviour
(#13530)
bc4d4c00e55 is described below
commit bc4d4c00e55366fdcf4b5f50866a3aefee9f5ad9
Author: Caideyipi <[email protected]>
AuthorDate: Wed Sep 18 15:46:52 2024 +0800
Pipe & IT : Refactor pipe / IT and improved IT behaviour (#13530)
---
.../org/apache/iotdb/it/env/MultiEnvFactory.java | 11 +-
.../iotdb/it/env/cluster/ClusterConstant.java | 10 -
.../org/apache/iotdb/it/env/cluster/EnvUtils.java | 74 +++---
.../iotdb/it/env/cluster/env/AbstractEnv.java | 284 +++++++++++----------
.../iotdb/it/env/cluster/env/MultiClusterEnv.java | 8 +-
.../it/env/cluster/node/AbstractNodeWrapper.java | 2 +-
.../it/env/cluster/node/ConfigNodeWrapper.java | 35 ++-
.../iotdb/it/env/cluster/node/DataNodeWrapper.java | 32 +--
.../apache/iotdb/it/framework/IoTDBTestRunner.java | 21 +-
.../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 6 +
.../PipeHistoricalDataRegionTsFileExtractor.java | 1 -
.../tools/schema/SchemaRegionSnapshotParser.java | 68 +----
.../org/apache/iotdb/db/utils/DateTimeUtils.java | 2 +
13 files changed, 251 insertions(+), 303 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
index f8c28567be1..5832f1c485b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
@@ -38,24 +38,25 @@ public class MultiEnvFactory {
// Empty constructor
}
- public static void setTestMethodName(String testMethodName) {
+ public static void setTestMethodName(final String testMethodName) {
currentMethodName = testMethodName;
+ envList.forEach(baseEnv -> baseEnv.setTestMethodName(testMethodName));
}
/** Get an environment with the specific index. */
- public static BaseEnv getEnv(int index) throws IndexOutOfBoundsException {
+ public static BaseEnv getEnv(final int index) throws
IndexOutOfBoundsException {
return envList.get(index);
}
/** Create several environments according to the specific number. */
- public static void createEnv(int num) {
+ public static void createEnv(final int num) {
// Not judge EnvType for individual test convenience
- long startTime = System.currentTimeMillis();
+ final long startTime = System.currentTimeMillis();
for (int i = 0; i < num; ++i) {
try {
Class.forName(Config.JDBC_DRIVER_NAME);
envList.add(new MultiClusterEnv(startTime, i, currentMethodName));
- } catch (ClassNotFoundException e) {
+ } catch (final ClassNotFoundException e) {
logger.error("Create env error", e);
System.exit(-1);
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index b3802f3ea8d..147b57f2f67 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -115,9 +115,6 @@ public class ClusterConstant {
"strongConsistencyClusterMode.dataRegionReplicaNumber";
// Property file names
- public static final String CONFIG_NODE_PROPERTIES_FILE =
"iotdb-confignode.properties";
- public static final String DATA_NODE_PROPERTIES_FILE =
"iotdb-datanode.properties";
- public static final String COMMON_PROPERTIES_FILE =
"iotdb-common.properties";
public static final String IOTDB_SYSTEM_PROPERTIES_FILE =
"iotdb-system.properties";
// Properties' keys
@@ -142,10 +139,7 @@ public class ClusterConstant {
// ConfigNode
public static final String CN_SYSTEM_DIR = "cn_system_dir";
public static final String CN_CONSENSUS_DIR = "cn_consensus_dir";
- public static final String CN_METRIC_PROMETHEUS_REPORTER_PORT =
- "cn_metric_prometheus_reporter_port";
public static final String CN_METRIC_IOTDB_REPORTER_HOST =
"cn_metric_iotdb_reporter_host";
- public static final String CN_METRIC_IOTDB_REPORTER_PORT =
"cn_metric_iotdb_reporter_port";
public static final String CN_CONNECTION_TIMEOUT_MS =
"cn_connection_timeout_ms";
@@ -157,13 +151,10 @@ public class ClusterConstant {
public static final String DN_TRACING_DIR = "dn_tracing_dir";
public static final String DN_SYNC_DIR = "dn_sync_dir";
public static final String DN_METRIC_IOTDB_REPORTER_HOST =
"dn_metric_iotdb_reporter_host";
- public static final String DN_METRIC_PROMETHEUS_REPORTER_PORT =
- "dn_metric_prometheus_reporter_port";
public static final String DN_MPP_DATA_EXCHANGE_PORT =
"dn_mpp_data_exchange_port";
public static final String DN_DATA_REGION_CONSENSUS_PORT =
"dn_data_region_consensus_port";
public static final String DN_SCHEMA_REGION_CONSENSUS_PORT =
"dn_schema_region_consensus_port";
- public static final String PIPE_AIR_GAP_RECEIVER_ENABLED =
"pipe_air_gap_receiver_enabled";
public static final String PIPE_AIR_GAP_RECEIVER_PORT =
"pipe_air_gap_receiver_port";
public static final String MAX_TSBLOCK_SIZE_IN_BYTES =
"max_tsblock_size_in_bytes";
public static final String PAGE_SIZE_IN_BYTE = "page_size_in_byte";
@@ -205,7 +196,6 @@ public class ClusterConstant {
// Env Constant
public static final int NODE_START_TIMEOUT = 100;
- public static final int PROBE_TIMEOUT_MS = 2000;
public static final int NODE_NETWORK_TIMEOUT_MS = 0;
public static final String ZERO_TIME_ZONE = "GMT+0";
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
index f3c9527e595..9663fa371e9 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
@@ -70,22 +70,22 @@ public class EnvUtils {
while (true) {
int randomPortStart = 1000 + (int) (Math.random() * (1999 - 1000));
randomPortStart = randomPortStart * (length + 1) + 1;
- String lockFilePath = getLockFilePath(randomPortStart);
- File lockFile = new File(lockFilePath);
+ final String lockFilePath = getLockFilePath(randomPortStart);
+ final File lockFile = new File(lockFilePath);
try {
// Lock the ports first to avoid to be occupied by other ForkedBooters
during ports
// available detecting
if (!lockFile.createNewFile()) {
continue;
}
- List<Integer> requiredPorts =
+ final List<Integer> requiredPorts =
IntStream.rangeClosed(randomPortStart, randomPortStart + length)
.boxed()
.collect(Collectors.toList());
if (checkPortsAvailable(requiredPorts)) {
return requiredPorts.stream().mapToInt(Integer::intValue).toArray();
}
- } catch (IOException e) {
+ } catch (final IOException ignore) {
// ignore
}
// Delete the lock file if the ports can't be used or some error happens
@@ -95,39 +95,35 @@ public class EnvUtils {
}
}
- private static boolean checkPortsAvailable(List<Integer> ports) {
- String cmd = getSearchAvailablePortCmd(ports);
+ private static boolean checkPortsAvailable(final List<Integer> ports) {
+ final String cmd = getSearchAvailablePortCmd(ports);
try {
- Process proc = Runtime.getRuntime().exec(cmd);
- return proc.waitFor() == 1;
- } catch (IOException e) {
+ return Runtime.getRuntime().exec(cmd).waitFor() == 1;
+ } catch (final IOException ignore) {
// ignore
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
- private static String getSearchAvailablePortCmd(List<Integer> ports) {
- if (SystemUtils.IS_OS_WINDOWS) {
- return getWindowsSearchPortCmd(ports);
- }
- return getUnixSearchPortCmd(ports);
+ private static String getSearchAvailablePortCmd(final List<Integer> ports) {
+ return SystemUtils.IS_OS_WINDOWS ? getWindowsSearchPortCmd(ports) :
getUnixSearchPortCmd(ports);
}
- private static String getWindowsSearchPortCmd(List<Integer> ports) {
- String cmd = "netstat -aon -p tcp | findStr ";
- return cmd
+ private static String getWindowsSearchPortCmd(final List<Integer> ports) {
+ return "netstat -aon -p tcp | findStr "
+ ports.stream().map(v -> "/C:'127.0.0.1:" + v +
"'").collect(Collectors.joining(" "));
}
- private static String getUnixSearchPortCmd(List<Integer> ports) {
- String cmd = "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E ";
- return cmd +
ports.stream().map(String::valueOf).collect(Collectors.joining("|")) + "\"";
+ private static String getUnixSearchPortCmd(final List<Integer> ports) {
+ return "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E "
+ + ports.stream().map(String::valueOf).collect(Collectors.joining("|"))
+ + "\"";
}
- private static Pair<Integer, Integer> getClusterNodesNum(int index) {
- String valueStr = System.getProperty(CLUSTER_CONFIGURATIONS);
+ private static Pair<Integer, Integer> getClusterNodesNum(final int index) {
+ final String valueStr = System.getProperty(CLUSTER_CONFIGURATIONS);
if (valueStr == null) {
return null;
}
@@ -154,17 +150,17 @@ public class EnvUtils {
// Print nothing to avoid polluting test outputs
return null;
}
- } catch (NumberFormatException ignore) {
+ } catch (final NumberFormatException ignore) {
return null;
}
}
- public static String getLockFilePath(int port) {
+ public static String getLockFilePath(final int port) {
return LOCK_FILE_PATH + port;
}
public static Pair<Integer, Integer> getNodeNum() {
- Pair<Integer, Integer> nodesNum = getClusterNodesNum(0);
+ final Pair<Integer, Integer> nodesNum = getClusterNodesNum(0);
if (nodesNum != null) {
return nodesNum;
}
@@ -173,8 +169,8 @@ public class EnvUtils {
getIntFromSysVar(DEFAULT_DATA_NODE_NUM, 3, 0));
}
- public static Pair<Integer, Integer> getNodeNum(int index) {
- Pair<Integer, Integer> nodesNum = getClusterNodesNum(index);
+ public static Pair<Integer, Integer> getNodeNum(final int index) {
+ final Pair<Integer, Integer> nodesNum = getClusterNodesNum(index);
if (nodesNum != null) {
return nodesNum;
}
@@ -183,38 +179,38 @@ public class EnvUtils {
getIntFromSysVar(DEFAULT_DATA_NODE_NUM, 3, index));
}
- public static String getFilePathFromSysVar(String key, int index) {
- String valueStr = System.getProperty(key);
+ public static String getFilePathFromSysVar(final String key, final int
index) {
+ final String valueStr = System.getProperty(key);
if (valueStr == null) {
return null;
}
return System.getProperty(USER_DIR) + getValueOfIndex(valueStr, index);
}
- public static int getIntFromSysVar(String key, int defaultValue, int index) {
- String valueStr = System.getProperty(key);
+ public static int getIntFromSysVar(final String key, final int defaultValue,
final int index) {
+ final String valueStr = System.getProperty(key);
if (valueStr == null) {
return defaultValue;
}
- String value = getValueOfIndex(valueStr, index);
+ final String value = getValueOfIndex(valueStr, index);
try {
return Integer.parseInt(value);
- } catch (NumberFormatException e) {
+ } catch (final NumberFormatException e) {
throw new IllegalArgumentException("Invalid property value: " + value +
" of key " + key);
}
}
- public static String getValueOfIndex(String valueStr, int index) {
- String[] values = valueStr.split(DELIMITER);
+ public static String getValueOfIndex(final String valueStr, final int index)
{
+ final String[] values = valueStr.split(DELIMITER);
return index <= values.length - 1 ? values[index] : values[values.length -
1];
}
- public static String getTimeForLogDirectory(long startTime) {
+ public static String getTimeForLogDirectory(final long startTime) {
return convertLongToDate(startTime, "ms").replace(":",
DIR_TIME_REPLACEMENT);
}
- public static String fromConsensusFullNameToAbbr(String consensus) {
+ public static String fromConsensusFullNameToAbbr(final String consensus) {
switch (consensus) {
case SIMPLE_CONSENSUS:
return SIMPLE_CONSENSUS_STR;
@@ -233,7 +229,7 @@ public class EnvUtils {
}
}
- public static String fromConsensusAbbrToFullName(String consensus) {
+ public static String fromConsensusAbbrToFullName(final String consensus) {
switch (consensus) {
case SIMPLE_CONSENSUS_STR:
return SIMPLE_CONSENSUS;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 7deb7620974..92cb39765c6 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -19,9 +19,7 @@
package org.apache.iotdb.it.env.cluster.env;
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
@@ -103,7 +101,7 @@ public abstract class AbstractEnv implements BaseEnv {
}
// For multiple environment ITs, time must be consistent across environments.
- protected AbstractEnv(long startTime) {
+ protected AbstractEnv(final long startTime) {
this.startTime = startTime;
this.clusterConfig = new MppClusterConfig();
}
@@ -115,10 +113,10 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public List<String> getMetricPrometheusReporterContents() {
- List<String> result = new ArrayList<>();
+ final List<String> result = new ArrayList<>();
// get all report content of confignodes
- for (ConfigNodeWrapper configNode : this.configNodeWrapperList) {
- String configNodeMetricContent =
+ for (final ConfigNodeWrapper configNode : this.configNodeWrapperList) {
+ final String configNodeMetricContent =
getUrlContent(
Config.IOTDB_HTTP_URL_PREFIX
+ configNode.getIp()
@@ -128,8 +126,8 @@ public abstract class AbstractEnv implements BaseEnv {
result.add(configNodeMetricContent);
}
// get all report content of datanodes
- for (DataNodeWrapper dataNode : this.dataNodeWrapperList) {
- String dataNodeMetricContent =
+ for (final DataNodeWrapper dataNode : this.dataNodeWrapperList) {
+ final String dataNodeMetricContent =
getUrlContent(
Config.IOTDB_HTTP_URL_PREFIX
+ dataNode.getIp()
@@ -141,16 +139,20 @@ public abstract class AbstractEnv implements BaseEnv {
return result;
}
- protected void initEnvironment(int configNodesNum, int dataNodesNum) {
+ protected void initEnvironment(final int configNodesNum, final int
dataNodesNum) {
initEnvironment(configNodesNum, dataNodesNum, retryCount);
}
- protected void initEnvironment(int configNodesNum, int dataNodesNum, int
testWorkingRetryCount) {
+ protected void initEnvironment(
+ final int configNodesNum, final int dataNodesNum, final int
testWorkingRetryCount) {
initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount,
false);
}
protected void initEnvironment(
- int configNodesNum, int dataNodesNum, int retryCount, boolean addAINode)
{
+ final int configNodesNum,
+ final int dataNodesNum,
+ final int retryCount,
+ final boolean addAINode) {
this.retryCount = retryCount;
this.configNodeWrapperList = new ArrayList<>();
this.dataNodeWrapperList = new ArrayList<>();
@@ -161,7 +163,7 @@ public abstract class AbstractEnv implements BaseEnv {
final String testClassName = getTestClassName();
- ConfigNodeWrapper seedConfigNodeWrapper =
+ final ConfigNodeWrapper seedConfigNodeWrapper =
new ConfigNodeWrapper(
true,
"",
@@ -179,22 +181,23 @@ public abstract class AbstractEnv implements BaseEnv {
seedConfigNodeWrapper.createLogDir();
seedConfigNodeWrapper.setKillPoints(configNodeKillPoints);
seedConfigNodeWrapper.start();
- String seedConfigNode = seedConfigNodeWrapper.getIpAndPortString();
+ final String seedConfigNode = seedConfigNodeWrapper.getIpAndPortString();
this.configNodeWrapperList.add(seedConfigNodeWrapper);
// Check if the Seed-ConfigNode started successfully
- try (SyncConfigNodeIServiceClient ignored =
+ try (final SyncConfigNodeIServiceClient ignored =
(SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
// Do nothing
logger.info("The Seed-ConfigNode started successfully!");
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error("Failed to get connection to the Seed-ConfigNode", e);
}
- List<String> configNodeEndpoints = new ArrayList<>();
- RequestDelegate<Void> configNodesDelegate = new
SerialRequestDelegate<>(configNodeEndpoints);
+ final List<String> configNodeEndpoints = new ArrayList<>();
+ final RequestDelegate<Void> configNodesDelegate =
+ new SerialRequestDelegate<>(configNodeEndpoints);
for (int i = 1; i < configNodesNum; i++) {
- ConfigNodeWrapper configNodeWrapper =
+ final ConfigNodeWrapper configNodeWrapper =
new ConfigNodeWrapper(
false,
seedConfigNode,
@@ -221,16 +224,16 @@ public abstract class AbstractEnv implements BaseEnv {
}
try {
configNodesDelegate.requestAll();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
logger.error("Start configNodes failed", e);
throw new AssertionError();
}
- List<String> dataNodeEndpoints = new ArrayList<>();
- RequestDelegate<Void> dataNodesDelegate =
+ final List<String> dataNodeEndpoints = new ArrayList<>();
+ final RequestDelegate<Void> dataNodesDelegate =
new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
for (int i = 0; i < dataNodesNum; i++) {
- DataNodeWrapper dataNodeWrapper =
+ final DataNodeWrapper dataNodeWrapper =
new DataNodeWrapper(
seedConfigNode,
testClassName,
@@ -257,7 +260,7 @@ public abstract class AbstractEnv implements BaseEnv {
try {
dataNodesDelegate.requestAll();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
logger.error("Start dataNodes failed", e);
throw new AssertionError();
}
@@ -270,9 +273,9 @@ public abstract class AbstractEnv implements BaseEnv {
checkClusterStatusWithoutUnknown();
}
- private void startAINode(String seedConfigNode, String testClassName) {
- String aiNodeEndPoint;
- AINodeWrapper aiNodeWrapper =
+ private void startAINode(final String seedConfigNode, final String
testClassName) {
+ final String aiNodeEndPoint;
+ final AINodeWrapper aiNodeWrapper =
new AINodeWrapper(
seedConfigNode,
testClassName,
@@ -284,29 +287,29 @@ public abstract class AbstractEnv implements BaseEnv {
aiNodeEndPoint = aiNodeWrapper.getIpAndPortString();
aiNodeWrapper.createNodeDir();
aiNodeWrapper.createLogDir();
- RequestDelegate<Void> AINodesDelegate =
+ final RequestDelegate<Void> aiNodesDelegate =
new ParallelRequestDelegate<>(
Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT);
- AINodesDelegate.addRequest(
+ aiNodesDelegate.addRequest(
() -> {
aiNodeWrapper.start();
return null;
});
try {
- AINodesDelegate.requestAll();
- } catch (SQLException e) {
+ aiNodesDelegate.requestAll();
+ } catch (final SQLException e) {
logger.error("Start aiNodes failed", e);
}
}
public String getTestClassName() {
- StackTraceElement[] stack = Thread.currentThread().getStackTrace();
- for (StackTraceElement stackTraceElement : stack) {
- String className = stackTraceElement.getClassName();
+ final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ for (final StackTraceElement stackTraceElement : stack) {
+ final String className = stackTraceElement.getClassName();
if (className.endsWith("IT")) {
- String result = className.substring(className.lastIndexOf(".") + 1);
+ final String result = className.substring(className.lastIndexOf(".") +
1);
if (!result.startsWith("Abstract")) {
return result;
}
@@ -315,8 +318,8 @@ public abstract class AbstractEnv implements BaseEnv {
return "UNKNOWN-IT";
}
- private Map<String, Integer> countNodeStatus(Map<Integer, String>
nodeStatus) {
- Map<String, Integer> result = new HashMap<>();
+ private Map<String, Integer> countNodeStatus(final Map<Integer, String>
nodeStatus) {
+ final Map<String, Integer> result = new HashMap<>();
nodeStatus.values().forEach(status -> result.put(status,
result.getOrDefault(status, 0) + 1));
return result;
}
@@ -343,13 +346,13 @@ public abstract class AbstractEnv implements BaseEnv {
*
* @param statusCheck the predicate to test the status of nodes
*/
- public void checkClusterStatus(Predicate<Map<Integer, String>> statusCheck) {
+ public void checkClusterStatus(final Predicate<Map<Integer, String>>
statusCheck) {
logger.info("Testing cluster environment...");
TShowClusterResp showClusterResp;
Exception lastException = null;
boolean flag;
for (int i = 0; i < retryCount; i++) {
- try (SyncConfigNodeIServiceClient client =
+ try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
flag = true;
showClusterResp = client.showCluster();
@@ -369,20 +372,19 @@ public abstract class AbstractEnv implements BaseEnv {
// Check the status of nodes
if (flag) {
- Map<Integer, String> nodeStatus = showClusterResp.getNodeStatus();
- flag = statusCheck.test(nodeStatus);
+ flag = statusCheck.test(showClusterResp.getNodeStatus());
}
if (flag) {
logger.info("The cluster is now ready for testing!");
return;
}
- } catch (Exception e) {
+ } catch (final Exception e) {
lastException = e;
}
try {
TimeUnit.SECONDS.sleep(1L);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
lastException = e;
Thread.currentThread().interrupt();
}
@@ -399,7 +401,7 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public void cleanClusterEnvironment() {
- List<AbstractNodeWrapper> allNodeWrappers =
+ final List<AbstractNodeWrapper> allNodeWrappers =
Stream.concat(
dataNodeWrapperList.stream(),
Stream.concat(configNodeWrapperList.stream(),
aiNodeWrapperList.stream()))
@@ -408,10 +410,10 @@ public abstract class AbstractEnv implements BaseEnv {
.findAny()
.ifPresent(
nodeWrapper -> logger.info("You can find logs at {}",
nodeWrapper.getLogDirPath()));
- for (AbstractNodeWrapper nodeWrapper : allNodeWrappers) {
+ for (final AbstractNodeWrapper nodeWrapper : allNodeWrappers) {
nodeWrapper.stopForcibly();
nodeWrapper.destroyDir();
- String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
+ final String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
if (!new File(lockPath).delete()) {
logger.error("Delete lock file {} failed", lockPath);
}
@@ -431,7 +433,8 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
- DataNodeWrapper dataNode, String username, String password) throws
SQLException {
+ final DataNodeWrapper dataNode, final String username, final String
password)
+ throws SQLException {
return new ClusterTestConnection(
getWriteConnectionWithSpecifiedDataNode(dataNode, null, username,
password),
Collections.emptyList());
@@ -439,7 +442,8 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public Connection getConnectionWithSpecifiedDataNode(
- DataNodeWrapper dataNode, String username, String password) throws
SQLException {
+ final DataNodeWrapper dataNode, final String username, final String
password)
+ throws SQLException {
return new ClusterTestConnection(
getWriteConnectionWithSpecifiedDataNode(dataNode, null, username,
password),
getReadConnections(null, username, password));
@@ -469,7 +473,7 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public ISession getSessionConnection(String userName, String password)
throws IoTDBConnectionException {
- DataNodeWrapper dataNode =
+ final DataNodeWrapper dataNode =
this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size()));
Session session = new Session(dataNode.getIp(), dataNode.getPort(),
userName, password);
session.open();
@@ -523,8 +527,8 @@ public abstract class AbstractEnv implements BaseEnv {
protected NodeConnection getWriteConnectionWithSpecifiedDataNode(
DataNodeWrapper dataNode, Constant.Version version, String username,
String password)
throws SQLException {
- String endpoint = dataNode.getIp() + ":" + dataNode.getPort();
- Connection writeConnection =
+ final String endpoint = dataNode.getIp() + ":" + dataNode.getPort();
+ final Connection writeConnection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX
+ endpoint
@@ -544,10 +548,10 @@ public abstract class AbstractEnv implements BaseEnv {
String username,
String password)
throws SQLException {
- List<DataNodeWrapper> dataNodeWrapperListCopy = new
ArrayList<>(dataNodeList);
+ final List<DataNodeWrapper> dataNodeWrapperListCopy = new
ArrayList<>(dataNodeList);
Collections.shuffle(dataNodeWrapperListCopy);
SQLException lastException = null;
- for (DataNodeWrapper dataNode : dataNodeWrapperListCopy) {
+ for (final DataNodeWrapper dataNode : dataNodeWrapperListCopy) {
try {
return getWriteConnectionWithSpecifiedDataNode(dataNode, version,
username, password);
} catch (SQLException e) {
@@ -593,19 +597,19 @@ public abstract class AbstractEnv implements BaseEnv {
// AssertionError.
protected void testJDBCConnection() {
logger.info("Testing JDBC connection...");
- List<String> endpoints =
+ final List<String> endpoints =
dataNodeWrapperList.stream()
.map(DataNodeWrapper::getIpAndPortString)
.collect(Collectors.toList());
- RequestDelegate<Void> testDelegate =
+ final RequestDelegate<Void> testDelegate =
new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
- for (DataNodeWrapper dataNode : dataNodeWrapperList) {
+ for (final DataNodeWrapper dataNode : dataNodeWrapperList) {
final String dataNodeEndpoint = dataNode.getIpAndPortString();
testDelegate.addRequest(
() -> {
Exception lastException = null;
for (int i = 0; i < retryCount; i++) {
- try (IoTDBConnection ignored =
+ try (final IoTDBConnection ignored =
(IoTDBConnection)
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX
@@ -615,7 +619,7 @@ public abstract class AbstractEnv implements BaseEnv {
System.getProperty("Password", "root"))) {
logger.info("Successfully connecting to DataNode: {}.",
dataNodeEndpoint);
return null;
- } catch (Exception e) {
+ } catch (final Exception e) {
lastException = e;
TimeUnit.SECONDS.sleep(1L);
}
@@ -628,15 +632,16 @@ public abstract class AbstractEnv implements BaseEnv {
}
try {
testDelegate.requestAll();
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error("exception in test Cluster with RPC, message: {}",
e.getMessage(), e);
throw new AssertionError(
String.format("After %d times retry, the cluster can't work!",
retryCount));
}
}
- private String getParam(Constant.Version version, int timeout, String
timeZone) {
- StringBuilder sb = new StringBuilder("?");
+ private String getParam(
+ final Constant.Version version, final int timeout, final String
timeZone) {
+ final StringBuilder sb = new StringBuilder("?");
sb.append(Config.NETWORK_TIMEOUT).append("=").append(timeout);
if (version != null) {
sb.append("&").append(VERSION).append("=").append(version);
@@ -652,23 +657,20 @@ public abstract class AbstractEnv implements BaseEnv {
}
@Override
- public void setTestMethodName(String testMethodName) {
+ public void setTestMethodName(final String testMethodName) {
this.testMethodName = testMethodName;
}
@Override
public void dumpTestJVMSnapshot() {
- for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
- configNodeWrapper.executeJstack(testMethodName);
- }
- for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
- dataNodeWrapper.executeJstack(testMethodName);
- }
+ configNodeWrapperList.forEach(
+ configNodeWrapper -> configNodeWrapper.executeJstack(testMethodName));
+ dataNodeWrapperList.forEach(dataNodeWrapper ->
dataNodeWrapper.executeJstack(testMethodName));
}
@Override
public List<AbstractNodeWrapper> getNodeWrapperList() {
- List<AbstractNodeWrapper> result = new ArrayList<>(configNodeWrapperList);
+ final List<AbstractNodeWrapper> result = new
ArrayList<>(configNodeWrapperList);
result.addAll(dataNodeWrapperList);
return result;
}
@@ -697,13 +699,13 @@ public abstract class AbstractEnv implements BaseEnv {
Exception lastException = null;
ConfigNodeWrapper lastErrorNode = null;
for (int i = 0; i < retryCount; i++) {
- for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
+ for (final ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
try {
lastErrorNode = configNodeWrapper;
- SyncConfigNodeIServiceClient client =
+ final SyncConfigNodeIServiceClient client =
clientManager.borrowClient(
new TEndPoint(configNodeWrapper.getIp(),
configNodeWrapper.getPort()));
- TShowClusterResp resp = client.showCluster();
+ final TShowClusterResp resp = client.showCluster();
if (resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Only the ConfigNodeClient who connects to the ConfigNode-leader
@@ -718,7 +720,7 @@ public abstract class AbstractEnv implements BaseEnv {
+ " message: "
+ resp.getStatus().getMessage());
}
- } catch (Exception e) {
+ } catch (final Exception e) {
lastException = e;
}
@@ -739,12 +741,12 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws
Exception {
Exception lastException = null;
- ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(index);
+ final ConfigNodeWrapper configNodeWrapper =
configNodeWrapperList.get(index);
for (int i = 0; i < 30; i++) {
try {
return clientManager.borrowClient(
new TEndPoint(configNodeWrapper.getIp(),
configNodeWrapper.getPort()));
- } catch (Exception e) {
+ } catch (final Exception e) {
lastException = e;
}
// Sleep 1s before next retry
@@ -760,9 +762,9 @@ public abstract class AbstractEnv implements BaseEnv {
ConfigNodeWrapper lastErrorNode = null;
for (int retry = 0; retry < 30; retry++) {
for (int configNodeId = 0; configNodeId < configNodeWrapperList.size();
configNodeId++) {
- ConfigNodeWrapper configNodeWrapper =
configNodeWrapperList.get(configNodeId);
+ final ConfigNodeWrapper configNodeWrapper =
configNodeWrapperList.get(configNodeId);
lastErrorNode = configNodeWrapper;
- try (SyncConfigNodeIServiceClient client =
+ try (final SyncConfigNodeIServiceClient client =
clientManager.borrowClient(
new TEndPoint(configNodeWrapper.getIp(),
configNodeWrapper.getPort()))) {
TShowRegionResp resp =
@@ -775,12 +777,12 @@ public abstract class AbstractEnv implements BaseEnv {
int port;
if (resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- for (TRegionInfo tRegionInfo : resp.getRegionInfoList()) {
+ for (final TRegionInfo tRegionInfo : resp.getRegionInfoList()) {
if (tRegionInfo.getRoleType().equals("Leader")) {
ip = tRegionInfo.getClientRpcIp();
port = tRegionInfo.getClientRpcPort();
for (int dataNodeId = 0; dataNodeId <
dataNodeWrapperList.size(); ++dataNodeId) {
- DataNodeWrapper dataNodeWrapper =
dataNodeWrapperList.get(dataNodeId);
+ final DataNodeWrapper dataNodeWrapper =
dataNodeWrapperList.get(dataNodeId);
if (dataNodeWrapper.getIp().equals(ip) &&
dataNodeWrapper.getPort() == port) {
return dataNodeId;
}
@@ -796,7 +798,7 @@ public abstract class AbstractEnv implements BaseEnv {
+ " message: "
+ resp.getStatus().getMessage());
}
- } catch (Exception e) {
+ } catch (final Exception e) {
lastException = e;
}
@@ -820,12 +822,12 @@ public abstract class AbstractEnv implements BaseEnv {
ConfigNodeWrapper lastErrorNode = null;
for (int retry = 0; retry < retryCount; retry++) {
for (int configNodeId = 0; configNodeId < configNodeWrapperList.size();
configNodeId++) {
- ConfigNodeWrapper configNodeWrapper =
configNodeWrapperList.get(configNodeId);
+ final ConfigNodeWrapper configNodeWrapper =
configNodeWrapperList.get(configNodeId);
lastErrorNode = configNodeWrapper;
- try (SyncConfigNodeIServiceClient client =
+ try (final SyncConfigNodeIServiceClient client =
clientManager.borrowClient(
new TEndPoint(configNodeWrapper.getIp(),
configNodeWrapper.getPort()))) {
- TShowClusterResp resp = client.showCluster();
+ final TShowClusterResp resp = client.showCluster();
// Only the ConfigNodeClient who connects to the ConfigNode-leader
// will respond the SUCCESS_STATUS
if (resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -837,7 +839,7 @@ public abstract class AbstractEnv implements BaseEnv {
+ " message: "
+ resp.getStatus().getMessage());
}
- } catch (Exception e) {
+ } catch (final Exception e) {
lastException = e;
}
@@ -857,15 +859,13 @@ public abstract class AbstractEnv implements BaseEnv {
}
@Override
- public void startConfigNode(int index) {
+ public void startConfigNode(final int index) {
configNodeWrapperList.get(index).start();
}
@Override
public void startAllConfigNodes() {
- for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
- configNodeWrapper.start();
- }
+ configNodeWrapperList.forEach(AbstractNodeWrapper::start);
}
@Override
@@ -875,24 +875,22 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public void shutdownAllConfigNodes() {
- for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
- configNodeWrapper.stop();
- }
+ configNodeWrapperList.forEach(AbstractNodeWrapper::stop);
}
@Override
- public ConfigNodeWrapper getConfigNodeWrapper(int index) {
+ public ConfigNodeWrapper getConfigNodeWrapper(final int index) {
return configNodeWrapperList.get(index);
}
@Override
- public DataNodeWrapper getDataNodeWrapper(int index) {
+ public DataNodeWrapper getDataNodeWrapper(final int index) {
return dataNodeWrapperList.get(index);
}
@Override
public ConfigNodeWrapper generateRandomConfigNodeWrapper() {
- ConfigNodeWrapper newConfigNodeWrapper =
+ final ConfigNodeWrapper newConfigNodeWrapper =
new ConfigNodeWrapper(
false,
configNodeWrapperList.get(0).getIpAndPortString(),
@@ -914,7 +912,7 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public DataNodeWrapper generateRandomDataNodeWrapper() {
- DataNodeWrapper newDataNodeWrapper =
+ final DataNodeWrapper newDataNodeWrapper =
new DataNodeWrapper(
configNodeWrapperList.get(0).getIpAndPortString(),
getTestClassName(),
@@ -934,19 +932,20 @@ public abstract class AbstractEnv implements BaseEnv {
}
@Override
- public void registerNewDataNode(boolean isNeedVerify) {
+ public void registerNewDataNode(final boolean isNeedVerify) {
registerNewDataNode(generateRandomDataNodeWrapper(), isNeedVerify);
}
@Override
- public void registerNewConfigNode(boolean isNeedVerify) {
+ public void registerNewConfigNode(final boolean isNeedVerify) {
registerNewConfigNode(generateRandomConfigNodeWrapper(), isNeedVerify);
}
@Override
- public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper,
boolean isNeedVerify) {
+ public void registerNewConfigNode(
+ final ConfigNodeWrapper newConfigNodeWrapper, final boolean
isNeedVerify) {
// Start new ConfigNode
- RequestDelegate<Void> configNodeDelegate =
+ final RequestDelegate<Void> configNodeDelegate =
new ParallelRequestDelegate<>(
Collections.singletonList(newConfigNodeWrapper.getIpAndPortString()),
NODE_START_TIMEOUT);
@@ -958,7 +957,7 @@ public abstract class AbstractEnv implements BaseEnv {
try {
configNodeDelegate.requestAll();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
logger.error("Start configNode failed", e);
throw new AssertionError();
}
@@ -970,11 +969,12 @@ public abstract class AbstractEnv implements BaseEnv {
}
@Override
- public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean
isNeedVerify) {
+ public void registerNewDataNode(
+ final DataNodeWrapper newDataNodeWrapper, final boolean isNeedVerify) {
// Start new DataNode
- List<String> dataNodeEndpoints =
+ final List<String> dataNodeEndpoints =
Collections.singletonList(newDataNodeWrapper.getIpAndPortString());
- RequestDelegate<Void> dataNodesDelegate =
+ final RequestDelegate<Void> dataNodesDelegate =
new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
dataNodesDelegate.addRequest(
() -> {
@@ -983,7 +983,7 @@ public abstract class AbstractEnv implements BaseEnv {
});
try {
dataNodesDelegate.requestAll();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
logger.error("Start dataNodes failed", e);
throw new AssertionError();
}
@@ -995,58 +995,63 @@ public abstract class AbstractEnv implements BaseEnv {
}
@Override
- public void startDataNode(int index) {
+ public void startDataNode(final int index) {
dataNodeWrapperList.get(index).start();
}
@Override
public void startAllDataNodes() {
- for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
- dataNodeWrapper.start();
- }
+ dataNodeWrapperList.forEach(AbstractNodeWrapper::start);
}
@Override
- public void shutdownDataNode(int index) {
+ public void shutdownDataNode(final int index) {
dataNodeWrapperList.get(index).stop();
}
@Override
public void shutdownAllDataNodes() {
- for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
- dataNodeWrapper.stop();
- }
+ dataNodeWrapperList.forEach(AbstractNodeWrapper::stop);
}
@Override
- public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus>
targetStatus)
+ public void ensureNodeStatus(
+ final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatus)
throws IllegalStateException {
Throwable lastException = null;
for (int i = 0; i < retryCount; i++) {
- try (SyncConfigNodeIServiceClient client =
+ try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- List<String> errorMessages = new ArrayList<>(nodes.size());
- Map<String, Integer> nodeIds = new HashMap<>(nodes.size());
- TShowClusterResp showClusterResp = client.showCluster();
- for (TConfigNodeLocation node : showClusterResp.getConfigNodeList()) {
- nodeIds.put(
- node.getInternalEndPoint().getIp() + ":" +
node.getInternalEndPoint().getPort(),
- node.getConfigNodeId());
- }
- for (TDataNodeLocation node : showClusterResp.getDataNodeList()) {
- nodeIds.put(
- node.getClientRpcEndPoint().getIp() + ":" +
node.getClientRpcEndPoint().getPort(),
- node.getDataNodeId());
- }
+ final List<String> errorMessages = new ArrayList<>(nodes.size());
+ final Map<String, Integer> nodeIds = new HashMap<>(nodes.size());
+ final TShowClusterResp showClusterResp = client.showCluster();
+ showClusterResp
+ .getConfigNodeList()
+ .forEach(
+ node ->
+ nodeIds.put(
+ node.getInternalEndPoint().getIp()
+ + ":"
+ + node.getInternalEndPoint().getPort(),
+ node.getConfigNodeId()));
+ showClusterResp
+ .getDataNodeList()
+ .forEach(
+ node ->
+ nodeIds.put(
+ node.getClientRpcEndPoint().getIp()
+ + ":"
+ + node.getClientRpcEndPoint().getPort(),
+ node.getDataNodeId()));
for (int j = 0; j < nodes.size(); j++) {
- String endpoint = nodes.get(j).getIpAndPortString();
+ final String endpoint = nodes.get(j).getIpAndPortString();
if (!nodeIds.containsKey(endpoint)) {
// Node not exist
// Notice: Never modify this line, since the NodeLocation might be
modified in IT
errorMessages.add("The node " + nodes.get(j).getIpAndPortString()
+ " is not found!");
continue;
}
- String status =
showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
+ final String status =
showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
if (!targetStatus.get(j).getStatus().equals(status)) {
// Error status
errorMessages.add(
@@ -1060,12 +1065,12 @@ public abstract class AbstractEnv implements BaseEnv {
} else {
lastException = new IllegalStateException(String.join(". ",
errorMessages));
}
- } catch (TException | ClientManagerException | IOException |
InterruptedException e) {
+ } catch (final TException | ClientManagerException | IOException |
InterruptedException e) {
lastException = e;
}
try {
TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -1074,8 +1079,9 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public int getMqttPort() {
- int randomIndex = new
Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size());
- return dataNodeWrapperList.get(randomIndex).getMqttPort();
+ return dataNodeWrapperList
+ .get(new
Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size()))
+ .getMqttPort();
}
@Override
@@ -1104,11 +1110,11 @@ public abstract class AbstractEnv implements BaseEnv {
}
@Override
- public Optional<DataNodeWrapper> dataNodeIdToWrapper(int nodeId) {
- try (SyncConfigNodeIServiceClient leaderClient =
+ public Optional<DataNodeWrapper> dataNodeIdToWrapper(final int nodeId) {
+ try (final SyncConfigNodeIServiceClient leaderClient =
(SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
- TShowDataNodesResp resp = leaderClient.showDataNodes();
- for (TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) {
+ final TShowDataNodesResp resp = leaderClient.showDataNodes();
+ for (final TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) {
if (dataNodeInfo.getDataNodeId() == nodeId) {
return dataNodeWrapperList.stream()
.filter(dataNodeWrapper -> dataNodeWrapper.getPort() ==
dataNodeInfo.getRpcPort())
@@ -1116,18 +1122,18 @@ public abstract class AbstractEnv implements BaseEnv {
}
}
return Optional.empty();
- } catch (Exception e) {
+ } catch (final Exception e) {
return Optional.empty();
}
}
@Override
- public void registerConfigNodeKillPoints(List<String> killPoints) {
+ public void registerConfigNodeKillPoints(final List<String> killPoints) {
this.configNodeKillPoints = killPoints;
}
@Override
- public void registerDataNodeKillPoints(List<String> killPoints) {
+ public void registerDataNodeKillPoints(final List<String> killPoints) {
this.dataNodeKillPoints = killPoints;
}
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
index 1eb05cb315f..d462b5a668b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
@@ -25,7 +25,7 @@ import org.apache.tsfile.utils.Pair;
public class MultiClusterEnv extends AbstractEnv {
- public MultiClusterEnv(long startTime, int index, String currentMethodName) {
+ public MultiClusterEnv(final long startTime, final int index, final String
currentMethodName) {
super(startTime);
this.index = index;
this.testMethodName = currentMethodName;
@@ -33,18 +33,18 @@ public class MultiClusterEnv extends AbstractEnv {
@Override
public void initClusterEnvironment() {
- Pair<Integer, Integer> nodeNum = EnvUtils.getNodeNum(index);
+ final Pair<Integer, Integer> nodeNum = EnvUtils.getNodeNum(index);
super.initEnvironment(nodeNum.getLeft(), nodeNum.getRight());
}
@Override
- public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
+ public void initClusterEnvironment(final int configNodesNum, final int
dataNodesNum) {
super.initEnvironment(configNodesNum, dataNodesNum);
}
@Override
public void initClusterEnvironment(
- int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+ final int configNodesNum, final int dataNodesNum, final int
testWorkingRetryCount) {
super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
}
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index bf28c407f72..e3f9b0b5d3e 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -649,7 +649,7 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
if (testMethodName == null) {
return testClassName;
}
- return testClassName + "_" + testMethodName;
+ return testClassName + File.separator + testMethodName;
}
public void setKillPoints(List<String> killPoints) {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java
index a509790ad07..2b3211fe868 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java
@@ -57,23 +57,17 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
private final String defaultCommonPropertiesFile;
public ConfigNodeWrapper(
- boolean isSeed,
- String targetCNs,
- String testClassName,
- String testMethodName,
- int[] portList,
- int clusterIndex,
- boolean isMultiCluster,
- long startTime) {
+ final boolean isSeed,
+ final String targetCNs,
+ final String testClassName,
+ final String testMethodName,
+ final int[] portList,
+ final int clusterIndex,
+ final boolean isMultiCluster,
+ final long startTime) {
super(testClassName, testMethodName, portList, clusterIndex,
isMultiCluster, startTime);
this.consensusPort = portList[1];
this.isSeed = isSeed;
- String seedConfigNodes;
- if (isSeed) {
- seedConfigNodes = getIpAndPortString();
- } else {
- seedConfigNodes = targetCNs;
- }
this.defaultNodePropertiesFile =
EnvUtils.getFilePathFromSysVar(DEFAULT_CONFIG_NODE_PROPERTIES,
clusterIndex);
this.defaultCommonPropertiesFile =
@@ -83,7 +77,8 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
reloadMutableFields();
// initialize immutable properties
- immutableNodeProperties.setProperty(IoTDBConstant.CN_SEED_CONFIG_NODE,
seedConfigNodes);
+ immutableNodeProperties.setProperty(
+ IoTDBConstant.CN_SEED_CONFIG_NODE, isSeed ? getIpAndPortString() :
targetCNs);
immutableNodeProperties.setProperty(CN_SYSTEM_DIR,
MppBaseConfig.NULL_VALUE);
immutableNodeProperties.setProperty(CN_CONSENSUS_DIR,
MppBaseConfig.NULL_VALUE);
immutableNodeProperties.setProperty(CN_METRIC_IOTDB_REPORTER_HOST,
MppBaseConfig.NULL_VALUE);
@@ -129,7 +124,7 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
}
@Override
- protected void addStartCmdParams(List<String> params) {
+ protected void addStartCmdParams(final List<String> params) {
final String workDir = getNodePath();
final String confDir = workDir + File.separator + "conf";
params.addAll(
@@ -166,14 +161,14 @@ public class ConfigNodeWrapper extends
AbstractNodeWrapper {
@Override
protected void renameFile() {
- String configNodeName = isSeed ? "SeedConfigNode" : "ConfigNode";
+ final String configNodeName = isSeed ? "SeedConfigNode" : "ConfigNode";
// rename log file
- File oldLogFile =
+ final File oldLogFile =
new File(getLogDirPath() + File.separator + configNodeName +
portList[0] + ".log");
oldLogFile.renameTo(new File(getLogDirPath() + File.separator + getId() +
".log"));
// rename node dir
- File oldNodeDir =
+ final File oldNodeDir =
new File(
System.getProperty(USER_DIR)
+ File.separator
@@ -184,7 +179,7 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
oldNodeDir.renameTo(new File(getNodePath()));
}
- public void setConsensusPort(int consensusPort) {
+ public void setConsensusPort(final int consensusPort) {
this.consensusPort = consensusPort;
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index cb7e840af63..127f9b33185 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -83,13 +83,13 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
private final String defaultCommonPropertiesFile;
public DataNodeWrapper(
- String seedConfigNode,
- String testClassName,
- String testMethodName,
- int[] portList,
- int clusterIndex,
- boolean isMultiCluster,
- long startTime) {
+ final String seedConfigNode,
+ final String testClassName,
+ final String testMethodName,
+ final int[] portList,
+ final int clusterIndex,
+ final boolean isMultiCluster,
+ final long startTime) {
super(testClassName, testMethodName, portList, clusterIndex,
isMultiCluster, startTime);
this.internalAddress = super.getIp();
this.mppDataExchangePort = portList[1];
@@ -161,7 +161,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
}
@Override
- protected void addStartCmdParams(List<String> params) {
+ protected void addStartCmdParams(final List<String> params) {
final String workDir = getNodePath();
final String confDir = workDir + File.separator + "conf";
params.addAll(
@@ -214,22 +214,22 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
@Override
public void renameFile() {
// Rename log file
- String oldLogFilePath =
+ final String oldLogFilePath =
getLogDirPath() + File.separator + DATA_NODE_NAME + portList[0] +
".log";
- String newLogFilePath = getLogDirPath() + File.separator + getId() +
".log";
- File oldLogFile = new File(oldLogFilePath);
+ final String newLogFilePath = getLogDirPath() + File.separator + getId() +
".log";
+ final File oldLogFile = new File(oldLogFilePath);
oldLogFile.renameTo(new File(newLogFilePath));
// Rename node dir
- String oldNodeDirPath =
+ final String oldNodeDirPath =
System.getProperty(USER_DIR)
+ File.separator
+ TARGET
+ File.separator
+ DATA_NODE_NAME
+ portList[0];
- String newNodeDirPath = getNodePath();
- File oldNodeDir = new File(oldNodeDirPath);
+ final String newNodeDirPath = getNodePath();
+ final File oldNodeDir = new File(oldNodeDirPath);
oldNodeDir.renameTo(new File(newNodeDirPath));
}
@@ -237,7 +237,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
return mppDataExchangePort;
}
- public void setMppDataExchangePort(int mppDataExchangePort) {
+ public void setMppDataExchangePort(final int mppDataExchangePort) {
this.mppDataExchangePort = mppDataExchangePort;
}
@@ -249,7 +249,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
return internalPort;
}
- public void setInternalPort(int internalPort) {
+ public void setInternalPort(final int internalPort) {
this.internalPort = internalPort;
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java
b/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java
index 07e04514688..17e20d91e9a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java
+++
b/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java
@@ -37,33 +37,30 @@ public class IoTDBTestRunner extends BlockJUnit4ClassRunner
{
private static final Logger logger = IoTDBTestLogger.logger;
private IoTDBTestListener listener;
- public IoTDBTestRunner(Class<?> testClass) throws InitializationError {
+ public IoTDBTestRunner(final Class<?> testClass) throws InitializationError {
super(testClass);
}
@Override
- public void run(RunNotifier notifier) {
- TimeZone.setDefault(TimeZone.getTimeZone("Bejing"));
+ public void run(final RunNotifier notifier) {
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC+08:00"));
listener = new IoTDBTestListener(this.getName());
notifier.addListener(listener);
super.run(notifier);
}
@Override
- protected void runChild(final FrameworkMethod method, RunNotifier notifier) {
- Description description = describeChild(method);
+ protected void runChild(final FrameworkMethod method, final RunNotifier
notifier) {
+ final Description description = describeChild(method);
logger.info("Run {}", description.getMethodName());
- long currentTime = System.currentTimeMillis();
+ final long currentTime = System.currentTimeMillis();
if (EnvType.getSystemEnvType() != EnvType.MultiCluster) {
EnvFactory.getEnv().setTestMethodName(description.getMethodName());
- } else {
- // TestMethodName must be set globally in MultiEnvFactory, since the
- // cluster environments are not created now
- MultiEnvFactory.setTestMethodName(description.getMethodName());
}
+ MultiEnvFactory.setTestMethodName(description.getMethodName());
super.runChild(method, notifier);
- double timeCost = (System.currentTimeMillis() - currentTime) / 1000.0;
- String testName = description.getClassName() + "." +
description.getMethodName();
+ final double timeCost = (System.currentTimeMillis() - currentTime) /
1000.0;
+ final String testName = description.getClassName() + "." +
description.getMethodName();
logger.info("Done {}. Cost: {}s", description.getMethodName(), timeCost);
listener.addTestStat(new IoTDBTestStat(testName, timeCost));
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 14567b01f11..8caabaf6f62 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -22,15 +22,21 @@ package org.apache.iotdb.pipe.it.single;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT1;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT1.class})
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
@Test
public void testOPCUASink() throws Exception {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index bfd57fe6e24..310ed2640d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -536,7 +536,6 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
return deviceSet.stream()
.anyMatch(
- // TODO: use IDeviceID
deviceID -> pipePattern.mayOverlapWithDevice(((PlainDeviceID)
deviceID).toStringID()));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java
index 7f6a719070b..a9ad880bebb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java
@@ -34,7 +34,6 @@ import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -49,7 +48,8 @@ public class SchemaRegionSnapshotParser {
// Empty constructor
}
- private static Path getLatestSnapshotPath(List<Path> snapshotPathList,
boolean includingTmp) {
+ private static Path getLatestSnapshotPath(
+ final List<Path> snapshotPathList, final boolean includingTmp) {
if (snapshotPathList.isEmpty()) {
return null;
}
@@ -68,69 +68,28 @@ public class SchemaRegionSnapshotParser {
return pathArray[pathArray.length - 1];
}
- // Return all schema region's latest snapshot units in this datanode.
- public static List<Pair<Path, Path>> getSnapshotPaths() {
- final String snapshotPath = CONFIG.getSchemaRegionConsensusDir();
- final File snapshotDir = new File(snapshotPath);
- final ArrayList<Pair<Path, Path>> snapshotUnits = new ArrayList<>();
-
- // Get schema region path
- try (DirectoryStream<Path> stream =
- Files.newDirectoryStream(snapshotDir.toPath(),
"[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*")) {
- for (Path path : stream) {
- try (DirectoryStream<Path> filestream =
- Files.newDirectoryStream(Paths.get(path.toString() +
File.separator + "sm"))) {
- // Find the latest snapshots
- final ArrayList<Path> snapshotList = new ArrayList<>();
- for (Path snapshotFolder : filestream) {
- if (snapshotFolder.toFile().isDirectory()) {
- snapshotList.add(snapshotFolder);
- }
- }
- final Path latestSnapshotPath = getLatestSnapshotPath(snapshotList,
false);
- if (latestSnapshotPath != null) {
- // Get metadata from the latest snapshot folder.
- final File mTreeSnapshot =
- SystemFileFactory.INSTANCE.getFile(
- latestSnapshotPath + File.separator +
SchemaConstant.MTREE_SNAPSHOT);
- final File tagSnapshot =
- SystemFileFactory.INSTANCE.getFile(
- latestSnapshotPath + File.separator +
SchemaConstant.TAG_LOG_SNAPSHOT);
- if (mTreeSnapshot.exists()) {
- snapshotUnits.add(
- new Pair<>(
- mTreeSnapshot.toPath(), tagSnapshot.exists() ?
tagSnapshot.toPath() : null));
- }
- }
- }
- }
- } catch (IOException exception) {
- LOGGER.warn("Cannot construct snapshot directory stream", exception);
- }
- return snapshotUnits;
- }
-
// In schema snapshot path:
datanode/consensus/schema_region/47474747-4747-4747-4747-000200000000
// this func will get schema region id =
47474747-4747-4747-4747-000200000000's latest snapshot.
// In one schema region, there is only one snapshot unit.
- public static Pair<Path, Path> getSnapshotPaths(String schemaRegionId,
boolean isTmp) {
+ public static Pair<Path, Path> getSnapshotPaths(
+ final String schemaRegionId, final boolean isTmp) {
final String snapshotPath = CONFIG.getSchemaRegionConsensusDir();
final File snapshotDir =
new File(snapshotPath + File.separator + schemaRegionId +
File.separator + "sm");
// Get the latest snapshot file
final ArrayList<Path> snapshotList = new ArrayList<>();
- try (DirectoryStream<Path> stream =
+ try (final DirectoryStream<Path> stream =
Files.newDirectoryStream(
snapshotDir.toPath(), isTmp ? ".tmp.[0-9]*_[0-9]*" :
"[0-9]*_[0-9]*")) {
- for (Path path : stream) {
+ for (final Path path : stream) {
snapshotList.add(path);
}
- } catch (IOException ioException) {
+ } catch (final IOException ioException) {
LOGGER.warn("ioexception when get {}'s folder", schemaRegionId,
ioException);
return null;
}
- Path latestSnapshotPath = getLatestSnapshotPath(snapshotList, isTmp);
+ final Path latestSnapshotPath = getLatestSnapshotPath(snapshotList, isTmp);
if (latestSnapshotPath != null) {
// Get metadata from the latest snapshot folder.
final File mTreeSnapshot =
@@ -148,17 +107,14 @@ public class SchemaRegionSnapshotParser {
}
public static SRStatementGenerator translate2Statements(
- Path mtreePath, Path tagFilePath, PartialPath databasePath) throws
IOException {
+ final Path mtreePath, final Path tagFilePath, final PartialPath
databasePath)
+ throws IOException {
if (mtreePath == null) {
return null;
}
final File mtreefile = mtreePath.toFile();
- final File tagfile;
- if (tagFilePath != null && tagFilePath.toFile().exists()) {
- tagfile = tagFilePath.toFile();
- } else {
- tagfile = null;
- }
+ final File tagfile =
+ tagFilePath != null && tagFilePath.toFile().exists() ?
tagFilePath.toFile() : null;
if (!mtreefile.exists()) {
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
index 1a3c394af7d..59867861ed9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
@@ -711,6 +711,8 @@ public class DateTimeUtils {
case "us":
timestamp /= 1000;
break;
+ default:
+ break;
}
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
ZoneId.systemDefault())
.toString();