This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1a3548e6fa6 Add cache for single measurement validation (#17899)
1a3548e6fa6 is described below
commit 1a3548e6fa61dace592987b5df8a368f48d47b2e
Author: Jiang Tian <[email protected]>
AuthorDate: Thu Jun 11 19:02:08 2026 +0800
Add cache for single measurement validation (#17899)
* Add cache
* Add cache config
* Add a log when resource is missing in snapshot
* Ignore measurement cache performance IT
* Only warn when resource not exist in the snapshot
* remove unused exception
* add import
* spotless
---
.../it/env/cluster/config/MppCommonConfig.java | 7 +
.../env/cluster/config/MppSharedCommonConfig.java | 7 +
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
...DBSingleMeasurementCheckCachePerformanceIT.java | 207 +++++++++++++++++++++
.../dataregion/snapshot/SnapshotLoader.java | 15 ++
.../conf/iotdb-system.properties.template | 6 +
.../apache/iotdb/commons/conf/CommonConfig.java | 10 +
.../iotdb/commons/conf/CommonDescriptor.java | 9 +
.../org/apache/iotdb/commons/utils/PathUtils.java | 56 +++++-
10 files changed, 320 insertions(+), 4 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 8cece160c13..35d35121820 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -523,6 +523,13 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setSingleMeasurementCheckCacheSize(int
singleMeasurementCheckCacheSize) {
+ setProperty(
+ "single_measurement_check_cache_size",
String.valueOf(singleMeasurementCheckCacheSize));
+ return this;
+ }
+
@Override
public CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs) {
setProperty("dn_connection_timeout_ms",
String.valueOf(connectionTimeoutMs));
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 8720e18ab22..e41c06c622a 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -542,6 +542,13 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
+ @Override
+ public CommonConfig setSingleMeasurementCheckCacheSize(int
singleMeasurementCheckCacheSize) {
+
dnConfig.setSingleMeasurementCheckCacheSize(singleMeasurementCheckCacheSize);
+
cnConfig.setSingleMeasurementCheckCacheSize(singleMeasurementCheckCacheSize);
+ return this;
+ }
+
@Override
public CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs) {
dnConfig.setDnConnectionTimeoutMs(connectionTimeoutMs);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 4db030e607b..ce97db93af1 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -383,6 +383,11 @@ public class RemoteCommonConfig implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setSingleMeasurementCheckCacheSize(int
singleMeasurementCheckCacheSize) {
+ return this;
+ }
+
@Override
public CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 5c1b1350adc..ca94b6d3251 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -168,6 +168,8 @@ public interface CommonConfig {
CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize);
+ CommonConfig setSingleMeasurementCheckCacheSize(int
singleMeasurementCheckCacheSize);
+
CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs);
CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/performance/IoTDBSingleMeasurementCheckCachePerformanceIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/performance/IoTDBSingleMeasurementCheckCachePerformanceIT.java
new file mode 100644
index 00000000000..5606bf6c773
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/performance/IoTDBSingleMeasurementCheckCachePerformanceIT.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.performance;
+
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Ignore
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBSingleMeasurementCheckCachePerformanceIT {
+
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(IoTDBSingleMeasurementCheckCachePerformanceIT.class);
+
+ private static final ExperimentGroup[] EXPERIMENT_GROUPS = {
+ new ExperimentGroup(1_000, 500),
+ new ExperimentGroup(2_000, 2_000),
+ new ExperimentGroup(4_000, 8_000)
+ };
+ private static final int CACHE_DISABLED_SIZE = 0;
+ private static final int REPEAT_COUNT = 3;
+ private static final int BATCH_COUNT = 800;
+ private static final int ROWS_PER_BATCH = 100;
+ private static final String DEVICE = "root.sg_cache_perf.d1";
+
+ @Test
+ public void
testEndToEndWritePerformanceWithDifferentSingleMeasurementCheckCacheSizes()
+ throws Exception {
+ for (ExperimentGroup experimentGroup : EXPERIMENT_GROUPS) {
+ long totalDisabledCost = 0;
+ long totalEnabledCost = 0;
+ for (int repeatIndex = 0; repeatIndex < REPEAT_COUNT; repeatIndex++) {
+ long disabledCost =
+ runWritePerformanceExperiment(
+ new Experiment(experimentGroup.measurementCount,
CACHE_DISABLED_SIZE));
+ long enabledCost =
+ runWritePerformanceExperiment(
+ new Experiment(experimentGroup.measurementCount,
experimentGroup.cacheSize));
+ totalDisabledCost += disabledCost;
+ totalEnabledCost += enabledCost;
+ LOGGER.info(
+ "End-to-end write cost repeat {}/{} with measurementCount={},
cache disabled: {} ms, "
+ + "cacheSize={} (cacheSize {} measurementCount): {} ms,
enabled/disabled ratio: {}",
+ repeatIndex + 1,
+ REPEAT_COUNT,
+ experimentGroup.measurementCount,
+ disabledCost / 1_000_000,
+ experimentGroup.cacheSize,
+ experimentGroup.cacheSizeRelation(),
+ enabledCost / 1_000_000,
+ String.format("%.3f", (double) enabledCost / disabledCost));
+ }
+ long averageDisabledCost = totalDisabledCost / REPEAT_COUNT;
+ long averageEnabledCost = totalEnabledCost / REPEAT_COUNT;
+ LOGGER.info(
+ "Average end-to-end write cost after {} repeats with
measurementCount={}, cache disabled: "
+ + "{} ms, cacheSize={} (cacheSize {} measurementCount): {} ms,
enabled/disabled "
+ + "ratio: {}",
+ REPEAT_COUNT,
+ experimentGroup.measurementCount,
+ averageDisabledCost / 1_000_000,
+ experimentGroup.cacheSize,
+ experimentGroup.cacheSizeRelation(),
+ averageEnabledCost / 1_000_000,
+ String.format("%.3f", (double) averageEnabledCost /
averageDisabledCost));
+ Assert.assertTrue(totalDisabledCost > 0);
+ Assert.assertTrue(totalEnabledCost > 0);
+ }
+ }
+
+ private long runWritePerformanceExperiment(Experiment experiment) throws
Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setSingleMeasurementCheckCacheSize(experiment.cacheSize)
+ .setAutoCreateSchemaEnabled(false);
+ EnvFactory.getEnv().initClusterEnvironment();
+ try {
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ createTimeseries(session, experiment.measurementCount);
+ long cost = executeWriteWorkload(session, experiment.measurementCount);
+ assertRowCount(session);
+ return cost;
+ }
+ } finally {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+ }
+
+ private void createTimeseries(ISession session, int measurementCount)
+ throws IoTDBConnectionException, StatementExecutionException {
+ session.executeNonQueryStatement("CREATE DATABASE root.sg_cache_perf");
+ for (int i = 0; i < measurementCount; i++) {
+ session.executeNonQueryStatement(
+ "CREATE TIMESERIES "
+ + DEVICE
+ + ".`sensor+"
+ + i
+ + "` WITH DATATYPE=INT64, ENCODING=PLAIN");
+ }
+ }
+
+ private long executeWriteWorkload(ISession session, int measurementCount)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<String> measurements = new ArrayList<>(measurementCount);
+ List<TSDataType> types = new ArrayList<>(measurementCount);
+ List<Object> values = new ArrayList<>(measurementCount);
+ for (int i = 0; i < measurementCount; i++) {
+ measurements.add("`sensor+" + i + "`");
+ types.add(TSDataType.INT64);
+ values.add((long) i);
+ }
+
+ long startTime = System.nanoTime();
+ for (int batchIndex = 0; batchIndex < BATCH_COUNT; batchIndex++) {
+ List<Long> timestamps = new ArrayList<>(ROWS_PER_BATCH);
+ List<List<String>> measurementsList = new ArrayList<>(ROWS_PER_BATCH);
+ List<List<TSDataType>> typesList = new ArrayList<>(ROWS_PER_BATCH);
+ List<List<Object>> valuesList = new ArrayList<>(ROWS_PER_BATCH);
+ for (int rowIndex = 0; rowIndex < ROWS_PER_BATCH; rowIndex++) {
+ timestamps.add((long) batchIndex * ROWS_PER_BATCH + rowIndex);
+ measurementsList.add(measurements);
+ typesList.add(types);
+ valuesList.add(values);
+ }
+ session.insertRecordsOfOneDevice(DEVICE, timestamps, measurementsList,
typesList, valuesList);
+ }
+ return System.nanoTime() - startTime;
+ }
+
+ private void assertRowCount(ISession session)
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (org.apache.iotdb.isession.SessionDataSet dataSet =
+ session.executeQueryStatement("SELECT COUNT(`sensor+0`) FROM " +
DEVICE)) {
+ Assert.assertTrue(dataSet.hasNext());
+ Assert.assertEquals(
+ (long) BATCH_COUNT * ROWS_PER_BATCH,
dataSet.next().getFields().get(0).getLongV());
+ Assert.assertFalse(dataSet.hasNext());
+ }
+ }
+
+ private static class ExperimentGroup {
+
+ private final int measurementCount;
+ private final int cacheSize;
+
+ private ExperimentGroup(int measurementCount, int cacheSize) {
+ this.measurementCount = measurementCount;
+ this.cacheSize = cacheSize;
+ }
+
+ private String cacheSizeRelation() {
+ if (cacheSize < measurementCount) {
+ return "<";
+ }
+ if (cacheSize == measurementCount) {
+ return "=";
+ }
+ return ">";
+ }
+ }
+
+ private static class Experiment {
+
+ private final int measurementCount;
+ private final int cacheSize;
+
+ private Experiment(int measurementCount, int cacheSize) {
+ this.measurementCount = measurementCount;
+ this.cacheSize = cacheSize;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
index d39d964df7b..053cb8a8b86 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
@@ -28,7 +28,9 @@ import org.apache.iotdb.db.i18n.StorageEngineMessages;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.external.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -360,6 +362,8 @@ public class SnapshotLoader {
String targetSuffix, File[] files, FolderManager folderManager) throws
IOException {
Map<String, String> fileTarget = new HashMap<>();
for (File file : files) {
+ checkTsFileResourceExists(file);
+
String fileKey = file.getName().split("\\.")[0];
String dataDir = fileTarget.get(fileKey);
@@ -394,6 +398,17 @@ public class SnapshotLoader {
}
}
+ private void checkTsFileResourceExists(File file) {
+ if (!file.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ return;
+ }
+
+ String resourceFileName = file.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX;
+ if (!new File(resourceFileName).exists()) {
+ LOGGER.warn("The associated resource file of {} is not found in the
snapshot", file);
+ }
+ }
+
private void createLinksFromSnapshotDirToDataDirWithLog() throws IOException
{
String snapshotId = logAnalyzer.getSnapshotId();
int loggedFileNum = logAnalyzer.getTotalFileCountInSnapshot();
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 945d0907039..48c564634fb 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -909,6 +909,12 @@ tag_attribute_flush_interval=1000
# Datatype: int
tag_attribute_total_size=700
+# The maximum number of single measurement check results cached by PathUtils.
+# Set to 0 to disable this cache.
+# effectiveMode: restart
+# Datatype: int
+single_measurement_check_cache_size=10000
+
# max measurement num of internal request
# When creating timeseries with Session.createMultiTimeseries, the user input
plan, the timeseries num of
# which exceeds this num, will be split to several plans with timeseries no
more than this num.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 252d00b5a4f..37de1b03833 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -431,6 +431,8 @@ public class CommonConfig {
// Max size for tag and attribute of one time series
private int tagAttributeTotalSize = 700;
+ private int singleMeasurementCheckCacheSize = 10_000;
+
// maximum number of Cluster Databases allowed
private int databaseLimitThreshold = -1;
@@ -2783,6 +2785,14 @@ public class CommonConfig {
this.tagAttributeTotalSize = tagAttributeTotalSize;
}
+ public int getSingleMeasurementCheckCacheSize() {
+ return singleMeasurementCheckCacheSize;
+ }
+
+ public void setSingleMeasurementCheckCacheSize(int
singleMeasurementCheckCacheSize) {
+ this.singleMeasurementCheckCacheSize = singleMeasurementCheckCacheSize;
+ }
+
public int getDatabaseLimitThreshold() {
return databaseLimitThreshold;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index fa853f61f47..cfe6ad6007e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -276,6 +276,15 @@ public class CommonDescriptor {
properties.getProperty(
"tag_attribute_total_size",
String.valueOf(config.getTagAttributeTotalSize()))));
+ int singleMeasurementCheckCacheSize =
+ Integer.parseInt(
+ properties.getProperty(
+ "single_measurement_check_cache_size",
+ String.valueOf(config.getSingleMeasurementCheckCacheSize())));
+ if (singleMeasurementCheckCacheSize >= 0) {
+
config.setSingleMeasurementCheckCacheSize(singleMeasurementCheckCacheSize);
+ }
+
config.setTimePartitionOrigin(
Long.parseLong(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java
index 615bd62453f..1803ebb68ca 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java
@@ -18,10 +18,13 @@
*/
package org.apache.iotdb.commons.utils;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.exception.PathParseException;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -35,6 +38,12 @@ import java.util.Map;
public class PathUtils {
+ private static final Cache<String, SingleMeasurementCheckResult>
SINGLE_MEASUREMENT_CHECK_CACHE =
+ Caffeine.newBuilder()
+ .maximumSize(
+
CommonDescriptor.getInstance().getConfig().getSingleMeasurementCheckCacheSize())
+ .build();
+
/**
* @param path the path will split. ex, root.ln.
* @return string array. ex, [root, ln]
@@ -162,20 +171,29 @@ public class PathUtils {
if (measurement == null) {
return null;
}
+ SingleMeasurementCheckResult result =
+ SINGLE_MEASUREMENT_CHECK_CACHE.get(measurement,
PathUtils::checkSingleMeasurement);
+ if (result.isLegal()) {
+ return result.getMeasurement();
+ }
+ throw new IllegalPathException(measurement);
+ }
+
+ private static SingleMeasurementCheckResult checkSingleMeasurement(String
measurement) {
if (measurement.startsWith(TsFileConstant.BACK_QUOTE_STRING)
&& measurement.endsWith(TsFileConstant.BACK_QUOTE_STRING)) {
if (checkBackQuotes(measurement.substring(1, measurement.length() - 1)))
{
- return removeBackQuotesIfNecessary(measurement);
+ return
SingleMeasurementCheckResult.legal(removeBackQuotesIfNecessary(measurement));
} else {
- throw new IllegalPathException(measurement);
+ return SingleMeasurementCheckResult.illegal();
}
}
if (IoTDBConstant.reservedWords.contains(measurement.toUpperCase())
|| isRealNumber(measurement)
|| !TsFileConstant.NODE_NAME_PATTERN.matcher(measurement).matches()) {
- throw new IllegalPathException(measurement);
+ return SingleMeasurementCheckResult.illegal();
}
- return measurement;
+ return SingleMeasurementCheckResult.legal(measurement);
}
/** Return true if the str is a real number. Examples: 1.0; +1.0; -1.0;
0011; 011e3; +23e-3 */
@@ -225,4 +243,34 @@ public class PathUtils {
public static boolean isTableModelDatabase(final String databaseName) {
return !databaseName.startsWith("root.");
}
+
+ private static class SingleMeasurementCheckResult {
+
+ private static final SingleMeasurementCheckResult ILLEGAL =
+ new SingleMeasurementCheckResult(false, null);
+
+ private final boolean legal;
+ private final String measurement;
+
+ private SingleMeasurementCheckResult(boolean legal, String measurement) {
+ this.legal = legal;
+ this.measurement = measurement;
+ }
+
+ private static SingleMeasurementCheckResult legal(String measurement) {
+ return new SingleMeasurementCheckResult(true, measurement);
+ }
+
+ private static SingleMeasurementCheckResult illegal() {
+ return ILLEGAL;
+ }
+
+ private boolean isLegal() {
+ return legal;
+ }
+
+ private String getMeasurement() {
+ return measurement;
+ }
+ }
}