This is an automated email from the ASF dual-hosted git repository. jt2594838 pushed a commit to branch add_measurement_cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b28f3de65f651a31e9e1efc0bc47f3b43ed526ee Author: Tian Jiang <[email protected]> AuthorDate: Tue Jun 9 15:26:23 2026 +0800 Add cache config --- .../it/env/cluster/config/MppCommonConfig.java | 7 + .../env/cluster/config/MppSharedCommonConfig.java | 7 + .../it/env/remote/config/RemoteCommonConfig.java | 5 + .../org/apache/iotdb/itbase/env/CommonConfig.java | 2 + ...DBSingleMeasurementCheckCachePerformanceIT.java | 205 +++++++++++++++++++++ 5 files changed, 226 insertions(+) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 4852b9d116e..64fa6aa7603 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -502,6 +502,13 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } + @Override + public CommonConfig setSingleMeasurementCheckCacheSize(int singleMeasurementCheckCacheSize) { + setProperty( + "single_measurement_check_cache_size", String.valueOf(singleMeasurementCheckCacheSize)); + return this; + } + @Override public CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs) { setProperty("dn_connection_timeout_ms", String.valueOf(connectionTimeoutMs)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 582c9a049e4..522c09e1199 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -521,6 +521,13 @@ public class MppSharedCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setSingleMeasurementCheckCacheSize(int singleMeasurementCheckCacheSize) { + dnConfig.setSingleMeasurementCheckCacheSize(singleMeasurementCheckCacheSize); + cnConfig.setSingleMeasurementCheckCacheSize(singleMeasurementCheckCacheSize); + return this; + } + @Override public CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs) { dnConfig.setDnConnectionTimeoutMs(connectionTimeoutMs); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 48c157e957b..1420420f6ad 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -368,6 +368,11 @@ public class RemoteCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setSingleMeasurementCheckCacheSize(int singleMeasurementCheckCacheSize) { + return this; + } + @Override public CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index dc21234e2ba..51a2ccd12f7 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -162,6 +162,8 @@ public interface CommonConfig { CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize); + CommonConfig setSingleMeasurementCheckCacheSize(int singleMeasurementCheckCacheSize); + CommonConfig setDnConnectionTimeoutMs(int connectionTimeoutMs); CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta( diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/performance/IoTDBSingleMeasurementCheckCachePerformanceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/performance/IoTDBSingleMeasurementCheckCachePerformanceIT.java new file mode 100644 index 00000000000..879fa1a136d --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/performance/IoTDBSingleMeasurementCheckCachePerformanceIT.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it.performance; + +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.enums.TSDataType; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class}) +public class IoTDBSingleMeasurementCheckCachePerformanceIT { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBSingleMeasurementCheckCachePerformanceIT.class); + + private static final ExperimentGroup[] EXPERIMENT_GROUPS = { + new ExperimentGroup(1_000, 500), + new ExperimentGroup(2_000, 2_000), + new ExperimentGroup(4_000, 8_000) + }; + private static final int CACHE_DISABLED_SIZE = 0; + private static final int REPEAT_COUNT = 3; + private static final int BATCH_COUNT = 800; + private static final int ROWS_PER_BATCH = 100; + private static final String DEVICE = "root.sg_cache_perf.d1"; + + @Test + public void testEndToEndWritePerformanceWithDifferentSingleMeasurementCheckCacheSizes() + throws Exception { + for (ExperimentGroup experimentGroup : EXPERIMENT_GROUPS) { + long totalDisabledCost = 0; + long totalEnabledCost = 0; + for (int repeatIndex = 0; repeatIndex < REPEAT_COUNT; repeatIndex++) { + long disabledCost = + runWritePerformanceExperiment( + new Experiment(experimentGroup.measurementCount, CACHE_DISABLED_SIZE)); + long enabledCost = + runWritePerformanceExperiment( + new Experiment(experimentGroup.measurementCount, experimentGroup.cacheSize)); + totalDisabledCost += disabledCost; + totalEnabledCost += enabledCost; + LOGGER.info( + "End-to-end write cost repeat {}/{} with measurementCount={}, cache disabled: {} ms, " + + "cacheSize={} (cacheSize {} measurementCount): {} ms, enabled/disabled ratio: {}", + repeatIndex + 1, + REPEAT_COUNT, + experimentGroup.measurementCount, + disabledCost / 1_000_000, + experimentGroup.cacheSize, + experimentGroup.cacheSizeRelation(), + enabledCost / 1_000_000, + String.format("%.3f", (double) enabledCost / disabledCost)); + } + long averageDisabledCost = totalDisabledCost / REPEAT_COUNT; + long averageEnabledCost = totalEnabledCost / REPEAT_COUNT; + LOGGER.info( + "Average end-to-end write cost after {} repeats with measurementCount={}, cache disabled: " + + "{} ms, cacheSize={} (cacheSize {} measurementCount): {} ms, enabled/disabled " + + "ratio: {}", + REPEAT_COUNT, + experimentGroup.measurementCount, + averageDisabledCost / 1_000_000, + experimentGroup.cacheSize, + experimentGroup.cacheSizeRelation(), + averageEnabledCost / 1_000_000, + String.format("%.3f", (double) averageEnabledCost / averageDisabledCost)); + Assert.assertTrue(totalDisabledCost > 0); + Assert.assertTrue(totalEnabledCost > 0); + } + } + + private long runWritePerformanceExperiment(Experiment experiment) throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setSingleMeasurementCheckCacheSize(experiment.cacheSize) + .setAutoCreateSchemaEnabled(false); + EnvFactory.getEnv().initClusterEnvironment(); + try { + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + createTimeseries(session, experiment.measurementCount); + long cost = executeWriteWorkload(session, experiment.measurementCount); + assertRowCount(session); + return cost; + } + } finally { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + } + + private void createTimeseries(ISession session, int measurementCount) + throws IoTDBConnectionException, StatementExecutionException { + session.executeNonQueryStatement("CREATE DATABASE root.sg_cache_perf"); + for (int i = 0; i < measurementCount; i++) { + session.executeNonQueryStatement( + "CREATE TIMESERIES " + + DEVICE + + ".`sensor+" + + i + + "` WITH DATATYPE=INT64, ENCODING=PLAIN"); + } + } + + private long executeWriteWorkload(ISession session, int measurementCount) + throws IoTDBConnectionException, StatementExecutionException { + List<String> measurements = new ArrayList<>(measurementCount); + List<TSDataType> types = new ArrayList<>(measurementCount); + List<Object> values = new ArrayList<>(measurementCount); + for (int i = 0; i < measurementCount; i++) { + measurements.add("`sensor+" + i + "`"); + types.add(TSDataType.INT64); + values.add((long) i); + } + + long startTime = System.nanoTime(); + for (int batchIndex = 0; batchIndex < BATCH_COUNT; batchIndex++) { + List<Long> timestamps = new ArrayList<>(ROWS_PER_BATCH); + List<List<String>> measurementsList = new ArrayList<>(ROWS_PER_BATCH); + List<List<TSDataType>> typesList = new ArrayList<>(ROWS_PER_BATCH); + List<List<Object>> valuesList = new ArrayList<>(ROWS_PER_BATCH); + for (int rowIndex = 0; rowIndex < ROWS_PER_BATCH; rowIndex++) { + timestamps.add((long) batchIndex * ROWS_PER_BATCH + rowIndex); + measurementsList.add(measurements); + typesList.add(types); + valuesList.add(values); + } + session.insertRecordsOfOneDevice(DEVICE, timestamps, measurementsList, typesList, valuesList); + } + return System.nanoTime() - startTime; + } + + private void assertRowCount(ISession session) + throws IoTDBConnectionException, StatementExecutionException { + try (org.apache.iotdb.isession.SessionDataSet dataSet = + session.executeQueryStatement("SELECT COUNT(`sensor+0`) FROM " + DEVICE)) { + Assert.assertTrue(dataSet.hasNext()); + Assert.assertEquals( + (long) BATCH_COUNT * ROWS_PER_BATCH, dataSet.next().getFields().get(0).getLongV()); + Assert.assertFalse(dataSet.hasNext()); + } + } + + private static class ExperimentGroup { + + private final int measurementCount; + private final int cacheSize; + + private ExperimentGroup(int measurementCount, int cacheSize) { + this.measurementCount = measurementCount; + this.cacheSize = cacheSize; + } + + private String cacheSizeRelation() { + if (cacheSize < measurementCount) { + return "<"; + } + if (cacheSize == measurementCount) { + return "="; + } + return ">"; + } + } + + private static class Experiment { + + private final int measurementCount; + private final int cacheSize; + + private Experiment(int measurementCount, int cacheSize) { + this.measurementCount = measurementCount; + this.cacheSize = cacheSize; + } + } +}
