This is an automated email from the ASF dual-hosted git repository. ycycse pushed a commit to branch addCIForAINode in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6b80a02f178c785587dc18d14b47c0a0ce0cfc87 Author: Caiyin Yang <[email protected]> AuthorDate: Fri Nov 17 11:53:22 2023 +0800 Support IT Framework for AINode (cherry picked from commit 5636ca793c0f0d02e01f90ba346a8e93c61971e5) --- integration-test/pom.xml | 14 ++ integration-test/src/assembly/mpp-test.xml | 14 ++ .../java/org/apache/iotdb/it/env/EnvFactory.java | 4 + .../main/java/org/apache/iotdb/it/env/EnvType.java | 1 + .../iotdb/it/env/cluster/ClusterConstant.java | 2 + .../iotdb/it/env/cluster/env/AbstractEnv.java | 52 +++- .../org/apache/iotdb/it/env/cluster/env/MLEnv.java | 19 ++ .../iotdb/it/env/cluster/node/MLNodeWrapper.java | 270 +++++++++++++++++++++ .../category/MLClusterIT.java} | 16 +- .../java/org/apache/iotdb/mlnode/it/MLEnvIT.java | 80 ++++++ 10 files changed, 454 insertions(+), 18 deletions(-) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 5fe4a48628f..a34f517dc71 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -534,6 +534,20 @@ <integrationTest.testEnv>Cluster1</integrationTest.testEnv> </properties> </profile> + <profile> + <id>AIClusterIT</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <properties> + <integrationTest.excludedGroups/> + <integrationTest.includedGroups>org.apache.iotdb.itbase.category.AIClusterIT</integrationTest.includedGroups> + <integrationTest.launchNodeInSameJVM>false</integrationTest.launchNodeInSameJVM> + <integrationTest.randomSelectWriteNode>false</integrationTest.randomSelectWriteNode> + <integrationTest.readAndVerifyWithMultiNode>false</integrationTest.readAndVerifyWithMultiNode> + <integrationTest.testEnv>AI</integrationTest.testEnv> + </properties> + </profile> <profile> <id>DailyIT</id> <activation> diff --git a/integration-test/src/assembly/mpp-test.xml b/integration-test/src/assembly/mpp-test.xml index 3dc443c8d03..656429c6153 100644 --- a/integration-test/src/assembly/mpp-test.xml +++ b/integration-test/src/assembly/mpp-test.xml @@ -42,6 +42,10 @@ <outputDirectory>conf</outputDirectory> <directory>${project.basedir}/../iotdb-core/metrics/interface/src/main/assembly/resources/conf</directory> </fileSet> + <fileSet> + <outputDirectory>conf</outputDirectory> + <directory>${project.basedir}/../iotdb-core/ainode/resources/conf</directory> + </fileSet> <fileSet> <outputDirectory>sbin</outputDirectory> <directory>${project.basedir}/../iotdb-core/datanode/src/assembly/resources/sbin</directory> @@ -52,6 +56,11 @@ <directory>${project.basedir}/../iotdb-core/confignode/src/assembly/resources/sbin</directory> <fileMode>0755</fileMode> </fileSet> + <fileSet> + <outputDirectory>sbin</outputDirectory> + <directory>${{project.basedir}/../iotdb-core/ainode/resources/sbin</directory> + <fileMode>0755</fileMode> + </fileSet> <fileSet> <outputDirectory>tools</outputDirectory> <directory>${project.basedir}/../iotdb-core/datanode/src/assembly/resources/tools</directory> @@ -67,6 +76,11 @@ <directory>${project.basedir}/../iotdb-client/cli/src/assembly/resources/tools</directory> <fileMode>0755</fileMode> </fileSet> + <fileSet> + <outputDirectory>lib</outputDirectory> + <directory>${project.basedir}/../iotdb-core/ainode/dist/</directory> + <fileMode>0755</fileMode> + </fileSet> </fileSets> <files> <file> diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java index 054127da6f9..88ac6f46f6e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java @@ -20,6 +20,7 @@ package org.apache.iotdb.it.env; import org.apache.iotdb.it.env.cluster.env.Cluster1Env; +import org.apache.iotdb.it.env.cluster.env.AIEnv; import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.remote.env.RemoteServerEnv; import org.apache.iotdb.it.framework.IoTDBTestLogger; @@ -54,6 +55,9 @@ public class EnvFactory { case Remote: env = new RemoteServerEnv(); break; + case AI: + env = new AIEnv(); + break; case MultiCluster: logger.warn( "EnvFactory only supports EnvType Simple, Cluster1 and Remote, please use MultiEnvFactory instead."); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java index 974af1fc22c..7c2ee415cf1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java @@ -24,6 +24,7 @@ public enum EnvType { Simple, Cluster1, MultiCluster, + AI, TABLE_SIMPLE, TABLE_CLUSTER1; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java index e685dd74ed2..cc231260d0f 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java @@ -185,6 +185,8 @@ public class ClusterConstant { public static final String DATA_NODE_NAME = "DataNode"; + public static final String AI_NODE_NAME = "AINode"; + public static final String LOCK_FILE_PATH = System.getProperty(USER_DIR) + File.separator + TARGET + File.separator + "lock-"; public static final String TEMPLATE_NODE_PATH = diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 33b49f94032..cb7a50c44c9 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -44,6 +44,7 @@ import org.apache.iotdb.it.env.cluster.config.*; import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper; import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.env.cluster.node.AINodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestLogger; import org.apache.iotdb.itbase.env.BaseEnv; import org.apache.iotdb.itbase.env.BaseNodeWrapper; @@ -81,6 +82,7 @@ public abstract class AbstractEnv implements BaseEnv { private final Random rand = new Random(); protected List<ConfigNodeWrapper> configNodeWrapperList = Collections.emptyList(); protected List<DataNodeWrapper> dataNodeWrapperList = Collections.emptyList(); + protected List<MLNodeWrapper> aiNodeWrapperList = Collections.emptyList(); protected String testMethodName = null; protected int index = 0; protected long startTime; @@ -144,8 +146,13 @@ public abstract class AbstractEnv implements BaseEnv { initEnvironment(configNodesNum, dataNodesNum, retryCount); } - protected void initEnvironment(int configNodesNum, int dataNodesNum, int retryCount) { - this.retryCount = retryCount; + protected void initEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount) { + initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, false); + } + + protected void initEnvironment( + int configNodesNum, int dataNodesNum, int testWorkingRetryCount, boolean addAINode) { + this.testWorkingRetryCount = testWorkingRetryCount; this.configNodeWrapperList = new ArrayList<>(); this.dataNodeWrapperList = new ArrayList<>(); @@ -256,9 +263,44 @@ public abstract class AbstractEnv implements BaseEnv { throw new AssertionError(); } + if (addAINode) { + this.aiNodeWrapperList = new ArrayList<>(); + startAINode(seedConfigNode, testClassName); + } + checkClusterStatusWithoutUnknown(); } + private void startAINode(String seedConfigNode, String testClassName) { + String aiNodeEndPoint; + AINodeWrapper aiNodeWrapper = + new AINodeWrapper( + seedConfigNode, + testClassName, + testMethodName, + EnvUtils.searchAvailablePorts(), + startTime); + aiNodeWrapperList.add(aiNodeWrapper); + aiNodeEndPoint = aiNodeWrapper.getIpAndPortString(); + aiNodeWrapper.createNodeDir(); + aiNodeWrapper.createLogDir(); + RequestDelegate<Void> AINodesDelegate = + new ParallelRequestDelegate<>( + Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT); + + AINodesDelegate.addRequest( + () -> { + aiNodeWrapper.start(); + return null; + }); + + try { + AINodesDelegate.requestAll(); + } catch (SQLException e) { + logger.error("Start aiNodes failed", e); + } + } + public String getTestClassName() { StackTraceElement[] stack = Thread.currentThread().getStackTrace(); for (StackTraceElement stackTraceElement : stack) { @@ -319,7 +361,9 @@ public abstract class AbstractEnv implements BaseEnv { // Check the number of nodes if (showClusterResp.getNodeStatus().size() - != configNodeWrapperList.size() + dataNodeWrapperList.size()) { + != configNodeWrapperList.size() + + dataNodeWrapperList.size() + + aiNodeWrapperList.size()) { flag = false; } @@ -356,7 +400,7 @@ public abstract class AbstractEnv implements BaseEnv { @Override public void cleanClusterEnvironment() { List<AbstractNodeWrapper> allNodeWrappers = - Stream.concat(this.dataNodeWrapperList.stream(), this.configNodeWrapperList.stream()) + Stream.concat(this.dataNodeWrapperList.stream(), this.configNodeWrapperList.stream(), this.aiNodeWrapperList.strea()) .collect(Collectors.toList()); allNodeWrappers.stream() .findAny() diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MLEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MLEnv.java new file mode 100644 index 00000000000..b01d9b5735d --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MLEnv.java @@ -0,0 +1,19 @@ +package org.apache.iotdb.it.env.cluster.env; + +public class AIEnv extends AbstractEnv { + @Override + public void initClusterEnvironment() { + initClusterEnvironment(1, 1); + } + + @Override + public void initClusterEnvironment(int configNodesNum, int dataNodesNum) { + super.initEnvironment(configNodesNum, dataNodesNum, 100, true); + } + + @Override + public void initClusterEnvironment( + int configNodesNum, int dataNodesNum, int testWorkingRetryCount) { + super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, true); + } +} diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/MLNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/MLNodeWrapper.java new file mode 100644 index 00000000000..5d713cd0dbd --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/MLNodeWrapper.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.it.env.cluster.node; + +import org.apache.iotdb.it.framework.IoTDBTestLogger; +import org.apache.iotdb.itbase.env.BaseNodeWrapper; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.file.PathUtils; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.nio.file.*; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.apache.iotdb.it.env.cluster.ClusterConstant.*; +import static org.apache.iotdb.it.env.cluster.ClusterConstant.ML_NODE_NAME; +import static org.apache.iotdb.it.env.cluster.EnvUtils.getTimeForLogDirectory; + +public class AINodeWrapper implements BaseNodeWrapper { + + private static final Logger logger = IoTDBTestLogger.logger; + private final String testClassName; + private final String testMethodName; + private final String nodeAddress; + private Process instance; + private final int nodePort; + private final long startTime; + private final String seedConfigNode; + + private static final String SCRIPT_FILE = "start-ainode.sh"; + + private static final String SHELL_COMMAND = "bash"; + + private static final String PROPERTIES_FILE = "iotdb-ainode.properties"; + public static final String CONFIG_PATH = "conf"; + public static final String SCRIPT_PATH = "sbin"; + private static final String INTERPRETER_PATH = "/root/.venv/bin/python3"; + + private String[] getReplaceCmd(String key, String value, String filePath) { + String s = String.format("sed -i s/^%s=.*/%s=%s/ %s", key, key, value, filePath); + return s.split("\\s+"); + } + + private void replaceAttribute(String key, String value, String filePath, File stdoutFile) + throws IOException, InterruptedException { + ProcessBuilder prepareProcessBuilder = + new ProcessBuilder(getReplaceCmd(key, value, filePath)) + .redirectOutput(ProcessBuilder.Redirect.appendTo(stdoutFile)) + .redirectError(ProcessBuilder.Redirect.appendTo(stdoutFile)); + Process prepareProcess = prepareProcessBuilder.start(); + prepareProcess.waitFor(); + } + + public MLNodeWrapper( + String seedConfigNode, + String testClassName, + String testMethodName, + int[] port, + long startTime) { + this.seedConfigNode = seedConfigNode; + this.testClassName = testClassName; + this.testMethodName = testMethodName; + this.nodeAddress = "127.0.0.1"; + this.nodePort = port[0]; + this.startTime = startTime; + } + + @Override + public void createNodeDir() { + // Copy templateNodePath to nodePath + String destPath = getNodePath(); + try { + try { + if (new File(destPath).exists()) { + PathUtils.deleteDirectory(Paths.get(destPath)); + } + } catch (NoSuchFileException e) { + // ignored + } + // Here we need to copy without follow symbolic links, so we can't use FileUtils directly. + try (Stream<Path> s = Files.walk(Paths.get(TEMPLATE_NODE_PATH))) { + s.forEach( + source -> { + Path destination = + Paths.get(destPath, source.toString().substring(TEMPLATE_NODE_PATH.length())); + try { + Files.copy( + source, + destination, + LinkOption.NOFOLLOW_LINKS, + StandardCopyOption.COPY_ATTRIBUTES); + } catch (IOException e) { + logger.error("Got error copying files to node dest dir", e); + throw new RuntimeException(e); + } + }); + } + } catch (IOException ex) { + logger.error("Copy node dir failed", ex); + throw new AssertionError(); + } + } + + @Override + public void createLogDir() { + try { + // Make sure the log dir exist, as the first file is output by starting script directly. + FileUtils.createParentDirectories(new File(getLogPath())); + } catch (IOException ex) { + logger.error("Copy node dir failed", ex); + throw new AssertionError(); + } + } + + private String getLogPath() { + return getLogDirPath() + File.separator + getId() + ".log"; + } + + private String getLogDirPath() { + return System.getProperty(USER_DIR) + + File.separator + + TARGET + + File.separator + + "mlnode-logs" + + File.separator + + getTestLogDirName() + + File.separator + + getTimeForLogDirectory(startTime); + } + + private String getTestLogDirName() { + if (testMethodName == null) { + return testClassName; + } + return testClassName + "_" + testMethodName; + } + + @Override + public void destroyDir() { + Exception lastException = null; + for (int i = 0; i < 10; i++) { + try { + PathUtils.deleteDirectory(Paths.get(getNodePath())); + return; + } catch (IOException ex) { + lastException = ex; + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("Delete node dir failed. " + e); + } + } + } + logger.error(lastException.getMessage()); + throw new AssertionError("Delete node dir failed."); + } + + @Override + public void start() { + try { + File stdoutFile = new File(getLogPath()); + String filePrefix = + System.getProperty(USER_DIR) + + File.separator + + TARGET + + File.separator + + ML_NODE_NAME + + getPort(); + String propertiesFile = + filePrefix + File.separator + CONFIG_PATH + File.separator + PROPERTIES_FILE; + + // set attribute + replaceAttribute( + "mln_target_config_node_list", this.seedConfigNode, propertiesFile, stdoutFile); + replaceAttribute( + "mln_inference_rpc_port", Integer.toString(getPort()), propertiesFile, stdoutFile); + + // start MLNode + List<String> start_command = new ArrayList<>(); + start_command.add(SHELL_COMMAND); + start_command.add(filePrefix + File.separator + SCRIPT_PATH + File.separator + SCRIPT_FILE); + start_command.add("-i"); + start_command.add(INTERPRETER_PATH); + start_command.add("-r"); + start_command.add("-n"); + ProcessBuilder processBuilder = + new ProcessBuilder(start_command) + .redirectOutput(ProcessBuilder.Redirect.appendTo(stdoutFile)) + .redirectError(ProcessBuilder.Redirect.appendTo(stdoutFile)); + this.instance = processBuilder.start(); + logger.info("In test {} {} started.", getTestLogDirName(), getId()); + } catch (IOException | InterruptedException e) { + throw new AssertionError("Start ML Node failed. " + e + Paths.get("")); + } + } + + @Override + public void stop() { + if (this.instance == null) { + return; + } + this.instance.destroy(); + try { + if (!this.instance.waitFor(20, TimeUnit.SECONDS)) { + this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Waiting ML Node to shutdown error. %s", e); + } + } + + @Override + public String getIp() { + return this.nodeAddress; + } + + @Override + public int getPort() { + return this.nodePort; + } + + @Override + public int getMetricPort() { + // no metric currently + return -1; + } + + @Override + public String getId() { + return ML_NODE_NAME + getPort(); + } + + @Override + public String getIpAndPortString() { + return this.getIp() + ":" + this.getPort(); + } + + @Override + public void dumpJVMSnapshot(String testCaseName) { + // there is no JVM to dump for MLNode + } + + private String getNodePath() { + return System.getProperty(USER_DIR) + File.separator + TARGET + File.separator + getId(); + } +} diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java b/integration-test/src/main/java/org/apache/iotdb/itbase/category/MLClusterIT.java similarity index 73% copy from integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java copy to integration-test/src/main/java/org/apache/iotdb/itbase/category/MLClusterIT.java index 974af1fc22c..ab3c458cd79 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/category/MLClusterIT.java @@ -17,18 +17,6 @@ * under the License. */ -package org.apache.iotdb.it.env; +package org.apache.iotdb.itbase.category; -public enum EnvType { - Remote, - Simple, - Cluster1, - MultiCluster, - TABLE_SIMPLE, - TABLE_CLUSTER1; - - public static EnvType getSystemEnvType() { - String envValue = System.getProperty("TestEnv", Simple.name()); - return EnvType.valueOf(envValue); - } -} +public interface AIClusterIT {} diff --git a/integration-test/src/test/java/org/apache/iotdb/mlnode/it/MLEnvIT.java b/integration-test/src/test/java/org/apache/iotdb/mlnode/it/MLEnvIT.java new file mode 100644 index 00000000000..3f1aad95a32 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/mlnode/it/MLEnvIT.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.ainode.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.AIClusterIT; + +import org.junit.*; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({AIClusterIT.class}) +public class AIEnvIT { + @Before + public void setUp() throws Exception { + // Init 1C1D1M cluster environment + EnvFactory.getEnv().initClusterEnvironment(1, 1); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + private static void checkHeader(ResultSetMetaData resultSetMetaData, String title) + throws SQLException { + String[] headers = title.split(","); + for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { + assertEquals(headers[i - 1], resultSetMetaData.getColumnName(i)); + } + } + + @Test + public void aiNodeConnectionTest() { + String sql = "SHOW AINODES"; + String title = "NodeID,Status,RpcAddress,RpcPort"; + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + try (ResultSet resultSet = statement.executeQuery(sql)) { + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + checkHeader(resultSetMetaData, title); + int count = 0; + while (resultSet.next()) { + assertEquals("2", resultSet.getString(1)); + assertEquals("Running", resultSet.getString(2)); + count++; + } + assertEquals(1, count); + } + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +}
