This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 56bc93ef02e [To dev/1.3] Update last cache in load (#15637)
56bc93ef02e is described below
commit 56bc93ef02ecfa195ddebe884c01ea6e59514f51
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Jun 4 15:18:23 2025 +0800
[To dev/1.3] Update last cache in load (#15637)
---
.../it/env/cluster/config/MppDataNodeConfig.java | 12 +
.../it/env/remote/config/RemoteDataNodeConfig.java | 10 +
.../apache/iotdb/itbase/env/DataNodeConfig.java | 4 +
.../apache/iotdb/db/it/IoTDBLoadLastCacheIT.java | 556 +++++++++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 38 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 17 +
.../pipeconsensus/PipeConsensusReceiver.java | 26 +-
.../plan/analyze/LoadTsFileAnalyzer.java | 5 +-
.../cache/schema/LastCacheLoadStrategy.java | 30 +-
.../cache/schema/TimeSeriesSchemaCache.java | 2 +-
.../scheduler/load/LoadTsFileDispatcherImpl.java | 3 +-
.../db/storageengine/dataregion/DataRegion.java | 146 +++++-
.../dataregion/tsfile/TsFileResource.java | 11 +
.../dataregion/utils/TsFileResourceUtils.java | 70 ++-
.../file/AbstractTsFileRecoverPerformer.java | 2 +-
.../db/storageengine/load/LoadTsFileManager.java | 79 ++-
.../conf/iotdb-system.properties.template | 18 +
pom.xml | 2 +-
18 files changed, 993 insertions(+), 38 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index e6fa19367fd..01636b7bf0b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -88,4 +88,16 @@ public class MppDataNodeConfig extends MppBaseConfig
implements DataNodeConfig {
String.valueOf(loadTsFileAnalyzeSchemaMemorySizeInBytes));
return this;
}
+
+ @Override
+ public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
+ setProperty("last_cache_operation_on_load", strategyName);
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setCacheLastValuesForLoad(boolean
cacheLastValuesForLoad) {
+ setProperty("cache_last_values_for_load",
String.valueOf(cacheLastValuesForLoad));
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 2c85a7e5ddf..80d9d98d23d 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -53,4 +53,14 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
return this;
}
+
+ @Override
+ public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setCacheLastValuesForLoad(boolean
cacheLastValuesForLoad) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index 980ab74c10c..c6112a0e639 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -35,4 +35,8 @@ public interface DataNodeConfig {
DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
long loadTsFileAnalyzeSchemaMemorySizeInBytes);
+
+ DataNodeConfig setLoadLastCacheStrategy(String strategyName);
+
+ DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
new file mode 100644
index 00000000000..a005f80d19d
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.utils.TsFileGenerator;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.jdbc.IoTDBSQLException;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@SuppressWarnings({"ResultOfMethodCallIgnored", "UnstableApiUsage"})
+@RunWith(Parameterized.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLoadLastCacheIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBLoadLastCacheIT.class);
+ private static final long PARTITION_INTERVAL = 10 * 1000L;
+ private static final int connectionTimeoutInMS = (int)
TimeUnit.SECONDS.toMillis(300);
+ private static final long loadTsFileAnalyzeSchemaMemorySizeInBytes = 10 *
1024L;
+
+ private File tmpDir;
+ private final LastCacheLoadStrategy lastCacheLoadStrategy;
+
+ @Parameters(name = "loadLastCacheStrategy={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][] {
+ {LastCacheLoadStrategy.CLEAN_ALL},
+ {LastCacheLoadStrategy.UPDATE},
+ {LastCacheLoadStrategy.UPDATE_NO_BLOB},
+ {LastCacheLoadStrategy.CLEAN_DEVICE}
+ });
+ }
+
+ public IoTDBLoadLastCacheIT(LastCacheLoadStrategy lastCacheLoadStrategy) {
+ this.lastCacheLoadStrategy = lastCacheLoadStrategy;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ tmpDir = new File(Files.createTempDirectory("load").toUri());
+
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL);
+ EnvFactory.getEnv()
+ .getConfig()
+ .getDataNodeConfig()
+ .setConnectionTimeoutInMS(connectionTimeoutInMS)
+
.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(loadTsFileAnalyzeSchemaMemorySizeInBytes);
+ EnvFactory.getEnv()
+ .getConfig()
+ .getDataNodeConfig()
+ .setLoadLastCacheStrategy(lastCacheLoadStrategy.name())
+ .setCacheLastValuesForLoad(true);
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ deleteSG();
+ EnvFactory.getEnv().cleanClusterEnvironment();
+
+ if (!deleteDir()) {
+ LOGGER.error("Can not delete tmp dir for loading tsfile.");
+ }
+ }
+
+ private void registerSchema() throws SQLException {
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ statement.execute("CREATE DATABASE " + SchemaConfig.STORAGE_GROUP_0);
+ statement.execute("CREATE DATABASE " + SchemaConfig.STORAGE_GROUP_1);
+
+ statement.execute(convert2SQL(SchemaConfig.DEVICE_0,
SchemaConfig.MEASUREMENT_00));
+ statement.execute(convert2SQL(SchemaConfig.DEVICE_0,
SchemaConfig.MEASUREMENT_01));
+ statement.execute(convert2SQL(SchemaConfig.DEVICE_0,
SchemaConfig.MEASUREMENT_02));
+ statement.execute(convert2SQL(SchemaConfig.DEVICE_0,
SchemaConfig.MEASUREMENT_03));
+
+ statement.execute(
+ convert2AlignedSQL(
+ SchemaConfig.DEVICE_1,
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_10,
+ SchemaConfig.MEASUREMENT_11,
+ SchemaConfig.MEASUREMENT_12,
+ SchemaConfig.MEASUREMENT_13,
+ SchemaConfig.MEASUREMENT_14,
+ SchemaConfig.MEASUREMENT_15,
+ SchemaConfig.MEASUREMENT_16,
+ SchemaConfig.MEASUREMENT_17)));
+
+ statement.execute(convert2SQL(SchemaConfig.DEVICE_2,
SchemaConfig.MEASUREMENT_20));
+
+ statement.execute(convert2SQL(SchemaConfig.DEVICE_3,
SchemaConfig.MEASUREMENT_30));
+
+ statement.execute(
+ convert2AlignedSQL(
+ SchemaConfig.DEVICE_4,
Collections.singletonList(SchemaConfig.MEASUREMENT_40)));
+ }
+ }
+
+ private String convert2SQL(final String device, final MeasurementSchema
schema) {
+ final String sql =
+ String.format(
+ "create timeseries %s %s",
+ new Path(device, schema.getMeasurementId(), true).getFullPath(),
+ schema.getType().name());
+ LOGGER.info("schema execute: {}", sql);
+ return sql;
+ }
+
+ private String convert2AlignedSQL(final String device, final
List<IMeasurementSchema> schemas) {
+ StringBuilder sql = new StringBuilder(String.format("create aligned
timeseries %s(", device));
+ for (int i = 0; i < schemas.size(); i++) {
+ final IMeasurementSchema schema = schemas.get(i);
+ sql.append(String.format("%s %s", schema.getMeasurementId(),
schema.getType().name()));
+ sql.append(i == schemas.size() - 1 ? ")" : ",");
+ }
+ LOGGER.info("schema execute: {}.", sql);
+ return sql.toString();
+ }
+
+ private void deleteSG() throws SQLException {
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ statement.execute(String.format("delete database %s",
SchemaConfig.STORAGE_GROUP_0));
+ statement.execute(String.format("delete database %s",
SchemaConfig.STORAGE_GROUP_1));
+ } catch (final IoTDBSQLException ignored) {
+ }
+ }
+
+ private boolean deleteDir() {
+ for (final File file : Objects.requireNonNull(tmpDir.listFiles())) {
+ if (!file.delete()) {
+ return false;
+ }
+ }
+ return tmpDir.delete();
+ }
+
+ @Test
+ public void testTreeModelLoadWithLastCache() throws Exception {
+ registerSchema();
+
+ final String device = SchemaConfig.DEVICE_0;
+ final String measurement = SchemaConfig.MEASUREMENT_00.getMeasurementId();
+
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ statement.execute(
+ String.format("insert into %s(timestamp, %s) values(100, 100)",
device, measurement));
+
+ try (final ResultSet resultSet =
+ statement.executeQuery(String.format("select last %s from %s",
measurement, device))) {
+ if (resultSet.next()) {
+ final String lastValue =
resultSet.getString(ColumnHeaderConstant.VALUE);
+ Assert.assertEquals("100", lastValue);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+ }
+
+ final File file1 = new File(tmpDir, "1-0-0-0.tsfile");
+ final File file2 = new File(tmpDir, "2-0-0-0.tsfile");
+ // device 0, device 1, sg 0
+ try (final TsFileGenerator generator = new TsFileGenerator(file1)) {
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_0,
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_00,
+ SchemaConfig.MEASUREMENT_01,
+ SchemaConfig.MEASUREMENT_02,
+ SchemaConfig.MEASUREMENT_03,
+ SchemaConfig.MEASUREMENT_04,
+ SchemaConfig.MEASUREMENT_05,
+ SchemaConfig.MEASUREMENT_06,
+ SchemaConfig.MEASUREMENT_07));
+ generator.registerAlignedTimeseries(
+ SchemaConfig.DEVICE_1,
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_10,
+ SchemaConfig.MEASUREMENT_11,
+ SchemaConfig.MEASUREMENT_12,
+ SchemaConfig.MEASUREMENT_13,
+ SchemaConfig.MEASUREMENT_14,
+ SchemaConfig.MEASUREMENT_15,
+ SchemaConfig.MEASUREMENT_16,
+ SchemaConfig.MEASUREMENT_17));
+ generator.generateData(SchemaConfig.DEVICE_0, 10000, PARTITION_INTERVAL
/ 10_000, false);
+ generator.generateData(SchemaConfig.DEVICE_1, 10000, PARTITION_INTERVAL
/ 10_000, true);
+ }
+
+ // device 2, device 3, device4, sg 1
+ try (final TsFileGenerator generator = new TsFileGenerator(file2)) {
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_2,
Collections.singletonList(SchemaConfig.MEASUREMENT_20));
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_3,
Collections.singletonList(SchemaConfig.MEASUREMENT_30));
+ generator.registerAlignedTimeseries(
+ SchemaConfig.DEVICE_4,
Collections.singletonList(SchemaConfig.MEASUREMENT_40));
+ generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL
/ 10_000, false);
+ generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL
/ 10_000, false);
+ generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL
/ 10_000, true);
+ }
+
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ statement.execute(String.format("load \"%s\" sglevel=2",
tmpDir.getAbsolutePath()));
+
+ try (final ResultSet resultSet =
+ statement.executeQuery(String.format("select last %s from %s",
measurement, device))) {
+ if (resultSet.next()) {
+ final String lastTime =
resultSet.getString(ColumnHeaderConstant.TIME);
+ Assert.assertEquals(String.valueOf(PARTITION_INTERVAL), lastTime);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+ }
+ }
+
+ private static class PerformanceSchemas {
+
+ private final String database;
+ private final List<MeasurementSchema> measurementSchemas;
+ private final List<String> columnNames;
+ private final List<TSDataType> dataTypes;
+
+ public PerformanceSchemas(
+ String database, String tableName, int measurementNum, int
blobMeasurementNum) {
+ this.database = database;
+ columnNames = new ArrayList<>(measurementNum + blobMeasurementNum);
+ dataTypes = new ArrayList<>(measurementNum + blobMeasurementNum);
+ measurementSchemas = new ArrayList<>(measurementNum +
blobMeasurementNum);
+
+ columnNames.add("device_id");
+ dataTypes.add(TSDataType.STRING);
+ for (int i = 0; i < measurementNum; i++) {
+ columnNames.add("s" + i);
+ dataTypes.add(TSDataType.INT64);
+ measurementSchemas.add(new MeasurementSchema("s" + i,
TSDataType.INT64));
+ }
+ for (int i = 0; i < blobMeasurementNum; i++) {
+ columnNames.add("s" + (measurementNum + i));
+ dataTypes.add(TSDataType.BLOB);
+ measurementSchemas.add(new MeasurementSchema("s" + (measurementNum +
i), TSDataType.BLOB));
+ }
+ }
+ }
+
+ private void generateAndLoadOne(
+ int deviceCnt,
+ int measurementCnt,
+ int blobMeasurementCnt,
+ int pointCnt,
+ int offset,
+ PerformanceSchemas schemas,
+ int fileNum,
+ Statement statement)
+ throws Exception {
+ File file = new File("target" + File.separator + fileNum + ".tsfile");
+ try (TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+ int rowIndex = 0;
+ for (int i = 0; i < deviceCnt; i++) {
+ PartialPath devicePath = new PartialPath(new PlainDeviceID("d" + i));
+ tsFileWriter.registerAlignedTimeseries(devicePath,
schemas.measurementSchemas);
+ Tablet tablet =
+ new Tablet("root.db1.d" + i, schemas.measurementSchemas, pointCnt
* deviceCnt);
+ for (int j = 0; j < pointCnt; j++) {
+ tablet.addTimestamp(rowIndex, j + offset);
+ for (int k = 0; k < measurementCnt; k++) {
+ tablet.addValue("s" + k, rowIndex, (long) j + offset);
+ }
+ for (int k = 0; k < blobMeasurementCnt; k++) {
+ tablet.addValue("s" + (k + 1 + measurementCnt), rowIndex,
String.valueOf(j + offset));
+ }
+ rowIndex++;
+ }
+ tsFileWriter.writeAligned(tablet);
+ }
+ }
+
+ statement.execute(String.format("load '%s'", file.getAbsolutePath(),
schemas.database));
+
+ file.delete();
+ }
+
+ private void generateAndLoadAll(
+ int deviceCnt,
+ int measurementCnt,
+ int blobMeasurementCnt,
+ int pointCnt,
+ PerformanceSchemas schemas,
+ int fileNum)
+ throws Exception {
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ for (int i = 0; i < fileNum; i++) {
+ generateAndLoadOne(
+ deviceCnt,
+ measurementCnt,
+ blobMeasurementCnt,
+ pointCnt,
+ pointCnt * i,
+ schemas,
+ fileNum,
+ statement);
+ }
+ }
+ }
+
+ private long queryLastOnce(int deviceNum, int measurementNum, Statement
statement)
+ throws SQLException {
+ try (final ResultSet resultSet =
+ statement.executeQuery(
+ String.format(
+ "select last %s from root.db1.d%s", "s" + measurementNum, "d"
+ deviceNum))) {
+ if (resultSet.next()) {
+ return resultSet.getLong("_col0");
+ } else {
+ return -1;
+ }
+ } catch (SQLException e) {
+ if (!e.getMessage().contains("does not exist")) {
+ throw e;
+ }
+ }
+ return -1;
+ }
+
+ @SuppressWarnings("BusyWait")
+ private void queryAll(
+ int deviceCnt,
+ int measurementCnt,
+ int pointCnt,
+ int fileCnt,
+ PerformanceSchemas schemas,
+ RateLimiter rateLimiter)
+ throws SQLException {
+ Random random = new Random();
+ long totalStart = System.currentTimeMillis();
+ List<Long> timeConsumptions = new ArrayList<>();
+
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ while (true) {
+ int deviceNum = random.nextInt(deviceCnt);
+ int measurementNum = random.nextInt(measurementCnt);
+ rateLimiter.acquire();
+ long start = System.nanoTime();
+ long result = queryLastOnce(deviceNum, measurementNum, statement);
+ long timeConsumption = System.nanoTime() - start;
+ if (result == -1) {
+ try {
+ Thread.sleep(1000);
+ continue;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ System.out.printf(
+ "%s: d%d.s%d %s %s%n", new Date(), deviceNum, measurementNum,
result, timeConsumption);
+ timeConsumptions.add(timeConsumption);
+ if (result == (long) pointCnt * fileCnt - 1) {
+ break;
+ }
+ }
+ }
+
+ System.out.printf(
+ "Synchronization ends after %dms, query latency avg %fms %n",
+ System.currentTimeMillis() - totalStart,
+ timeConsumptions.stream().mapToLong(i -> i).average().orElse(0.0) /
1000000);
+ }
+
+ @Ignore("Performance")
+ @Test
+ public void testLoadPerformance() throws Exception {
+ int deviceCnt = 100;
+ int measurementCnt = 100;
+ int blobMeasurementCnt = 10;
+ int pointCnt = 100;
+ int fileCnt = 100000;
+ int queryPerSec = 100;
+ int queryThreadsNum = 10;
+
+ PerformanceSchemas schemas =
+ new PerformanceSchemas("test", "test_table", measurementCnt,
blobMeasurementCnt);
+
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE IF NOT EXISTS " + schemas.database);
+ }
+
+ Thread loadThread =
+ new Thread(
+ () -> {
+ try {
+ generateAndLoadAll(
+ deviceCnt, measurementCnt, blobMeasurementCnt, pointCnt,
schemas, fileCnt);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ });
+
+ RateLimiter rateLimiter = RateLimiter.create(queryPerSec);
+ List<Thread> queryThreads = new ArrayList<>(queryThreadsNum);
+ for (int i = 0; i < queryThreadsNum; i++) {
+ Thread queryThread =
+ new Thread(
+ () -> {
+ try {
+ queryAll(
+ deviceCnt,
+ measurementCnt + blobMeasurementCnt,
+ pointCnt,
+ fileCnt,
+ schemas,
+ rateLimiter);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ });
+ queryThreads.add(queryThread);
+ }
+
+ loadThread.start();
+ queryThreads.forEach(Thread::start);
+
+ loadThread.join();
+ for (Thread queryThread : queryThreads) {
+ queryThread.join();
+ }
+ }
+
+ private static class SchemaConfig {
+
+ private static final String STORAGE_GROUP_0 = "root.sg.test_0";
+ private static final String STORAGE_GROUP_1 = "root.sg.test_1";
+
+ // device 0, nonaligned, sg 0
+ private static final String DEVICE_0 = "root.sg.test_0.d_0";
+ private static final MeasurementSchema MEASUREMENT_00 =
+ new MeasurementSchema("sensor_00", TSDataType.INT32, TSEncoding.RLE);
+ private static final MeasurementSchema MEASUREMENT_01 =
+ new MeasurementSchema("sensor_01", TSDataType.INT64, TSEncoding.RLE);
+ private static final MeasurementSchema MEASUREMENT_02 =
+ new MeasurementSchema("sensor_02", TSDataType.DOUBLE,
TSEncoding.GORILLA);
+ private static final MeasurementSchema MEASUREMENT_03 =
+ new MeasurementSchema("sensor_03", TSDataType.TEXT, TSEncoding.PLAIN);
+ private static final MeasurementSchema MEASUREMENT_04 =
+ new MeasurementSchema("sensor_04", TSDataType.TIMESTAMP,
TSEncoding.RLE);
+ private static final MeasurementSchema MEASUREMENT_05 =
+ new MeasurementSchema("sensor_05", TSDataType.DATE, TSEncoding.RLE);
+ private static final MeasurementSchema MEASUREMENT_06 =
+ new MeasurementSchema("sensor_06", TSDataType.BLOB, TSEncoding.PLAIN);
+ private static final MeasurementSchema MEASUREMENT_07 =
+ new MeasurementSchema("sensor_07", TSDataType.STRING,
TSEncoding.PLAIN);
+
+ // device 1, aligned, sg 0
+ private static final String DEVICE_1 = "root.sg.test_0.a_1";
+ private static final MeasurementSchema MEASUREMENT_10 =
+ new MeasurementSchema("sensor_10", TSDataType.INT32, TSEncoding.RLE);
+ private static final MeasurementSchema MEASUREMENT_11 =
+ new MeasurementSchema("sensor_11", TSDataType.INT64, TSEncoding.RLE);
+ private static final MeasurementSchema MEASUREMENT_12 =
+ new MeasurementSchema("sensor_12", TSDataType.DOUBLE,
TSEncoding.GORILLA);
+ private static final MeasurementSchema MEASUREMENT_13 =
+ new MeasurementSchema("sensor_13", TSDataType.TEXT, TSEncoding.PLAIN);
+ private static final MeasurementSchema MEASUREMENT_14 =
+ new MeasurementSchema("sensor_14", TSDataType.TIMESTAMP,
TSEncoding.RLE);
+ private static final MeasurementSchema MEASUREMENT_15 =
+ new MeasurementSchema("sensor_15", TSDataType.DATE, TSEncoding.RLE);
+ private static final MeasurementSchema MEASUREMENT_16 =
+ new MeasurementSchema("sensor_16", TSDataType.BLOB, TSEncoding.PLAIN);
+ private static final MeasurementSchema MEASUREMENT_17 =
+ new MeasurementSchema("sensor_17", TSDataType.STRING,
TSEncoding.PLAIN);
+
+ // device 2, non aligned, sg 1
+ private static final String DEVICE_2 = "root.sg.test_1.d_2";
+ private static final MeasurementSchema MEASUREMENT_20 =
+ new MeasurementSchema("sensor_20", TSDataType.INT32, TSEncoding.RLE);
+
+ // device 3, non aligned, sg 1
+ private static final String DEVICE_3 = "root.sg.test_1.d_3";
+ private static final MeasurementSchema MEASUREMENT_30 =
+ new MeasurementSchema("sensor_30", TSDataType.INT32, TSEncoding.RLE);
+
+ // device 4, aligned, sg 1
+ private static final String DEVICE_4 = "root.sg.test_1.a_4";
+ private static final MeasurementSchema MEASUREMENT_40 =
+ new MeasurementSchema("sensor_40", TSDataType.INT32, TSEncoding.RLE);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 61625b8a2ad..7f31bbefd48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.audit.AuditLogOperation;
import org.apache.iotdb.db.audit.AuditLogStorage;
import org.apache.iotdb.db.exception.LoadConfigurationException;
import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
@@ -1210,6 +1211,17 @@ public class IoTDBConfig {
private CompressionType WALCompressionAlgorithm = CompressionType.LZ4;
+ private LastCacheLoadStrategy lastCacheLoadStrategy =
LastCacheLoadStrategy.UPDATE;
+
+ /**
+ * Whether to cache last values when constructing TsFileResource during
LOAD. When set to true,
+ * blob series will be forcibly ignored even if lastCacheLoadStrategy =
+ * LastCacheLoadStrategy.UPDATE.
+ */
+ private boolean cacheLastValuesForLoad = true;
+
+ private long cacheLastValuesMemoryBudgetInByte = 4 * 1024 * 1024;
+
IoTDBConfig() {}
public int getMaxLogEntriesNumPerBatch() {
@@ -4276,4 +4288,30 @@ public class IoTDBConfig {
public void setWALCompressionAlgorithm(CompressionType
WALCompressionAlgorithm) {
this.WALCompressionAlgorithm = WALCompressionAlgorithm;
}
+
+ public LastCacheLoadStrategy getLastCacheLoadStrategy() {
+ return lastCacheLoadStrategy;
+ }
+
+ public void setLastCacheLoadStrategy(LastCacheLoadStrategy
lastCacheLoadStrategy) {
+ this.lastCacheLoadStrategy = lastCacheLoadStrategy;
+ }
+
+ public boolean isCacheLastValuesForLoad() {
+ return (lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE
+ || lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE_NO_BLOB)
+ && cacheLastValuesForLoad;
+ }
+
+ public void setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) {
+ this.cacheLastValuesForLoad = cacheLastValuesForLoad;
+ }
+
+ public long getCacheLastValuesMemoryBudgetInByte() {
+ return cacheLastValuesMemoryBudgetInByte;
+ }
+
+ public void setCacheLastValuesMemoryBudgetInByte(long
cacheLastValuesMemoryBudgetInByte) {
+ this.cacheLastValuesMemoryBudgetInByte = cacheLastValuesMemoryBudgetInByte;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6f058156b61..94576d0aabf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy;
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
import org.apache.iotdb.db.storageengine.StorageEngine;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
@@ -2463,6 +2464,22 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_disk_select_strategy_for_pipe_and_iotv2",
ILoadDiskSelector.LoadDiskSelectorType.INHERIT_LOAD.getValue()));
+
+ conf.setLastCacheLoadStrategy(
+ LastCacheLoadStrategy.valueOf(
+ properties.getProperty(
+ "last_cache_operation_on_load",
LastCacheLoadStrategy.UPDATE.name())));
+
+ conf.setCacheLastValuesForLoad(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "cache_last_values_for_load",
String.valueOf(conf.isCacheLastValuesForLoad()))));
+
+ conf.setCacheLastValuesMemoryBudgetInByte(
+ Long.parseLong(
+ properties.getProperty(
+ "cache_last_values_memory_budget_in_byte",
+ String.valueOf(conf.getCacheLastValuesMemoryBudgetInByte()))));
}
private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws
IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index e9a242a69b7..c9823985727 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -641,9 +641,23 @@ public class PipeConsensusReceiver {
private TSStatus loadFileToDataRegion(String filePath, ProgressIndex
progressIndex)
throws IOException, LoadFileException {
- StorageEngine.getInstance()
- .getDataRegion(((DataRegionId) consensusGroupId))
- .loadNewTsFile(generateTsFileResource(filePath, progressIndex), true,
false);
+ DataRegion region =
+ StorageEngine.getInstance().getDataRegion(((DataRegionId)
consensusGroupId));
+ if (region != null) {
+ TsFileResource resource =
+ generateTsFileResource(
+ filePath,
+ progressIndex,
+
IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad());
+ region.loadNewTsFile(resource, true, false, true);
+ } else {
+ // Data region is null indicates that dr has been removed or migrated.
In those cases, there
+ // is no need to replicate data. we just return success to avoid leader
keeping retry
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: skip load tsfile-{} when sealing,
because this region has been removed or migrated.",
+ consensusPipeName,
+ filePath);
+ }
return RpcUtils.SUCCESS_STATUS;
}
@@ -687,13 +701,13 @@ public class PipeConsensusReceiver {
dataRegion, databaseName, writePointCount, true));
}
- private TsFileResource generateTsFileResource(String filePath, ProgressIndex
progressIndex)
- throws IOException {
+ private TsFileResource generateTsFileResource(
+ String filePath, ProgressIndex progressIndex, boolean cacheLastValues)
throws IOException {
final File tsFile = new File(filePath);
final TsFileResource tsFileResource = new TsFileResource(tsFile);
try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
- TsFileResourceUtils.updateTsFileResource(reader, tsFileResource);
+ TsFileResourceUtils.updateTsFileResource(reader, tsFileResource,
cacheLastValues);
}
tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index cb504aee9c5..19d5cd636d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -457,7 +457,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
timeseriesMetadataIterator.next();
// Update time index no matter if resource file exists or not, because
resource file may be
// untrusted
- TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata,
tsFileResource);
+ TsFileResourceUtils.updateTsFileResource(
+ device2TimeseriesMetadata,
+ tsFileResource,
+
IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad());
schemaAutoCreatorAndVerifier.setCurrentTimeIndex(tsFileResource.getTimeIndex());
if (isAutoCreateSchemaOrVerifySchemaEnabled) {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/LastCacheLoadStrategy.java
similarity index 55%
copy from
integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/LastCacheLoadStrategy.java
index 980ab74c10c..d9738e7d304 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/LastCacheLoadStrategy.java
@@ -16,23 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.iotdb.itbase.env;
-
-import java.util.List;
-
-/** This interface is used to handle properties in iotdb-datanode.properties.
*/
-public interface DataNodeConfig {
- DataNodeConfig setMetricReporterType(List<String> metricReporterTypes);
-
- DataNodeConfig setMetricPrometheusReporterUsername(String username);
-
- DataNodeConfig setMetricPrometheusReporterPassword(String password);
-
- DataNodeConfig setEnableRestService(boolean enableRestService);
-
- DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS);
-
- DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
- long loadTsFileAnalyzeSchemaMemorySizeInBytes);
+package org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache;
+
+public enum LastCacheLoadStrategy {
+ // when a TsFile is loaded, read its data to update LastCache
+ UPDATE,
+ // similar to UPDATE, but will invalidate cache of Blob series instead of
updating them
+ UPDATE_NO_BLOB,
+ // when a TsFile is loaded, clean its included device in LastCache
+ CLEAN_DEVICE,
+ // when a TsFile is loaded, clean all LastCache
+ CLEAN_ALL
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
index 99d3d17f666..fadd38c5ae4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
@@ -309,7 +309,7 @@ public class TimeSeriesSchemaCache {
if (entry == null) {
synchronized (dualKeyCache) {
entry = dualKeyCache.get(devicePath, measurements[index]);
- if (null == entry) {
+ if (null == entry && measurementSchemas != null) {
entry = new SchemaCacheEntry(database, measurementSchemas[index],
null, isAligned);
dualKeyCache.put(devicePath, measurements[index], entry);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index f50533bec47..28660fd0e5d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -177,7 +177,8 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
.loadNewTsFile(
tsFileResource,
((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(),
- isGeneratedByPipe);
+ isGeneratedByPipe,
+ false);
} catch (LoadFileException e) {
LOGGER.warn("Load TsFile Node {} error.", planNode, e);
TSStatus resultStatus = new TSStatus();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index a900e5164e6..5e9b25bdda1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.PartialPath;
@@ -60,6 +61,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy;
import org.apache.iotdb.db.service.SettleService;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
@@ -96,8 +98,10 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheReader;
import
org.apache.iotdb.db.storageengine.dataregion.utils.validate.TsFileValidator;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
@@ -130,7 +134,9 @@ import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.fileSystem.FSType;
import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.read.reader.TsFileLastReader;
import org.apache.tsfile.utils.FSUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -2908,7 +2914,8 @@ public class DataRegion implements IDataRegionForQuery {
public void loadNewTsFile(
final TsFileResource newTsFileResource,
final boolean deleteOriginFile,
- final boolean isGeneratedByPipe)
+ final boolean isGeneratedByPipe,
+ final boolean isFromConsensus)
throws LoadFileException {
final File tsfileToBeInserted = newTsFileResource.getTsFile();
final long newFilePartitionId =
newTsFileResource.getTimePartitionWithCheck();
@@ -2918,6 +2925,23 @@ public class DataRegion implements IDataRegionForQuery {
"tsfile validate failed, " +
newTsFileResource.getTsFile().getName());
}
+ TsFileLastReader lastReader = null;
+ LastCacheLoadStrategy lastCacheLoadStrategy =
config.getLastCacheLoadStrategy();
+ if ((lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE
+ || lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE_NO_BLOB)
+ && newTsFileResource.getLastValues() == null) {
+ try {
+ // init reader outside of lock to boost performance
+ lastReader =
+ new TsFileLastReader(
+ newTsFileResource.getTsFilePath(),
+ true,
+ lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE_NO_BLOB);
+ } catch (IOException e) {
+ throw new LoadFileException(e);
+ }
+ }
+
writeLock("loadNewTsFile");
try {
if (deleted) {
@@ -2971,6 +2995,7 @@ public class DataRegion implements IDataRegionForQuery {
false);
}
+ onTsFileLoaded(newTsFileResource, isFromConsensus, lastReader);
logger.info("TsFile {} is successfully loaded in unsequence list.",
newFileName);
} catch (final DiskSpaceInsufficientException e) {
logger.error(
@@ -2978,15 +3003,128 @@ public class DataRegion implements IDataRegionForQuery
{
tsfileToBeInserted.getAbsolutePath(),
tsfileToBeInserted.getParentFile().getName());
throw new LoadFileException(e);
+ } catch (Exception e) {
+ throw new LoadFileException(e);
} finally {
writeUnlock();
- // TODO: do more precise control
- if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- DataNodeSchemaCache.getInstance().invalidateAll();
+ if (lastReader != null) {
+ try {
+ lastReader.close();
+ } catch (Exception e) {
+ logger.warn("Cannot close last reader after loading TsFile {}",
newTsFileResource, e);
+ }
+ }
+ }
+ }
+
+ private void onTsFileLoaded(
+ TsFileResource newTsFileResource, boolean isFromConsensus,
TsFileLastReader lastReader) {
+ if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() &&
!isFromConsensus) {
+ switch (config.getLastCacheLoadStrategy()) {
+ case UPDATE:
+ case UPDATE_NO_BLOB:
+ updateLastCache(newTsFileResource, lastReader);
+ break;
+ case CLEAN_ALL:
+ // The inner cache is shared by TreeDeviceSchemaCacheManager and
+ // TableDeviceSchemaCacheManager,
+ // so cleaning either of them is enough
+ DataNodeSchemaCache.getInstance().invalidateAll();
+ break;
+ case CLEAN_DEVICE:
+ ITimeIndex timeIndex = newTsFileResource.getTimeIndex();
+ if (timeIndex instanceof DeviceTimeIndex) {
+ DeviceTimeIndex deviceTimeIndex = (DeviceTimeIndex) timeIndex;
+ for (IDeviceID deviceID : deviceTimeIndex.getDevices()) {
+ try {
+ DataNodeSchemaCache.getInstance()
+ .invalidateLastCache(new PartialPath(deviceID, "**"));
+ } catch (IllegalPathException e) {
+ logger.error(
+ "Failed to construct path for invalidating last cache of
{}", deviceID, e);
+ DataNodeSchemaCache.getInstance().invalidateAll();
+ return;
+ }
+ }
+ } else {
+ DataNodeSchemaCache.getInstance().invalidateAll();
+ }
+ break;
+ default:
+ logger.warn(
+ "Unrecognized LastCacheLoadStrategy: {}, fall back to CLEAN_ALL",
+
IoTDBDescriptor.getInstance().getConfig().getLastCacheLoadStrategy());
+ DataNodeSchemaCache.getInstance().invalidateAll();
+ break;
}
}
}
+ @SuppressWarnings("java:S112")
+ private void updateLastCache(TsFileResource newTsFileResource,
TsFileLastReader lastReader) {
+
+ Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues =
+ newTsFileResource.getLastValues();
+ if (lastValues != null) {
+ for (Entry<IDeviceID, List<Pair<String, TimeValuePair>>> entry :
lastValues.entrySet()) {
+ IDeviceID deviceID = entry.getKey();
+ String[] measurements =
entry.getValue().stream().map(Pair::getLeft).toArray(String[]::new);
+ TimeValuePair[] timeValuePairs =
+
entry.getValue().stream().map(Pair::getRight).toArray(TimeValuePair[]::new);
+ try {
+ // we do not update schema here, so aligned is not relevant
+ DataNodeSchemaCache.getInstance()
+ .updateLastCache(
+ databaseName,
+ new PartialPath(deviceID),
+ measurements,
+ null,
+ false,
+ value -> timeValuePairs[value],
+ i -> true,
+ true,
+ 0L);
+ } catch (IllegalPathException e) {
+ logger.error("Failed to construct path for invalidating last cache
of {}", deviceID, e);
+ DataNodeSchemaCache.getInstance().invalidateAll();
+ return;
+ }
+ }
+ newTsFileResource.setLastValues(null);
+ return;
+ }
+
+ if (lastReader != null) {
+ while (lastReader.hasNext()) {
+ Pair<IDeviceID, List<Pair<String, TimeValuePair>>> nextDevice =
lastReader.next();
+ IDeviceID deviceID = nextDevice.left;
+ String[] measurements =
nextDevice.right.stream().map(Pair::getLeft).toArray(String[]::new);
+ TimeValuePair[] timeValuePairs =
+
nextDevice.right.stream().map(Pair::getRight).toArray(TimeValuePair[]::new);
+ try {
+ // we do not update schema here, so aligned is not relevant
+ DataNodeSchemaCache.getInstance()
+ .updateLastCache(
+ databaseName,
+ new PartialPath(deviceID),
+ measurements,
+ null,
+ false,
+ value -> timeValuePairs[value],
+ i -> true,
+ true,
+ 0L);
+ } catch (IllegalPathException e) {
+ logger.error("Failed to construct path for invalidating last cache
of {}", deviceID, e);
+ DataNodeSchemaCache.getInstance().invalidateAll();
+ return;
+ }
+ }
+ } else {
+ DataNodeSchemaCache.getInstance().invalidateAll();
+ }
+ }
+
private long getAndSetNewVersion(long timePartitionId, TsFileResource
tsFileResource) {
long version =
partitionMaxFileVersions.compute(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 87ae50e872f..7761ea5c3b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -47,6 +47,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.FilePathUtils;
import org.apache.tsfile.utils.Pair;
@@ -176,6 +177,8 @@ public class TsFileResource {
private InsertionCompactionCandidateStatus
insertionCompactionCandidateStatus =
InsertionCompactionCandidateStatus.NOT_CHECKED;
+ private Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues;
+
@TestOnly
public TsFileResource() {
this.tsFileID = new TsFileID();
@@ -1251,4 +1254,12 @@ public class TsFileResource {
public void
setInsertionCompactionTaskCandidate(InsertionCompactionCandidateStatus status) {
insertionCompactionCandidateStatus = status;
}
+
+ public Map<IDeviceID, List<Pair<String, TimeValuePair>>> getLastValues() {
+ return lastValues;
+ }
+
+ public void setLastValues(Map<IDeviceID, List<Pair<String, TimeValuePair>>>
lastValues) {
+ this.lastValues = lastValues;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
index 5b42dc4ecd3..3fe05b5df55 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
@@ -43,12 +43,15 @@ import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.reader.page.PageReader;
import org.apache.tsfile.read.reader.page.TimePageReader;
import org.apache.tsfile.read.reader.page.ValuePageReader;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +66,7 @@ import java.util.Map;
import java.util.Set;
public class TsFileResourceUtils {
+
private static final Logger logger =
LoggerFactory.getLogger(TsFileResourceUtils.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private static final String VALIDATE_FAILED = "validate failed,";
@@ -407,27 +411,87 @@ public class TsFileResourceUtils {
}
public static void updateTsFileResource(
- TsFileSequenceReader reader, TsFileResource tsFileResource) throws
IOException {
- updateTsFileResource(reader.getAllTimeseriesMetadata(false),
tsFileResource);
+ TsFileSequenceReader reader, TsFileResource tsFileResource, boolean
cacheLastValues)
+ throws IOException {
+ updateTsFileResource(reader.getAllTimeseriesMetadata(false),
tsFileResource, cacheLastValues);
tsFileResource.updatePlanIndexes(reader.getMinPlanIndex());
tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex());
}
public static void updateTsFileResource(
- Map<IDeviceID, List<TimeseriesMetadata>> device2Metadata, TsFileResource
tsFileResource) {
+ Map<IDeviceID, List<TimeseriesMetadata>> device2Metadata,
+ TsFileResource tsFileResource,
+ boolean cacheLastValue) {
// For async recover tsfile, there might be a FileTimeIndex, we need a new
newTimeIndex
ITimeIndex newTimeIndex =
tsFileResource.getTimeIndex().getTimeIndexType() ==
ITimeIndex.FILE_TIME_INDEX_TYPE
? config.getTimeIndexLevel().getTimeIndex()
: tsFileResource.getTimeIndex();
+ Map<IDeviceID, List<Pair<String, TimeValuePair>>> deviceLastValues =
+ tsFileResource.getLastValues();
+ long lastValueMemCost = 0;
+ if (cacheLastValue && deviceLastValues == null) {
+ deviceLastValues = new HashMap<>(device2Metadata.size());
+ }
for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
device2Metadata.entrySet()) {
+ List<Pair<String, TimeValuePair>> seriesLastValues = null;
+ if (deviceLastValues != null) {
+ seriesLastValues = new ArrayList<>(entry.getValue().size());
+ }
+
for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
newTimeIndex.updateStartTime(
entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
newTimeIndex.updateEndTime(entry.getKey(),
timeseriesMetaData.getStatistics().getEndTime());
+ if (deviceLastValues != null) {
+ if (timeseriesMetaData.getTsDataType() != TSDataType.BLOB) {
+ TsPrimitiveType value;
+ value =
+ TsPrimitiveType.getByType(
+ timeseriesMetaData.getTsDataType() == TSDataType.VECTOR
+ ? TSDataType.INT64
+ : timeseriesMetaData.getTsDataType(),
+ timeseriesMetaData.getStatistics().getLastValue());
+ seriesLastValues.add(
+ new Pair<>(
+ timeseriesMetaData.getMeasurementId(),
+ new
TimeValuePair(timeseriesMetaData.getStatistics().getEndTime(), value)));
+ } else {
+ seriesLastValues.add(new
Pair<>(timeseriesMetaData.getMeasurementId(), null));
+ }
+ }
+ }
+ if (deviceLastValues != null) {
+ lastValueMemCost += entry.getKey().ramBytesUsed();
+ for (Pair<String, TimeValuePair> lastValue : seriesLastValues) {
+ if (lastValue == null) {
+ continue;
+ }
+ // pair
+ lastValueMemCost += RamUsageEstimator.shallowSizeOf(lastValue);
+ // measurement name
+ lastValueMemCost += RamUsageEstimator.sizeOf(lastValue.left);
+ TimeValuePair right = lastValue.getRight();
+ lastValueMemCost +=
+ right != null ? right.getSize() :
RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+ }
+ // ArrayList
+ lastValueMemCost +=
+ (long) seriesLastValues.size() *
RamUsageEstimator.NUM_BYTES_OBJECT_REF
+ + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+ + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
+ lastValueMemCost += RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
+ if (lastValueMemCost <= config.getCacheLastValuesMemoryBudgetInByte())
{
+ deviceLastValues
+ .computeIfAbsent(entry.getKey(), deviceID -> new ArrayList<>())
+ .addAll(seriesLastValues);
+ } else {
+ deviceLastValues = null;
+ }
}
}
tsFileResource.setTimeIndex(newTimeIndex);
+ tsFileResource.setLastValues(deviceLastValues);
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
index 0880697b02c..094636234d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
@@ -116,7 +116,7 @@ public abstract class AbstractTsFileRecoverPerformer
implements Closeable {
protected void reconstructResourceFile() throws IOException {
try (TsFileSequenceReader reader =
new
TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath())) {
- TsFileResourceUtils.updateTsFileResource(reader, tsFileResource);
+ TsFileResourceUtils.updateTsFileResource(reader, tsFileResource, false);
}
// set progress index for pipe to avoid data loss
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 4ab5acae15f..25b594631aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -53,10 +53,15 @@ import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyT
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,12 +73,15 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -502,7 +510,7 @@ public class LoadTsFileManager {
tsFileResource,
timePartitionProgressIndexMap.getOrDefault(
entry.getKey().getTimePartitionSlot(),
MinimumProgressIndex.INSTANCE));
- dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe);
+ dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe,
false);
// Metrics
dataRegion
@@ -518,12 +526,81 @@ public class LoadTsFileManager {
TsFileIOWriter writer, TsFileResource tsFileResource, ProgressIndex
progressIndex)
throws IOException {
// Update time index by chunk groups still in memory
+ Map<IDeviceID, Map<String, TimeValuePair>> deviceLastValues = null;
+ if
(IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad()) {
+ deviceLastValues = new HashMap<>();
+ }
+ AtomicLong lastValuesMemCost = new AtomicLong(0);
+
for (final ChunkGroupMetadata chunkGroupMetadata :
writer.getChunkGroupMetadataList()) {
final IDeviceID device = chunkGroupMetadata.getDevice();
for (final ChunkMetadata chunkMetadata :
chunkGroupMetadata.getChunkMetadataList()) {
tsFileResource.updateStartTime(device, chunkMetadata.getStartTime());
tsFileResource.updateEndTime(device, chunkMetadata.getEndTime());
+ if (deviceLastValues != null) {
+ Map<String, TimeValuePair> deviceMap =
+ deviceLastValues.computeIfAbsent(
+ device,
+ d -> {
+ Map<String, TimeValuePair> map = new HashMap<>();
+
lastValuesMemCost.addAndGet(RamUsageEstimator.shallowSizeOf(map));
+ lastValuesMemCost.addAndGet(device.ramBytesUsed());
+ return map;
+ });
+ int prevSize = deviceMap.size();
+ deviceMap.compute(
+ chunkMetadata.getMeasurementUid(),
+ (m, oldPair) -> {
+ if (oldPair != null && oldPair.getTimestamp() >
chunkMetadata.getEndTime()) {
+ return oldPair;
+ }
+ TsPrimitiveType lastValue =
+ chunkMetadata.getStatistics() != null
+ && chunkMetadata.getDataType() != TSDataType.BLOB
+ ? TsPrimitiveType.getByType(
+ chunkMetadata.getDataType() == TSDataType.VECTOR
+ ? TSDataType.INT64
+ : chunkMetadata.getDataType(),
+ chunkMetadata.getStatistics().getLastValue())
+ : null;
+ TimeValuePair timeValuePair =
+ lastValue != null
+ ? new TimeValuePair(chunkMetadata.getEndTime(),
lastValue)
+ : null;
+ if (oldPair != null) {
+ lastValuesMemCost.addAndGet(-oldPair.getSize());
+ }
+ if (timeValuePair != null) {
+ lastValuesMemCost.addAndGet(timeValuePair.getSize());
+ }
+ return timeValuePair;
+ });
+ int afterSize = deviceMap.size();
+ lastValuesMemCost.addAndGet(
+ (afterSize - prevSize) *
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+ if (lastValuesMemCost.get()
+ > IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getCacheLastValuesMemoryBudgetInByte()) {
+ deviceLastValues = null;
+ }
+ }
+ }
+ }
+ if (deviceLastValues != null) {
+ Map<IDeviceID, List<Pair<String, TimeValuePair>>>
finalDeviceLastValues;
+ finalDeviceLastValues = new HashMap<>(deviceLastValues.size());
+ for (final Map.Entry<IDeviceID, Map<String, TimeValuePair>> entry :
+ deviceLastValues.entrySet()) {
+ final IDeviceID device = entry.getKey();
+ Map<String, TimeValuePair> lastValues = entry.getValue();
+ List<Pair<String, TimeValuePair>> pairList =
+ lastValues.entrySet().stream()
+ .map(e -> new Pair<>(e.getKey(), e.getValue()))
+ .collect(Collectors.toList());
+ finalDeviceLastValues.put(device, pairList);
}
+ tsFileResource.setLastValues(finalDeviceLastValues);
}
tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
tsFileResource.setProgressIndex(progressIndex);
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 8e718667646..ac30871fdc3 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -2048,6 +2048,24 @@ load_active_listening_max_thread_num=0
# Datatype: int
load_active_listening_check_interval_seconds=5
+# The operation performed to LastCache when a TsFile is successfully loaded.
+# UPDATE: use the data in the TsFile to update LastCache;
+# UPDATE_NO_BLOB: similar to UPDATE, but will invalidate LastCache for blob
series;
+# CLEAN_DEVICE: invalidate LastCache of devices contained in the TsFile;
+# CLEAN_ALL: clean the whole LastCache.
+last_cache_operation_on_load=UPDATE_NO_BLOB
+
+# Whether to cache last values before loading a TsFile. Only effective when
+# last_cache_operation_on_load=UPDATE_NO_BLOB or
last_cache_operation_on_load=UPDATE.
+# When set to true, blob series will be ignored even with
last_cache_operation_on_load=UPDATE.
+# Enabling this will increase the memory footprint during loading TsFiles.
+cache_last_values_for_load=true
+
+# When cache_last_values_for_load=true, the maximum memory that can be used to
cache last values.
+# If this value is exceeded, the cached values will be abandoned and last
values will be read from
+# the TsFile in a streaming manner.
+cache_last_values_memory_budget_in_byte=4194304
+
####################
### Dispatch Retry Configuration
####################
diff --git a/pom.xml b/pom.xml
index 04ed597d290..9c67e1f1734 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,7 +175,7 @@
<thrift.version>0.14.1</thrift.version>
<xz.version>1.9</xz.version>
<zstd-jni.version>1.5.6-3</zstd-jni.version>
- <tsfile.version>1.1.1</tsfile.version>
+ <tsfile.version>1.1.2-250603-SNAPSHOT</tsfile.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim