This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f66794ba229 [To dev/1.3] feat: encode load attributes in active load
directories (#16722) (#16758)
f66794ba229 is described below
commit f66794ba229185b3afe826d7f4eaec6132112f50
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Nov 14 18:36:39 2025 +0800
[To dev/1.3] feat: encode load attributes in active load directories
(#16722) (#16758)
---
.../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 41 ++++
.../protocol/thrift/IoTDBDataNodeReceiver.java | 10 +-
.../plan/analyze/load/LoadTsFileAnalyzer.java | 11 +-
.../plan/statement/crud/LoadTsFileStatement.java | 10 +-
.../load/active/ActiveLoadDirScanner.java | 17 +-
.../active/ActiveLoadFailedMessageHandler.java | 45 ++--
.../load/active/ActiveLoadPathHelper.java | 256 +++++++++++++++++++++
.../load/active/ActiveLoadPendingQueue.java | 45 +++-
.../load/active/ActiveLoadTsFileLoader.java | 156 ++++++++-----
.../storageengine/load/active/ActiveLoadUtil.java | 37 +--
.../load/config/LoadTsFileConfigurator.java | 18 ++
11 files changed, 527 insertions(+), 119 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index a19612245c5..d8a51b53501 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -998,6 +998,47 @@ public class IoTDBLoadTsFileIT {
}
}
+ @Test
+ public void testAsyncLoadLocally() throws Exception {
+ registerSchema();
+
+ final long writtenPoint1;
+ // device 0, device 1, sg 0
+ try (final TsFileGenerator generator =
+ new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_0,
Collections.singletonList(SchemaConfig.MEASUREMENT_00));
+ generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL /
10_000, false);
+ writtenPoint1 = generator.getTotalNumber();
+ }
+
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ statement.execute(
+ String.format(
+ "load \"%s\" with ('async'='true','database-level'='2')",
tmpDir.getAbsolutePath()));
+
+ for (int i = 0; i < 20; i++) {
+ try (final ResultSet resultSet =
+ statement.executeQuery("select count(*) from root.** group by
level=1,2")) {
+ if (resultSet.next()) {
+ final long sg1Count =
resultSet.getLong("count(root.sg.test_0.*.*)");
+ Assert.assertEquals(writtenPoint1, sg1Count);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ } catch (final Throwable e) {
+ if (i < 19) {
+ Thread.sleep(1000);
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
@Test
@Ignore("Load with conversion is currently banned")
public void testLoadWithConvertOnTypeMismatch() throws Exception {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 9c04baba6d4..a8654103349 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -83,6 +83,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -475,7 +476,14 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
private TSStatus loadTsFileAsync(final List<String> absolutePaths) throws
IOException {
- if (!ActiveLoadUtil.loadFilesToActiveDir(null, absolutePaths, true)) {
+ final Map<String, String> loadAttributes =
+ ActiveLoadPathHelper.buildAttributes(
+ null,
+ shouldConvertDataTypeOnTypeMismatch,
+ validateTsFile.get(),
+ null,
+ shouldMarkAsPipeRequest.get());
+ if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths,
true)) {
throw new PipeException("Load active listening pipe dir is not set.");
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 765300a65c1..eb36ddeb19c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -65,6 +65,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
@@ -240,7 +241,15 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private boolean doAsyncLoad(final Analysis analysis) {
final long startTime = System.nanoTime();
try {
- if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(tsFiles, null,
isDeleteAfterLoad)) {
+ final Map<String, String> activeLoadAttributes =
+ ActiveLoadPathHelper.buildAttributes(
+ databaseLevel,
+ isConvertOnTypeMismatch,
+ isVerifySchema,
+ tabletConversionThresholdBytes,
+ isGeneratedByPipe);
+ if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(
+ tsFiles, activeLoadAttributes, isDeleteAfterLoad)) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
analysis.setStatement(loadTsFileStatement);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 77e2d14840b..88a00d39599 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -51,8 +51,6 @@ public class LoadTsFileStatement extends Statement {
private boolean isGeneratedByPipe = false;
private boolean isAsyncLoad = false;
- private Map<String, String> loadAttributes;
-
private List<File> tsFiles;
private List<TsFileResource> resources;
private List<Long> writePointCountList;
@@ -212,15 +210,14 @@ public class LoadTsFileStatement extends Statement {
}
public void setLoadAttributes(final Map<String, String> loadAttributes) {
- this.loadAttributes = loadAttributes;
- initAttributes();
+ initAttributes(loadAttributes);
}
public boolean isAsyncLoad() {
return isAsyncLoad;
}
- private void initAttributes() {
+ private void initAttributes(final Map<String, String> loadAttributes) {
this.databaseLevel =
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
this.convertOnTypeMismatch =
@@ -229,6 +226,9 @@ public class LoadTsFileStatement extends Statement {
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
this.verifySchema =
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
this.isAsyncLoad =
LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
+ if (LoadTsFileConfigurator.parseOrGetDefaultPipeGenerated(loadAttributes))
{
+ markIsGeneratedByPipe();
+ }
}
public boolean reconstructStatementIfMiniFileConverted(final List<Boolean>
isMiniTsFile) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index d029fbdecc8..f4c705c17b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -39,6 +39,7 @@ import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -101,8 +102,9 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
final boolean isGeneratedByPipe =
listeningDir.equals(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
+ final File listeningDirFile = new File(listeningDir);
try (final Stream<File> fileStream =
- FileUtils.streamFiles(new File(listeningDir), true, (String[])
null)) {
+ FileUtils.streamFiles(listeningDirFile, true, (String[]) null)) {
try {
fileStream
.filter(file ->
!activeLoadTsFileLoader.isFilePendingOrLoading(file))
@@ -115,7 +117,18 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
.filter(this::isTsFileCompleted)
.limit(currentAllowedPendingSize)
.forEach(
- file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file,
isGeneratedByPipe));
+ filePath -> {
+ final File tsFile = new File(filePath);
+ final Map<String, String> attributes =
+ ActiveLoadPathHelper.parseAttributes(tsFile,
listeningDirFile);
+
+ final File parentFile = tsFile.getParentFile();
+
+ activeLoadTsFileLoader.tryTriggerTsFileLoad(
+ tsFile.getAbsolutePath(),
+ listeningDirFile.getAbsolutePath(),
+ isGeneratedByPipe);
+ });
} catch (UncheckedIOException e) {
LOGGER.debug("The file has been deleted. Ignore this exception.");
} catch (final Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
index be66e591d5d..c2d9af94725 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.storageengine.load.active;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,72 +40,72 @@ public class ActiveLoadFailedMessageHandler {
// system is memory constrains
put(
"memory",
- filePair ->
+ entry ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe =
{}) due to memory constraints, will retry later.",
- filePair.getLeft(),
- filePair.getRight()));
+ entry.getFile(),
+ entry.isGeneratedByPipe()));
// system is read only
put(
"read only",
- filePair ->
+ entry ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe =
{}) due to the system is read only, will retry later.",
- filePair.getLeft(),
- filePair.getRight()));
+ entry.getFile(),
+ entry.isGeneratedByPipe()));
// Timed out to wait for procedure return. The procedure is
still running.
put(
"procedure return",
- filePair ->
+ entry ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe =
{}) due to time out to wait for procedure return, will retry later.",
- filePair.getLeft(),
- filePair.getRight()));
+ entry.getFile(),
+ entry.isGeneratedByPipe()));
// DataNode is not enough, please register more.
put(
"not enough",
- filePair ->
+ entry ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe =
{}) due to the datanode is not enough, will retry later.",
- filePair.getLeft(),
- filePair.getRight()));
+ entry.getFile(),
+ entry.isGeneratedByPipe()));
// Fail to connect to any config node. Please check status of
ConfigNodes or logs of
// connected DataNode.
put(
"any config node",
- filePair ->
+ entry ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe =
{}) due to fail to connect to any config node, will retry later.",
- filePair.getLeft(),
- filePair.getRight()));
+ entry.getFile(),
+ entry.isGeneratedByPipe()));
// Current query is time out, query start time is 1729653161797,
ddl is
// -3046040214706, current time is 1729653184210, please check
your statement or
// modify timeout parameter
put(
"query is time out",
- filePair ->
+ entry ->
LOGGER.info(
"Rejecting auto load tsfile {} (isGeneratedByPipe =
{}) due to current query is time out, will retry later.",
- filePair.getLeft(),
- filePair.getRight()));
+ entry.getFile(),
+ entry.isGeneratedByPipe()));
}
});
@FunctionalInterface
private interface ExceptionMessageHandler {
- void handle(final Pair<String, Boolean> filePair);
+ void handle(final ActiveLoadPendingQueue.ActiveLoadEntry activeLoadEntry);
}
public static boolean isExceptionMessageShouldRetry(
- final Pair<String, Boolean> filePair, final String message) {
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String
message) {
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
- EXCEPTION_MESSAGE_HANDLER_MAP.get("read only").handle(filePair);
+ EXCEPTION_MESSAGE_HANDLER_MAP.get("read only").handle(entry);
return true;
}
for (String key : EXCEPTION_MESSAGE_HANDLER_MAP.keySet()) {
if (message != null && message.contains(key)) {
- EXCEPTION_MESSAGE_HANDLER_MAP.get(key).handle(filePair);
+ EXCEPTION_MESSAGE_HANDLER_MAP.get(key).handle(entry);
return true;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
new file mode 100644
index 00000000000..965f2941dc6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Utility methods for encoding and decoding load attributes into directory
structures. */
+public final class ActiveLoadPathHelper {
+
+ private static final String SEGMENT_SEPARATOR = "-";
+
+ private static final List<String> KEY_ORDER =
+ Collections.unmodifiableList(
+ Arrays.asList(
+ LoadTsFileConfigurator.DATABASE_LEVEL_KEY,
+ LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY,
+ LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY,
+ LoadTsFileConfigurator.VERIFY_KEY,
+ LoadTsFileConfigurator.PIPE_GENERATED_KEY));
+
+ private ActiveLoadPathHelper() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static Map<String, String> buildAttributes(
+ final Integer databaseLevel,
+ final Boolean convertOnTypeMismatch,
+ final Boolean verify,
+ final Long tabletConversionThresholdBytes,
+ final Boolean pipeGenerated) {
+ final Map<String, String> attributes = new LinkedHashMap<>();
+
+ if (Objects.nonNull(databaseLevel)) {
+ attributes.put(LoadTsFileConfigurator.DATABASE_LEVEL_KEY,
databaseLevel.toString());
+ }
+
+ if (Objects.nonNull(convertOnTypeMismatch)) {
+ attributes.put(
+ LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY,
+ Boolean.toString(convertOnTypeMismatch));
+ }
+
+ if (Objects.nonNull(tabletConversionThresholdBytes)) {
+ attributes.put(
+ LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY,
+ tabletConversionThresholdBytes.toString());
+ }
+
+ if (Objects.nonNull(verify)) {
+ attributes.put(LoadTsFileConfigurator.VERIFY_KEY,
Boolean.toString(verify));
+ }
+
+ if (Objects.nonNull(pipeGenerated) && pipeGenerated) {
+ attributes.put(LoadTsFileConfigurator.PIPE_GENERATED_KEY,
Boolean.TRUE.toString());
+ }
+ return attributes;
+ }
+
+ public static File resolveTargetDir(final File baseDir, final Map<String,
String> attributes) {
+ File current = baseDir;
+ for (final String key : KEY_ORDER) {
+ final String value = attributes.get(key);
+ if (value == null) {
+ continue;
+ }
+ current = new File(current, formatSegment(key, value));
+ }
+ return current;
+ }
+
+ public static Map<String, String> parseAttributes(final File file, final
File pendingDir) {
+ if (file == null) {
+ return Collections.emptyMap();
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+ File current = file.getParentFile();
+ while (current != null) {
+ final String dirName = current.getName();
+ if (pendingDir != null && current.equals(pendingDir)) {
+ break;
+ }
+ for (final String key : KEY_ORDER) {
+ final String prefix = key + SEGMENT_SEPARATOR;
+ if (dirName.startsWith(prefix)) {
+ extractAndValidateAttributeValue(key, dirName, prefix.length())
+ .ifPresent(value -> attributes.putIfAbsent(key, value));
+ break;
+ }
+ }
+ current = current.getParentFile();
+ }
+ return attributes;
+ }
+
+ public static File findPendingDirectory(final File file) {
+ if (file == null) {
+ return null;
+ }
+ String[] dirs =
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
+ File current = file;
+ while (current != null) {
+ for (final String dir : dirs) {
+ if (current.isDirectory() && current.getAbsolutePath().equals(dir)) {
+ return current;
+ }
+ }
+ current = current.getParentFile();
+ }
+ return null;
+ }
+
+ public static void applyAttributesToStatement(
+ final Map<String, String> attributes,
+ final LoadTsFileStatement statement,
+ final boolean defaultVerify) {
+
+
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY))
+ .ifPresent(
+ level -> {
+ try {
+ statement.setDatabaseLevel(Integer.parseInt(level));
+ } catch (final NumberFormatException ignored) {
+ // keep the default when parsing fails
+ }
+ });
+
+
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY))
+ .ifPresent(value ->
statement.setConvertOnTypeMismatch(Boolean.parseBoolean(value)));
+
+
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY))
+ .ifPresent(
+ threshold -> {
+ try {
+
statement.setTabletConversionThresholdBytes(Long.parseLong(threshold));
+ } catch (final NumberFormatException ignored) {
+ // keep the default when parsing fails
+ }
+ });
+
+ if (attributes.containsKey(LoadTsFileConfigurator.VERIFY_KEY)) {
+ statement.setVerifySchema(
+
Boolean.parseBoolean(attributes.get(LoadTsFileConfigurator.VERIFY_KEY)));
+ } else {
+ statement.setVerifySchema(defaultVerify);
+ }
+
+ if (attributes.containsKey(LoadTsFileConfigurator.PIPE_GENERATED_KEY)
+ &&
Boolean.parseBoolean(attributes.get(LoadTsFileConfigurator.PIPE_GENERATED_KEY)))
{
+ statement.markIsGeneratedByPipe();
+ }
+ }
+
+ private static String formatSegment(final String key, final String value) {
+ return key + SEGMENT_SEPARATOR + encodeValue(value);
+ }
+
+ private static String encodeValue(final String value) {
+ try {
+ return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
+ } catch (final UnsupportedEncodingException e) {
+ // UTF-8 should always be supported; fallback to raw value when
unexpected
+ return value;
+ }
+ }
+
+ private static Optional<String> extractAndValidateAttributeValue(
+ final String key, final String dirName, final int prefixLength) {
+ if (dirName.length() <= prefixLength) {
+ return Optional.empty();
+ }
+
+ final String encodedValue = dirName.substring(prefixLength);
+ final String decodedValue = decodeValue(encodedValue);
+ try {
+ validateAttributeValue(key, decodedValue);
+ return Optional.of(decodedValue);
+ } catch (final SemanticException e) {
+ return Optional.empty();
+ }
+ }
+
+ private static void validateAttributeValue(final String key, final String
value) {
+ switch (key) {
+ case LoadTsFileConfigurator.DATABASE_LEVEL_KEY:
+ LoadTsFileConfigurator.validateDatabaseLevelParam(value);
+ break;
+ case LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY:
+ LoadTsFileConfigurator.validateConvertOnTypeMismatchParam(value);
+ break;
+ case LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY:
+ validateTabletConversionThreshold(value);
+ break;
+ case LoadTsFileConfigurator.VERIFY_KEY:
+ LoadTsFileConfigurator.validateVerifyParam(value);
+ break;
+ default:
+ LoadTsFileConfigurator.validateParameters(key, value);
+ }
+ }
+
+ private static void validateTabletConversionThreshold(final String value) {
+ try {
+ final long threshold = Long.parseLong(value);
+ if (threshold < 0) {
+ throw new SemanticException(
+ "Tablet conversion threshold must be a non-negative long value.");
+ }
+ } catch (final NumberFormatException e) {
+ throw new SemanticException(
+ String.format("Tablet conversion threshold '%s' is not a valid long
value.", value));
+ }
+ }
+
+ private static String decodeValue(final String value) {
+ try {
+ return URLDecoder.decode(value, StandardCharsets.UTF_8.toString());
+ } catch (final UnsupportedEncodingException e) {
+ return value;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index 6c2b2cd41f5..bb611b842be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.storageengine.load.active;
import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
-import org.apache.tsfile.utils.Pair;
-
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
@@ -31,13 +29,14 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public class ActiveLoadPendingQueue {
private final Set<String> pendingFileSet = new HashSet<>();
- private final Queue<Pair<String, Boolean>> pendingFileQueue = new
ConcurrentLinkedQueue<>();
+ private final Queue<ActiveLoadEntry> pendingFileQueue = new
ConcurrentLinkedQueue<>();
private final Set<String> loadingFileSet = new HashSet<>();
- public synchronized boolean enqueue(final String file, final boolean
isGeneratedByPipe) {
+ public synchronized boolean enqueue(
+ final String file, final String pendingDir, final boolean
isGeneratedByPipe) {
if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
- pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));
+ pendingFileQueue.offer(new ActiveLoadEntry(file, pendingDir,
isGeneratedByPipe));
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1);
return true;
@@ -45,16 +44,16 @@ public class ActiveLoadPendingQueue {
return false;
}
- public synchronized Pair<String, Boolean> dequeueFromPending() {
- final Pair<String, Boolean> pair = pendingFileQueue.poll();
- if (pair != null) {
- pendingFileSet.remove(pair.left);
- loadingFileSet.add(pair.left);
+ public synchronized ActiveLoadEntry dequeueFromPending() {
+ final ActiveLoadEntry entry = pendingFileQueue.poll();
+ if (entry != null) {
+ pendingFileSet.remove(entry.getFile());
+ loadingFileSet.add(entry.getFile());
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(1);
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(-1);
}
- return pair;
+ return entry;
}
public synchronized void removeFromLoading(final String file) {
@@ -74,4 +73,28 @@ public class ActiveLoadPendingQueue {
public boolean isEmpty() {
return pendingFileQueue.isEmpty() && loadingFileSet.isEmpty();
}
+
+ public static class ActiveLoadEntry {
+ private final String file;
+ private final String pendingDir;
+ private final boolean isGeneratedByPipe;
+
+ public ActiveLoadEntry(String file, String pendingDir, boolean
isGeneratedByPipe) {
+ this.file = file;
+ this.pendingDir = pendingDir;
+ this.isGeneratedByPipe = isGeneratedByPipe;
+ }
+
+ public String getFile() {
+ return file;
+ }
+
+ public String getPendingDir() {
+ return pendingDir;
+ }
+
+ public boolean isGeneratedByPipe() {
+ return isGeneratedByPipe;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index b1f950321dd..0aeb83c959e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -24,10 +24,13 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -41,7 +44,6 @@ import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetr
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.io.FileUtils;
-import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +56,7 @@ import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.ZoneId;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
@@ -67,6 +70,8 @@ public class ActiveLoadTsFileLoader {
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
private static final int MAX_PENDING_SIZE = 1000;
private final ActiveLoadPendingQueue pendingQueue = new
ActiveLoadPendingQueue();
@@ -79,12 +84,13 @@ public class ActiveLoadTsFileLoader {
return MAX_PENDING_SIZE - pendingQueue.size();
}
- public void tryTriggerTsFileLoad(String absolutePath, boolean
isGeneratedByPipe) {
+ public void tryTriggerTsFileLoad(
+ String absolutePath, String pendingDir, boolean isGeneratedByPipe) {
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
return;
}
- if (pendingQueue.enqueue(absolutePath, isGeneratedByPipe)) {
+ if (pendingQueue.enqueue(absolutePath, pendingDir, isGeneratedByPipe)) {
initFailDirIfNecessary();
adjustExecutorIfNecessary();
}
@@ -149,42 +155,55 @@ public class ActiveLoadTsFileLoader {
}
private void tryLoadPendingTsFiles() {
- while (true) {
- final Optional<Pair<String, Boolean>> filePair = tryGetNextPendingFile();
- if (!filePair.isPresent()) {
- return;
- }
+ final IClientSession session =
+ new InternalClientSession(
+ String.format(
+ "%s_%s",
+ ActiveLoadTsFileLoader.class.getSimpleName(),
Thread.currentThread().getName()));
+ session.setUsername(AuthorityChecker.SUPER_USER);
+ session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
+ session.setZoneId(ZoneId.systemDefault());
+
+ try {
+ while (true) {
+ final Optional<ActiveLoadPendingQueue.ActiveLoadEntry> loadEntry =
tryGetNextPendingFile();
+ if (!loadEntry.isPresent()) {
+ return;
+ }
- try {
- final TSStatus result = loadTsFile(filePair.get());
- if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- LOGGER.info(
- "Successfully auto load tsfile {} (isGeneratedByPipe = {})",
- filePair.get().getLeft(),
- filePair.get().getRight());
- } else {
- handleLoadFailure(filePair.get(), result);
+ try {
+ final TSStatus result = loadTsFile(loadEntry.get(), session);
+ if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ LOGGER.info(
+ "Successfully auto load tsfile {} (isGeneratedByPipe = {})",
+ loadEntry.get().getFile(),
+ loadEntry.get().isGeneratedByPipe());
+ } else {
+ handleLoadFailure(loadEntry.get(), result);
+ }
+ } catch (final FileNotFoundException e) {
+ handleFileNotFoundException(loadEntry.get());
+ } catch (final Exception e) {
+ handleOtherException(loadEntry.get(), e);
+ } finally {
+ pendingQueue.removeFromLoading(loadEntry.get().getFile());
}
- } catch (final FileNotFoundException e) {
- handleFileNotFoundException(filePair.get());
- } catch (final Exception e) {
- handleOtherException(filePair.get(), e);
- } finally {
- pendingQueue.removeFromLoading(filePair.get().getLeft());
}
+ } finally {
+ SESSION_MANAGER.closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution);
}
}
- private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
+ private Optional<ActiveLoadPendingQueue.ActiveLoadEntry>
tryGetNextPendingFile() {
final long maxRetryTimes =
Math.max(1, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds()
<< 1);
long currentRetryTimes = 0;
while (true) {
- final Pair<String, Boolean> filePair = pendingQueue.dequeueFromPending();
- if (Objects.nonNull(filePair)) {
- return Optional.of(filePair);
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry =
pendingQueue.dequeueFromPending();
+ if (Objects.nonNull(entry)) {
+ return Optional.of(entry);
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
@@ -195,58 +214,75 @@ public class ActiveLoadTsFileLoader {
}
}
- private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws
FileNotFoundException {
- final LoadTsFileStatement statement = new
LoadTsFileStatement(filePair.getLeft());
+ private TSStatus loadTsFile(
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry, final IClientSession
session)
+ throws FileNotFoundException {
+ final File tsFile = new File(entry.getFile());
+ final LoadTsFileStatement statement = new
LoadTsFileStatement(entry.getFile());
+
statement.setDeleteAfterLoad(true);
- statement.setConvertOnTypeMismatch(true);
- statement.setVerifySchema(isVerify);
statement.setAutoCreateDatabase(
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
- return executeStatement(filePair.getRight() ? new
PipeEnrichedStatement(statement) : statement);
+
+ final File pendingDir =
+ entry.getPendingDir() == null
+ ? ActiveLoadPathHelper.findPendingDirectory(tsFile)
+ : new File(entry.getPendingDir());
+ final Map<String, String> attributes =
ActiveLoadPathHelper.parseAttributes(tsFile, pendingDir);
+ ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement,
isVerify);
+
+ return executeStatement(
+ entry.isGeneratedByPipe() ? new PipeEnrichedStatement(statement) :
statement, session);
}
- private TSStatus executeStatement(final Statement statement) {
- return Coordinator.getInstance()
- .executeForTreeModel(
- statement,
- SessionManager.getInstance().requestQueryId(),
- new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
- "",
- ClusterPartitionFetcher.getInstance(),
- ClusterSchemaFetcher.getInstance(),
- IOTDB_CONFIG.getQueryTimeoutThreshold(),
- false)
- .status;
+ private TSStatus executeStatement(final Statement statement, final
IClientSession session) {
+ SESSION_MANAGER.registerSession(session);
+ try {
+ return Coordinator.getInstance()
+ .executeForTreeModel(
+ statement,
+ SessionManager.getInstance().requestQueryId(),
+ new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance(),
+ IOTDB_CONFIG.getQueryTimeoutThreshold(),
+ false)
+ .status;
+ } finally {
+ SESSION_MANAGER.removeCurrSession();
+ }
}
- private void handleLoadFailure(final Pair<String, Boolean> filePair, final
TSStatus status) {
- if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(
- filePair, status.getMessage())) {
+ private void handleLoadFailure(
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus
status) {
+ if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry,
status.getMessage())) {
LOGGER.warn(
"Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}.
File will be moved to fail directory.",
- filePair.getLeft(),
- filePair.getRight(),
+ entry.getFile(),
+ entry.isGeneratedByPipe(),
status);
- removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+ removeFileAndResourceAndModsToFailDir(entry.getFile());
}
}
- private void handleFileNotFoundException(final Pair<String, Boolean>
filePair) {
+ private void handleFileNotFoundException(final
ActiveLoadPendingQueue.ActiveLoadEntry entry) {
LOGGER.warn(
"Failed to auto load tsfile {} (isGeneratedByPipe = {}) due to file
not found, will skip this file.",
- filePair.getLeft(),
- filePair.getRight());
- removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+ entry.getFile(),
+ entry.isGeneratedByPipe());
+ removeFileAndResourceAndModsToFailDir(entry.getFile());
}
- private void handleOtherException(final Pair<String, Boolean> filePair,
final Exception e) {
- if
(!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(filePair,
e.getMessage())) {
+ private void handleOtherException(
+ final ActiveLoadPendingQueue.ActiveLoadEntry entry, final Exception e) {
+ if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry,
e.getMessage())) {
LOGGER.warn(
"Failed to auto load tsfile {} (isGeneratedByPipe = {}) because of
an unexpected exception. File will be moved to fail directory.",
- filePair.getLeft(),
- filePair.getRight(),
+ entry.getFile(),
+ entry.isGeneratedByPipe(),
e);
- removeFileAndResourceAndModsToFailDir(filePair.getLeft());
+ removeFileAndResourceAndModsToFailDir(entry.getFile());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
index 5bfa9b71105..1ab573a14cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
@@ -32,7 +32,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
@@ -45,14 +47,16 @@ public class ActiveLoadUtil {
private static volatile ILoadDiskSelector loadDiskSelector =
updateLoadDiskSelector();
public static boolean loadTsFileAsyncToActiveDir(
- final List<File> tsFiles, final String dataBaseName, final boolean
isDeleteAfterLoad) {
+ final List<File> tsFiles,
+ final Map<String, String> loadAttributes,
+ final boolean isDeleteAfterLoad) {
if (tsFiles == null || tsFiles.isEmpty()) {
return true;
}
try {
for (File file : tsFiles) {
- if (!loadTsFilesToActiveDir(dataBaseName, file, isDeleteAfterLoad)) {
+ if (!loadTsFilesToActiveDir(loadAttributes, file, isDeleteAfterLoad)) {
return false;
}
}
@@ -65,7 +69,7 @@ public class ActiveLoadUtil {
}
private static boolean loadTsFilesToActiveDir(
- final String dataBaseName, final File file, final boolean
isDeleteAfterLoad)
+ final Map<String, String> loadAttributes, final File file, final boolean
isDeleteAfterLoad)
throws IOException {
if (file == null) {
return true;
@@ -84,12 +88,9 @@ public class ActiveLoadUtil {
LOGGER.warn("Load active listening dir is not set.");
return false;
}
- final File targetDir;
- if (Objects.nonNull(dataBaseName)) {
- targetDir = new File(targetFilePath, dataBaseName);
- } else {
- targetDir = targetFilePath;
- }
+ final Map<String, String> attributes =
+ Objects.nonNull(loadAttributes) ? loadAttributes :
Collections.emptyMap();
+ final File targetDir =
ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes);
loadTsFileAsyncToTargetDir(targetDir, file, isDeleteAfterLoad);
loadTsFileAsyncToTargetDir(
@@ -100,7 +101,9 @@ public class ActiveLoadUtil {
}
public static boolean loadFilesToActiveDir(
- final String dataBaseName, final List<String> files, final boolean
isDeleteAfterLoad)
+ final Map<String, String> loadAttributes,
+ final List<String> files,
+ final boolean isDeleteAfterLoad)
throws IOException {
if (files == null || files.isEmpty()) {
return true;
@@ -120,12 +123,9 @@ public class ActiveLoadUtil {
LOGGER.warn("Load active listening dir is not set.");
return false;
}
- final File targetDir;
- if (Objects.nonNull(dataBaseName)) {
- targetDir = new File(targetFilePath, dataBaseName);
- } else {
- targetDir = targetFilePath;
- }
+ final Map<String, String> attributes =
+ Objects.nonNull(loadAttributes) ? loadAttributes :
Collections.emptyMap();
+ final File targetDir =
ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes);
for (final String file : files) {
loadTsFileAsyncToTargetDir(targetDir, new File(file), isDeleteAfterLoad);
@@ -138,6 +138,11 @@ public class ActiveLoadUtil {
if (!file.exists()) {
return;
}
+ if (!targetDir.exists() && !targetDir.mkdirs()) {
+ if (!targetDir.exists()) {
+ throw new IOException("Failed to create target directory " +
targetDir.getAbsolutePath());
+ }
+ }
RetryUtils.retryOnException(
() -> {
if (isDeleteAfterLoad) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index b1d2472603c..6ab188915f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -48,6 +48,9 @@ public class LoadTsFileConfigurator {
case VERIFY_KEY:
validateVerifyParam(value);
break;
+ case PIPE_GENERATED_KEY:
+ validatePipeGeneratedParam(value);
+ break;
case ASYNC_LOAD_KEY:
validateAsyncLoadParam(value);
break;
@@ -155,6 +158,21 @@ public class LoadTsFileConfigurator {
loadAttributes.getOrDefault(VERIFY_KEY,
String.valueOf(VERIFY_DEFAULT_VALUE)));
}
+ public static final String PIPE_GENERATED_KEY = "pipe-generated";
+
+ public static void validatePipeGeneratedParam(final String pipeGenerated) {
+ if (!"true".equalsIgnoreCase(pipeGenerated) &&
!"false".equalsIgnoreCase(pipeGenerated)) {
+ throw new SemanticException(
+ String.format(
+ "Given %s value '%s' is not supported, please input a valid
boolean value.",
+ PIPE_GENERATED_KEY, pipeGenerated));
+ }
+ }
+
+ public static boolean parseOrGetDefaultPipeGenerated(final Map<String,
String> loadAttributes) {
+ return
Boolean.parseBoolean(loadAttributes.getOrDefault(PIPE_GENERATED_KEY, "false"));
+ }
+
public static final String ASYNC_LOAD_KEY = "async";
private static final boolean ASYNC_LOAD_DEFAULT_VALUE = false;