This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 91e48f07f63 feat: encode load attributes in active load directories
(#16722)
91e48f07f63 is described below
commit 91e48f07f63a1161b46487a281858eb1b4b63479
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Nov 14 15:14:37 2025 +0800
feat: encode load attributes in active load directories (#16722)
* feat: encode load attributes in active load directories
* spotless
* fix
* fix
* fix
* fix
* fix
* update it
* update it
* fix
* update
* update
---
.../iotdb/db/it/IoTDBLoadTsFileWithModIT.java | 60 ++++-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 12 +-
.../plan/analyze/load/LoadTsFileAnalyzer.java | 13 +-
.../plan/statement/crud/LoadTsFileStatement.java | 16 +-
.../load/active/ActiveLoadDirScanner.java | 23 +-
.../load/active/ActiveLoadPathHelper.java | 282 +++++++++++++++++++++
.../load/active/ActiveLoadPendingQueue.java | 17 +-
.../load/active/ActiveLoadTsFileLoader.java | 36 +--
.../storageengine/load/active/ActiveLoadUtil.java | 37 +--
.../load/config/LoadTsFileConfigurator.java | 18 ++
10 files changed, 460 insertions(+), 54 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java
index 9d0a54f1672..5578023e4f5 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileWithModIT.java
@@ -85,7 +85,7 @@ public class IoTDBLoadTsFileWithModIT {
// write mods file
resource
.getExclusiveModFile()
- .write(new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1"),
1, 2));
+ .write(new TreeDeletionEntry(new
MeasurementPath("root.test.d1.de.s1"), 1, 2));
resource.getExclusiveModFile().close();
}
@@ -93,7 +93,7 @@ public class IoTDBLoadTsFileWithModIT {
throws IOException, DataRegionException, WriteProcessException,
IllegalPathException {
TsFileResource resource = generateFile();
ModificationFileV1 oldModFile = ModificationFileV1.getNormalMods(resource);
- oldModFile.write(new Deletion(new MeasurementPath("root.test.d1.s1"),
Long.MAX_VALUE, 1, 2));
+ oldModFile.write(new Deletion(new MeasurementPath("root.test.d1.de.s1"),
Long.MAX_VALUE, 1, 2));
oldModFile.close();
}
@@ -102,11 +102,11 @@ public class IoTDBLoadTsFileWithModIT {
File tsfile = new File(tmpDir, "1-1-0-0.tsfile");
try (TsFileWriter writer = new TsFileWriter(tsfile)) {
writer.registerAlignedTimeseries(
- "root.test.d1",
+ "root.test.d1.de",
Collections.singletonList(new MeasurementSchema("s1",
TSDataType.BOOLEAN)));
Tablet tablet =
new Tablet(
- "root.test.d1",
+ "root.test.d1.de",
Collections.singletonList(new MeasurementSchema("s1",
TSDataType.BOOLEAN)));
for (int i = 0; i < 5; i++) {
tablet.addTimestamp(i, i);
@@ -138,13 +138,61 @@ public class IoTDBLoadTsFileWithModIT {
statement.execute(String.format("load \'%s\'",
tmpDir.getAbsolutePath()));
try (final ResultSet resultSet =
- statement.executeQuery("select count(s1) as c from root.test.d1")) {
+ statement.executeQuery("select count(s1) as c from
root.test.d1.de")) {
Assert.assertTrue(resultSet.next());
Assert.assertEquals(3, resultSet.getLong("c"));
}
}
}
+ @Test
+ public void testWithNewModFileAndLoadAttributes()
+ throws SQLException,
+ IOException,
+ DataRegionException,
+ WriteProcessException,
+ IllegalPathException {
+ generateFileWithNewModFile();
+ final String databaseName = "root.test.d1";
+
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ statement.execute(
+ String.format(
+ "load \'%s\' with ("
+ + "'database-name'='%s',"
+ + "'database-level'='2',"
+ + "'verify'='true',"
+ + "'on-success'='none',"
+ + "'async'='true')",
+ tmpDir.getAbsolutePath(), databaseName));
+
+ boolean databaseFound = false;
+ out:
+ for (int i = 0; i < 10; i++) {
+ try (final ResultSet resultSet = statement.executeQuery("show
databases")) {
+ while (resultSet.next()) {
+ final String currentDatabase = resultSet.getString(1);
+ if (databaseName.equalsIgnoreCase(currentDatabase)) {
+ databaseFound = true;
+ break out;
+ }
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ Assert.assertTrue(
+ "The `database-level` parameter is not working; the generated
database does not contain 'root.test.d1'.",
+ databaseFound);
+ }
+ }
+
@Test
public void testWithOldModFile()
throws SQLException,
@@ -159,7 +207,7 @@ public class IoTDBLoadTsFileWithModIT {
statement.execute(String.format("load \'%s\'",
tmpDir.getAbsolutePath()));
try (final ResultSet resultSet =
- statement.executeQuery("select count(s1) as c from root.test.d1")) {
+ statement.executeQuery("select count(s1) as c from
root.test.d1.de")) {
Assert.assertTrue(resultSet.next());
Assert.assertEquals(3, resultSet.getLong("c"));
Assert.assertTrue(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 8a9ceb61c93..652530d11b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -100,6 +100,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -570,7 +571,16 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus loadTsFileAsync(final String dataBaseName, final
List<String> absolutePaths)
throws IOException {
- if (!ActiveLoadUtil.loadFilesToActiveDir(dataBaseName, absolutePaths,
true)) {
+ final Map<String, String> loadAttributes =
+ ActiveLoadPathHelper.buildAttributes(
+ dataBaseName,
+ null,
+ shouldConvertDataTypeOnTypeMismatch,
+ validateTsFile.get(),
+ null,
+ shouldMarkAsPipeRequest.get());
+
+ if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths,
true)) {
throw new PipeException("Load active listening pipe dir is not set.");
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 4fe119566d7..076b529ab91 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -42,6 +42,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
@@ -281,7 +282,17 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
} else {
databaseName = null;
}
- if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(tsFiles, databaseName,
isDeleteAfterLoad)) {
+ final Map<String, String> activeLoadAttributes =
+ ActiveLoadPathHelper.buildAttributes(
+ databaseName,
+ databaseLevel,
+ isConvertOnTypeMismatch,
+ isVerifySchema,
+ tabletConversionThresholdBytes,
+ isGeneratedByPipe);
+
+ if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(
+ tsFiles, activeLoadAttributes, isDeleteAfterLoad)) {
analysis.setFinishQueryAfterAnalyze(true);
setRealStatement(analysis);
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index dbc4cb7b64d..2d74925971c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -48,6 +48,7 @@ import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurat
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE;
+import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.PIPE_GENERATED_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY;
public class LoadTsFileStatement extends Statement {
@@ -63,8 +64,6 @@ public class LoadTsFileStatement extends Statement {
private boolean isGeneratedByPipe = false;
private boolean isAsyncLoad = false;
- private Map<String, String> loadAttributes;
-
private List<File> tsFiles;
private List<Boolean> isTableModel;
private List<TsFileResource> resources;
@@ -245,15 +244,14 @@ public class LoadTsFileStatement extends Statement {
}
public void setLoadAttributes(final Map<String, String> loadAttributes) {
- this.loadAttributes = loadAttributes;
- initAttributes();
+ initAttributes(loadAttributes);
}
public boolean isAsyncLoad() {
return isAsyncLoad;
}
- private void initAttributes() {
+ private void initAttributes(final Map<String, String> loadAttributes) {
this.databaseLevel =
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
@@ -263,6 +261,9 @@ public class LoadTsFileStatement extends Statement {
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
this.verifySchema =
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
this.isAsyncLoad =
LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
+ if (LoadTsFileConfigurator.parseOrGetDefaultPipeGenerated(loadAttributes))
{
+ markIsGeneratedByPipe();
+ }
}
public boolean reconstructStatementIfMiniFileConverted(final List<Boolean>
isMiniTsFile) {
@@ -314,7 +315,7 @@ public class LoadTsFileStatement extends Statement {
@Override
public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement
toRelationalStatement(
MPPQueryContext context) {
- loadAttributes = new HashMap<>();
+ final Map<String, String> loadAttributes = new HashMap<>();
loadAttributes.put(DATABASE_LEVEL_KEY, String.valueOf(databaseLevel));
if (database != null) {
@@ -326,6 +327,9 @@ public class LoadTsFileStatement extends Statement {
loadAttributes.put(
TABLET_CONVERSION_THRESHOLD_KEY,
String.valueOf(tabletConversionThresholdBytes));
loadAttributes.put(ASYNC_LOAD_KEY, String.valueOf(isAsyncLoad));
+ if (isGeneratedByPipe) {
+ loadAttributes.put(PIPE_GENERATED_KEY, String.valueOf(true));
+ }
return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index 5ac33dfbdd4..470cf702b22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -39,6 +39,7 @@ import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -117,13 +118,23 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
.filter(this::isTsFileCompleted)
.limit(currentAllowedPendingSize)
.forEach(
- file -> {
- final File parentFile = new File(file).getParentFile();
+ filePath -> {
+ final File tsFile = new File(filePath);
+ final Map<String, String> attributes =
+ ActiveLoadPathHelper.parseAttributes(tsFile,
listeningDirFile);
+
+ final File parentFile = tsFile.getParentFile();
+ final boolean isTableModel =
+ ActiveLoadPathHelper.containsDatabaseName(attributes)
+ || (parentFile != null
+ && !Objects.equals(
+ parentFile.getAbsoluteFile(),
+ listeningDirFile.getAbsoluteFile()));
+
activeLoadTsFileLoader.tryTriggerTsFileLoad(
- file,
- parentFile != null
- && !Objects.equals(
- parentFile.getAbsoluteFile(),
listeningDirFile.getAbsoluteFile()),
+ tsFile.getAbsolutePath(),
+ listeningDirFile.getAbsolutePath(),
+ isTableModel,
isGeneratedByPipe);
});
} catch (UncheckedIOException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
new file mode 100644
index 00000000000..7c83f20f369
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Utility methods for encoding and decoding load attributes into directory
structures. */
+public final class ActiveLoadPathHelper {
+
+ private static final String SEGMENT_SEPARATOR = "-";
+
+ private static final List<String> KEY_ORDER =
+ Collections.unmodifiableList(
+ Arrays.asList(
+ LoadTsFileConfigurator.DATABASE_NAME_KEY,
+ LoadTsFileConfigurator.DATABASE_LEVEL_KEY,
+ LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY,
+ LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY,
+ LoadTsFileConfigurator.VERIFY_KEY,
+ LoadTsFileConfigurator.DATABASE_KEY,
+ LoadTsFileConfigurator.PIPE_GENERATED_KEY));
+
+ private ActiveLoadPathHelper() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static Map<String, String> buildAttributes(
+ final String databaseName,
+ final Integer databaseLevel,
+ final Boolean convertOnTypeMismatch,
+ final Boolean verify,
+ final Long tabletConversionThresholdBytes,
+ final Boolean pipeGenerated) {
+ final Map<String, String> attributes = new LinkedHashMap<>();
+ if (Objects.nonNull(databaseName) && !databaseName.isEmpty()) {
+ attributes.put(LoadTsFileConfigurator.DATABASE_NAME_KEY, databaseName);
+ }
+
+ if (Objects.nonNull(databaseLevel)) {
+ attributes.put(LoadTsFileConfigurator.DATABASE_LEVEL_KEY,
databaseLevel.toString());
+ }
+
+ if (Objects.nonNull(convertOnTypeMismatch)) {
+ attributes.put(
+ LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY,
+ Boolean.toString(convertOnTypeMismatch));
+ }
+
+ if (Objects.nonNull(tabletConversionThresholdBytes)) {
+ attributes.put(
+ LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY,
+ tabletConversionThresholdBytes.toString());
+ }
+
+ if (Objects.nonNull(verify)) {
+ attributes.put(LoadTsFileConfigurator.VERIFY_KEY,
Boolean.toString(verify));
+ }
+
+ if (Objects.nonNull(pipeGenerated) && pipeGenerated) {
+ attributes.put(LoadTsFileConfigurator.PIPE_GENERATED_KEY,
Boolean.TRUE.toString());
+ }
+ return attributes;
+ }
+
+ public static File resolveTargetDir(final File baseDir, final Map<String,
String> attributes) {
+ File current = baseDir;
+ for (final String key : KEY_ORDER) {
+ final String value = attributes.get(key);
+ if (value == null) {
+ continue;
+ }
+ current = new File(current, formatSegment(key, value));
+ }
+ return current;
+ }
+
+ public static Map<String, String> parseAttributes(final File file, final
File pendingDir) {
+ if (file == null) {
+ return Collections.emptyMap();
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+ File current = file.getParentFile();
+ while (current != null) {
+ final String dirName = current.getName();
+ if (pendingDir != null && current.equals(pendingDir)) {
+ break;
+ }
+ for (final String key : KEY_ORDER) {
+ final String prefix = key + SEGMENT_SEPARATOR;
+ if (dirName.startsWith(prefix)) {
+ extractAndValidateAttributeValue(key, dirName, prefix.length())
+ .ifPresent(value -> attributes.putIfAbsent(key, value));
+ break;
+ }
+ }
+ current = current.getParentFile();
+ }
+ return attributes;
+ }
+
+ public static File findPendingDirectory(final File file) {
+ if (file == null) {
+ return null;
+ }
+ String[] dirs =
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
+ File current = file;
+ while (current != null) {
+ for (final String dir : dirs) {
+ if (current.isDirectory() && current.getAbsolutePath().equals(dir)) {
+ return current;
+ }
+ }
+ current = current.getParentFile();
+ }
+ return null;
+ }
+
+ public static void applyAttributesToStatement(
+ final Map<String, String> attributes,
+ final LoadTsFileStatement statement,
+ final boolean defaultVerify) {
+
+
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY))
+ .filter(name -> !name.isEmpty())
+ .ifPresent(statement::setDatabase);
+
+ if (statement.getDatabase() == null || statement.getDatabase().isEmpty()) {
+ Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_KEY))
+ .filter(name -> !name.isEmpty())
+ .ifPresent(statement::setDatabase);
+ }
+
+
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY))
+ .ifPresent(
+ level -> {
+ try {
+ statement.setDatabaseLevel(Integer.parseInt(level));
+ } catch (final NumberFormatException ignored) {
+ // keep the default when parsing fails
+ }
+ });
+
+
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY))
+ .ifPresent(value ->
statement.setConvertOnTypeMismatch(Boolean.parseBoolean(value)));
+
+
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY))
+ .ifPresent(
+ threshold -> {
+ try {
+
statement.setTabletConversionThresholdBytes(Long.parseLong(threshold));
+ } catch (final NumberFormatException ignored) {
+ // keep the default when parsing fails
+ }
+ });
+
+ if (attributes.containsKey(LoadTsFileConfigurator.VERIFY_KEY)) {
+ statement.setVerifySchema(
+
Boolean.parseBoolean(attributes.get(LoadTsFileConfigurator.VERIFY_KEY)));
+ } else {
+ statement.setVerifySchema(defaultVerify);
+ }
+
+ if (attributes.containsKey(LoadTsFileConfigurator.PIPE_GENERATED_KEY)
+ &&
Boolean.parseBoolean(attributes.get(LoadTsFileConfigurator.PIPE_GENERATED_KEY)))
{
+ statement.markIsGeneratedByPipe();
+ }
+ }
+
+ public static boolean containsDatabaseName(final Map<String, String>
attributes) {
+ return attributes.containsKey(LoadTsFileConfigurator.DATABASE_NAME_KEY)
+ || attributes.containsKey(LoadTsFileConfigurator.DATABASE_KEY);
+ }
+
+ private static String formatSegment(final String key, final String value) {
+ return key + SEGMENT_SEPARATOR + encodeValue(value);
+ }
+
+ private static String encodeValue(final String value) {
+ try {
+ return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
+ } catch (final UnsupportedEncodingException e) {
+ // UTF-8 should always be supported; fallback to raw value when
unexpected
+ return value;
+ }
+ }
+
+ private static Optional<String> extractAndValidateAttributeValue(
+ final String key, final String dirName, final int prefixLength) {
+ if (dirName.length() <= prefixLength) {
+ return Optional.empty();
+ }
+
+ final String encodedValue = dirName.substring(prefixLength);
+ final String decodedValue = decodeValue(encodedValue);
+ try {
+ validateAttributeValue(key, decodedValue);
+ return Optional.of(decodedValue);
+ } catch (final SemanticException e) {
+ return Optional.empty();
+ }
+ }
+
+ private static void validateAttributeValue(final String key, final String
value) {
+ switch (key) {
+ case LoadTsFileConfigurator.DATABASE_NAME_KEY:
+ if (value == null || value.isEmpty()) {
+ throw new SemanticException("Database name must not be empty.");
+ }
+ break;
+ case LoadTsFileConfigurator.DATABASE_LEVEL_KEY:
+ LoadTsFileConfigurator.validateDatabaseLevelParam(value);
+ break;
+ case LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY:
+ LoadTsFileConfigurator.validateConvertOnTypeMismatchParam(value);
+ break;
+ case LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY:
+ validateTabletConversionThreshold(value);
+ break;
+ case LoadTsFileConfigurator.VERIFY_KEY:
+ LoadTsFileConfigurator.validateVerifyParam(value);
+ break;
+ default:
+ LoadTsFileConfigurator.validateParameters(key, value);
+ }
+ }
+
+ private static void validateTabletConversionThreshold(final String value) {
+ try {
+ final long threshold = Long.parseLong(value);
+ if (threshold < 0) {
+ throw new SemanticException(
+ "Tablet conversion threshold must be a non-negative long value.");
+ }
+ } catch (final NumberFormatException e) {
+ throw new SemanticException(
+ String.format("Tablet conversion threshold '%s' is not a valid long
value.", value));
+ }
+ }
+
+ private static String decodeValue(final String value) {
+ try {
+ return URLDecoder.decode(value, StandardCharsets.UTF_8.toString());
+ } catch (final UnsupportedEncodingException e) {
+ return value;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index 3ec283cd2e0..7b5f7166197 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -34,9 +34,13 @@ public class ActiveLoadPendingQueue {
private final Set<String> loadingFileSet = new HashSet<>();
public synchronized boolean enqueue(
- final String file, final boolean isGeneratedByPipe, final boolean
isTableModel) {
+ final String file,
+ final String pendingDir,
+ final boolean isGeneratedByPipe,
+ final boolean isTableModel) {
if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
- pendingFileQueue.offer(new ActiveLoadEntry(file, isGeneratedByPipe,
isTableModel));
+ pendingFileQueue.offer(
+ new ActiveLoadEntry(file, pendingDir, isGeneratedByPipe,
isTableModel));
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1);
return true;
@@ -76,11 +80,14 @@ public class ActiveLoadPendingQueue {
public static class ActiveLoadEntry {
private final String file;
+ private final String pendingDir;
private final boolean isGeneratedByPipe;
private final boolean isTableModel;
- public ActiveLoadEntry(String file, boolean isGeneratedByPipe, boolean
isTableModel) {
+ public ActiveLoadEntry(
+ String file, String pendingDir, boolean isGeneratedByPipe, boolean
isTableModel) {
this.file = file;
+ this.pendingDir = pendingDir;
this.isGeneratedByPipe = isGeneratedByPipe;
this.isTableModel = isTableModel;
}
@@ -89,6 +96,10 @@ public class ActiveLoadPendingQueue {
return file;
}
+ public String getPendingDir() {
+ return pendingDir;
+ }
+
public boolean isGeneratedByPipe() {
return isGeneratedByPipe;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 62feb562be7..20817c94146 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -56,6 +56,7 @@ import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.ZoneId;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
@@ -84,12 +85,12 @@ public class ActiveLoadTsFileLoader {
}
public void tryTriggerTsFileLoad(
- String absolutePath, boolean isTabletMode, boolean isGeneratedByPipe) {
+ String absolutePath, String pendingDir, boolean isTabletMode, boolean
isGeneratedByPipe) {
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
return;
}
- if (pendingQueue.enqueue(absolutePath, isGeneratedByPipe, isTabletMode)) {
+ if (pendingQueue.enqueue(absolutePath, pendingDir, isGeneratedByPipe,
isTabletMode)) {
initFailDirIfNecessary();
adjustExecutorIfNecessary();
}
@@ -216,24 +217,29 @@ public class ActiveLoadTsFileLoader {
private TSStatus loadTsFile(
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final IClientSession
session)
throws FileNotFoundException {
- final LoadTsFileStatement statement = new
LoadTsFileStatement(entry.getFile());
+ final File tsFile = new File(entry.getFile());
+ final LoadTsFileStatement statement = new
LoadTsFileStatement(tsFile.getAbsolutePath());
final List<File> files = statement.getTsFiles();
- // It should be noted here that the instructions in this code block do not
need to use the
- // DataBase, so the DataBase is assigned a value of null. If the DataBase
is used later, an
- // exception will be thrown.
- final File parentFile;
- statement.setDatabase(
- files.isEmpty()
- || !entry.isTableModel()
- || (parentFile = files.get(0).getParentFile()) == null
- ? null
- : parentFile.getName());
statement.setDeleteAfterLoad(true);
- statement.setConvertOnTypeMismatch(true);
- statement.setVerifySchema(isVerify);
statement.setAutoCreateDatabase(
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
+
+ final File pendingDir =
+ entry.getPendingDir() == null
+ ? ActiveLoadPathHelper.findPendingDirectory(tsFile)
+ : new File(entry.getPendingDir());
+ final Map<String, String> attributes =
ActiveLoadPathHelper.parseAttributes(tsFile, pendingDir);
+ ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement,
isVerify);
+
+ final File parentFile;
+ if (statement.getDatabase() == null && entry.isTableModel()) {
+ statement.setDatabase(
+ files.isEmpty() || (parentFile = files.get(0).getParentFile()) ==
null
+ ? null
+ : parentFile.getName());
+ }
+
return executeStatement(
entry.isGeneratedByPipe() ? new PipeEnrichedStatement(statement) :
statement, session);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
index 669013ec450..e3dbe43507d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
@@ -32,7 +32,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
@@ -45,14 +47,16 @@ public class ActiveLoadUtil {
private static volatile ILoadDiskSelector loadDiskSelector =
updateLoadDiskSelector();
public static boolean loadTsFileAsyncToActiveDir(
- final List<File> tsFiles, final String dataBaseName, final boolean
isDeleteAfterLoad) {
+ final List<File> tsFiles,
+ final Map<String, String> loadAttributes,
+ final boolean isDeleteAfterLoad) {
if (tsFiles == null || tsFiles.isEmpty()) {
return true;
}
try {
for (File file : tsFiles) {
- if (!loadTsFilesToActiveDir(dataBaseName, file, isDeleteAfterLoad)) {
+ if (!loadTsFilesToActiveDir(loadAttributes, file, isDeleteAfterLoad)) {
return false;
}
}
@@ -65,7 +69,7 @@ public class ActiveLoadUtil {
}
private static boolean loadTsFilesToActiveDir(
- final String dataBaseName, final File file, final boolean
isDeleteAfterLoad)
+ final Map<String, String> loadAttributes, final File file, final boolean
isDeleteAfterLoad)
throws IOException {
if (file == null) {
return true;
@@ -84,12 +88,9 @@ public class ActiveLoadUtil {
LOGGER.warn("Load active listening dir is not set.");
return false;
}
- final File targetDir;
- if (Objects.nonNull(dataBaseName)) {
- targetDir = new File(targetFilePath, dataBaseName);
- } else {
- targetDir = targetFilePath;
- }
+ final Map<String, String> attributes =
+ Objects.nonNull(loadAttributes) ? loadAttributes :
Collections.emptyMap();
+ final File targetDir =
ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes);
loadTsFileAsyncToTargetDir(
targetDir, new File(file.getAbsolutePath() + ".resource"),
isDeleteAfterLoad);
@@ -100,7 +101,9 @@ public class ActiveLoadUtil {
}
public static boolean loadFilesToActiveDir(
- final String dataBaseName, final List<String> files, final boolean
isDeleteAfterLoad)
+ final Map<String, String> loadAttributes,
+ final List<String> files,
+ final boolean isDeleteAfterLoad)
throws IOException {
if (files == null || files.isEmpty()) {
return true;
@@ -120,12 +123,9 @@ public class ActiveLoadUtil {
LOGGER.warn("Load active listening dir is not set.");
return false;
}
- final File targetDir;
- if (Objects.nonNull(dataBaseName)) {
- targetDir = new File(targetFilePath, dataBaseName);
- } else {
- targetDir = targetFilePath;
- }
+ final Map<String, String> attributes =
+ Objects.nonNull(loadAttributes) ? loadAttributes :
Collections.emptyMap();
+ final File targetDir =
ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes);
for (final String file : files) {
loadTsFileAsyncToTargetDir(targetDir, new File(file), isDeleteAfterLoad);
@@ -138,6 +138,11 @@ public class ActiveLoadUtil {
if (!file.exists()) {
return;
}
+ if (!targetDir.exists() && !targetDir.mkdirs()) {
+ if (!targetDir.exists()) {
+ throw new IOException("Failed to create target directory " +
targetDir.getAbsolutePath());
+ }
+ }
RetryUtils.retryOnException(
() -> {
if (isDeleteAfterLoad) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index da94de532d1..8478486781b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -54,6 +54,9 @@ public class LoadTsFileConfigurator {
case VERIFY_KEY:
validateVerifyParam(value);
break;
+ case PIPE_GENERATED_KEY:
+ validatePipeGeneratedParam(value);
+ break;
case ASYNC_LOAD_KEY:
validateAsyncLoadParam(value);
break;
@@ -183,6 +186,21 @@ public class LoadTsFileConfigurator {
loadAttributes.getOrDefault(VERIFY_KEY,
String.valueOf(VERIFY_DEFAULT_VALUE)));
}
+ public static final String PIPE_GENERATED_KEY = "pipe-generated";
+
+ public static void validatePipeGeneratedParam(final String pipeGenerated) {
+ if (!"true".equalsIgnoreCase(pipeGenerated) &&
!"false".equalsIgnoreCase(pipeGenerated)) {
+ throw new SemanticException(
+ String.format(
+ "Given %s value '%s' is not supported, please input a valid
boolean value.",
+ PIPE_GENERATED_KEY, pipeGenerated));
+ }
+ }
+
+ public static boolean parseOrGetDefaultPipeGenerated(final Map<String,
String> loadAttributes) {
+ return
Boolean.parseBoolean(loadAttributes.getOrDefault(PIPE_GENERATED_KEY, "false"));
+ }
+
public static final String ASYNC_LOAD_KEY = "async";
private static final boolean ASYNC_LOAD_DEFAULT_VALUE = false;