This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch patch-2094 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 54e6bb8b3b51f00ad8903e1318dbcce69e899614 Author: 罗振羽 <[email protected]> AuthorDate: Thu May 14 02:35:46 2026 +0000 [TIMECHODB] Split library-pipe package (cherry picked from commit 8aa2a7d246bff96fe42d3e0fc3460eb389e8f419) --- .gitlab-ci.yml | 18 + distribution/pom.xml | 18 +- distribution/src/assembly/all.xml | 2 - distribution/src/assembly/extension.xml | 134 ---- distribution/src/assembly/library-pipe.xml | 51 ++ integration-test/pom.xml | 14 + .../iotdb/itbase/category/TsFileBackupIT.java | 22 + .../iotdb/tools/it/TsFileBackupScriptIT.java | 781 +++++++++++++++++++++ .../org/apache/iotdb/tool/pipe/TsFileBackup.java | 106 ++- .../apache/iotdb/tool/pipe/TsFileBackupTest.java | 100 +++ scripts/tools/tsfile-backup.sh | 17 +- scripts/tools/windows/tsfile-backup.bat | 7 - 12 files changed, 1072 insertions(+), 198 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 3bbe7e1e950..e51559b3f17 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -203,6 +203,24 @@ external-service-impl-it-1c3d: - integration-test/target/cluster-logs expire_in: 7 day +tsfile-backup-it-scp: + stage: test + tags: + - timechodb + script: + - 'mvn verify -P with-integration-tests -DskipUTs -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=1024 -DDataNodeMaxHeapSize=1024 -pl integration-test -am -PTsFileBackupIT -s $MAVEN_SETTINGS_XML' + interruptible: true + rules: + - if: $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /^rel/ || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /^rc/ + + artifacts: + when: on_failure + name: tsfile-backup-it-log + paths: + - integration-test/target/cluster-logs + - integration-test/target/tsfile-backup-it + expire_in: 7 day + ainode-test-1c1d1a: stage: test image: timechodb:AICluster diff --git a/distribution/pom.xml b/distribution/pom.xml index dc27b682e68..f6734d3f2c2 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -501,20 +501,7 @@ <configuration> <descriptors> <descriptor>src/assembly/all.xml</descriptor> - <descriptor>src/assembly/external-service-impl.xml</descriptor> - </descriptors> - <finalName>timechodb-${project.version}</finalName> - </configuration> - </execution> - <execution> - <id>extension</id> - <goals> - <goal>single</goal> - </goals> - <phase>package</phase> - <configuration> - <descriptors> - <descriptor>src/assembly/extension.xml</descriptor> + <descriptor>src/assembly/library-pipe.xml</descriptor> </descriptors> <finalName>timechodb-${project.version}</finalName> </configuration> @@ -557,8 +544,7 @@ <directory>${project.build.directory}</directory> <includes> <include>timechodb-${project.version}-bin.zip</include> - <include>apache-iotdb-${project.version}-external-service-impl-bin.zip</include> - <include>timechodb-${project.version}-extension.zip</include> + <include>timechodb-${project.version}-library-pipe-bin.zip</include> </includes> </fileSet> <!--security-file--> diff --git a/distribution/src/assembly/all.xml b/distribution/src/assembly/all.xml index 00174c28865..ddcc19f4aee 100644 --- a/distribution/src/assembly/all.xml +++ b/distribution/src/assembly/all.xml @@ -113,8 +113,6 @@ <excludes> <exclude>*ainode.*</exclude> <exclude>**/*ainode.*</exclude> - <exclude>tsfile-backup.sh</exclude> - <exclude>windows/tsfile-backup.bat</exclude> </excludes> <fileMode>0755</fileMode> </fileSet> diff --git a/distribution/src/assembly/extension.xml b/distribution/src/assembly/extension.xml deleted file mode 100644 index 71e6af65901..00000000000 --- a/distribution/src/assembly/extension.xml +++ /dev/null @@ -1,134 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly> - <id>extension</id> - <formats> - <format>dir</format> - <format>zip</format> - </formats> - <baseDirectory>timechodb-${project.version}-extension</baseDirectory> - <fileSets> - <fileSet> - <outputDirectory>lib</outputDirectory> - <directory>${project.build.directory}</directory> - <includes> - <include>common-dependency.jar</include> - </includes> - </fileSet> - <fileSet> - <outputDirectory>lib</outputDirectory> - <directory>${project.build.directory}</directory> - <includes> - <include>timechodb-server-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <outputDirectory>conf</outputDirectory> - <directory>${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/conf</directory> - </fileSet> - <fileSet> - <outputDirectory>conf</outputDirectory> - <directory>${maven.multiModuleProjectDirectory}/iotdb-core/confignode/src/assembly/resources/conf</directory> - </fileSet> - <fileSet> - <outputDirectory>conf</outputDirectory> - <directory>${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/conf</directory> - <excludes> - <exclude>safe/**</exclude> - <exclude>**/ainode-env.*</exclude> - </excludes> - </fileSet> - <fileSet> - <outputDirectory>conf</outputDirectory> - <directory>${maven.multiModuleProjectDirectory}/scripts/conf</directory> - <excludes> - <exclude>ainode-env.*</exclude> - <exclude>**/ainode-env.*</exclude> - </excludes> - <fileMode>0755</fileMode> - </fileSet> - <fileSet> - <outputDirectory>conf</outputDirectory> - <directory>${maven.multiModuleProjectDirectory}/timecho-server/src/assembly/resources/conf</directory> - <excludes> - <exclude>META-INF/**</exclude> - </excludes> - </fileSet> - <fileSet> - <outputDirectory>conf</outputDirectory> - <directory>${maven.multiModuleProjectDirectory}/timecho-server/src/main/resources</directory> - <excludes> - <exclude>META-INF/**</exclude> - </excludes> - </fileSet> - <fileSet> - <outputDirectory>activation</outputDirectory> - <directory>${maven.multiModuleProjectDirectory}/activation</directory> - <fileMode>0755</fileMode> - <excludes> - <exclude>**/*</exclude> - </excludes> - </fileSet> - <fileSet> - <outputDirectory>sbin</outputDirectory> - <directory>${maven.multiModuleProjectDirectory}/scripts/sbin</directory> - <excludes> - <exclude>*ainode.*</exclude> - <exclude>**/*ainode.*</exclude> - </excludes> - <fileMode>0755</fileMode> - </fileSet> - <fileSet> - <outputDirectory>tools</outputDirectory> - <directory>${maven.multiModuleProjectDirectory}/scripts/tools</directory> - <excludes> - <exclude>*ainode.*</exclude> - <exclude>**/*ainode.*</exclude> - </excludes> - <fileMode>0755</fileMode> - </fileSet> - </fileSets> - <files> - <file> - <source>${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/conf/logback-backup.xml</source> - <outputDirectory>conf</outputDirectory> - <fileMode>0755</fileMode> - </file> - <file> - <source>${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties</source> - <outputDirectory>conf</outputDirectory> - <destName>iotdb-system.properties.template.efficiency</destName> - </file> - <file> - <source>${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/conf/safe/iotdb-system.properties</source> - <outputDirectory>conf</outputDirectory> - <destName>iotdb-system.properties.template.safe</destName> - </file> - <file> - <source>${maven.multiModuleProjectDirectory}/library-pipe/tsfile-remote-sink/target/tsfile-remote-sink-${project.version}-jar-with-dependencies.jar</source> - <outputDirectory>ext/pipe</outputDirectory> - </file> - </files> - <componentDescriptors> - <componentDescriptor>common-files.xml</componentDescriptor> - </componentDescriptors> -</assembly> diff --git a/distribution/src/assembly/library-pipe.xml b/distribution/src/assembly/library-pipe.xml new file mode 100644 index 00000000000..c6f17292c25 --- /dev/null +++ b/distribution/src/assembly/library-pipe.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly> + <id>library-pipe-bin</id> + <formats> + <format>dir</format> + <format>zip</format> + </formats> + <baseDirectory>timechodb-${project.version}-library-pipe-bin</baseDirectory> + <fileSets> + <fileSet> + <directory>${project.basedir}/../licenses</directory> + <outputDirectory>licenses</outputDirectory> + </fileSet> + </fileSets> + <files> + <file> + <source>${project.basedir}/../LICENSE-binary</source> + <outputDirectory>licenses</outputDirectory> + <destName>LICENSE</destName> + </file> + <file> + <source>${project.basedir}/../NOTICE-binary</source> + <outputDirectory>licenses</outputDirectory> + <destName>NOTICE</destName> + </file> + <file> + <source>${maven.multiModuleProjectDirectory}/library-pipe/tsfile-remote-sink/target/tsfile-remote-sink-${project.version}-jar-with-dependencies.jar</source> + <outputDirectory>/</outputDirectory> + </file> + </files> +</assembly> diff --git a/integration-test/pom.xml b/integration-test/pom.xml index ecce38175f7..4c1170520ae 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -734,6 +734,20 @@ <integrationTest.testEnv>Cluster1</integrationTest.testEnv> </properties> </profile> + <profile> + <id>TsFileBackupIT</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <properties> + <integrationTest.excludedGroups>org.apache.iotdb.itbase.category.ManualIT</integrationTest.excludedGroups> + <integrationTest.includedGroups>org.apache.iotdb.itbase.category.TsFileBackupIT</integrationTest.includedGroups> + <integrationTest.launchNodeInSameJVM>false</integrationTest.launchNodeInSameJVM> + <integrationTest.randomSelectWriteNode>false</integrationTest.randomSelectWriteNode> + <integrationTest.readAndVerifyWithMultiNode>false</integrationTest.readAndVerifyWithMultiNode> + <integrationTest.testEnv>Simple</integrationTest.testEnv> + </properties> + </profile> <profile> <id>AIClusterIT</id> <activation> diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/category/TsFileBackupIT.java b/integration-test/src/main/java/org/apache/iotdb/itbase/category/TsFileBackupIT.java new file mode 100644 index 00000000000..3bf426b3954 --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/category/TsFileBackupIT.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.itbase.category; + +public interface TsFileBackupIT {} diff --git a/integration-test/src/test/java/org/apache/iotdb/tools/it/TsFileBackupScriptIT.java b/integration-test/src/test/java/org/apache/iotdb/tools/it/TsFileBackupScriptIT.java new file mode 100644 index 00000000000..4ef6d72246a --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/tools/it/TsFileBackupScriptIT.java @@ -0,0 +1,781 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tools.it; + +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TsFileBackupIT; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.sshd.common.file.nativefs.NativeFileSystemFactory; +import org.apache.sshd.scp.server.ScpCommandFactory; +import org.apache.sshd.server.SshServer; +import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; +import org.apache.sshd.server.shell.ProcessShellCommandFactory; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.record.Tablet; +import org.awaitility.Awaitility; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@RunWith(IoTDBTestRunner.class) +@Category({TsFileBackupIT.class}) +public class TsFileBackupScriptIT { + + private static final String IOTDB_USER = "root"; + private static final String IOTDB_PASSWORD = "TimechoDB@2021"; + private static final String PIPE_PLUGIN_NAME = "TSFILE_REMOTE_SINK"; + private static final String PIPE_PLUGIN_JAR_PREFIX = "tsfile-remote-sink-"; + private static final String PIPE_PLUGIN_JAR_SUFFIX = "-jar-with-dependencies.jar"; + private static final String TEST_SSH_USER = "tsfilebackup"; + private static final String TEST_SSH_PASSWORD = "tsfilebackup"; + + private static final String TABLE_T1 = "t1"; + private static final String TABLE_T2 = "t2"; + private static final String TREE_D1 = "d1"; + private static final String TREE_D2 = "d2"; + + private static final int HISTORY_INSERT_START = 1; + private static final int HISTORY_INSERT_ROWS = 100; + private static final int DELETE_OFFSET = 20; + private static final int DELETE_LENGTH = 25; + private static final long PROCESS_TIMEOUT_SECONDS = 60; + private static final long VERIFY_TIMEOUT_SECONDS = 120; + + private static BaseEnv senderEnv; + private static BaseEnv receiverEnv; + private static String ip; + private static String port; + private static String toolsPath; + private static String libPath; + private static Path artifactDir; + private static SshServer scpServer; + private static ExecutorService processOutputExecutor; + + @BeforeClass + public static void setUp() throws Exception { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + + senderEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false) + .setPipeAutoSplitFullEnabled(false); + senderEnv.initClusterEnvironment(); + + receiverEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false) + .setPipeAutoSplitFullEnabled(false); + receiverEnv.initClusterEnvironment(); + + ip = senderEnv.getIP(); + port = senderEnv.getPort(); + toolsPath = senderEnv.getToolsPath(); + libPath = senderEnv.getLibPath(); + artifactDir = Paths.get(toolsPath).getParent().getParent().resolve("tsfile-backup-it"); + Files.createDirectories(artifactDir); + + copyPluginJarToToolHome(); + scpServer = startScpServer(artifactDir); + processOutputExecutor = Executors.newCachedThreadPool(); + } + + @AfterClass + public static void tearDown() throws Exception { + if (processOutputExecutor != null) { + processOutputExecutor.shutdownNow(); + } + if (scpServer != null) { + scpServer.stop(true); + scpServer = null; + } + if (senderEnv != null) { + senderEnv.cleanClusterEnvironment(); + } + if (receiverEnv != null) { + receiverEnv.cleanClusterEnvironment(); + } + } + + @Test + public void testTableBackupSpecificTable() throws Exception { + assertUnixLikeEnvironment(); + final String dbName = "table_spec_" + generateRandomSuffix(); + + prepareTableData(senderEnv, dbName); + + final Path testRoot = Files.createTempDirectory(artifactDir, "table-spec-"); + final Path remoteDir = testRoot.resolve("remote"); + String pipeName = null; + + try { + List<String> extraArgs = + Arrays.asList("-sql_dialect", "table", "-db", dbName, "-table", TABLE_T1); + ProcessResult result = executeBackupScript(remoteDir, extraArgs); + assertProcessSuccess(senderEnv, result); + pipeName = extractPipeName(result.outputLines); + + waitForExportCompleted(senderEnv, pipeName, remoteDir); + loadExportedFiles(receiverEnv, remoteDir, dbName); + + verifyTableLoadedState( + receiverEnv, + dbName, + Collections.singletonList(TABLE_T1), + Collections.singletonList(TABLE_T2)); + } finally { + dropPipeIfExists(senderEnv, pipeName); + } + } + + @Test + public void testTableBackupDatabaseOnly() throws Exception { + assertUnixLikeEnvironment(); + final String dbName = "table_db_" + generateRandomSuffix(); + + prepareTableData(senderEnv, dbName); + + final Path testRoot = Files.createTempDirectory(artifactDir, "table-db-"); + final Path remoteDir = testRoot.resolve("remote"); + String pipeName = null; + + try { + List<String> extraArgs = Arrays.asList("-sql_dialect", "table", "-db", dbName); + ProcessResult result = executeBackupScript(remoteDir, extraArgs); + assertProcessSuccess(senderEnv, result); + pipeName = extractPipeName(result.outputLines); + + waitForExportCompleted(senderEnv, pipeName, remoteDir); + loadExportedFiles(receiverEnv, remoteDir, dbName); + + verifyTableLoadedState( + receiverEnv, dbName, Arrays.asList(TABLE_T1, TABLE_T2), Collections.emptyList()); + } finally { + dropPipeIfExists(senderEnv, pipeName); + } + } + + @Test + public void testTreeBackupSpecificPath() throws Exception { + assertUnixLikeEnvironment(); + final String dbName = "root.tree_path_" + generateRandomSuffix(); + + prepareTreeData(senderEnv, dbName); + + final Path testRoot = Files.createTempDirectory(artifactDir, "tree-path-"); + final Path remoteDir = testRoot.resolve("remote"); + String pipeName = null; + + try { + String targetPath = dbName + "." + TREE_D1 + ".**"; + List<String> extraArgs = Arrays.asList("-sql_dialect", "tree", "-path", targetPath); + ProcessResult result = executeBackupScript(remoteDir, extraArgs); + assertProcessSuccess(senderEnv, result); + pipeName = extractPipeName(result.outputLines); + + waitForExportCompleted(senderEnv, pipeName, remoteDir); + loadExportedFilesTree(receiverEnv, remoteDir, dbName); + + verifyTreeLoadedState( + receiverEnv, + dbName, + Collections.singletonList(TREE_D1), + Collections.singletonList(TREE_D2)); + } finally { + dropPipeIfExists(senderEnv, pipeName); + } + } + + private static String generateRandomSuffix() { + return UUID.randomUUID().toString().replace("-", "").substring(0, 8); + } + + private static void assertUnixLikeEnvironment() { + final String osName = System.getProperty("os.name", "").toLowerCase(Locale.ENGLISH); + if (osName.startsWith("windows")) { + throw new IllegalStateException("TsFile backup SCP IT only supports Unix-like environments."); + } + } + + private static void copyPluginJarToToolHome() throws IOException { + final Path pluginJar = locatePluginJar(); + final Path extPipeDir = Paths.get(toolsPath).getParent().resolve("ext").resolve("pipe"); + Files.createDirectories(extPipeDir); + Files.copy( + pluginJar, + extPipeDir.resolve(pluginJar.getFileName()), + StandardCopyOption.REPLACE_EXISTING); + } + + private static Path locatePluginJar() throws IOException { + Path current = Paths.get(System.getProperty("user.dir")).toAbsolutePath().normalize(); + while (current != null) { + final Path targetDir = + current.resolve("library-pipe").resolve("tsfile-remote-sink").resolve("target"); + if (Files.isDirectory(targetDir)) { + try (Stream<Path> stream = Files.list(targetDir)) { + return stream + .filter(Files::isRegularFile) + .filter(path -> path.getFileName().toString().startsWith(PIPE_PLUGIN_JAR_PREFIX)) + .filter(path -> path.getFileName().toString().endsWith(PIPE_PLUGIN_JAR_SUFFIX)) + .min(Comparator.comparing(path -> path.getFileName().toString())) + .orElseThrow(() -> new IllegalStateException("Cannot find plugin jar")); + } + } + current = current.getParent(); + } + throw new IllegalStateException("Cannot locate plugin jar"); + } + + private static void prepareTableData(BaseEnv env, String dbName) throws Exception { + try (ITableSession session = env.getTableSessionConnection()) { + session.executeNonQueryStatement(String.format("DROP DATABASE IF EXISTS %s", dbName)); + session.executeNonQueryStatement(String.format("CREATE DATABASE IF NOT EXISTS %s", dbName)); + session.executeNonQueryStatement(String.format("USE %s", dbName)); + session.executeNonQueryStatement( + String.format( + "CREATE TABLE IF NOT EXISTS %s (id STRING TAG, file OBJECT FIELD)", TABLE_T1)); + session.executeNonQueryStatement( + String.format( + "CREATE TABLE IF NOT EXISTS %s (id STRING TAG, file OBJECT FIELD)", TABLE_T2)); + + insertTableRows(session, TABLE_T1); + insertTableRows(session, TABLE_T2); + TestUtils.executeNonQueryWithRetry(env, "flush"); + + final long deleteStart = HISTORY_INSERT_START + DELETE_OFFSET; + final long deleteEnd = deleteStart + DELETE_LENGTH - 1L; + session.executeNonQueryStatement( + String.format( + "DELETE FROM %s WHERE time >= %d AND time <= %d", TABLE_T1, deleteStart, deleteEnd)); + session.executeNonQueryStatement( + String.format( + "DELETE FROM %s WHERE time >= %d AND time <= %d", TABLE_T2, deleteStart, deleteEnd)); + TestUtils.executeNonQueryWithRetry(env, "flush"); + } + } + + private static void prepareTreeData(BaseEnv env, String dbName) throws Exception { + try (ISession session = env.getSessionConnection()) { + session.open(); + dropTreeDatabaseIfExists(session, dbName); + session.executeNonQueryStatement(String.format("CREATE DATABASE %s", dbName)); + session.executeNonQueryStatement( + String.format( + "CREATE TIMESERIES %s.%s.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN", dbName, TREE_D1)); + session.executeNonQueryStatement( + String.format( + "CREATE TIMESERIES %s.%s.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN", dbName, TREE_D2)); + + insertTreeRows(session, dbName + "." + TREE_D1); + insertTreeRows(session, dbName + "." + TREE_D2); + TestUtils.executeNonQueryWithRetry(env, "flush"); + + final long deleteStart = HISTORY_INSERT_START + DELETE_OFFSET; + final long deleteEnd = deleteStart + DELETE_LENGTH - 1L; + session.executeNonQueryStatement( + String.format( + "DELETE FROM %s.%s.s1 WHERE time >= %d AND time <= %d", + dbName, TREE_D1, deleteStart, deleteEnd)); + session.executeNonQueryStatement( + String.format( + "DELETE FROM %s.%s.s1 WHERE time >= %d AND time <= %d", + dbName, TREE_D2, deleteStart, deleteEnd)); + TestUtils.executeNonQueryWithRetry(env, "flush"); + } + } + + private static SshServer startScpServer(final Path serverRoot) throws IOException { + final SshServer server = SshServer.setUpDefaultServer(); + server.setHost("127.0.0.1"); + server.setPort(0); + server.setKeyPairProvider( + new SimpleGeneratorHostKeyProvider(serverRoot.resolve("hostkey.ser"))); + server.setPasswordAuthenticator( + (username, password, session) -> + TEST_SSH_USER.equals(username) && TEST_SSH_PASSWORD.equals(password)); + server.setFileSystemFactory(new NativeFileSystemFactory()); + + final ScpCommandFactory scpCommandFactory = new ScpCommandFactory(); + scpCommandFactory.setDelegateCommandFactory(ProcessShellCommandFactory.INSTANCE); + server.setCommandFactory(scpCommandFactory); + server.setShellFactory(scpCommandFactory); + server.start(); + return server; + } + + private ProcessResult executeBackupScript(final Path remoteDir, final List<String> extraArgs) + throws Exception { + List<String> command = + new ArrayList<>( + Arrays.asList( + "bash", + Paths.get(toolsPath, "tsfile-backup.sh").toString(), + "-h", + ip, + "-p", + port, + "-u", + IOTDB_USER, + "-pw", + IOTDB_PASSWORD, + "-t", + remoteDir.toString(), + "-th", + "127.0.0.1", + "-tu", + TEST_SSH_USER, + "-tpw", + TEST_SSH_PASSWORD, + "-tp", + String.valueOf(scpServer.getPort()))); + command.addAll(extraArgs); + + final ProcessBuilder builder = new ProcessBuilder(command); + builder.redirectErrorStream(true); + builder.environment().put("CLASSPATH", libPath); + + final Process process = builder.start(); + + Future<List<String>> outputFuture = + processOutputExecutor.submit( + () -> { + List<String> lines = new ArrayList<>(); + try (BufferedReader reader = + new BufferedReader( + new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + lines.add(line); + } + } + return lines; + }); + + boolean finished = process.waitFor(PROCESS_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!finished) { + process.destroyForcibly(); + throw new TimeoutException("Backup script timeout"); + } + + List<String> outputLines; + try { + outputLines = outputFuture.get(5, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + outputLines = Collections.emptyList(); + } + + return new ProcessResult(process.exitValue(), outputLines); + } + + private void assertProcessSuccess(BaseEnv env, ProcessResult result) + throws IoTDBConnectionException, StatementExecutionException { + String output = String.join(System.lineSeparator(), result.outputLines); + Assert.assertEquals("Script failed: " + output, 0, result.exitCode); + Assert.assertTrue("Pipe not submitted: " + output, output.contains("Pipe task submitted")); + Assert.assertTrue("Plugin not registered", isPluginRegistered(env)); + } + + private static boolean isPluginRegistered(BaseEnv env) + throws IoTDBConnectionException, StatementExecutionException { + try (ISession session = env.getSessionConnection()) { + session.open(); + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW PIPEPLUGINS")) { + final SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + final String pluginName = readPluginName(iterator); + if (PIPE_PLUGIN_NAME.equals(pluginName)) { + return true; + } + } + } + } + return false; + } + + private static String readPluginName(final SessionDataSet.DataIterator iterator) { + try { + if (!iterator.isNull(ColumnHeaderConstant.PLUGIN_NAME)) { + return iterator.getString(ColumnHeaderConstant.PLUGIN_NAME); + } + } catch (Exception ignored) { + } + try { + if (!iterator.isNull(ColumnHeaderConstant.PLUGIN_NAME_TABLE_MODEL)) { + return iterator.getString(ColumnHeaderConstant.PLUGIN_NAME_TABLE_MODEL); + } + } catch (Exception ignored) { + } + return null; + } + + private static String extractPipeName(final List<String> outputLines) { + for (String line : outputLines) { + final String prefix = "[INFO] Pipe name: "; + final int index = line.indexOf(prefix); + if (index >= 0) { + return line.substring(index + prefix.length()).trim(); + } + } + return null; + } + + private static void waitForExportCompleted( + BaseEnv env, final String pipeName, final Path remoteDir) { + try { + if (pipeName != null && !pipeName.isEmpty()) { + Awaitility.await() + .atMost(Duration.ofSeconds(VERIFY_TIMEOUT_SECONDS)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !isPipePresent(env, pipeName)); + } + + Awaitility.await() + .atMost(Duration.ofSeconds(PROCESS_TIMEOUT_SECONDS)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> containsTsFile(remoteDir)); + } catch (Exception e) { + throw new RuntimeException("Failed waiting for export completion.", e); + } + } + + private static boolean isPipePresent(BaseEnv env, final String pipeName) throws Exception { + try (ISession session = env.getSessionConnection()) { + session.open(); + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW PIPES")) { + final SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + if (pipeName.equals(iterator.getString("ID"))) { + return true; + } + } + } + } + return false; + } + + private static void dropPipeIfExists(BaseEnv env, final String pipeName) { + if (pipeName == null || pipeName.isEmpty()) { + return; + } + try (ISession session = env.getSessionConnection()) { + session.open(); + session.executeNonQueryStatement("DROP PIPE " + pipeName); + } catch (Exception ignored) { + } + } + + private static void loadExportedFiles(BaseEnv env, final Path dir, String targetDbName) + throws Exception { + if (!Files.exists(dir)) { + return; + } + try (ITableSession session = env.getTableSessionConnection(); + Stream<Path> pathStream = Files.walk(dir)) { + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS " + targetDbName); + session.executeNonQueryStatement("USE " + targetDbName); + + List<Path> tsFiles = + pathStream + .filter(Files::isRegularFile) + .filter(p -> p.toString().endsWith(".tsfile")) + .sorted() + .collect(Collectors.toList()); + + for (Path tsFile : tsFiles) { + session.executeNonQueryStatement( + String.format( + "LOAD '%s' WITH ('database'='%s')", + tsFile.toAbsolutePath().toString(), targetDbName)); + } + TestUtils.executeNonQueryWithRetry(env, "flush"); + } + } + + private static void loadExportedFilesTree(BaseEnv env, final Path dir, String targetDbName) + throws Exception { + if (!Files.exists(dir)) { + return; + } + try (ISession session = env.getSessionConnection(); + Stream<Path> pathStream = Files.walk(dir)) { + session.open(); + session.executeNonQueryStatement("CREATE DATABASE " + targetDbName); + + List<Path> tsFiles = + pathStream + .filter(Files::isRegularFile) + .filter(p -> p.toString().endsWith(".tsfile")) + .sorted() + .collect(Collectors.toList()); + + for (Path tsFile : tsFiles) { + session.executeNonQueryStatement( + String.format( + "LOAD '%s' WITH ('database'='%s')", + tsFile.toAbsolutePath().toString(), targetDbName)); + } + TestUtils.executeNonQueryWithRetry(env, "flush"); + } + } + + private void verifyTableLoadedState( + BaseEnv env, final String dbName, List<String> expectedTables, List<String> excludedTables) { + final List<Long> expectedTimestamps = buildExpectedTimestamps(); + try (ITableSession session = env.getTableSessionConnection()) { + session.executeNonQueryStatement("USE " + dbName); + for (String table : expectedTables) { + assertTableLoadedResult(session, table, expectedTimestamps); + } + for (String table : excludedTables) { + assertTableEmptyOrNotExist(session, table); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void verifyTreeLoadedState( + BaseEnv env, + final String dbName, + List<String> expectedDevices, + List<String> excludedDevices) { + final List<Long> expectedTimestamps = buildExpectedTimestamps(); + try (ISession session = env.getSessionConnection()) { + session.open(); + for (String device : expectedDevices) { + assertTreeLoadedResult(session, dbName + "." + device, expectedTimestamps); + } + for (String device : excludedDevices) { + assertTreeEmptyOrNotExist(session, dbName + "." + device); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void dropTreeDatabaseIfExists(final ISession session, final String database) + throws Exception { + try { + session.executeNonQueryStatement("DROP DATABASE " + database); + } catch (Exception e) { + if (e.getMessage() == null + || (!e.getMessage().contains("does not exist") + && !e.getMessage().contains("has not been created") + && !e.getMessage().contains("Path ["))) { + throw e; + } + } + } + + private static List<Long> buildExpectedTimestamps() { + final List<Long> expected = new ArrayList<>(); + final long deleteStart = HISTORY_INSERT_START + DELETE_OFFSET; + final long deleteEnd = deleteStart + DELETE_LENGTH - 1L; + for (long ts = HISTORY_INSERT_START; ts < HISTORY_INSERT_START + HISTORY_INSERT_ROWS; ts++) { + if (ts < deleteStart || ts > deleteEnd) { + expected.add(ts); + } + } + return expected; + } + + private static void assertTableLoadedResult( + final ITableSession session, final String tableName, final List<Long> expectedTimestamps) + throws Exception { + try (SessionDataSet dataSet = + session.executeQueryStatement( + String.format("SELECT time, READ_OBJECT(file) FROM %s ORDER BY time ASC", tableName))) { + final SessionDataSet.DataIterator iterator = dataSet.iterator(); + int index = 0; + while (iterator.next()) { + final Binary blob = iterator.getBlob(2); + if (blob == null || blob.getValues() == null) { + continue; + } + Assert.assertTrue(index < expectedTimestamps.size()); + final long expectedTimestamp = expectedTimestamps.get(index); + Assert.assertEquals(expectedTimestamp, iterator.getLong(1)); + Assert.assertArrayEquals(buildPayload(expectedTimestamp), blob.getValues()); + index++; + } + Assert.assertEquals(expectedTimestamps.size(), index); + } + } + + private static void assertTableEmptyOrNotExist( + final ITableSession session, final String tableName) throws Exception { + try (SessionDataSet dataSet = + session.executeQueryStatement(String.format("SELECT time FROM %s", tableName))) { + Assert.assertFalse(dataSet.iterator().next()); + } catch (Exception e) { + Assert.assertTrue( + e.getMessage().contains("does not exist") || e.getMessage().contains("not exist")); + } + } + + private static void assertTreeLoadedResult( + final ISession session, final String fullDevicePath, final List<Long> expectedTimestamps) + throws Exception { + try (SessionDataSet dataSet = + session.executeQueryStatement( + String.format("SELECT s1 FROM %s ORDER BY time ASC", fullDevicePath))) { + final SessionDataSet.DataIterator iterator = dataSet.iterator(); + int index = 0; + while (iterator.next()) { + Assert.assertTrue(index < expectedTimestamps.size()); + final long expectedTimestamp = expectedTimestamps.get(index); + Assert.assertEquals(expectedTimestamp, iterator.getLong(1)); + Assert.assertEquals( + new String(buildPayload(expectedTimestamp), StandardCharsets.UTF_8), + iterator.getString(2)); + index++; + } + Assert.assertEquals(expectedTimestamps.size(), index); + } + } + + private static void assertTreeEmptyOrNotExist(final ISession session, final String fullDevicePath) + throws Exception { + try (SessionDataSet dataSet = + session.executeQueryStatement(String.format("SELECT s1 FROM %s", fullDevicePath))) { + Assert.assertFalse(dataSet.iterator().next()); + } catch (Exception e) { + Assert.assertTrue( + e.getMessage().contains("does not exist") + || e.getMessage().contains("has not been created")); + } + } + + private static void insertTableRows(final ITableSession session, final String tableName) + throws Exception { + final List<String> columnNames = Arrays.asList("id", "file"); + final List<TSDataType> dataTypes = Arrays.asList(TSDataType.STRING, TSDataType.OBJECT); + final List<ColumnCategory> columnCategories = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD); + + final Tablet tablet = new Tablet(tableName, columnNames, dataTypes, columnCategories, 100); + for (long ts = HISTORY_INSERT_START; ts < HISTORY_INSERT_START + HISTORY_INSERT_ROWS; ts++) { + final int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, ts); + tablet.addValue(rowIndex, 0, "device1"); + tablet.addValue(rowIndex, 1, true, 0, buildPayload(ts)); + if (tablet.getRowSize() == tablet.getMaxRowNumber()) { + session.insert(tablet); + tablet.reset(); + } + } + if (tablet.getRowSize() > 0) { + session.insert(tablet); + } + } + + private static void insertTreeRows(final ISession session, final String devicePath) + throws Exception { + List<String> measurements = Collections.singletonList("s1"); + List<TSDataType> dataTypes = Collections.singletonList(TSDataType.TEXT); + + Tablet tablet = new Tablet(new StringArrayDeviceID(devicePath), measurements, dataTypes, 100); + for (long ts = HISTORY_INSERT_START; ts < HISTORY_INSERT_START + HISTORY_INSERT_ROWS; ts++) { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, ts); + tablet.addValue(rowIndex, 0, new String(buildPayload(ts), StandardCharsets.UTF_8)); + if (tablet.getRowSize() == tablet.getMaxRowNumber()) { + session.insertTablet(tablet); + tablet.reset(); + } + } + if (tablet.getRowSize() > 0) { + session.insertTablet(tablet); + } + } + + private static byte[] buildPayload(final long timestamp) { + return ("Payload_" + timestamp).getBytes(StandardCharsets.UTF_8); + } + + private static boolean containsTsFile(final Path dir) throws IOException { + if (!Files.exists(dir)) { + return false; + } + try (Stream<Path> stream = Files.walk(dir)) { + return stream + .filter(Files::isRegularFile) + .anyMatch(path -> path.getFileName().toString().endsWith(".tsfile")); + } + } + + private static final class ProcessResult { + private final int exitCode; + private final List<String> outputLines; + + private ProcessResult(final int exitCode, final List<String> outputLines) { + this.exitCode = exitCode; + this.outputLines = outputLines; + } + } +} diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/pipe/TsFileBackup.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/pipe/TsFileBackup.java index 575f44aab59..0ca05669506 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/pipe/TsFileBackup.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/pipe/TsFileBackup.java @@ -44,6 +44,8 @@ import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.Enumeration; import java.util.List; @@ -54,14 +56,16 @@ import java.util.List; public final class TsFileBackup { // ============================================================================================== - // Constants dictionary: plugin names, keys, defaults, env vars, and CLI options + // Constants dictionary: plugin names, keys, defaults, and CLI options // ============================================================================================== public static final String SINK_PLUGIN_NAME = "TSFILE_REMOTE_SINK"; public static final String SINK_PLUGIN_CLASS = "org.apache.iotdb.pipe.plugin.sink.tsfile.PipeTsFileRemoteSink"; - private static final String PLUGIN_JAR_PROPERTY = "tsfile.backup.plugin.jar"; - private static final String ENV_PLUGIN_JAR = "TSFILE_REMOTE_SINK_JAR"; + private static final String IOTDB_HOME_PROPERTY = "IOTDB_HOME"; + private static final String DEFAULT_PLUGIN_DIR = "ext/pipe"; + private static final String DEFAULT_PLUGIN_JAR_PREFIX = "tsfile-remote-sink-"; + private static final String DEFAULT_PLUGIN_JAR_SUFFIX = "-jar-with-dependencies.jar"; private static final IoTPrinter OUT = new IoTPrinter(System.out); @@ -338,18 +342,10 @@ public final class TsFileBackup { "object_thread_keep_alive_seconds must be a positive integer."); } - String jarPath = resolvePluginJarPath(line.getOptionValue(CliOptions.PLUGIN_JAR_LONG)); - this.pluginJar = new File(jarPath); - if (!this.pluginJar.isFile()) { - throw new IllegalArgumentException( - "Plugin JAR not found: " - + jarPath - + ". Set -D" - + PLUGIN_JAR_PROPERTY - + ", --plugin_jar, or " - + ENV_PLUGIN_JAR - + "."); - } + this.pluginJar = + resolvePluginJar( + line.getOptionValue(CliOptions.PLUGIN_JAR_LONG), + System.getProperty(IOTDB_HOME_PROPERTY)); } private String resolvePassword(CommandLine line, String opt, String prompt, String defaultPw) { @@ -763,16 +759,80 @@ public final class TsFileBackup { } } - private static String resolvePluginJarPath(String cliOverride) { - String prop = System.getProperty(PLUGIN_JAR_PROPERTY); - if (StringUtils.isNotBlank(prop)) { - return prop.trim(); + static File resolvePluginJar(String cliOverride, String iotdbHome) { + String configuredJarPath = normalizePathSetting(cliOverride); + if (StringUtils.isNotBlank(configuredJarPath)) { + File configuredPluginJar = new File(configuredJarPath); + if (configuredPluginJar.isFile()) { + return configuredPluginJar; + } + throw new IllegalArgumentException("Specified plugin JAR not found: " + configuredJarPath); } - if (StringUtils.isNotBlank(cliOverride)) { - return cliOverride.trim(); + + File defaultPluginDir = resolveDefaultPluginDir(iotdbHome); + File defaultPluginJar = findPluginJarInDirectory(defaultPluginDir); + if (defaultPluginJar != null) { + return defaultPluginJar; + } + + throw new IllegalArgumentException(buildMissingPluginJarMessage(defaultPluginDir)); + } + + private static File resolveDefaultPluginDir(String iotdbHome) { + iotdbHome = normalizePathSetting(iotdbHome); + if (StringUtils.isBlank(iotdbHome)) { + return null; + } + return new File(iotdbHome, DEFAULT_PLUGIN_DIR); + } + + private static File findPluginJarInDirectory(File pluginDir) { + if (pluginDir == null || !pluginDir.isDirectory()) { + return null; + } + + File[] pluginJars = + pluginDir.listFiles( + (dir, name) -> + name.startsWith(DEFAULT_PLUGIN_JAR_PREFIX) + && name.endsWith(DEFAULT_PLUGIN_JAR_SUFFIX)); + if (pluginJars == null || pluginJars.length == 0) { + return null; + } + + Arrays.sort(pluginJars, Comparator.comparing(File::getName)); + return pluginJars[0]; + } + + private static String buildMissingPluginJarMessage(File defaultPluginDir) { + String expectedJarName = DEFAULT_PLUGIN_JAR_PREFIX + "*" + DEFAULT_PLUGIN_JAR_SUFFIX; + if (defaultPluginDir == null) { + return "Pipe plugin JAR does not exist: --plugin_jar is not configured, and the default directory " + + DEFAULT_PLUGIN_DIR + + " cannot be resolved because " + + IOTDB_HOME_PROPERTY + + " is not set. Expected jar pattern: " + + expectedJarName + + "."; + } + return "Pipe plugin JAR does not exist: --plugin_jar is not configured, and the default directory " + + defaultPluginDir.getAbsolutePath() + + " does not contain " + + expectedJarName + + "."; + } + + private static String normalizePathSetting(String path) { + if (StringUtils.isBlank(path)) { + return ""; + } + + String normalized = path.trim(); + if ((normalized.startsWith("\"") && normalized.endsWith("\"")) + || (normalized.startsWith("'") && normalized.endsWith("'"))) { + normalized = normalized.substring(1, normalized.length() - 1).trim(); } - String env = System.getenv(ENV_PLUGIN_JAR); - return StringUtils.isNotBlank(env) ? env.trim() : ""; + return normalized; } private static String getLocalIpv4() { diff --git a/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/pipe/TsFileBackupTest.java b/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/pipe/TsFileBackupTest.java new file mode 100644 index 00000000000..e599de4e212 --- /dev/null +++ b/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/pipe/TsFileBackupTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tool.pipe; + +import org.junit.After; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TsFileBackupTest { + + private final List<Path> tempDirs = new ArrayList<>(); + + @After + public void tearDown() throws IOException { + for (Path tempDir : tempDirs) { + deleteRecursively(tempDir); + } + } + + @Test + public void testResolvePluginJarFromCliOverride() throws IOException { + Path tempDir = createTempDir(); + Path pluginJar = Files.createFile(tempDir.resolve("custom-plugin.jar")); + + File resolvedPluginJar = TsFileBackup.resolvePluginJar(pluginJar.toString(), null); + + assertEquals(pluginJar.toFile().getCanonicalFile(), resolvedPluginJar.getCanonicalFile()); + } + + @Test + public void testResolvePluginJarFromIoTDBHomeExtPipe() throws IOException { + Path tempDir = createTempDir(); + Path pluginDir = Files.createDirectories(tempDir.resolve("ext").resolve("pipe")); + Path pluginJar = + Files.createFile( + pluginDir.resolve("tsfile-remote-sink-2.0.8-SNAPSHOT-jar-with-dependencies.jar")); + + File resolvedPluginJar = TsFileBackup.resolvePluginJar(null, tempDir.toString()); + + assertEquals(pluginJar.toFile().getCanonicalFile(), resolvedPluginJar.getCanonicalFile()); + } + + @Test + public void testResolvePluginJarReportsMissingJarInIoTDBHomeExtPipe() throws IOException { + Path tempDir = createTempDir(); + Path pluginDir = Files.createDirectories(tempDir.resolve("ext").resolve("pipe")); + + try { + TsFileBackup.resolvePluginJar(null, tempDir.toString()); + fail("Expected missing plugin jar validation to fail."); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("--plugin_jar is not configured")); + assertTrue(e.getMessage().contains("tsfile-remote-sink-*-jar-with-dependencies.jar")); + assertTrue(e.getMessage().contains(pluginDir.toFile().getAbsolutePath())); + } + } + + private Path createTempDir() throws IOException { + Path tempDir = Files.createTempDirectory("tsfile-backup-test"); + tempDirs.add(tempDir); + return tempDir; + } + + private void deleteRecursively(Path root) throws IOException { + if (root == null || !Files.exists(root)) { + return; + } + try (java.util.stream.Stream<Path> stream = Files.walk(root)) { + stream.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } + } +} diff --git a/scripts/tools/tsfile-backup.sh b/scripts/tools/tsfile-backup.sh index ce480e09859..bf3c35abf8f 100755 --- a/scripts/tools/tsfile-backup.sh +++ b/scripts/tools/tsfile-backup.sh @@ -29,18 +29,6 @@ if [ -z "${IOTDB_HOME}" ]; then export IOTDB_HOME="$(cd "`dirname "$0"`"/..; pwd)" fi -TOOL_ROOT="$(cd "$(dirname "$0")/.."; pwd)" -PLUGIN_JAR="" - -if [ -z "$PLUGIN_JAR" ] && [ -d "$TOOL_ROOT/ext/pipe" ]; then - for f in "$TOOL_ROOT/ext/pipe"/tsfile-remote-sink-*-jar-with-dependencies.jar; do - if [ -f "$f" ]; then - PLUGIN_JAR="$f" - break - fi - done -fi - if [ -z "${IOTDB_HOME}" ]; then echo "[ERROR] IOTDB_HOME is not set. Set it to your IoTDB installation root (directory that contains lib/)." >&2 exit 1 @@ -63,11 +51,8 @@ if [ -z "$JAVA" ] ; then fi JVM_OPTS="-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8" -if [ -n "$PLUGIN_JAR" ] && [ -f "$PLUGIN_JAR" ]; then - JVM_OPTS="${JVM_OPTS} -Dtsfile.backup.plugin.jar=${PLUGIN_JAR}" -fi -CLASSPATH="" +CLASSPATH="${CLASSPATH:-}" for f in "${IOTDB_HOME}"/lib/*.jar; do CLASSPATH="${CLASSPATH}:${f}" done diff --git a/scripts/tools/windows/tsfile-backup.bat b/scripts/tools/windows/tsfile-backup.bat index a116475c29f..324501ce0ab 100644 --- a/scripts/tools/windows/tsfile-backup.bat +++ b/scripts/tools/windows/tsfile-backup.bat @@ -36,12 +36,6 @@ popd >nul @REM ----------------------------------------------------------------------------- if NOT DEFINED JAVA_HOME goto :err_java -set "PLUGIN_JAR=" - -if NOT DEFINED PLUGIN_JAR for %%f in ("%TOOL_ROOT%\ext\pipe\tsfile-remote-sink-*-jar-with-dependencies.jar") do ( - if EXIST "%%~f" set "PLUGIN_JAR=%%~f" -) - @REM ----------------------------------------------------------------------------- @REM 3. JVM Options and Dependency Assembly @REM ----------------------------------------------------------------------------- @@ -49,7 +43,6 @@ if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.pipe.TsFileBackup @REM Centralized system properties and encoding settings set "JAVA_OPTS=-ea -Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8 -DIOTDB_HOME="%IOTDB_HOME%"" -if DEFINED PLUGIN_JAR if EXIST "%PLUGIN_JAR%" set "JAVA_OPTS=%JAVA_OPTS% -Dtsfile.backup.plugin.jar="%PLUGIN_JAR%"" @REM Elegant dependency loading using wildcards (*) to avoid string concatenation hell set "CLASSPATH=%TOOL_ROOT%\lib\*;%IOTDB_HOME%\lib\*"
