This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch update_last_cache_in_load in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8f6f578c7853ca54cfc6b7857a9b9059206d6691 Author: Tian Jiang <[email protected]> AuthorDate: Wed May 28 10:44:17 2025 +0800 add cacheLastValuesForLoad --- .../apache/iotdb/db/it/IoTDBLoadLastCacheIT.java | 185 ++++++++++++++++++++- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 + .../pipeconsensus/PipeConsensusReceiver.java | 12 +- .../plan/analyze/load/LoadTsFileAnalyzer.java | 10 +- .../db/storageengine/dataregion/DataRegion.java | 22 +++ .../dataregion/tsfile/TsFileResource.java | 11 ++ .../dataregion/utils/TsFileResourceUtils.java | 45 ++++- .../file/AbstractTsFileRecoverPerformer.java | 2 +- .../db/storageengine/load/LoadTsFileManager.java | 46 +++++ 10 files changed, 338 insertions(+), 12 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java index f05683056f0..9b54a9ea724 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java @@ -27,6 +27,7 @@ import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.LocalStandaloneIT; import org.apache.iotdb.jdbc.IoTDBSQLException; +import com.google.common.util.concurrent.RateLimiter; import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.ColumnSchema; @@ -55,13 +56,17 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.List; import java.util.Objects; +import java.util.Random; import java.util.concurrent.TimeUnit; +@SuppressWarnings({"ResultOfMethodCallIgnored", "UnstableApiUsage"}) @RunWith(Parameterized.class) @Category({LocalStandaloneIT.class, ClusterIT.class}) public class IoTDBLoadLastCacheIT { @@ -276,8 +281,6 @@ public class IoTDBLoadLastCacheIT { @Test public void testTableModelLoadWithLastCache() throws Exception { - registerSchema(); - final String database = SchemaConfig.DATABASE_0; final String table = SchemaConfig.TABLE_0; final String measurement = SchemaConfig.MEASUREMENT_00.getMeasurementName(); @@ -350,6 +353,184 @@ public class IoTDBLoadLastCacheIT { } } + private static class PerformanceSchemas { + + private final String database; + private final TableSchema tableSchema; + private final List<String> columnNames; + private final List<TSDataType> dataTypes; + + public PerformanceSchemas(String database, String tableName, int measurementNum) { + this.database = database; + List<ColumnSchema> columnSchemas = new ArrayList<>(measurementNum); + columnNames = new ArrayList<>(measurementNum); + dataTypes = new ArrayList<>(measurementNum); + + columnSchemas.add(new ColumnSchema("device_id", TSDataType.STRING, ColumnCategory.TAG)); + columnNames.add("device_id"); + dataTypes.add(TSDataType.STRING); + for (int i = 0; i < measurementNum; i++) { + columnSchemas.add(new ColumnSchema("s" + i, TSDataType.INT64, ColumnCategory.FIELD)); + columnNames.add("s" + i); + dataTypes.add(TSDataType.INT64); + } + tableSchema = new TableSchema(tableName, columnSchemas); + } + } + + private void generateAndLoadOne( + int deviceCnt, + int measurementCnt, + int pointCnt, + int offset, + PerformanceSchemas schemas, + int fileNum) + throws Exception { + File file = new File("target" + File.separator + fileNum + ".tsfile"); + try (ITsFileWriter tsFileWriter = + new TsFileWriterBuilder().file(file).tableSchema(schemas.tableSchema).build()) { + Tablet tablet = new Tablet(schemas.columnNames, schemas.dataTypes, pointCnt * deviceCnt); + int rowIndex = 0; + for (int i = 0; i < deviceCnt; i++) { + for (int j = 0; j < pointCnt; j++) { + tablet.addTimestamp(rowIndex, j + offset); + tablet.addValue(rowIndex, 0, "d" + i); + for (int k = 0; k < measurementCnt; k++) { + tablet.addValue(rowIndex, k + 1, (long) j + offset); + } + rowIndex++; + } + } + tsFileWriter.write(tablet); + } + + try (final Connection connection = EnvFactory.getEnv().getTableConnection(); + final Statement statement = connection.createStatement()) { + statement.execute("USE " + schemas.database); + statement.execute( + String.format( + "load '%s' with ('database-name'='%s')", file.getAbsolutePath(), schemas.database)); + } + file.delete(); + } + + private void generateAndLoadAll( + int deviceCnt, int measurementCnt, int pointCnt, PerformanceSchemas schemas, int fileNum) + throws Exception { + for (int i = 0; i < fileNum; i++) { + generateAndLoadOne(deviceCnt, measurementCnt, pointCnt, pointCnt * i, schemas, fileNum); + } + } + + private long queryLastOnce(int deviceNum, int measurementNum, PerformanceSchemas schemas) + throws SQLException { + try (final Connection connection = EnvFactory.getEnv().getTableConnection(); + final Statement statement = connection.createStatement()) { + statement.execute("USE " + schemas.database); + + try (final ResultSet resultSet = + statement.executeQuery( + String.format( + "select last(%s) from %s where device_id='%s'", + "s" + measurementNum, schemas.tableSchema.getTableName(), "d" + deviceNum))) { + if (resultSet.next()) { + return resultSet.getLong("_col0"); + } else { + return -1; + } + } catch (SQLException e) { + if (!e.getMessage().contains("does not exist")) { + throw e; + } + } + } + return -1; + } + + @SuppressWarnings("BusyWait") + private void queryAll( + int deviceCnt, + int measurementCnt, + int pointCnt, + int fileCnt, + PerformanceSchemas schemas, + RateLimiter rateLimiter) + throws SQLException { + Random random = new Random(); + long totalStart = System.currentTimeMillis(); + List<Long> timeConsumptions = new ArrayList<>(); + + while (true) { + int deviceNum = random.nextInt(deviceCnt); + int measurementNum = random.nextInt(measurementCnt); + rateLimiter.acquire(); + long start = System.currentTimeMillis(); + long result = queryLastOnce(deviceNum, measurementNum, schemas); + long timeConsumption = System.currentTimeMillis() - start; + if (result == -1) { + try { + Thread.sleep(1000); + continue; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + System.out.printf("%s: %s %s%n", new Date(), result, timeConsumption); + timeConsumptions.add(timeConsumption); + if (result == (long) pointCnt * fileCnt - 1) { + break; + } + } + System.out.printf( + "Synchronization ends after %dms%n, query latency avg %f", + System.currentTimeMillis() - totalStart, + timeConsumptions.stream().mapToLong(i -> i).average().orElse(0.0)); + } + + // @Ignore("Performance") + @Test + public void testTableLoadPerformance() throws Exception { + int deviceCnt = 1000; + int measurementCnt = 100; + int pointCnt = 100; + int fileCnt = 100; + int queryPerSec = 10; + + PerformanceSchemas schemas = new PerformanceSchemas("test", "test_table", measurementCnt); + + try (final Connection connection = EnvFactory.getEnv().getTableConnection(); + final Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE IF NOT EXISTS " + schemas.database); + } + + Thread loadThread = + new Thread( + () -> { + try { + generateAndLoadAll(deviceCnt, measurementCnt, pointCnt, schemas, fileCnt); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + RateLimiter rateLimiter = RateLimiter.create(queryPerSec); + Thread queryThread = + new Thread( + () -> { + try { + queryAll(deviceCnt, measurementCnt, pointCnt, fileCnt, schemas, rateLimiter); + } catch (SQLException e) { + e.printStackTrace(); + } + }); + + loadThread.start(); + queryThread.start(); + + loadThread.join(); + queryThread.join(); + } + private static class SchemaConfig { private static final String DATABASE_0 = "db"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index adc0b70cd31..47a080f5d7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1151,6 +1151,8 @@ public class IoTDBConfig { private LastCacheLoadStrategy lastCacheLoadStrategy = LastCacheLoadStrategy.UPDATE; + private boolean cacheLastValuesForLoad = true; + IoTDBConfig() {} public int getMaxLogEntriesNumPerBatch() { @@ -4079,4 +4081,14 @@ public class IoTDBConfig { public void setLastCacheLoadStrategy(LastCacheLoadStrategy lastCacheLoadStrategy) { this.lastCacheLoadStrategy = lastCacheLoadStrategy; } + + public boolean isCacheLastValuesForLoad() { + return (lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE + || lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE_NO_BLOB) + && cacheLastValuesForLoad; + } + + public void setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) { + this.cacheLastValuesForLoad = cacheLastValuesForLoad; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index c927c0f4cfb..e1a9d155c98 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2267,6 +2267,11 @@ public class IoTDBDescriptor { LastCacheLoadStrategy.valueOf( properties.getProperty( "last_cache_operation_on_load", LastCacheLoadStrategy.UPDATE.name()))); + + conf.setCacheLastValuesForLoad( + Boolean.parseBoolean( + properties.getProperty( + "cache_last_values_for_load", String.valueOf(conf.isCacheLastValuesForLoad())))); } private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 843cc1765c9..e13d2a08297 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -718,7 +718,11 @@ public class PipeConsensusReceiver { DataRegion region = StorageEngine.getInstance().getDataRegion(((DataRegionId) consensusGroupId)); if (region != null) { - TsFileResource resource = generateTsFileResource(filePath, progressIndex); + TsFileResource resource = + generateTsFileResource( + filePath, + progressIndex, + IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad()); region.loadNewTsFile(resource, true, false, true); } else { // Data region is null indicates that dr has been removed or migrated. In those cases, there @@ -773,13 +777,13 @@ public class PipeConsensusReceiver { dataRegion, databaseName, writePointCount, true))); } - private TsFileResource generateTsFileResource(String filePath, ProgressIndex progressIndex) - throws IOException { + private TsFileResource generateTsFileResource( + String filePath, ProgressIndex progressIndex, boolean cacheLastValues) throws IOException { final File tsFile = new File(filePath); final TsFileResource tsFileResource = new TsFileResource(tsFile); try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { - TsFileResourceUtils.updateTsFileResource(reader, tsFileResource); + TsFileResourceUtils.updateTsFileResource(reader, tsFileResource, cacheLastValues); } tsFileResource.setStatus(TsFileResourceStatus.NORMAL); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 4fccee49e12..8232a71d30d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -519,7 +519,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable { // Update time index no matter if resource file exists or not, because resource file may be // untrusted - TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata, tsFileResource); + TsFileResourceUtils.updateTsFileResource( + device2TimeseriesMetadata, + tsFileResource, + IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad()); getOrCreateTreeSchemaVerifier().setCurrentTimeIndex(tsFileResource.getTimeIndex()); if (isAutoCreateSchemaOrVerifySchemaEnabled) { @@ -578,7 +581,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable { // Update time index no matter if resource file exists or not, because resource file may be // untrusted - TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata, tsFileResource); + TsFileResourceUtils.updateTsFileResource( + device2TimeseriesMetadata, + tsFileResource, + IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad()); getOrCreateTableSchemaCache().setCurrentTimeIndex(tsFileResource.getTimeIndex()); for (IDeviceID deviceId : device2TimeseriesMetadata.keySet()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 2d749622ce6..1947eb9ba9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3131,6 +3131,28 @@ public class DataRegion implements IDataRegionForQuery { throws Exception { boolean isTableModel = isTableModelDatabase(databaseName); + Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues = + newTsFileResource.getLastValues(); + if (lastValues != null) { + for (Entry<IDeviceID, List<Pair<String, TimeValuePair>>> entry : lastValues.entrySet()) { + IDeviceID deviceID = entry.getKey(); + String[] measurements = entry.getValue().stream().map(Pair::getLeft).toArray(String[]::new); + TimeValuePair[] timeValuePairs = + entry.getValue().stream().map(Pair::getRight).toArray(TimeValuePair[]::new); + if (isTableModel) { + TableDeviceSchemaCache.getInstance() + .updateLastCacheIfExists(databaseName, deviceID, measurements, timeValuePairs); + } else { + // we do not update schema here, so aligned is not relevant + TreeDeviceSchemaCacheManager.getInstance() + .updateLastCacheIfExists( + databaseName, deviceID, measurements, timeValuePairs, false, null); + } + } + newTsFileResource.setLastValues(null); + return; + } + try (TsFileLastReader lastReader = new TsFileLastReader(newTsFileResource.getTsFilePath(), true, ignoreBlob)) { while (lastReader.hasNext()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 2d2fbea1e74..1beb48ee246 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -56,6 +56,7 @@ import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.ITimeSeriesMetadata; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.fileSystem.fsFactory.FSFactory; +import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.FilePathUtils; import org.apache.tsfile.utils.Pair; @@ -205,6 +206,8 @@ public class TsFileResource implements PersistentResource { private InsertionCompactionCandidateStatus insertionCompactionCandidateStatus = InsertionCompactionCandidateStatus.NOT_CHECKED; + private Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues; + @TestOnly public TsFileResource() { this.tsFileID = new TsFileID(); @@ -1565,4 +1568,12 @@ public class TsFileResource implements PersistentResource { public void setCompactionModFile(ModificationFile compactionModFile) { this.compactionModFile = compactionModFile; } + + public Map<IDeviceID, List<Pair<String, TimeValuePair>>> getLastValues() { + return lastValues; + } + + public void setLastValues(Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues) { + this.lastValues = lastValues; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java index 6ff2b605cf4..16e7939069f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java @@ -42,12 +42,14 @@ import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.BatchData; import org.apache.tsfile.read.reader.page.PageReader; import org.apache.tsfile.read.reader.page.TimePageReader; import org.apache.tsfile.read.reader.page.ValuePageReader; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.TsPrimitiveType; import org.apache.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +64,7 @@ import java.util.Map; import java.util.Set; public class TsFileResourceUtils { + private static final Logger logger = LoggerFactory.getLogger(TsFileResourceUtils.class); private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); private static final String VALIDATE_FAILED = "validate failed,"; @@ -409,27 +412,63 @@ public class TsFileResourceUtils { } public static void updateTsFileResource( - TsFileSequenceReader reader, TsFileResource tsFileResource) throws IOException { - updateTsFileResource(reader.getAllTimeseriesMetadata(false), tsFileResource); + TsFileSequenceReader reader, TsFileResource tsFileResource, boolean cacheLastValues) + throws IOException { + updateTsFileResource(reader.getAllTimeseriesMetadata(false), tsFileResource, cacheLastValues); tsFileResource.updatePlanIndexes(reader.getMinPlanIndex()); tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex()); } public static void updateTsFileResource( - Map<IDeviceID, List<TimeseriesMetadata>> device2Metadata, TsFileResource tsFileResource) { + Map<IDeviceID, List<TimeseriesMetadata>> device2Metadata, + TsFileResource tsFileResource, + boolean cacheLastValue) { // For async recover tsfile, there might be a FileTimeIndex, we need a new newTimeIndex ITimeIndex newTimeIndex = tsFileResource.getTimeIndex().getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE ? config.getTimeIndexLevel().getTimeIndex() : tsFileResource.getTimeIndex(); + Map<IDeviceID, List<Pair<String, TimeValuePair>>> deviceLastValues = + tsFileResource.getLastValues(); + if (cacheLastValue && deviceLastValues == null) { + deviceLastValues = new HashMap<>(device2Metadata.size()); + } for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) { + List<Pair<String, TimeValuePair>> seriesLastValues = null; + if (cacheLastValue) { + seriesLastValues = new ArrayList<>(entry.getValue().size()); + } + for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) { newTimeIndex.updateStartTime( entry.getKey(), timeseriesMetaData.getStatistics().getStartTime()); newTimeIndex.updateEndTime(entry.getKey(), timeseriesMetaData.getStatistics().getEndTime()); + if (cacheLastValue) { + if (timeseriesMetaData.getTsDataType() != TSDataType.BLOB) { + TsPrimitiveType value; + value = + TsPrimitiveType.getByType( + timeseriesMetaData.getTsDataType() == TSDataType.VECTOR + ? TSDataType.INT64 + : timeseriesMetaData.getTsDataType(), + timeseriesMetaData.getStatistics().getLastValue()); + seriesLastValues.add( + new Pair<>( + timeseriesMetaData.getMeasurementId(), + new TimeValuePair(timeseriesMetaData.getStatistics().getEndTime(), value))); + } else { + seriesLastValues.add(new Pair<>(timeseriesMetaData.getMeasurementId(), null)); + } + } + } + if (cacheLastValue) { + deviceLastValues + .computeIfAbsent(entry.getKey(), deviceID -> new ArrayList<>()) + .addAll(seriesLastValues); } } tsFileResource.setTimeIndex(newTimeIndex); + tsFileResource.setLastValues(deviceLastValues); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java index 73621a5208e..62a6a465410 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java @@ -115,7 +115,7 @@ public abstract class AbstractTsFileRecoverPerformer implements Closeable { protected void reconstructResourceFile() throws IOException { try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath())) { - TsFileResourceUtils.updateTsFileResource(reader, tsFileResource); + TsFileResourceUtils.updateTsFileResource(reader, tsFileResource, false); } // set progress index for pipe to avoid data loss diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index d6962e2ae35..b483266a544 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -54,9 +54,13 @@ import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyT import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.ChunkGroupMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.TsPrimitiveType; import org.apache.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,12 +72,14 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT; @@ -533,12 +539,52 @@ public class LoadTsFileManager { TsFileIOWriter writer, TsFileResource tsFileResource, ProgressIndex progressIndex) throws IOException { // Update time index by chunk groups still in memory + Map<IDeviceID, Map<String, TimeValuePair>> deviceLastValues = null; + if (IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad()) { + deviceLastValues = new HashMap<>(); + } for (final ChunkGroupMetadata chunkGroupMetadata : writer.getChunkGroupMetadataList()) { final IDeviceID device = chunkGroupMetadata.getDevice(); for (final ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) { tsFileResource.updateStartTime(device, chunkMetadata.getStartTime()); tsFileResource.updateEndTime(device, chunkMetadata.getEndTime()); + if (deviceLastValues != null) { + deviceLastValues + .computeIfAbsent(device, d -> new HashMap<>()) + .compute( + chunkMetadata.getMeasurementUid(), + (m, oldPair) -> { + if (oldPair != null && oldPair.getTimestamp() > chunkMetadata.getEndTime()) { + return oldPair; + } + TsPrimitiveType lastValue = + chunkMetadata.getStatistics() != null + && chunkMetadata.getDataType() != TSDataType.BLOB + ? TsPrimitiveType.getByType( + chunkMetadata.getDataType() == TSDataType.VECTOR + ? TSDataType.INT64 + : chunkMetadata.getDataType(), + chunkMetadata.getStatistics().getLastValue()) + : null; + return new TimeValuePair(chunkMetadata.getEndTime(), lastValue); + }); + } + } + } + if (deviceLastValues != null) { + Map<IDeviceID, List<Pair<String, TimeValuePair>>> finalDeviceLastValues; + finalDeviceLastValues = new HashMap<>(deviceLastValues.size()); + for (final Map.Entry<IDeviceID, Map<String, TimeValuePair>> entry : + deviceLastValues.entrySet()) { + final IDeviceID device = entry.getKey(); + Map<String, TimeValuePair> lastValues = entry.getValue(); + List<Pair<String, TimeValuePair>> pairList = + lastValues.entrySet().stream() + .map(e -> new Pair<>(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + finalDeviceLastValues.put(device, pairList); } + tsFileResource.setLastValues(finalDeviceLastValues); } tsFileResource.setStatus(TsFileResourceStatus.NORMAL); tsFileResource.setProgressIndex(progressIndex);
