This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b396766f4f26e3bd59bc30d5e3fd79d790c5a3ec Author: Zhenyu Luo <[email protected]> AuthorDate: Fri Nov 14 15:14:37 2025 +0800 feat: encode load attributes in active load directories (#16722) * feat: encode load attributes in active load directories * spotless * fix * fix * fix * fix * fix * update it * update it * fix * update * update (cherry picked from commit 91e48f07f63a1161b46487a281858eb1b4b63479) --- .../iotdb/db/it/IoTDBLoadTsFileWithModIT.java | 60 ++++- .../protocol/thrift/IoTDBDataNodeReceiver.java | 12 +- .../plan/analyze/load/LoadTsFileAnalyzer.java | 13 +- .../plan/statement/crud/LoadTsFileStatement.java | 16 +- .../load/active/ActiveLoadDirScanner.java | 23 +- .../load/active/ActiveLoadPathHelper.java | 282 +++++++++++++++++++++ .../load/active/ActiveLoadPendingQueue.java | 17 +- .../load/active/ActiveLoadTsFileLoader.java | 36 +-- .../storageengine/load/active/ActiveLoadUtil.java | 37 +-- .../load/config/LoadTsFileConfigurator.java | 18 ++ 10 files changed, 460 insertions(+), 54 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java index 9d0a54f1672..5578023e4f5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java @@ -85,7 +85,7 @@ public class IoTDBLoadTsFileWithModIT { // write mods file resource .getExclusiveModFile() - .write(new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1"), 1, 2)); + .write(new TreeDeletionEntry(new MeasurementPath("root.test.d1.de.s1"), 1, 2)); resource.getExclusiveModFile().close(); } @@ -93,7 +93,7 @@ public class IoTDBLoadTsFileWithModIT { throws IOException, DataRegionException, WriteProcessException, IllegalPathException { TsFileResource resource = generateFile(); ModificationFileV1 oldModFile = ModificationFileV1.getNormalMods(resource); - oldModFile.write(new Deletion(new MeasurementPath("root.test.d1.s1"), Long.MAX_VALUE, 1, 2)); + oldModFile.write(new Deletion(new MeasurementPath("root.test.d1.de.s1"), Long.MAX_VALUE, 1, 2)); oldModFile.close(); } @@ -102,11 +102,11 @@ public class IoTDBLoadTsFileWithModIT { File tsfile = new File(tmpDir, "1-1-0-0.tsfile"); try (TsFileWriter writer = new TsFileWriter(tsfile)) { writer.registerAlignedTimeseries( - "root.test.d1", + "root.test.d1.de", Collections.singletonList(new MeasurementSchema("s1", TSDataType.BOOLEAN))); Tablet tablet = new Tablet( - "root.test.d1", + "root.test.d1.de", Collections.singletonList(new MeasurementSchema("s1", TSDataType.BOOLEAN))); for (int i = 0; i < 5; i++) { tablet.addTimestamp(i, i); @@ -138,13 +138,61 @@ public class IoTDBLoadTsFileWithModIT { statement.execute(String.format("load \'%s\'", tmpDir.getAbsolutePath())); try (final ResultSet resultSet = - statement.executeQuery("select count(s1) as c from root.test.d1")) { + statement.executeQuery("select count(s1) as c from root.test.d1.de")) { Assert.assertTrue(resultSet.next()); Assert.assertEquals(3, resultSet.getLong("c")); } } } + @Test + public void testWithNewModFileAndLoadAttributes() + throws SQLException, + IOException, + DataRegionException, + WriteProcessException, + IllegalPathException { + generateFileWithNewModFile(); + final String databaseName = "root.test.d1"; + + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + + statement.execute( + String.format( + "load \'%s\' with (" + + "'database-name'='%s'," + + "'database-level'='2'," + + "'verify'='true'," + + "'on-success'='none'," + + "'async'='true')", + tmpDir.getAbsolutePath(), databaseName)); + + boolean databaseFound = false; + out: + for (int i = 0; i < 10; i++) { + try (final ResultSet resultSet = statement.executeQuery("show databases")) { + while (resultSet.next()) { + final String currentDatabase = resultSet.getString(1); + if (databaseName.equalsIgnoreCase(currentDatabase)) { + databaseFound = true; + break out; + } + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + } + } + Assert.assertTrue( + "The `database-level` parameter is not working; the generated database does not contain 'root.test.d1'.", + databaseFound); + } + } + @Test public void testWithOldModFile() throws SQLException, @@ -159,7 +207,7 @@ public class IoTDBLoadTsFileWithModIT { statement.execute(String.format("load \'%s\'", tmpDir.getAbsolutePath())); try (final ResultSet resultSet = - statement.executeQuery("select count(s1) as c from root.test.d1")) { + statement.executeQuery("select count(s1) as c from root.test.d1.de")) { Assert.assertTrue(resultSet.next()); Assert.assertEquals(3, resultSet.getLong("c")); Assert.assertTrue( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 8a9ceb61c93..652530d11b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -100,6 +100,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; +import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil; import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; @@ -570,7 +571,16 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { private TSStatus loadTsFileAsync(final String dataBaseName, final List<String> absolutePaths) throws IOException { - if (!ActiveLoadUtil.loadFilesToActiveDir(dataBaseName, absolutePaths, true)) { + final Map<String, String> loadAttributes = + ActiveLoadPathHelper.buildAttributes( + dataBaseName, + null, + shouldConvertDataTypeOnTypeMismatch, + validateTsFile.get(), + null, + shouldMarkAsPipeRequest.get()); + + if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) { throw new PipeException("Load active listening pipe dir is not set."); } return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 4fe119566d7..076b529ab91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; +import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil; import org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter; import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet; @@ -281,7 +282,17 @@ public class LoadTsFileAnalyzer implements AutoCloseable { } else { databaseName = null; } - if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(tsFiles, databaseName, isDeleteAfterLoad)) { + final Map<String, String> activeLoadAttributes = + ActiveLoadPathHelper.buildAttributes( + databaseName, + databaseLevel, + isConvertOnTypeMismatch, + isVerifySchema, + tabletConversionThresholdBytes, + isGeneratedByPipe); + + if (ActiveLoadUtil.loadTsFileAsyncToActiveDir( + tsFiles, activeLoadAttributes, isDeleteAfterLoad)) { analysis.setFinishQueryAfterAnalyze(true); setRealStatement(analysis); return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index dbc4cb7b64d..2d74925971c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -48,6 +48,7 @@ import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurat import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE; +import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.PIPE_GENERATED_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY; public class LoadTsFileStatement extends Statement { @@ -63,8 +64,6 @@ public class LoadTsFileStatement extends Statement { private boolean isGeneratedByPipe = false; private boolean isAsyncLoad = false; - private Map<String, String> loadAttributes; - private List<File> tsFiles; private List<Boolean> isTableModel; private List<TsFileResource> resources; @@ -245,15 +244,14 @@ public class LoadTsFileStatement extends Statement { } public void setLoadAttributes(final Map<String, String> loadAttributes) { - this.loadAttributes = loadAttributes; - initAttributes(); + initAttributes(loadAttributes); } public boolean isAsyncLoad() { return isAsyncLoad; } - private void initAttributes() { + private void initAttributes(final Map<String, String> loadAttributes) { this.databaseLevel = LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes); this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes); this.deleteAfterLoad = LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes); @@ -263,6 +261,9 @@ public class LoadTsFileStatement extends Statement { LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes); this.verifySchema = LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes); this.isAsyncLoad = LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes); + if (LoadTsFileConfigurator.parseOrGetDefaultPipeGenerated(loadAttributes)) { + markIsGeneratedByPipe(); + } } public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> isMiniTsFile) { @@ -314,7 +315,7 @@ public class LoadTsFileStatement extends Statement { @Override public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelationalStatement( MPPQueryContext context) { - loadAttributes = new HashMap<>(); + final Map<String, String> loadAttributes = new HashMap<>(); loadAttributes.put(DATABASE_LEVEL_KEY, String.valueOf(databaseLevel)); if (database != null) { @@ -326,6 +327,9 @@ public class LoadTsFileStatement extends Statement { loadAttributes.put( TABLET_CONVERSION_THRESHOLD_KEY, String.valueOf(tabletConversionThresholdBytes)); loadAttributes.put(ASYNC_LOAD_KEY, String.valueOf(isAsyncLoad)); + if (isGeneratedByPipe) { + loadAttributes.put(PIPE_GENERATED_KEY, String.valueOf(true)); + } return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java index 5ac33dfbdd4..470cf702b22 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java @@ -39,6 +39,7 @@ import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.Arrays; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -117,13 +118,23 @@ public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService { .filter(this::isTsFileCompleted) .limit(currentAllowedPendingSize) .forEach( - file -> { - final File parentFile = new File(file).getParentFile(); + filePath -> { + final File tsFile = new File(filePath); + final Map<String, String> attributes = + ActiveLoadPathHelper.parseAttributes(tsFile, listeningDirFile); + + final File parentFile = tsFile.getParentFile(); + final boolean isTableModel = + ActiveLoadPathHelper.containsDatabaseName(attributes) + || (parentFile != null + && !Objects.equals( + parentFile.getAbsoluteFile(), + listeningDirFile.getAbsoluteFile())); + activeLoadTsFileLoader.tryTriggerTsFileLoad( - file, - parentFile != null - && !Objects.equals( - parentFile.getAbsoluteFile(), listeningDirFile.getAbsoluteFile()), + tsFile.getAbsolutePath(), + listeningDirFile.getAbsolutePath(), + isTableModel, isGeneratedByPipe); }); } catch (UncheckedIOException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java new file mode 100644 index 00000000000..7c83f20f369 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.active; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator; + +import java.io.File; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Utility methods for encoding and decoding load attributes into directory structures. */ +public final class ActiveLoadPathHelper { + + private static final String SEGMENT_SEPARATOR = "-"; + + private static final List<String> KEY_ORDER = + Collections.unmodifiableList( + Arrays.asList( + LoadTsFileConfigurator.DATABASE_NAME_KEY, + LoadTsFileConfigurator.DATABASE_LEVEL_KEY, + LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY, + LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY, + LoadTsFileConfigurator.VERIFY_KEY, + LoadTsFileConfigurator.DATABASE_KEY, + LoadTsFileConfigurator.PIPE_GENERATED_KEY)); + + private ActiveLoadPathHelper() { + throw new IllegalStateException("Utility class"); + } + + public static Map<String, String> buildAttributes( + final String databaseName, + final Integer databaseLevel, + final Boolean convertOnTypeMismatch, + final Boolean verify, + final Long tabletConversionThresholdBytes, + final Boolean pipeGenerated) { + final Map<String, String> attributes = new LinkedHashMap<>(); + if (Objects.nonNull(databaseName) && !databaseName.isEmpty()) { + attributes.put(LoadTsFileConfigurator.DATABASE_NAME_KEY, databaseName); + } + + if (Objects.nonNull(databaseLevel)) { + attributes.put(LoadTsFileConfigurator.DATABASE_LEVEL_KEY, databaseLevel.toString()); + } + + if (Objects.nonNull(convertOnTypeMismatch)) { + attributes.put( + LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY, + Boolean.toString(convertOnTypeMismatch)); + } + + if (Objects.nonNull(tabletConversionThresholdBytes)) { + attributes.put( + LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY, + tabletConversionThresholdBytes.toString()); + } + + if (Objects.nonNull(verify)) { + attributes.put(LoadTsFileConfigurator.VERIFY_KEY, Boolean.toString(verify)); + } + + if (Objects.nonNull(pipeGenerated) && pipeGenerated) { + attributes.put(LoadTsFileConfigurator.PIPE_GENERATED_KEY, Boolean.TRUE.toString()); + } + return attributes; + } + + public static File resolveTargetDir(final File baseDir, final Map<String, String> attributes) { + File current = baseDir; + for (final String key : KEY_ORDER) { + final String value = attributes.get(key); + if (value == null) { + continue; + } + current = new File(current, formatSegment(key, value)); + } + return current; + } + + public static Map<String, String> parseAttributes(final File file, final File pendingDir) { + if (file == null) { + return Collections.emptyMap(); + } + + final Map<String, String> attributes = new HashMap<>(); + File current = file.getParentFile(); + while (current != null) { + final String dirName = current.getName(); + if (pendingDir != null && current.equals(pendingDir)) { + break; + } + for (final String key : KEY_ORDER) { + final String prefix = key + SEGMENT_SEPARATOR; + if (dirName.startsWith(prefix)) { + extractAndValidateAttributeValue(key, dirName, prefix.length()) + .ifPresent(value -> attributes.putIfAbsent(key, value)); + break; + } + } + current = current.getParentFile(); + } + return attributes; + } + + public static File findPendingDirectory(final File file) { + if (file == null) { + return null; + } + String[] dirs = IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs(); + File current = file; + while (current != null) { + for (final String dir : dirs) { + if (current.isDirectory() && current.getAbsolutePath().equals(dir)) { + return current; + } + } + current = current.getParentFile(); + } + return null; + } + + public static void applyAttributesToStatement( + final Map<String, String> attributes, + final LoadTsFileStatement statement, + final boolean defaultVerify) { + + Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY)) + .filter(name -> !name.isEmpty()) + .ifPresent(statement::setDatabase); + + if (statement.getDatabase() == null || statement.getDatabase().isEmpty()) { + Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_KEY)) + .filter(name -> !name.isEmpty()) + .ifPresent(statement::setDatabase); + } + + Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY)) + .ifPresent( + level -> { + try { + statement.setDatabaseLevel(Integer.parseInt(level)); + } catch (final NumberFormatException ignored) { + // keep the default when parsing fails + } + }); + + Optional.ofNullable(attributes.get(LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY)) + .ifPresent(value -> statement.setConvertOnTypeMismatch(Boolean.parseBoolean(value))); + + Optional.ofNullable(attributes.get(LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY)) + .ifPresent( + threshold -> { + try { + statement.setTabletConversionThresholdBytes(Long.parseLong(threshold)); + } catch (final NumberFormatException ignored) { + // keep the default when parsing fails + } + }); + + if (attributes.containsKey(LoadTsFileConfigurator.VERIFY_KEY)) { + statement.setVerifySchema( + Boolean.parseBoolean(attributes.get(LoadTsFileConfigurator.VERIFY_KEY))); + } else { + statement.setVerifySchema(defaultVerify); + } + + if (attributes.containsKey(LoadTsFileConfigurator.PIPE_GENERATED_KEY) + && Boolean.parseBoolean(attributes.get(LoadTsFileConfigurator.PIPE_GENERATED_KEY))) { + statement.markIsGeneratedByPipe(); + } + } + + public static boolean containsDatabaseName(final Map<String, String> attributes) { + return attributes.containsKey(LoadTsFileConfigurator.DATABASE_NAME_KEY) + || attributes.containsKey(LoadTsFileConfigurator.DATABASE_KEY); + } + + private static String formatSegment(final String key, final String value) { + return key + SEGMENT_SEPARATOR + encodeValue(value); + } + + private static String encodeValue(final String value) { + try { + return URLEncoder.encode(value, StandardCharsets.UTF_8.toString()); + } catch (final UnsupportedEncodingException e) { + // UTF-8 should always be supported; fallback to raw value when unexpected + return value; + } + } + + private static Optional<String> extractAndValidateAttributeValue( + final String key, final String dirName, final int prefixLength) { + if (dirName.length() <= prefixLength) { + return Optional.empty(); + } + + final String encodedValue = dirName.substring(prefixLength); + final String decodedValue = decodeValue(encodedValue); + try { + validateAttributeValue(key, decodedValue); + return Optional.of(decodedValue); + } catch (final SemanticException e) { + return Optional.empty(); + } + } + + private static void validateAttributeValue(final String key, final String value) { + switch (key) { + case LoadTsFileConfigurator.DATABASE_NAME_KEY: + if (value == null || value.isEmpty()) { + throw new SemanticException("Database name must not be empty."); + } + break; + case LoadTsFileConfigurator.DATABASE_LEVEL_KEY: + LoadTsFileConfigurator.validateDatabaseLevelParam(value); + break; + case LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY: + LoadTsFileConfigurator.validateConvertOnTypeMismatchParam(value); + break; + case LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY: + validateTabletConversionThreshold(value); + break; + case LoadTsFileConfigurator.VERIFY_KEY: + LoadTsFileConfigurator.validateVerifyParam(value); + break; + default: + LoadTsFileConfigurator.validateParameters(key, value); + } + } + + private static void validateTabletConversionThreshold(final String value) { + try { + final long threshold = Long.parseLong(value); + if (threshold < 0) { + throw new SemanticException( + "Tablet conversion threshold must be a non-negative long value."); + } + } catch (final NumberFormatException e) { + throw new SemanticException( + String.format("Tablet conversion threshold '%s' is not a valid long value.", value)); + } + } + + private static String decodeValue(final String value) { + try { + return URLDecoder.decode(value, StandardCharsets.UTF_8.toString()); + } catch (final UnsupportedEncodingException e) { + return value; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java index 3ec283cd2e0..7b5f7166197 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java @@ -34,9 +34,13 @@ public class ActiveLoadPendingQueue { private final Set<String> loadingFileSet = new HashSet<>(); public synchronized boolean enqueue( - final String file, final boolean isGeneratedByPipe, final boolean isTableModel) { + final String file, + final String pendingDir, + final boolean isGeneratedByPipe, + final boolean isTableModel) { if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) { - pendingFileQueue.offer(new ActiveLoadEntry(file, isGeneratedByPipe, isTableModel)); + pendingFileQueue.offer( + new ActiveLoadEntry(file, pendingDir, isGeneratedByPipe, isTableModel)); ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1); return true; @@ -76,11 +80,14 @@ public class ActiveLoadPendingQueue { public static class ActiveLoadEntry { private final String file; + private final String pendingDir; private final boolean isGeneratedByPipe; private final boolean isTableModel; - public ActiveLoadEntry(String file, boolean isGeneratedByPipe, boolean isTableModel) { + public ActiveLoadEntry( + String file, String pendingDir, boolean isGeneratedByPipe, boolean isTableModel) { this.file = file; + this.pendingDir = pendingDir; this.isGeneratedByPipe = isGeneratedByPipe; this.isTableModel = isTableModel; } @@ -89,6 +96,10 @@ public class ActiveLoadPendingQueue { return file; } + public String getPendingDir() { + return pendingDir; + } + public boolean isGeneratedByPipe() { return isGeneratedByPipe; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java index 62feb562be7..20817c94146 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java @@ -56,6 +56,7 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.time.ZoneId; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.LinkedBlockingQueue; @@ -84,12 +85,12 @@ public class ActiveLoadTsFileLoader { } public void tryTriggerTsFileLoad( - String absolutePath, boolean isTabletMode, boolean isGeneratedByPipe) { + String absolutePath, String pendingDir, boolean isTabletMode, boolean isGeneratedByPipe) { if (CommonDescriptor.getInstance().getConfig().isReadOnly()) { return; } - if (pendingQueue.enqueue(absolutePath, isGeneratedByPipe, isTabletMode)) { + if (pendingQueue.enqueue(absolutePath, pendingDir, isGeneratedByPipe, isTabletMode)) { initFailDirIfNecessary(); adjustExecutorIfNecessary(); } @@ -216,24 +217,29 @@ public class ActiveLoadTsFileLoader { private TSStatus loadTsFile( final ActiveLoadPendingQueue.ActiveLoadEntry entry, final IClientSession session) throws FileNotFoundException { - final LoadTsFileStatement statement = new LoadTsFileStatement(entry.getFile()); + final File tsFile = new File(entry.getFile()); + final LoadTsFileStatement statement = new LoadTsFileStatement(tsFile.getAbsolutePath()); final List<File> files = statement.getTsFiles(); - // It should be noted here that the instructions in this code block do not need to use the - // DataBase, so the DataBase is assigned a value of null. If the DataBase is used later, an - // exception will be thrown. - final File parentFile; - statement.setDatabase( - files.isEmpty() - || !entry.isTableModel() - || (parentFile = files.get(0).getParentFile()) == null - ? null - : parentFile.getName()); statement.setDeleteAfterLoad(true); - statement.setConvertOnTypeMismatch(true); - statement.setVerifySchema(isVerify); statement.setAutoCreateDatabase( IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()); + + final File pendingDir = + entry.getPendingDir() == null + ? ActiveLoadPathHelper.findPendingDirectory(tsFile) + : new File(entry.getPendingDir()); + final Map<String, String> attributes = ActiveLoadPathHelper.parseAttributes(tsFile, pendingDir); + ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, isVerify); + + final File parentFile; + if (statement.getDatabase() == null && entry.isTableModel()) { + statement.setDatabase( + files.isEmpty() || (parentFile = files.get(0).getParentFile()) == null + ? null + : parentFile.getName()); + } + return executeStatement( entry.isGeneratedByPipe() ? new PipeEnrichedStatement(statement) : statement, session); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java index 669013ec450..e3dbe43507d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java @@ -32,7 +32,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check; @@ -45,14 +47,16 @@ public class ActiveLoadUtil { private static volatile ILoadDiskSelector loadDiskSelector = updateLoadDiskSelector(); public static boolean loadTsFileAsyncToActiveDir( - final List<File> tsFiles, final String dataBaseName, final boolean isDeleteAfterLoad) { + final List<File> tsFiles, + final Map<String, String> loadAttributes, + final boolean isDeleteAfterLoad) { if (tsFiles == null || tsFiles.isEmpty()) { return true; } try { for (File file : tsFiles) { - if (!loadTsFilesToActiveDir(dataBaseName, file, isDeleteAfterLoad)) { + if (!loadTsFilesToActiveDir(loadAttributes, file, isDeleteAfterLoad)) { return false; } } @@ -65,7 +69,7 @@ public class ActiveLoadUtil { } private static boolean loadTsFilesToActiveDir( - final String dataBaseName, final File file, final boolean isDeleteAfterLoad) + final Map<String, String> loadAttributes, final File file, final boolean isDeleteAfterLoad) throws IOException { if (file == null) { return true; @@ -84,12 +88,9 @@ public class ActiveLoadUtil { LOGGER.warn("Load active listening dir is not set."); return false; } - final File targetDir; - if (Objects.nonNull(dataBaseName)) { - targetDir = new File(targetFilePath, dataBaseName); - } else { - targetDir = targetFilePath; - } + final Map<String, String> attributes = + Objects.nonNull(loadAttributes) ? loadAttributes : Collections.emptyMap(); + final File targetDir = ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes); loadTsFileAsyncToTargetDir( targetDir, new File(file.getAbsolutePath() + ".resource"), isDeleteAfterLoad); @@ -100,7 +101,9 @@ public class ActiveLoadUtil { } public static boolean loadFilesToActiveDir( - final String dataBaseName, final List<String> files, final boolean isDeleteAfterLoad) + final Map<String, String> loadAttributes, + final List<String> files, + final boolean isDeleteAfterLoad) throws IOException { if (files == null || files.isEmpty()) { return true; @@ -120,12 +123,9 @@ public class ActiveLoadUtil { LOGGER.warn("Load active listening dir is not set."); return false; } - final File targetDir; - if (Objects.nonNull(dataBaseName)) { - targetDir = new File(targetFilePath, dataBaseName); - } else { - targetDir = targetFilePath; - } + final Map<String, String> attributes = + Objects.nonNull(loadAttributes) ? loadAttributes : Collections.emptyMap(); + final File targetDir = ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes); for (final String file : files) { loadTsFileAsyncToTargetDir(targetDir, new File(file), isDeleteAfterLoad); @@ -138,6 +138,11 @@ public class ActiveLoadUtil { if (!file.exists()) { return; } + if (!targetDir.exists() && !targetDir.mkdirs()) { + if (!targetDir.exists()) { + throw new IOException("Failed to create target directory " + targetDir.getAbsolutePath()); + } + } RetryUtils.retryOnException( () -> { if (isDeleteAfterLoad) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java index da94de532d1..8478486781b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java @@ -54,6 +54,9 @@ public class LoadTsFileConfigurator { case VERIFY_KEY: validateVerifyParam(value); break; + case PIPE_GENERATED_KEY: + validatePipeGeneratedParam(value); + break; case ASYNC_LOAD_KEY: validateAsyncLoadParam(value); break; @@ -183,6 +186,21 @@ public class LoadTsFileConfigurator { loadAttributes.getOrDefault(VERIFY_KEY, String.valueOf(VERIFY_DEFAULT_VALUE))); } + public static final String PIPE_GENERATED_KEY = "pipe-generated"; + + public static void validatePipeGeneratedParam(final String pipeGenerated) { + if (!"true".equalsIgnoreCase(pipeGenerated) && !"false".equalsIgnoreCase(pipeGenerated)) { + throw new SemanticException( + String.format( + "Given %s value '%s' is not supported, please input a valid boolean value.", + PIPE_GENERATED_KEY, pipeGenerated)); + } + } + + public static boolean parseOrGetDefaultPipeGenerated(final Map<String, String> loadAttributes) { + return Boolean.parseBoolean(loadAttributes.getOrDefault(PIPE_GENERATED_KEY, "false")); + } + public static final String ASYNC_LOAD_KEY = "async"; private static final boolean ASYNC_LOAD_DEFAULT_VALUE = false;
